26 fInputQueueCapacity(0),
27 fOutputQueueCapacity(0),
34 fAcknSendBufBusy(false),
47 EOUT(
"Cannot obtain network addon for the NetworkTransport");
69 EOUT(
"No memory pool specified to provided buffers for network transport");
104 EOUT(
"Pool required for input transport or for the acknowledge queue");
114 DOUT2(
"#### ~NetworkTransport fRecs %p", fRecs);
121 DOUT3(
"NetworkTransport::TransportCleanup");
126 fAcknSendQueue.Reset();
129 for (uint32_t n=0;n<fNumRecs;n++) {
130 fRecs[n].used =
false;
131 fRecs[n].buf.Release();
144 fRecs[recid].header = header;
145 if (fInlineDataSize > 0)
146 fRecs[recid].inlinebuf = (
char*) header +
sizeof(
NetworkHeader);
151 if (fNumRecs == 0)
return 0;
153 uint32_t cnt = fNumRecs;
156 recid = fRecsCounter;
157 fRecsCounter = (fRecsCounter+1) % fNumRecs;
158 if (!fRecs[recid].used) {
159 fRecs[recid].used =
true;
160 fRecs[recid].kind = kind;
161 fRecs[recid].buf << buf;
162 fRecs[recid].extras = extras;
168 EOUT(
"Cannot allocate NetIORec. Halt");
169 EOUT(
"SendQueue %u RecvQueue %u NumRecs %u used %u", fOutputQueueSize, fInputQueueSize, fNumRecs, fNumUsedRecs);
175 if (recid<fNumRecs) {
176 if (!fRecs[recid].buf.null())
EOUT(
"Buffer is not empty when record is released !!!!");
177 fRecs[recid].used =
false;
180 EOUT(
"Error recid %u", recid);
191 if (hdr==0)
return 0;
194 hdr->
kind = fRecs[recid].kind;
195 if (fRecs[recid].buf.null()) {
199 if (hdr->
kind & netot_HdrSend) {
201 hdr->
size = fRecs[recid].extras;
207 hdr->
typid = fRecs[recid].buf.GetTypeId();
208 hdr->
size = fRecs[recid].buf.GetTotalSize();
211 if ((hdr->
size>0) && fRecs[recid].inlinebuf && (hdr->
size<=fInlineDataSize)) {
212 fRecs[recid].buf.CopyTo(fRecs[recid].inlinebuf, hdr->
size);
228 if (isTransportError())
return;
230 unsigned newitems(0), numcansubmit(0);
232 if (IsInputTransport()) {
234 EOUT(
"No memory pool in input transport");
235 CloseTransport(
true);
238 if (NumOutputs()==0) {
239 EOUT(
"No output port for input transport");
240 CloseTransport(
true);
243 numcansubmit = NumCanSend();
249 numcansubmit = fInputQueueCapacity;
252 while (fInputQueueSize < numcansubmit) {
255 if (IsInputTransport()) {
257 if (freebuf) buf << *freebuf;
259 if (buf.
null()) buf = TakeBuffer();
262 if (IsAutoPool()) ShootTimer(
"SysTimer", 0.001);
267 uint32_t recvrec = TakeRec(buf, netot_Recv);
270 fNet->SubmitRecv(recvrec);
273 if (freebuf && onlyfreebuf)
break;
279 CheckAcknReadyCounter(newitems);
287 DOUT5(
"CheckAcknReadyCounter ackn:%s pool:%s inp:%s",
DBOOL(fUseAckn), PoolName().c_str(),
DBOOL(IsInputTransport()));
289 if (!fUseAckn || (NumPools()==0) || !IsInputTransport())
return false;
291 fAcknReadyCounter+=newitems;
293 if (fAcknSendBufBusy)
return false;
295 unsigned ackn_limit = fFirstAckn ? fInputQueueCapacity : fInputQueueCapacity/2;
296 if (ackn_limit<1) ackn_limit = 1;
298 DOUT5(
"fAcknReadyCounter = %d limit = %d", fAcknReadyCounter, ackn_limit);
301 if (fAcknReadyCounter<ackn_limit)
return false;
303 fAcknSendBufBusy =
true;
305 fAcknReadyCounter -= ackn_limit;
311 uint32_t recid = TakeRec(buf, netot_HdrSend, ackn_limit);
313 fNet->SubmitSend(recid);
320 while ((fAcknAllowedOper>0) && (fAcknSendQueue.Size()>0)) {
321 uint32_t recid = fAcknSendQueue.Pop();
323 fNet->SubmitSend(recid);
329 if (recid>=fNumRecs) {
EOUT(
"Recid fail %u %u", recid, fNumRecs);
return; }
331 bool checkackn(
false);
333 fRecs[recid].buf.Release();
335 if (fRecs[recid].kind & netot_Send) {
340 EOUT(
"One cannot recieve buffer!!!!");
347 if (fRecs[recid].kind & netot_HdrSend) {
348 fAcknSendBufBusy =
false;
351 EOUT(
"Wrong kind=%u in ProcessSendCompl", fRecs[recid].kind);
359 if (checkackn) CheckAcknReadyCounter(0);
368 if (recid>=fNumRecs) {
369 EOUT(
"Recid fail tr %p %u %u",
this, recid, fNumRecs);
377 EOUT(
"Error in network header magic number");
379 CloseTransport(
true);
385 if (hdr->
kind & netot_HdrSend) {
386 uint32_t extras = hdr->
size;
388 fAcknAllowedOper += extras;
389 SubmitAllowedSendOperations();
395 buf << fRecs[recid].buf;
407 buf << fRecs[recid].buf;
412 if ((hdr->
size>0) && (hdr->
size <= fInlineDataSize))
433 unsigned numbufs = NumCanRecv(port);
435 while (fOutputQueueSize < numbufs) {
438 Buffer buf = RecvQueueItem(port, fOutputQueueSize);
440 uint32_t recid = TakeRec(buf, netot_Send);
441 if (recid==fNumRecs) {
442 EOUT(
"No available recs!!!");
449 if (fAcknSendQueue.Capacity() > 0) {
450 fAcknSendQueue.Push(recid);
452 fNet->SubmitSend(recid);
456 SubmitAllowedSendOperations();
510 EOUT(
"Port or request disappear while connection is ready");
526 if (newthrdname.empty()) newthrdname = devthrdname;
540 DOUT0(
"!!!!!! NETWORK TRANSPORT IS CREATED !!!!");
545 EOUT(
"No thread for transport");
Reference on memory from memory pool.
void SetTotalSize(BufferSize_t len)
Set total length of the buffer to specified value Size cannot be bigger than original size of the buf...
void SetTypeId(unsigned tid)
void SetPoolName(const std::string &name)
Represents command with its arguments.
bool GetUseAckn() const
Use of acknowledge in protocol.
std::string GetPoolName() const
std::string GetConnThread() const
Thread name for transport.
Reference GetPort() const
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
bool ConnectPoolHandles()
Method called by manager to establish connection to pools TODO: while used from devices,...
unsigned NumPools() const
virtual void OnThreadAssigned()
bool IsAutoPool(unsigned indx=0) const
Returns true when handle automatically delivers buffers via the connection.
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
virtual void AllocateNet(unsigned fulloutputqueue, unsigned fullinputqueue)=0
void SetRecHeader(uint32_t recid, void *header)
uint32_t TakeRec(Buffer &buf, uint32_t kind=0, uint32_t extras=0)
virtual void ProcessPoolEvent(unsigned pool)
Method called by framework when pool event is produced.
void ReleaseRec(uint32_t recid)
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
virtual ~NetworkTransport()
void SubmitAllowedSendOperations()
virtual void TransportCleanup()
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
void FillRecvQueue(Buffer *freebuf=0, bool onlyfreebuf=false)
virtual void ProcessInputEvent(unsigned port)
Method called by framework when input event is produced.
void ProcessRecvCompl(uint32_t recid)
virtual void OnThreadAssigned()
bool CheckAcknReadyCounter(unsigned newitems=0)
BufferSize_t fFullHeaderSize
BufferSize_t fInlineDataSize
void ProcessSendCompl(uint32_t recid)
unsigned fInputQueueCapacity
NetIORecsQueue fAcknSendQueue
NetworkTransport(dabc::Command cmd, const PortRef &inpport, const PortRef &outport, bool useackn, WorkerAddon *addon)
int PackHeader(uint32_t recid)
virtual bool StopTransport()
static bool Make(const ConnectionRequest &req, WorkerAddon *addon, const std::string &devthrdname="")
unsigned fAcknReadyCounter
virtual void ProcessOutputEvent(unsigned port)
Method called by framework when output event is produced.
unsigned fOutputQueueCapacity
static void GetRequiredQueuesSizes(const PortRef &port, unsigned &input_size, unsigned &output_size)
Manipulator with dabc::Buffer class.
BufferSize_t copyfrom(const Pointer &src, BufferSize_t sz=0)
Returns actual size copied.
Reference on the dabc::Port class
bool IsInput() const
Returns true if it is input port.
PortRef GetBindPort()
Return reference on the bind port.
unsigned QueueCapacity() const
Returns queue capacity of the port.
void Allocate(unsigned capacity)
bool null() const
Returns true if reference contains nullptr.
void Destroy()
Release reference and starts destroyment of referenced object.
Reference on dabc::Transport class
Base class for transport implementations.
bool IsInputTransport() const
virtual bool StopTransport()
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
bool IsOutputTransport() const
virtual void TransportCleanup()
long Notify(const std::string &cmd, int arg=0)
Generic addon for dabc::Worker.
bool MakeThreadForWorker(const std::string &thrdname="")
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
WorkerAddonRef fAddon
extension of worker for some special events
const unsigned AcknoledgeQueueLength