DABC (Data Acquisition Backbone Core)  2.9.9
ServerTransport.cxx
Go to the documentation of this file.
1 // $Id: ServerTransport.cxx 4482 2020-04-15 14:47:18Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "mbs/ServerTransport.h"
17 
18 #include <unistd.h>
19 
20 #include "dabc/Manager.h"
21 #include "dabc/DataTransport.h"
22 
23 mbs::ServerOutputAddon::ServerOutputAddon(int fd, int kind, dabc::EventsIteratorRef& iter, uint32_t subid) :
24  dabc::SocketIOAddon(fd, false, true),
25  dabc::DataOutput(dabc::Url()),
26  fState(oInit),
27  fKind(kind),
28  fSendBuffers(0),
29  fDroppedBuffers(0),
30  fIter(iter),
31  fEvHdr(),
32  fSubHdr(),
33  fEvCounter(0),
34  fSubevId(subid),
35  fHasExtraRequest(false)
36 {
37  DOUT3("Create MBS server addon fd:%d kind:%s", fd, mbs::ServerKindToStr(kind));
38 }
39 
41 {
42  DOUT3("Destroy ServerOutputAddon %p", this);
43 }
44 
45 void mbs::ServerOutputAddon::FillServInfo(int32_t maxbytes, bool isnewformat)
46 {
47  memset(&fServInfo, 0, sizeof(fServInfo));
48 
49  fServInfo.iEndian = 1; // byte order. Set to 1 by sender
50  fServInfo.iMaxBytes = maxbytes; // maximum buffer size
51  fServInfo.iBuffers = 1; // buffers per stream
52  fServInfo.iStreams = isnewformat ? 0 : 1; // number of streams (could be set to -1 to indicate variable length buffers, size l_free[1])
53 }
54 
56 {
58 
59  if (fState == oInit)
60  StartSend(&fServInfo, sizeof(fServInfo));
61 
62  memset(f_sbuf, 0, sizeof(f_sbuf));
63  StartRecv(f_sbuf, 12);
64 
65 // ActivateTimeout(1.);
66 }
67 
69 {
70 // DOUT0("mbs::ServerOutputAddon::ProcessTimeout inp:%s out:%s", DBOOL(IsDoingInput()), DBOOL(IsDoingOutput()));
71  return 1.;
72 }
73 
74 
76 {
77 // DOUT0("mbs::ServerOutputAddon::OnSendCompleted inp:%s out:%s", DBOOL(IsDoingInput()), DBOOL(IsDoingOutput()));
78 
79  switch (fState) {
80  case oInit:
81  DOUT4("Send info completed");
82  if (fKind == mbs::StreamServer)
83  fState = oWaitingReq;
84  else
85  fState = oWaitingBuffer;
86  return;
87 
88  case oInitReq:
89  fState = oWaitingBuffer;
90  return;
91 
92  case oSendingEvents: {
93  dabc::EventsIterator* iter = fIter();
94 
95  unsigned evsize = iter->EventSize();
96  if (evsize % 2) evsize++;
97 
98  fSubHdr.InitFull(fSubevId);
99  fSubHdr.SetRawDataSize(evsize);
100 
101  fEvHdr.Init(fEvCounter++);
102  fEvHdr.SetFullSize(evsize + sizeof(fEvHdr) + sizeof(fSubHdr));
103 
104  StartSend(&fEvHdr, sizeof(fEvHdr), &fSubHdr, sizeof(fSubHdr), iter->Event(), evsize);
105 
106  if (!iter->NextEvent())
107  // if there are no move event - close iterator and wait for completion
108  fState = oSendingLastEvent;
109 
110  return;
111  }
112 
113  case oSendingLastEvent:
114  fIter()->Close(); // close here to ensure memory for send operation
115  /* no break */
116 
117  case oSendingBuffer:
118  if (fKind == mbs::StreamServer)
119  fState = fHasExtraRequest ? oWaitingBuffer : oWaitingReq;
120  else
121  fState = oWaitingBuffer;
122  fHasExtraRequest = false;
123  MakeCallback(dabc::do_Ok);
124  return;
125 
126  case oDoingClose:
127  case oError:
128  // ignore all input events in such case
129  return;
130 
131  default:
132  EOUT("Send complete at wrong state %d", fState);
133  break;
134  }
135 }
136 
138 {
139 // DOUT0("mbs::ServerOutputAddon::OnRecvCompleted %s inp:%s out:%s", f_sbuf, DBOOL(IsDoingInput()), DBOOL(IsDoingOutput()));
140 
141  if (strcmp(f_sbuf, "CLOSE")==0) {
142  OnSocketError(0, "get CLOSE event"); // do same as connection was closed
143  return;
144  }
145 
146  if (strcmp(f_sbuf, "GETEVT")!=0)
147  EOUT("Wrong request string %s", f_sbuf);
148 
149  memset(f_sbuf, 0, sizeof(f_sbuf));
150  StartRecv(f_sbuf, 12);
151  fHasExtraRequest = false;
152 
153  switch (fState) {
154  case oInit:
155  // get request before send of server info was completed
156  EOUT("Get data request before send server info was completed");
157  fState = oInitReq;
158  break;
159  case oWaitingBuffer:
160  // second request - ignore it
161  break;
162  case oWaitingReq:
163  // normal situation
164  fState = oWaitingBuffer;
165  break;
166  case oWaitingReqBack:
167  MakeCallback(dabc::do_Ok);
168  fState = oWaitingBuffer;
169  break;
170  case oSendingEvents:
171  case oSendingLastEvent:
172  case oSendingBuffer:
173  fHasExtraRequest = true;
174  break;
175  default:
176  EOUT("Get request at wrong state %d", fState);
177  fState = oError;
178  SubmitWorkerCmd(dabc::Command("CloseTransport"));
179  }
180 }
181 
183 {
184  dabc::OutputTransport* tr = dynamic_cast<dabc::OutputTransport*> (fWorker());
185 
186  if (tr==0) {
187  EOUT("Didnot found OutputTransport on other side worker %p", fWorker());
188  fState = oError;
189  SubmitWorkerCmd(dabc::Command("CloseTransport"));
190  } else {
191  // DOUT0("Activate CallBack with arg %u", arg);
192  tr->Write_CallBack(arg);
193  }
194 }
195 
196 
198 {
199 // DOUT0("mbs::ServerOutputAddon::Write_Check at state %d", fState);
200 
201  switch (fState) {
202  // when initialization was not completed, try after timeout
203  case oInit:
204  case oInitReq:
205  return dabc::do_RepeatTimeOut;
206 
207  // when stream server waits request, we need to inform transport as soon as possible
208  case oWaitingReq:
209  fState = oWaitingReqBack;
210  return dabc::do_CallBack;
211 
212  case oWaitingBuffer:
213  return dabc::do_Ok;
214 
215  case oError:
216  return dabc::do_Error;
217 
218  default:
219  EOUT("Write_Check at wrong state %d", fState);
220  fState = oError;
221  return dabc::do_Error;
222  }
223 
224  return dabc::do_RepeatTimeOut;
225 }
226 
228 {
229  if (fState != oWaitingBuffer) return dabc::do_Error;
230 
231 // DOUT0("mbs::ServerOutputAddon::Write_Buffer %u at state %d", buf.GetTotalSize(), fState);
232 
233  unsigned sendsize = buf.GetTotalSize();
234 
235  if (sendsize == 0) return dabc::do_Skip;
236 
237  if (!fIter.null()) {
238  dabc::EventsIterator *iter = fIter();
239  sendsize = 0;
240  iter->Assign(buf);
241  unsigned rawsize = 0;
242 
243  while (iter->NextEvent()) {
244  sendsize += sizeof(mbs::EventHeader) + sizeof(mbs::SubeventHeader);
245  unsigned evsize = iter->EventSize();
246  if (evsize % 2) evsize++;
247  sendsize += evsize;
248  rawsize += evsize;
249  }
250  iter->Close();
251 
252  if (rawsize == 0) return dabc::do_Skip;
253 
254  iter->Assign(buf);
255  iter->NextEvent(); // shift to first event
256  }
257 
258 
259  fHeader.Init(true);
260  fHeader.SetUsedBufferSize(sendsize);
261 
262  // error in evapi, must be + sizeof(mbs::BufferHeader)
263  fHeader.SetFullSize(sendsize - sizeof(mbs::BufferHeader));
264 
265  if (fIter.null()) {
266  fState = oSendingBuffer;
267  StartNetSend(&fHeader, sizeof(fHeader), buf);
268  } else {
269  fState = oSendingEvents;
270  StartSend(&fHeader, sizeof(fHeader));
271  }
272 
273  return dabc::do_CallBack;
274 }
275 
276 
277 void mbs::ServerOutputAddon::OnSocketError(int err, const std::string &info)
278 {
279  switch (fState) {
280  case oSendingEvents: // only at this states callback is required to inform transport that data should be closed
281  case oSendingBuffer:
282  case oWaitingReqBack:
283  fState = (err==0) ? oDoingClose : oError;
284  MakeCallback(dabc::do_Close);
285  return;
286 
287  case oDoingClose: return;
288  case oError: return;
289 
290  default:
291  fState = (err==0) ? oDoingClose : oError;
292  SubmitWorkerCmd(dabc::Command("CloseTransport"));
293  }
294 }
295 
296 // ===============================================================================
297 
298 mbs::ServerTransport::ServerTransport(dabc::Command cmd, const dabc::PortRef& outport, int kind, int portnum, dabc::SocketServerAddon* connaddon, const dabc::Url& url) :
299  dabc::Transport(cmd, 0, outport),
300  fKind(kind),
301  fPortNum(portnum),
302  fSlaveQueueLength(5),
303  fClientsLimit(0),
304  fDoingClose(0),
305  fBlocking(false),
306  fDeliverAll(false),
307  fIterKind(),
308  fSubevId(0x1f)
309 {
310  // this addon handles connection
311  AssignAddon(connaddon);
312 
313  // TODO: queue length one can take from configuration
314  fSlaveQueueLength = 5;
315 
316  if (url.HasOption("limit"))
317  fClientsLimit = url.GetOptionInt("limit", fClientsLimit);
318 
319  if (url.HasOption("iter"))
320  fIterKind = url.GetOptionStr("iter");
321 
322  if (url.HasOption("subid"))
323  fSubevId = (unsigned) url.GetOptionInt("subid", fSubevId);
324 
325  // by default transport server is blocking and stream is unblocking
326  // blocking has two meaning:
327  // - when no connections are there, either block input or not
328  // - when at least one connection established, block input until all output are ready
330 
332 
333  if (url.HasOption("nonblock")) fBlocking = false; else
334  if (url.HasOption("blocking")) fBlocking = true;
335 
336  if (url.HasOption("deliverall")) fDeliverAll = true;
337 
338  DOUT0("Create MBS server fd:%d kind:%s port:%d limit:%d blocking:%s deliverall:%s",
340 
341  if (fClientsLimit>0) DOUT0("Set client limit for MBS server to %d", fClientsLimit);
342 
343 // DOUT0("mbs::ServerTransport isinp=%s", DBOOL(connaddon->IsDoingInput()));
344 }
345 
347 {
348 }
349 
351 {
353 }
354 
356 {
358 }
359 
361 {
362  if (cmd.IsName("SocketConnect")) {
363 
364  int fd = cmd.GetInt("fd", -1);
365 
366  if (fd<=0) return dabc::cmd_false;
367 
368  int numconn(0);
369  int portindx = -1;
370  for (unsigned n=0;n<NumOutputs();n++)
371  if (IsOutputConnected(n)) {
372  numconn++;
373  } else {
374  if (portindx<0) portindx = n;
375  }
376 
377  if ((fClientsLimit>0) && (numconn>=fClientsLimit)) {
378  DOUT0("Reject connection %d, maximum number %d is achieved ", fd, numconn);
379  close(fd);
380  return dabc::cmd_true;
381  }
382 
383  DOUT3("Get new connection request with fd %d canrecv %s", fd, DBOOL(CanRecv()));
384 
386 
387  if (!fIterKind.empty()) {
388  iter = dabc::mgr.CreateObject(fIterKind,"iter");
389  if (iter.null()) {
390  EOUT("Fail to create events iterator %s", fIterKind.c_str());
391  close(fd);
392  return dabc::cmd_true;
393  }
394  }
395 
397 
398  ServerOutputAddon *addon = new ServerOutputAddon(fd, fKind, iter, fSubevId);
399  // FIXME: should we configure buffer size or could one ignore it???
400  addon->FillServInfo(0x400000, true);
401 
402 
403  if (portindx<0) portindx = CreateOutput(dabc::format("Slave%u",NumOutputs()), fSlaveQueueLength);
404 
405  dabc::TransportRef tr = new dabc::OutputTransport(dabc::Command(), FindPort(OutputName(portindx)), addon, true);
406 
407  tr()->AssignToThread(thread(), true);
408 
409  // transport will be started automatically
410 
411  dabc::LocalTransport::ConnectPorts(FindPort(OutputName(portindx)), tr.InputPort(), cmd);
412 
413  DOUT3("mbs::ServerTransport create new connection at running=%s", DBOOL(isTransportRunning()));
414 
415  return dabc::cmd_true;
416  }
417 
418  if (cmd.IsName("GetTransportStatistic")) {
419 
420  int cnt = 0;
421  std::vector<uint64_t> cansend;
422  for(unsigned n=0;n<NumOutputs();n++) {
423  if (IsOutputConnected(n)) {
424  cnt++; cansend.push_back(NumCanSend(n));
425  }
426  }
427 
428  cmd.SetInt("NumClients", cnt);
429  cmd.SetInt("NumCanRecv", NumCanRecv());
430 
431  if (cnt==1) cmd.SetField("NumCanSend", cansend[0]); else
432  if (cnt>1) cmd.SetField("NumCanSend", cansend); else
433  cmd.SetField("NumCanSend", 0);
434 
435  cmd.SetStr("MbsKind", mbs::ServerKindToStr(fKind));
436  cmd.SetInt("MbsPort", fPortNum);
437  cmd.SetStr("MbsInfo", dabc::format("%s:%d NumClients:%d", mbs::ServerKindToStr(fKind), fPortNum, cnt));
438 
439  return dabc::cmd_true;
440  }
441 
443 }
444 
446 {
447  if (!CanRecv()) return false;
448 
449  // unconnected transport server will block until any connection is established
450  if ((NumOutputs()==0) && fBlocking /*&& (fKind == mbs::TransportServer) */) return false;
451 
452  bool allcansend = CanSendToAllOutputs(true);
453 
454 // if (!allcansend)
455 // DOUT1("mbs::ServerTransport::ProcessRecv numout:%u cansend:%s numwork:%u", NumOutputs(), DBOOL(allcansend), numoutputs);
456 
457  // in case of transport buffer all outputs should be
458  // ready to get next buffer. Otherwise input port will be blocked
459  if (!allcansend) {
460  // if server must deliver all events, than wait (default for transport, can be enabled for stream)
461  if (fDeliverAll) return false;
462  // if server do not blocks, first wait until input queue will be filled
463  if (!RecvQueueFull()) {
464  // DOUT0("mbs::ServerTransport::ProcessRecv let input queue to be filled size:%u", NumCanRecv());
465  // dabc::SetDebugLevel(1);
466  SignalRecvWhenFull();
467  return false;
468  }
469  // DOUT0("TRY TO SEND EVEN WHEN NOT POSSIBLE");
470  }
471 
472  // this is normal situation when buffer can be send to all outputs
473 
474  dabc::Buffer buf = Recv();
475 
476  bool iseof = (buf.GetTypeId() == dabc::mbt_EOF);
477 
478  SendToAllOutputs(buf);
479 
480  if (iseof) {
481  DOUT2("Server transport saw EOF buffer");
482  fDoingClose = 1;
483 
484  if ((NumOutputs()==0) || !fBlocking) {
485  DOUT2("One could close server transport immediately");
486  CloseTransport(false);
487  fDoingClose = 2;
488  return false;
489  }
490  }
491 
492  return fDoingClose == 0;
493 }
494 
495 
496 void mbs::ServerTransport::ProcessConnectionActivated(const std::string &name, bool on)
497 {
498  if (name==InputName()) {
500  } else {
501  DOUT2("mbs::ServerTransport detect new client on %s %s", name.c_str(), (on ? "CONNECTED" : "DISCONNECTED") );
502 
503  if (on) {
504  ProcessInputEvent(0);
505  return;
506  }
507 
508  if (!on) FindPort(name).Disconnect();
509 
510  if (fDoingClose == 1) {
511  bool isany = false;
512  for (unsigned n=0;n<NumOutputs();n++)
513  if (IsOutputConnected(n)) isany = true;
514  if (!isany) {
515  DOUT2("Close server transport while all clients are closed");
516  CloseTransport(false);
517  fDoingClose = 2;
518  }
519  }
520  }
521 }
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned GetTypeId() const
Definition: Buffer.h:152
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Iterator over events in dabc::Buffer class.
Definition: eventsapi.h:44
virtual void Close()=0
virtual bool Assign(const Buffer &buf)=0
virtual BufferSize_t EventSize()=0
virtual bool NextEvent()=0
virtual void * Event()=0
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
Reference CreateObject(const std::string &classname, const std::string &objname)
Definition: Manager.cxx:2251
Base class for output transport implementations.
void Write_CallBack(unsigned arg)
Reference on the dabc::Port class
Definition: Port.h:195
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
int Socket() const
Definition: SocketThread.h:108
Socket addon for handling connection requests on server side.
Definition: SocketThread.h:285
static bool SetNoDelaySocket(int fd)
Reference on dabc::Transport class
Definition: Transport.h:109
PortRef InputPort()
Definition: Transport.h:114
virtual bool StopTransport()
Definition: Transport.cxx:231
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
Definition: Transport.cxx:220
virtual void ProcessConnectionActivated(const std::string &name, bool on)
Method called when module on other side is started.
Definition: Transport.cxx:141
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Transport.cxx:202
Uniform Resource Locator interpreter.
Definition: Url.h:33
std::string GetOptionStr(const std::string &optname, const std::string &dflt="") const
Definition: Url.cxx:281
bool HasOption(const std::string &optname) const
Definition: Url.h:70
int GetOptionInt(const std::string &optname, int dflt=0) const
Definition: Url.cxx:290
virtual void OnThreadAssigned()
Definition: Worker.h:73
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
Definition: Worker.cxx:277
Addon for output of server-side different kinds of MBS server
ServerOutputAddon(int fd, int kind, dabc::EventsIteratorRef &iter, uint32_t subid)
virtual unsigned Write_Buffer(dabc::Buffer &buf)
Start writing of buffer to output.
virtual void OnRecvCompleted()
Method called when receive operation is completed.
virtual double ProcessTimeout(double last_diff)
virtual void OnSocketError(int err, const std::string &info)
Generic error handler.
virtual unsigned Write_Check()
Check if output can be done.
virtual void OnThreadAssigned()
void FillServInfo(int32_t maxbytes, bool isnewformat)
void MakeCallback(unsigned sz)
virtual void OnSendCompleted()
Method called when send operation is completed.
int fKind
kind: stream or transport
uint32_t fSubevId
subevent id when non-MBS events are used
int fPortNum
used port number (only for info)
ServerTransport(dabc::Command cmd, const dabc::PortRef &outport, int kind, int portnum, dabc::SocketServerAddon *connaddon, const dabc::Url &url)
int fClientsLimit
maximum number of simultaneous clients
bool fBlocking
if true, server will block buffers until it can be delivered
bool fDeliverAll
if true, server will try deliver all events when clients are there (default for transport)
virtual bool StopTransport()
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
int fSlaveQueueLength
queue length, used for slaves connections
void ProcessConnectionActivated(const std::string &name, bool on)
Method called when module on other side is started.
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
std::string fIterKind
iterator kind when non-mbs events should be delivered to clients
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
Event manipulation API.
Definition: api.h:23
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
@ do_RepeatTimeOut
Definition: DataIO.h:145
@ do_CallBack
Definition: DataIO.h:146
@ do_Ok
Definition: DataIO.h:142
@ do_Skip
Definition: DataIO.h:143
@ do_Close
Definition: DataIO.h:148
@ do_Error
Definition: DataIO.h:147
@ mbt_EOF
Definition: Buffer.h:45
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38
@ StreamServer
Definition: MbsTypeDefs.h:355
@ TransportServer
Definition: MbsTypeDefs.h:354
const char * ServerKindToStr(int kind)
MBS buffer header.
Definition: MbsTypeDefs.h:152
MBS subevent
Definition: MbsTypeDefs.h:40