DABC (Data Acquisition Backbone Core)  2.9.9
ClientOutput.cxx
Go to the documentation of this file.
1 // $Id: ClientOutput.cxx 4721 2021-03-13 13:52:01Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "aqua/ClientOutput.h"
17 
18 #include "dabc/DataTransport.h"
19 
20 
22 {
23  // one could connect server already here??
24 
26 }
27 
29 {
30  fState = oCompleteSend;
31  MakeCallBack(dabc::do_Ok);
32 }
33 
35 {
36  // method not used - socket only send data
37 
38  // StartRecv(fRecvBuf, 16);
39 }
40 
41 void aqua::ClientOutput::OnSocketError(int err, const std::string &info)
42 {
43  if (fState == oSendingBuffer) MakeCallBack(dabc::do_Ok);
44  DOUT1("Connection to AQUA broken %s:%d - %d:%s", fServerName.c_str(), fServerPort, err, info.c_str());
45 
46  CancelIOOperations();
47 
48  fState = err!=0 ? oError : oDisconnected;
49 }
50 
51 double aqua::ClientOutput::ProcessTimeout(double lastdiff)
52 {
53  return -1;
54 }
55 
57  dabc::SocketIOAddon(),
58  dabc::DataOutput(url),
59  fLastConnect(),
60  fState(oDisconnected),
61  fBufCounter(0)
62 {
63  fServerName = url.GetHostName();
64  fServerPort = url.GetPort();
65 
66  fReconnectTmout = url.GetOptionInt("tmout", 3.);
67 }
68 
69 
71 {
72  DOUT0("Destroy AQUA output");
73 }
74 
76 {
77  dabc::OutputTransport* tr = dynamic_cast<dabc::OutputTransport*> (fWorker());
78 
79  if (tr==0) {
80  EOUT("Did not found OutputTransport on other side worker %s", fWorker.GetName());
81  fState = oError;
82  SubmitWorkerCmd(dabc::Command("CloseTransport"));
83  } else {
85  }
86 
87 }
88 
89 
91 {
92  CloseSocket();
93 
94  // do not try connection request too often
95  if (!fLastConnect.Expired(fReconnectTmout)) return false;
96 
97  fLastConnect.GetNow();
98 
99  int fd = dabc::SocketThread::StartClient(fServerName, fServerPort);
100  if (fd < 0) return false;
101 
102  SetSocket(fd);
103 
104  DOUT1("Connect AQUA server %s:%d", fServerName.c_str(), fServerPort);
105 
106  // receiving not used in the transport
107  // StartRecv(fRecvBuf, 16);
108 
109  return true;
110 }
111 
112 
114 {
115  if (fState != oReady)
116  if (ConnectAquaServer()) fState = oReady;
117 
118  return true;
119 }
120 
122 {
123  switch (fState) {
124  case oDisconnected: // when server not connected
125  case oError: // error state
126  if (!ConnectAquaServer()) {
127  fBufCounter++;
128  return dabc::do_Skip;
129  }
130  fState = oReady;
131  return dabc::do_Ok; // state changed, can continue
132 
133  case oReady: // ready to send next buffer
134  return dabc::do_Ok;
135 
136  default:
137  EOUT("Write_Check at wrong state %d", fState);
138  return dabc::do_Error;
139  }
140 
141  return dabc::do_RepeatTimeOut;
142 }
143 
145 {
146  if (fState != oReady) {
147  EOUT("Write_Buffer at wrong state %d", fState);
148  return dabc::do_Error;
149  }
150 
151  fState = oSendingBuffer;
152 
153  fSendHdr.bufsize = buf.GetTotalSize();
154  fSendHdr.counter = fBufCounter++;
155 
156  StartNetSend(&fSendHdr, sizeof(fSendHdr), buf);
157 
158  return dabc::do_CallBack;
159 }
160 
162 {
163  if (fState == oCompleteSend) {
164  fState = oReady;
165  return dabc::do_Ok;
166  }
167 
168  // this is not normal, but return OK to try start from beginning
169  return dabc::do_Ok;
170 }
171 
173 {
174  switch (fState) {
175  case oReady:
176  case oSendingBuffer:
177  case oCompleteSend: return 0.001;
178  default: break;
179  }
180 
181  return 1.;
182 }
virtual unsigned Write_Check()
Check if output can be done.
virtual void OnThreadAssigned()
virtual void OnSocketError(int errnum, const std::string &info)
Generic error handler.
virtual ~ClientOutput()
ClientOutput(dabc::Url &url)
std::string fServerName
Definition: ClientOutput.h:53
virtual unsigned Write_Complete()
Complete writing of the buffer.
virtual void OnRecvCompleted()
Method called when receive operation is completed.
void MakeCallBack(unsigned arg)
virtual double Write_Timeout()
Timeout in seconds for write operation.
virtual double ProcessTimeout(double lastdiff)
virtual bool Write_Init()
This is generic virtual method to initialize output before real work is started.
virtual unsigned Write_Buffer(dabc::Buffer &buf)
Start writing of buffer to output.
virtual void OnSendCompleted()
Method called when send operation is completed.
OutputState fState
Definition: ClientOutput.h:58
Reference on memory from memory pool.
Definition: Buffer.h:135
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
Represents command with its arguments.
Definition: Command.h:99
Base class for output transport implementations.
void Write_CallBack(unsigned arg)
static int StartClient(const std::string &host, int nport, bool nonblocking=true)
Uniform Resource Locator interpreter.
Definition: Url.h:33
int GetOptionInt(const std::string &optname, int dflt=0) const
Definition: Url.cxx:290
std::string GetHostName() const
Definition: Url.h:58
int GetPort() const
Definition: Url.h:59
#define DOUT0(args ...)
Definition: logging.h:156
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
Event manipulation API.
Definition: api.h:23
@ do_RepeatTimeOut
Definition: DataIO.h:145
@ do_CallBack
Definition: DataIO.h:146
@ do_Ok
Definition: DataIO.h:142
@ do_Skip
Definition: DataIO.h:143
@ do_Error
Definition: DataIO.h:147