DABC (Data Acquisition Backbone Core)  2.9.9
DataTransport.cxx
Go to the documentation of this file.
1 // $Id: DataTransport.cxx 4764 2021-04-23 07:01:04Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/DataTransport.h"
17 
18 #include "dabc/Manager.h"
19 #include "dabc/Publisher.h"
20 
21 dabc::InputTransport::InputTransport(dabc::Command cmd, const PortRef& inpport, DataInput* inp, bool owner) :
22  dabc::Transport(cmd, inpport, 0),
23  fInput(0),
24  fInputOwner(false),
25  fInpState(inpInit),
26  fCurrentBuf(),
27  fNextDataSize(0),
28  fPoolChangeCounter(0),
29  fPoolRef(),
30  fExtraBufs(0),
31  fActivateWorkaround(false),
32  fReconnect(),
33  fStopRequested(false)
34 {
35  if (inp!=0) SetDataInput(inp, owner);
36 
37  CreateTimer("SysTimer");
38 // DOUT5("Create InputTransport %s", GetName());
39 }
40 
42 {
43 }
44 
46 {
47  CloseInput();
48 
49  if (!inp) return;
50 
51  fInput = inp;
52  fInputOwner = false;
53  WorkerAddon* addon = inp->Read_GetAddon();
54 
55  if (addon==0) {
56  fInputOwner = owner;
57  } else if (owner) {
58  AssignAddon(addon);
59  } else {
60  EOUT("Transport %s cannot assign addon while owner flag is not specified", GetName());
61  }
62 }
63 
64 void dabc::InputTransport::EnableReconnect(const std::string &reconn)
65 {
66  fReconnect = reconn;
67 }
68 
70 {
71  fPoolRef = dabc::mgr.FindPool(PoolName());
72  if (fPoolRef.null())
73  EOUT("Cannot find memory pool, associated with handle %s, monitoring will not work", PoolName().c_str());
74 }
75 
76 
78 {
79  bool res = Transport::StartTransport();
80 
81  DOUT2("============================= Start InputTransport %s isrunning %s", ItemName().c_str(), DBOOL(IsRunning()));
82 
83  // if we are doing input transport,
84  // generate artificial event for the port to start transport
85  // TODO: should it be default for the module itself???
86  if (fInput==0) {
87  EOUT("Input object is not assigned");
88 
89  return false;
90  }
91 
92  // clear any existing previous request
93  fStopRequested = false;
94 
95  if (!SuitableStateForStartStop()) {
96  EOUT("Start transport %s at not optimal state %u", GetName(), (unsigned) fInpState);
97  }
98 
99  // fNextDataSize = 0;
100  ProduceOutputEvent();
101 
102  return res;
103 }
104 
106 {
107 // DOUT0("Stopping InputTransport %s isrunning %s", GetName(), DBOOL(IsRunning()));
108 
109  DOUT2("Stopping InputTransport %s isrunning %s", GetName(), DBOOL(IsRunning()));
110  if (SuitableStateForStartStop()) {
111  fStopRequested = false;
112  return Transport::StopTransport();
113  }
114 
115  if (!fStopRequested) {
116  DOUT2("%s Try to wait until suitable state is achieved, now state = %u", GetName(), (unsigned) fInpState);
117  fStopRequested = true;
118  fAddon.Notify("TransportWantToStop");
119  }
120  return true;
121 }
122 
124 {
125  // we only need buffer when we explicitly request it
126 
127  if (fInpState != inpWaitBuffer) return false;
128 
129  // only when buffer is awaited, react on buffer event and continue
130  fCurrentBuf = TakeBuffer(pool);
131 
132 // DOUT0("@@@@@@@@@@ Process buffer null %s size %u", DBOOL(fCurrentBuf.null()), fCurrentBuf.GetTotalSize());
133 
134  ChangeState(inpCheckBuffer);
135  ProcessOutputEvent(0);
136 
137  // we are interesting for next buffer event if we really waiting for the buffer
138  return fInpState == inpWaitBuffer;
139 }
140 
142 {
143  if ((fInput!=0) && fInputOwner) {
144  delete fInput;
145  }
146  fInput = nullptr;
147  fInputOwner = false;
148 
149  AssignAddon(nullptr);
150 }
151 
153 {
154  if (fInpState != inpClosed) {
155  CloseInput();
156  fInpState = inpClosed;
157  }
158 
159  fCurrentBuf.Release();
160 
161  fPoolRef.Release();
162 
164 }
165 
166 
168 {
169  if (fInpState == inpInitTimeout)
170  ChangeState(inpInit);
171 
172  if (fInpState == inpComplitTimeout)
173  ChangeState(inpCompleting);
174 
175  ProcessOutputEvent(0);
176 }
177 
179 {
180  bool isfailure = cmd.IsName(CmdDataInputFailed::CmdName());
181 
182  if (isfailure || cmd.IsName(CmdDataInputClosed::CmdName())) {
183 
184  if (fInpState != inpClosed) {
185  CloseInput();
186  ChangeState(inpClosed);
187  }
188 
189  if (fReconnect.empty()) {
190  CloseTransport(isfailure);
191  } else {
192  ChangeState(inpReconnect);
193  ShootTimer("SysTimer", 1.);
194  }
195 
196  return cmd_true;
197  }
198 
199  if (cmd.IsName("GetTransportStatistic")) {
200  // take statistic from output element
201  if (fInput) fInput->Read_Stat(cmd);
202  return cmd_true;
203  }
204 
205  if (cmd.IsName(dabc::CmdGetBinary::CmdName()) && (cmd.GetStr("Kind")=="transport.json")) {
206  dabc::Record info;
207  info.CreateRecord(GetName());
208  info.SetField("IsInput", IsInputTransport());
209  info.SetField("IsOutput", IsOutputTransport());
210  info.SetField("HasInput", fInput ? true : false);
211  info.SetField("InpState", (int) fInpState);
212  cmd.SetStr("StringReply", info.SaveToJson());
213  return cmd_true;
214  }
215 
217 }
218 
219 
221 {
222  // this inform that we are
223  if (sz == dabc::di_QueueBufReady) {
224  // in case when transport waiting for next buffer, switch state to receiving of next ready buffer
225  if (fExtraBufs) fInpState = inpCallBack;
226  }
227 
228  switch (fInpState) {
229 
230  case inpInit:
231  if (!fExtraBufs) {
232  EOUT("Call back at init state not with extra mode");
233  exit(333);
234  }
235  fInpState = inpCompleting;
236  break;
237 
238  case inpSizeCallBack:
239  fInpState = inpCheckSize;
240  // DOUT0("dabc::InputTransport::Get request for buffer %u", sz);
241  fNextDataSize = sz;
242  break;
243 
244  case inpCallBack:
245  fInpState = inpCompleting;
246  break;
247 
248  default:
249  EOUT("Get callback at wrong state %d", fInpState);
250  fInpState = inpError;
251  }
252 
253  if (fActivateWorkaround)
254  ProcessOutputEvent(0);
255  else
256  ActivateOutput(0);
257 }
258 
259 
261 {
262  fInpState = state;
263 
264  if (fStopRequested && SuitableStateForStartStop()) {
265  DOUT2("%s Stop transport at suitable state", GetName());
266  fStopRequested = false;
268  }
269 }
270 
271 
273 {
274  // DOUT0("dabc::InputTransport %s ProcessSend state %d", ItemName().c_str(), fInpState);
275 
276  // if transport was already closed, one should ignore any other events
277  if (fInpState == inpClosed) return false;
278 
279  if (NumPools()==0) {
280  EOUT("InputTransport %s - no memory pool!!!!", GetName());
281  CloseTransport(true);
282  return false;
283  }
284 
285  // if transport was created via device and device is destroyed - close transport as well
286  if (fTransportDevice.DeviceDestroyed()) {
287  if (fInpState != inpClosed) {
288  CloseInput();
289  ChangeState(inpClosed);
290  }
291  CloseTransport(false);
292  return false;
293  }
294 
295  if (fInpState == inpReconnect) {
296 
297  if (fInput!=0) {
298  EOUT("Reconnect when input non 0 - ABORT");
299  CloseTransport(true);
300  return false;
301  }
302 
303  if (fReconnect.empty()) {
304  EOUT("Reconnect not specified - ABORT");
305  CloseTransport(true);
306  return false;
307  }
308 
309  DataInput* inp = dabc::mgr.CreateDataInput(fReconnect);
310 
311  if (inp==0) {
312  ShootTimer("SysTimer", 1.);
313  return false;
314  }
315 
316  SetDataInput(inp, true);
317 
318  ChangeState(inpInit);
319  }
320 
321  if (fInput==0) {
322  EOUT("InputTransport %s - no input object!!!!", GetName());
323  CloseTransport(true);
324  return false;
325  }
326 
327  if ((fInpState == inpSizeCallBack) || (fInpState == inpInitTimeout) || (fInpState == inpComplitTimeout)) {
328  // at these states one should get event from other source first
329  return false;
330  }
331 
332 // EOUT("Tr:%08x Process event", this);
333 
334 
335 
336  // first step - request size of the buffer
337 
338  if (fInpState == inpInit) {
339 
340  // if transport not running, do not start acquire new buffer
341  if (!isTransportRunning()) return false;
342 
343  // if internal queue already acquire as many buffers, wait
344  if ((fExtraBufs > 0) && (NumCanSend(port) <= fExtraBufs)) {
345 // DOUT0("There are too many buffers in the transport queue - start wait for the call-back buf %u", fCurrentBuf.GetTotalSize());
346  ChangeState(inpCallBack);
347  return false;
348  }
349 
350  if (NumCanSend(port) == 0) { EOUT("Logical failure in input transport"); exit(333); }
351 
352  fInpState = inpSizeCallBack;
353  fNextDataSize = 0;
354 
355  unsigned size_res = fInput->Read_Size();
356 
357  if (size_res != di_CallBack) fInpState = inpInit;
358 
359  // this is case when input want to repeat operation
360  // when we return true, we say that we want to continue processing
361 
362  switch (size_res) {
363  case di_Repeat:
364  return true;
365  case di_RepeatTimeOut:
366  ChangeState(inpInitTimeout);
367  ShootTimer("SysTimer", fInput->Read_Timeout());
368  return false;
369  case di_CallBack:
370  // if state already changed, process it
371  return fInpState != inpSizeCallBack;
372  }
373 
374  fNextDataSize = size_res;
375 
376  ChangeState(inpCheckSize);
377  }
378 
379  if (fInpState == inpCheckSize) {
380 
381 // DOUT0("InputTransport process fInpState == inpCheckSize sz %u", fNextDataSize);
382 
383  switch (fNextDataSize) {
384 
385  case di_CallBack:
386  EOUT("Wrong place for callback");
387  ChangeState(inpError);
388  break;
389 
390  case di_EndOfStream:
391  ChangeState(inpEnd);
392  break;
393 
394  case di_DfltBufSize:
395  ChangeState(inpNeedBuffer);
396  fNextDataSize = 0;
397  break;
398 
399  default:
400  if (fNextDataSize <= di_ValidSize) {
401  ChangeState(inpNeedBuffer);
402  } else {
403  DOUT0("Tr:%s Reading error nextsz = 0x%08x", GetName(), fNextDataSize);
404  ChangeState(inpError);
405  }
406  }
407  }
408 
409  // here we request buffer
410 
411  if (fInpState == inpNeedBuffer) {
412 
413  if (!fPoolRef.null() && fPoolRef()->CheckChangeCounter(fPoolChangeCounter))
414  ProcessPoolChanged(fPoolRef());
415 
416  fCurrentBuf = TakeBuffer();
417 
418  if (!fCurrentBuf.null()) {
419  ChangeState(inpCheckBuffer);
420  } else if (IsAutoPool()) {
421  ChangeState(inpWaitBuffer);
422  return false;
423  } else {
424  EOUT("Did not get buffer and pool queue is not configured - use minimal timeout");
425  ShootTimer("SysTimer", 0.001);
426  return false;
427  }
428  }
429 
430  if (fInpState == inpCheckBuffer) {
431 
432 // DOUT0("Check buffer null %s size %u", DBOOL(fCurrentBuf.null()), fCurrentBuf.GetTotalSize());
433  // if buffer was provided, use it
434  if (fCurrentBuf.GetTotalSize() < fNextDataSize) {
435  EOUT("Requested buffer smaller than actual data size");
436  ChangeState(inpError);
437  } else {
438  if (fNextDataSize>0) fCurrentBuf.SetTotalSize(fNextDataSize);
439  ChangeState(inpHasBuffer);
440  }
441  }
442 
443  if (fInpState == inpHasBuffer) {
444 
445  // special handling for call back
446  // when function returns di_CallBack, it already can invoke callback
447  // therefore change state temporary before correspondent call
448 
449  fInpState = inpCallBack;
450 
451  unsigned start_res = fInput->Read_Start(fCurrentBuf);
452 
453  // current value in state variable may be incorrect
454  // therefore one should change it in any case
455 
456  switch (start_res) {
457  case di_Ok:
458  // this will allows to call Read_Complete method in next iteration
459  ChangeState(inpCompleting);
460  break;
461 
462  case di_NeedMoreBuf:
463  // this is case when transport internally uses queue and need more buffers
464  fExtraBufs++;
465  ChangeState(inpInit);
466  return true;
467 
468  case di_HasEnoughBuf:
469  fExtraBufs++;
470  return (fInpState != inpCallBack);
471 
472  case di_CallBack:
473  // if state already change, process such change
474  return (fInpState != inpCallBack);
475 
476  default:
477  ChangeState(inpError);
478  }
479  }
480 
481 
482  if (fInpState == inpCallBack) {
483  // this is state when transport fills buffer and should invoke CallBack
484  return false;
485  }
486 
487  if (fInpState == inpCompleting) {
488 
489  if (fExtraBufs && !fCurrentBuf.null()) {
490  EOUT("Internal error - currbuf not null when completing");
491  return false;
492  }
493 
494  if (!fExtraBufs && fCurrentBuf.null()) {
495  EOUT("Internal error - currbuf null when completing");
496  return false;
497  }
498 
499  unsigned res = fInput->Read_Complete(fCurrentBuf);
500 
501  if (fExtraBufs) {
502  if (fCurrentBuf.null()) EOUT("Transport does not return buffer!!!");
503  fExtraBufs--;
504  }
505 
506  switch (res) {
507  case di_Ok:
508  ChangeState(inpReady);
509  break;
510  case di_MoreBufReady:
511  // we send immediately buffer and will try to take more buffers out of transport
512  if (NumCanSend(port) == 0) { EOUT("Logical failure in input transport"); exit(333); }
513  Send(fCurrentBuf);
514  return true;
515  case di_SkipBuffer:
516  fCurrentBuf.Release();
517  // DOUT4("Skip input buffer");
518  ChangeState(inpInit);
519  break;
520  case di_EndOfStream:
521  fCurrentBuf.Release();
522  DOUT4("End of stream");
523  ChangeState(inpEnd);
524  break;
525  case di_Repeat:
526  return true;
527  case di_RepeatTimeOut:
528  ChangeState(inpComplitTimeout);
529  ShootTimer("SysTimer", fInput->Read_Timeout());
530  return false;
531  default:
532  EOUT("Error when do buffer reading res = %d", res);
533  ChangeState(inpError);
534  }
535  }
536 
537  if (fInpState == inpReady) {
538  // DOUT0("Input transport sends buf %u", (unsigned) fCurrentBuf.SegmentId(0));
539 
540  if (NumCanSend(port) == 0) { EOUT("Logical failure in input transport"); exit(333); }
541 
542  Send(fCurrentBuf);
543  fCurrentBuf.Release();
544  ChangeState(inpInit);
545  }
546 
547  if ((fInpState == inpError) || (fInpState == inpEnd)) {
548 
549  DOUT2("InputTransport:: Generate EOF packet");
550  CloseInput();
551  fNextDataSize = 0;
552 
553  fCurrentBuf.MakeEmpty();
554 
555  if (fCurrentBuf.null()) {
556  EOUT("Fatal error - cannot get empty buffer, try after 1 sec");
557  ShootTimer("SysTimer", 1.);
558  return false;
559  } else {
560  fCurrentBuf.SetTypeId(dabc::mbt_EOF);
561  if (NumCanSend(port) == 0) { EOUT("Logical failure in input transport"); exit(333); }
562  Send(fCurrentBuf);
563  ChangeState(inpClosed);
564  }
565  }
566 
567  if (fInpState == inpClosed) {
568  CloseTransport(false);
569  return false;
570  }
571 
572  return true;
573 }
574 
575 // ====================================================================================
576 
578  dabc::Transport(cmd, 0, outport),
579  fOutput(0),
580  fOutputOwner(false),
581  fOutState(outReady),
582  fCurrentBuf(),
583  fStopRequested(false),
584  fRetryPeriod(-1.)
585 {
586  SetDataOutput(out, owner);
587 
588  CreateTimer("SysTimer");
589 
590  fRetryPeriod = outport.Cfg("retry", cmd).AsDouble(-1);
591 
592  if (!fTransportInfoName.empty() && fOutput)
594 
595  DOUT2("Create out transport %s %s", GetName(), ItemName().c_str());
596 }
597 
599 {
600  // DOUT0("DESTROY OUTPUT TRANSPORT %s", GetName());
601  CloseOutput();
602 }
603 
605 {
606 
607  CloseOutput();
608 
609  if (out==0) return;
610 
611  fOutput = out;
612  fOutputOwner = false;
613  WorkerAddon* addon = out->Write_GetAddon();
614 
615  if (addon==0) {
616  fOutputOwner = owner;
617  } else
618  if (owner)
619  AssignAddon(addon);
620  else
621  EOUT("Cannot assigned addon while owner flag is not specified");
622 
623 }
624 
626 {
627  if ((fOutput!=0) && fOutputOwner)
628  delete fOutput;
629 
630  fOutput = 0;
631  fOutputOwner = false;
632 
633  fCurrentBuf.Release();
634 }
635 
637 {
638  fOutState = state;
639 
640  if (fStopRequested && SuitableStateForStartStop()) {
641  StopTransport();
642  }
643 }
644 
645 
647 {
648  DOUT2("Starting OutputTransport %s isrunning %s", GetName(), DBOOL(IsRunning()));
649 
650  bool res = Transport::StartTransport();
651 
652  fStopRequested = false;
653 
654  if (fOutput==0) {
655  EOUT("Output was not specified!!!");
656  return false;
657  }
658 
659  if (!SuitableStateForStartStop()) {
660  EOUT("Start transport %s at not optimal state %u", GetName(), (unsigned) fOutState);
661  }
662 
663  return res;
664 }
665 
667 {
668  DOUT2("Stopping OutputTransport %s isrunning %s", GetName(), DBOOL(IsRunning()));
669 
670  if (SuitableStateForStartStop()) {
671  fStopRequested = false;
672  return Transport::StopTransport();
673  }
674 
675  if (!fStopRequested) {
676  fStopRequested = true;
677  DOUT2("%s Try to wait until suitable state is achieved now %u", GetName(), (unsigned) fOutState);
678  }
679 
680  return true;
681 }
682 
683 
685 {
686  CloseOutput();
687 
689 }
690 
692 {
693  if ((fRetryPeriod < 0.) || !fOutput || !fOutput->Write_Retry()) {
694  ChangeState(outClosed);
695  CloseOutput();
696  CloseTransport(true);
697  }
698 
699  ChangeState(outRetry);
700  ShootTimer("SysTimer", fRetryPeriod);
701 }
702 
703 
705 {
706  // DOUT0("%s dabc::OutputTransport::ProcessEvent %u state %u", GetName(), (unsigned) evnt.GetCode(), fOutState);
707 
708  if (evnt.GetCode() == evCallBack) {
709 
710  if (evnt.GetArg() != do_Ok) {
711  if (evnt.GetArg() == do_Error) EOUT("Callback with error argument");
712  CloseOnError();
713  return;
714  }
715 
716  if (fOutState == outWaitCallback) {
717  ChangeState(outReady);
718  ProcessInputEvent(0);
719  return;
720  }
721 
722  if (fOutState == outWaitFinishCallback) {
723  ChangeState(outFinishWriting);
724 
725  // we need to call ProcessRecv directly at least once before entering into normal loop
726  if (ProcessRecv(0))
727  ProcessInputEvent(0);
728 
729  return;
730  }
731 
732  EOUT("Call-back at wrong state!!");
733  }
734 
736 }
737 
739 {
740 // if (IsName("_OnlineServer_Output0_Transport_Slave0_Transport"))
741 // DOUT0("dabc::OutputTransport::ProcessRecv %s state %u", GetName(), fOutState);
742 
743  if (!fOutput) {
744  EOUT("Output object not specified");
745  ChangeState(outError);
746  }
747 
748  if (fTransportDevice.DeviceDestroyed()) {
749  ChangeState(outClosed);
750  CloseOutput();
751  CloseTransport(false);
752  return false;
753  }
754 
755  if (fOutState == outReady) {
756 
757  unsigned ret(do_Ok);
758 
759  unsigned buftyp = RecvQueueItem(port,0).GetTypeId();
760 
761  if (buftyp == dabc::mbt_EOF) {
762  DOUT0("EOF - close output transport");
763  Recv(port).Release();
764  ret = do_Close;
765  } else
766  if (buftyp == dabc::mbt_EOL) {
767  fOutput->Write_Flush();
768  Recv(port).Release();
769  return true;
770 // TODO: should we stop output when transport is closed?????
771 // } else
772 // if (!isTransportRunning()) {
773 // // when transport not running, ignore all input buffers anyway
774 // return false;
775  } else {
776  ret = fOutput->Write_Check();
777  }
778 
779  switch (ret) {
780  case do_Ok:
781  ChangeState(outStartWriting);
782  break;
783  case do_Repeat:
784  return true;
785  case do_RepeatTimeOut:
786  ChangeState(outInitTimeout);
787  ShootTimer("SysTimer", fOutput->Write_Timeout());
788  return false;
789  case do_CallBack:
790  ChangeState(outWaitCallback);
791  return false;
792  case do_Skip:
793  Recv(port).Release();
794  return true;
795  case do_Close:
796  ChangeState(outClosing);
797  break;
798  case do_Error:
799  ChangeState(outError);
800  break;
801  default:
802  EOUT("Wrong return value %u for the Write_Check", ret);
803  ChangeState(outError);
804  }
805  }
806 
807  if (fOutState == outInitTimeout) {
808  return false;
809  }
810 
811  if (fOutState == outWaitCallback) {
812  return false;
813  }
814 
815  if (fOutState == outStartWriting) {
816 
817  fCurrentBuf = Recv(port);
818 
819  unsigned ret = fOutput->Write_Buffer(fCurrentBuf);
820 
821  switch (ret) {
822  case do_Ok:
823  ChangeState(outFinishWriting);
824  break;
825  case do_CallBack:
826  ChangeState(outWaitFinishCallback);
827  return false;
828  case do_Skip:
829  ChangeState(outReady);
830  return true;
831  case do_Close:
832  ChangeState(outClosing);
833  break;
834  case do_Error:
835  DOUT0("Error when writing buffer in transport %s", GetName());
836  ChangeState(outError);
837  break;
838  default:
839  EOUT("Wrong return value %u for the Write_Buffer", ret);
840  ChangeState(outError);
841  }
842  }
843 
844  if (fOutState == outWaitFinishCallback) {
845  // if we wait for call back, ignore all possible events
846  return false;
847  }
848 
849  if (fOutState == outFinishWriting) {
850 
851  fCurrentBuf.Release();
852 
853  unsigned ret = fOutput->Write_Complete();
854 
855  switch (ret) {
856  case do_Ok:
857  ChangeState(outReady);
858  break;
859  case do_Close:
860  ChangeState(outClosing);
861  break;
862  case do_Error:
863  ChangeState(outError);
864  break;
865  default:
866  EOUT("%s Wrong return value %u for the Write_Complete", GetName(), ret);
867  ChangeState(outError);
868  }
869  }
870 
871  if (fOutState == outClosing) {
872  ChangeState(outClosed);
873  CloseOutput();
874  CloseTransport(false);
875  return false;
876  }
877 
878  if (fOutState == outError) {
879  CloseOnError();
880  return false;
881  }
882 
883  if (fOutput && InfoExpected()) {
884  std::string info = fOutput->ProvideInfo();
885  ProvideInfo(0, info);
886  }
887 
888  return true;
889 }
890 
892 {
893  if (fOutState == outInitTimeout)
894  ChangeState(outReady);
895 
896  if (fOutState == outRetry) {
897  if (fOutput && fOutput->Write_Init())
898  ChangeState(outReady);
899  else {
900  ShootTimer("SysTimer", fRetryPeriod);
901  return;
902  }
903  }
904 
905  ProcessInputEvent(0);
906 }
907 
909 {
910  if (cmd.IsName("GetTransportStatistic")) {
911  // take statistic from output element
912  cmd.SetStr("OutputState", StateAsStr());
913  if (fOutput) fOutput->Write_Stat(cmd);
914  return cmd_true;
915  } else if (cmd.IsName("RestartTransport")) {
916  bool res = fOutput ? fOutput->Write_Restart(cmd) : false;
917  return cmd_bool(res);
918  }
919 
921 }
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
Interface for implementing any kind of data input.
Definition: DataIO.h:61
virtual WorkerAddon * Read_GetAddon()
Returns addon, provided by data input If specified, supposed that I/O object is double-deriver from D...
Definition: DataIO.h:68
Interface for implementing any kind of data output.
Definition: DataIO.h:158
void SetInfoParName(const std::string &name)
Methods set parameter name, which could be used for debug output.
Definition: DataIO.cxx:55
virtual WorkerAddon * Write_GetAddon()
Returns addon, provided by data output If specified, supposed that I/O object is double-derived from ...
Definition: DataIO.h:169
void EnableReconnect(const std::string &reconn)
Set URL, use to reconnect data input.
virtual void TransportCleanup()
void Read_CallBack(unsigned compl_res=di_Ok)
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
void ChangeState(EInputStates state)
void RequestPoolMonitoring()
Method can be used in custom transport to start pool monitoring.
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
void SetDataInput(DataInput *inp, bool owner)
Assign input object, set addon if exists.
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual bool StopTransport()
InputTransport(dabc::Command cmd, const PortRef &inpport, DataInput *inp=0, bool owner=false)
virtual bool ProcessSend(unsigned port)
Method called by framework when at least one buffer can be send to output port.
virtual bool ProcessBuffer(unsigned pool)
Method called by framework when at least one buffer available in pool handle.
DataInput * CreateDataInput(const std::string &kind)
Create data input, using factories methods.
Definition: Manager.cxx:2260
Reference FindPool(const std::string &name)
Definition: Manager.cxx:2064
virtual void ProcessEvent(const EventId &)
Definition: Module.cxx:822
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
Definition: Module.cxx:109
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Object.cxx:1076
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
void ChangeState(EOutputStates state)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
void SetDataOutput(DataOutput *out, bool owner)
virtual void ProcessEvent(const EventId &)
virtual void TransportCleanup()
double fRetryPeriod
if retry option enabled, transport will try to reinit output
OutputTransport(dabc::Command cmd, const PortRef &outport, DataOutput *out, bool owner)
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual bool ProcessRecv(unsigned port)
Method called by framework when at least one buffer available in input port.
virtual bool StopTransport()
virtual void ProcessTimerEvent(unsigned)
Method called by framework when timer event is produced.
Reference on the dabc::Port class
Definition: Port.h:195
double AsDouble(double dflt=0.) const
Definition: Record.cxx:549
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
std::string SaveToJson(unsigned mask=0)
Store record in JSON form.
Definition: Record.cxx:1658
virtual void CreateRecord(const std::string &name)
Definition: Record.cxx:1674
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
Base class for transport implementations.
Definition: Transport.h:37
virtual bool StopTransport()
Definition: Transport.cxx:231
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
Definition: Transport.cxx:220
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Transport.cxx:202
std::string fTransportInfoName
Definition: Transport.h:53
virtual void TransportCleanup()
Definition: Transport.h:77
Generic addon for dabc::Worker.
Definition: Worker.h:49
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
Definition: Worker.h:482
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
Event manipulation API.
Definition: api.h:23
ManagerRef mgr
Definition: Manager.cxx:42
@ do_RepeatTimeOut
Definition: DataIO.h:145
@ do_CallBack
Definition: DataIO.h:146
@ do_Repeat
Definition: DataIO.h:144
@ do_Ok
Definition: DataIO.h:142
@ do_Skip
Definition: DataIO.h:143
@ do_Close
Definition: DataIO.h:148
@ do_Error
Definition: DataIO.h:147
@ mbt_EOF
Definition: Buffer.h:45
@ mbt_EOL
Definition: Buffer.h:46
@ di_Ok
Definition: DataIO.h:38
@ di_QueueBufReady
Definition: DataIO.h:46
@ di_HasEnoughBuf
Definition: DataIO.h:44
@ di_ValidSize
Definition: DataIO.h:33
@ di_CallBack
Definition: DataIO.h:39
@ di_NeedMoreBuf
Definition: DataIO.h:43
@ di_MoreBufReady
Definition: DataIO.h:45
@ di_DfltBufSize
Definition: DataIO.h:42
@ di_RepeatTimeOut
Definition: DataIO.h:36
@ di_SkipBuffer
Definition: DataIO.h:41
@ di_EndOfStream
Definition: DataIO.h:37
@ di_Repeat
Definition: DataIO.h:35
@ cmd_true
Definition: Command.h:38
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
uint32_t GetArg() const
Definition: Thread.h:94
uint16_t GetCode() const
Definition: Thread.h:92