28 fPoolChangeCounter(0),
31 fActivateWorkaround(false),
60 EOUT(
"Transport %s cannot assign addon while owner flag is not specified", GetName());
73 EOUT(
"Cannot find memory pool, associated with handle %s, monitoring will not work", PoolName().c_str());
81 DOUT2(
"============================= Start InputTransport %s isrunning %s", ItemName().c_str(),
DBOOL(IsRunning()));
87 EOUT(
"Input object is not assigned");
93 fStopRequested =
false;
95 if (!SuitableStateForStartStop()) {
96 EOUT(
"Start transport %s at not optimal state %u", GetName(), (
unsigned) fInpState);
100 ProduceOutputEvent();
109 DOUT2(
"Stopping InputTransport %s isrunning %s", GetName(),
DBOOL(IsRunning()));
110 if (SuitableStateForStartStop()) {
111 fStopRequested =
false;
115 if (!fStopRequested) {
116 DOUT2(
"%s Try to wait until suitable state is achieved, now state = %u", GetName(), (
unsigned) fInpState);
117 fStopRequested =
true;
118 fAddon.Notify(
"TransportWantToStop");
127 if (fInpState != inpWaitBuffer)
return false;
130 fCurrentBuf = TakeBuffer(pool);
134 ChangeState(inpCheckBuffer);
135 ProcessOutputEvent(0);
138 return fInpState == inpWaitBuffer;
143 if ((fInput!=0) && fInputOwner) {
149 AssignAddon(
nullptr);
154 if (fInpState != inpClosed) {
156 fInpState = inpClosed;
159 fCurrentBuf.Release();
169 if (fInpState == inpInitTimeout)
170 ChangeState(inpInit);
172 if (fInpState == inpComplitTimeout)
173 ChangeState(inpCompleting);
175 ProcessOutputEvent(0);
180 bool isfailure = cmd.
IsName(CmdDataInputFailed::CmdName());
182 if (isfailure || cmd.
IsName(CmdDataInputClosed::CmdName())) {
184 if (fInpState != inpClosed) {
186 ChangeState(inpClosed);
189 if (fReconnect.empty()) {
190 CloseTransport(isfailure);
192 ChangeState(inpReconnect);
193 ShootTimer(
"SysTimer", 1.);
199 if (cmd.
IsName(
"GetTransportStatistic")) {
201 if (fInput) fInput->Read_Stat(cmd);
205 if (cmd.
IsName(dabc::CmdGetBinary::CmdName()) && (cmd.
GetStr(
"Kind")==
"transport.json")) {
208 info.
SetField(
"IsInput", IsInputTransport());
209 info.
SetField(
"IsOutput", IsOutputTransport());
210 info.
SetField(
"HasInput", fInput ?
true :
false);
211 info.
SetField(
"InpState", (
int) fInpState);
225 if (fExtraBufs) fInpState = inpCallBack;
232 EOUT(
"Call back at init state not with extra mode");
235 fInpState = inpCompleting;
238 case inpSizeCallBack:
239 fInpState = inpCheckSize;
245 fInpState = inpCompleting;
249 EOUT(
"Get callback at wrong state %d", fInpState);
250 fInpState = inpError;
253 if (fActivateWorkaround)
254 ProcessOutputEvent(0);
264 if (fStopRequested && SuitableStateForStartStop()) {
265 DOUT2(
"%s Stop transport at suitable state", GetName());
266 fStopRequested =
false;
277 if (fInpState == inpClosed)
return false;
280 EOUT(
"InputTransport %s - no memory pool!!!!", GetName());
281 CloseTransport(
true);
286 if (fTransportDevice.DeviceDestroyed()) {
287 if (fInpState != inpClosed) {
289 ChangeState(inpClosed);
291 CloseTransport(
false);
295 if (fInpState == inpReconnect) {
298 EOUT(
"Reconnect when input non 0 - ABORT");
299 CloseTransport(
true);
303 if (fReconnect.empty()) {
304 EOUT(
"Reconnect not specified - ABORT");
305 CloseTransport(
true);
312 ShootTimer(
"SysTimer", 1.);
316 SetDataInput(inp,
true);
318 ChangeState(inpInit);
322 EOUT(
"InputTransport %s - no input object!!!!", GetName());
323 CloseTransport(
true);
327 if ((fInpState == inpSizeCallBack) || (fInpState == inpInitTimeout) || (fInpState == inpComplitTimeout)) {
338 if (fInpState == inpInit) {
341 if (!isTransportRunning())
return false;
344 if ((fExtraBufs > 0) && (NumCanSend(port) <= fExtraBufs)) {
346 ChangeState(inpCallBack);
350 if (NumCanSend(port) == 0) {
EOUT(
"Logical failure in input transport"); exit(333); }
352 fInpState = inpSizeCallBack;
355 unsigned size_res = fInput->Read_Size();
366 ChangeState(inpInitTimeout);
367 ShootTimer(
"SysTimer", fInput->Read_Timeout());
371 return fInpState != inpSizeCallBack;
374 fNextDataSize = size_res;
376 ChangeState(inpCheckSize);
379 if (fInpState == inpCheckSize) {
383 switch (fNextDataSize) {
386 EOUT(
"Wrong place for callback");
387 ChangeState(inpError);
395 ChangeState(inpNeedBuffer);
401 ChangeState(inpNeedBuffer);
403 DOUT0(
"Tr:%s Reading error nextsz = 0x%08x", GetName(), fNextDataSize);
404 ChangeState(inpError);
411 if (fInpState == inpNeedBuffer) {
413 if (!fPoolRef.null() && fPoolRef()->CheckChangeCounter(fPoolChangeCounter))
414 ProcessPoolChanged(fPoolRef());
416 fCurrentBuf = TakeBuffer();
418 if (!fCurrentBuf.null()) {
419 ChangeState(inpCheckBuffer);
420 }
else if (IsAutoPool()) {
421 ChangeState(inpWaitBuffer);
424 EOUT(
"Did not get buffer and pool queue is not configured - use minimal timeout");
425 ShootTimer(
"SysTimer", 0.001);
430 if (fInpState == inpCheckBuffer) {
434 if (fCurrentBuf.GetTotalSize() < fNextDataSize) {
435 EOUT(
"Requested buffer smaller than actual data size");
436 ChangeState(inpError);
438 if (fNextDataSize>0) fCurrentBuf.SetTotalSize(fNextDataSize);
439 ChangeState(inpHasBuffer);
443 if (fInpState == inpHasBuffer) {
449 fInpState = inpCallBack;
451 unsigned start_res = fInput->Read_Start(fCurrentBuf);
459 ChangeState(inpCompleting);
465 ChangeState(inpInit);
470 return (fInpState != inpCallBack);
474 return (fInpState != inpCallBack);
477 ChangeState(inpError);
482 if (fInpState == inpCallBack) {
487 if (fInpState == inpCompleting) {
489 if (fExtraBufs && !fCurrentBuf.null()) {
490 EOUT(
"Internal error - currbuf not null when completing");
494 if (!fExtraBufs && fCurrentBuf.null()) {
495 EOUT(
"Internal error - currbuf null when completing");
499 unsigned res = fInput->Read_Complete(fCurrentBuf);
502 if (fCurrentBuf.null())
EOUT(
"Transport does not return buffer!!!");
508 ChangeState(inpReady);
512 if (NumCanSend(port) == 0) {
EOUT(
"Logical failure in input transport"); exit(333); }
516 fCurrentBuf.Release();
518 ChangeState(inpInit);
521 fCurrentBuf.Release();
522 DOUT4(
"End of stream");
528 ChangeState(inpComplitTimeout);
529 ShootTimer(
"SysTimer", fInput->Read_Timeout());
532 EOUT(
"Error when do buffer reading res = %d", res);
533 ChangeState(inpError);
537 if (fInpState == inpReady) {
540 if (NumCanSend(port) == 0) {
EOUT(
"Logical failure in input transport"); exit(333); }
543 fCurrentBuf.Release();
544 ChangeState(inpInit);
547 if ((fInpState == inpError) || (fInpState == inpEnd)) {
549 DOUT2(
"InputTransport:: Generate EOF packet");
553 fCurrentBuf.MakeEmpty();
555 if (fCurrentBuf.null()) {
556 EOUT(
"Fatal error - cannot get empty buffer, try after 1 sec");
557 ShootTimer(
"SysTimer", 1.);
561 if (NumCanSend(port) == 0) {
EOUT(
"Logical failure in input transport"); exit(333); }
563 ChangeState(inpClosed);
567 if (fInpState == inpClosed) {
568 CloseTransport(
false);
583 fStopRequested(false),
612 fOutputOwner =
false;
616 fOutputOwner = owner;
621 EOUT(
"Cannot assigned addon while owner flag is not specified");
627 if ((fOutput!=0) && fOutputOwner)
631 fOutputOwner =
false;
633 fCurrentBuf.Release();
640 if (fStopRequested && SuitableStateForStartStop()) {
648 DOUT2(
"Starting OutputTransport %s isrunning %s", GetName(),
DBOOL(IsRunning()));
652 fStopRequested =
false;
655 EOUT(
"Output was not specified!!!");
659 if (!SuitableStateForStartStop()) {
660 EOUT(
"Start transport %s at not optimal state %u", GetName(), (
unsigned) fOutState);
668 DOUT2(
"Stopping OutputTransport %s isrunning %s", GetName(),
DBOOL(IsRunning()));
670 if (SuitableStateForStartStop()) {
671 fStopRequested =
false;
675 if (!fStopRequested) {
676 fStopRequested =
true;
677 DOUT2(
"%s Try to wait until suitable state is achieved now %u", GetName(), (
unsigned) fOutState);
693 if ((fRetryPeriod < 0.) || !fOutput || !fOutput->Write_Retry()) {
694 ChangeState(outClosed);
696 CloseTransport(
true);
699 ChangeState(outRetry);
700 ShootTimer(
"SysTimer", fRetryPeriod);
708 if (evnt.
GetCode() == evCallBack) {
716 if (fOutState == outWaitCallback) {
717 ChangeState(outReady);
718 ProcessInputEvent(0);
722 if (fOutState == outWaitFinishCallback) {
723 ChangeState(outFinishWriting);
727 ProcessInputEvent(0);
732 EOUT(
"Call-back at wrong state!!");
744 EOUT(
"Output object not specified");
745 ChangeState(outError);
748 if (fTransportDevice.DeviceDestroyed()) {
749 ChangeState(outClosed);
751 CloseTransport(
false);
755 if (fOutState == outReady) {
759 unsigned buftyp = RecvQueueItem(port,0).GetTypeId();
762 DOUT0(
"EOF - close output transport");
763 Recv(port).Release();
767 fOutput->Write_Flush();
768 Recv(port).Release();
776 ret = fOutput->Write_Check();
781 ChangeState(outStartWriting);
786 ChangeState(outInitTimeout);
787 ShootTimer(
"SysTimer", fOutput->Write_Timeout());
790 ChangeState(outWaitCallback);
793 Recv(port).Release();
796 ChangeState(outClosing);
799 ChangeState(outError);
802 EOUT(
"Wrong return value %u for the Write_Check", ret);
803 ChangeState(outError);
807 if (fOutState == outInitTimeout) {
811 if (fOutState == outWaitCallback) {
815 if (fOutState == outStartWriting) {
817 fCurrentBuf = Recv(port);
819 unsigned ret = fOutput->Write_Buffer(fCurrentBuf);
823 ChangeState(outFinishWriting);
826 ChangeState(outWaitFinishCallback);
829 ChangeState(outReady);
832 ChangeState(outClosing);
835 DOUT0(
"Error when writing buffer in transport %s", GetName());
836 ChangeState(outError);
839 EOUT(
"Wrong return value %u for the Write_Buffer", ret);
840 ChangeState(outError);
844 if (fOutState == outWaitFinishCallback) {
849 if (fOutState == outFinishWriting) {
851 fCurrentBuf.Release();
853 unsigned ret = fOutput->Write_Complete();
857 ChangeState(outReady);
860 ChangeState(outClosing);
863 ChangeState(outError);
866 EOUT(
"%s Wrong return value %u for the Write_Complete", GetName(), ret);
867 ChangeState(outError);
871 if (fOutState == outClosing) {
872 ChangeState(outClosed);
874 CloseTransport(
false);
878 if (fOutState == outError) {
883 if (fOutput && InfoExpected()) {
884 std::string info = fOutput->ProvideInfo();
885 ProvideInfo(0, info);
893 if (fOutState == outInitTimeout)
894 ChangeState(outReady);
896 if (fOutState == outRetry) {
897 if (fOutput && fOutput->Write_Init())
898 ChangeState(outReady);
900 ShootTimer(
"SysTimer", fRetryPeriod);
905 ProcessInputEvent(0);
910 if (cmd.
IsName(
"GetTransportStatistic")) {
912 cmd.
SetStr(
"OutputState", StateAsStr());
913 if (fOutput) fOutput->Write_Stat(cmd);
915 }
else if (cmd.
IsName(
"RestartTransport")) {
916 bool res = fOutput ? fOutput->Write_Restart(cmd) :
false;
917 return cmd_bool(res);
Represents command with its arguments.
bool SetStr(const std::string &name, const char *value)
std::string GetStr(const std::string &name, const std::string &dflt="") const
Interface for implementing any kind of data output.
void SetInfoParName(const std::string &name)
Methods set parameter name, which could be used for debug output.
virtual WorkerAddon * Write_GetAddon()
Returns addon, provided by data output If specified, supposed that I/O object is double-derived from ...
DataInput * CreateDataInput(const std::string &kind)
Create data input, using factories methods.
Reference FindPool(const std::string &name)
virtual void ProcessEvent(const EventId &)
unsigned CreateTimer(const std::string &name, double period_sec=-1., bool synchron=false)
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
const char * GetName() const
Returns name of the object, thread safe
void ChangeState(EOutputStates state)
virtual ~OutputTransport()
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
void SetDataOutput(DataOutput *out, bool owner)
virtual void ProcessEvent(const EventId &)
virtual void TransportCleanup()
double fRetryPeriod
if retry option enabled, transport will try to reinit output
OutputTransport(dabc::Command cmd, const PortRef &outport, DataOutput *out, bool owner)
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual bool ProcessRecv(unsigned port)
Method called by framework when at least one buffer available in input port.
virtual bool StopTransport()
virtual void ProcessTimerEvent(unsigned)
Method called by framework when timer event is produced.
Reference on the dabc::Port class
double AsDouble(double dflt=0.) const
bool SetField(const std::string &name, const RecordField &v)
std::string SaveToJson(unsigned mask=0)
Store record in JSON form.
virtual void CreateRecord(const std::string &name)
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Base class for transport implementations.
virtual bool StopTransport()
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
std::string fTransportInfoName
virtual void TransportCleanup()
Generic addon for dabc::Worker.
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
Event structure, exchanged between DABC threads.