DABC (Data Acquisition Backbone Core)  2.9.9
ClientTransport.cxx
Go to the documentation of this file.
1 // $Id: ClientTransport.cxx 4479 2020-04-15 14:30:52Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "mbs/ClientTransport.h"
17 
18 #include "dabc/DataTransport.h"
19 
20 #include "mbs/Iterator.h"
21 
23  dabc::SocketIOAddon(fd),
24  dabc::DataInput(),
25  fState(ioInit),
26  fSwapping(false),
27  fSpanning(false),
28  fKind(kind),
29  fPendingStart(false),
30  fSpanBuffer()
31 {
32  fServInfo.iStreams = 0; // by default, new format
33 
34  DOUT3("Create mbs::ClientTransport::ClientTransport() %p fd:%d kind:%d", this, fd, kind);
35 }
36 
38 {
39  DOUT3("Destroy mbs::ClientTransport::~ClientTransport() %p", this);
40 }
41 
43 {
44  DOUT3("mbs::ClientTransport::ObjectCleanup\n");
45 
46  if ((fState!=ioError) && (fState!=ioClosed)) {
47  strcpy(fSendBuf, "CLOSE");
48  DoSendBuffer(fSendBuf, 12);
49  fState = ioClosing;
50  }
51 
53 }
54 
55 
57 {
58  return fServInfo.iStreams==0;
59 }
60 
62 {
63  uint32_t sz = fHeader.BufferLength();
64  if (sz < sizeof(fHeader)) {
65  EOUT("Wrong buffer length %u", sz);
66  return 0;
67  }
68 
69  return sz - sizeof(mbs::BufferHeader);
70 }
71 
72 
74 {
76 }
77 
79 {
80 }
81 
83 {
84 // DOUT0("mbs::ClientTransport::OnRecvCompleted() state = %d", fState);
85 
86  switch (fState) {
87  case ioRecvInfo:
88 
89  fState = ioReady;
90  if (fServInfo.iEndian != 1) {
91  mbs::SwapData(&fServInfo, sizeof(fServInfo));
92  fSwapping = true;
93  }
94 
95  if (fServInfo.iEndian != 1) {
96  EOUT("Cannot correctly define server endian");
97  fState = ioError;
98  }
99 
100  if ((fState != ioError) && (fServInfo.iStreams != 0) && (fServInfo.iBuffers != 1)) {
101  DOUT0("Number of buffers %u per stream bigger than 1", fServInfo.iBuffers);
102  DOUT0("This will lead to event spanning which is not optimal for DABC");
103  DOUT0("Set buffers number to 1 or call 'enable dabc' on mbs side");
104  fSpanning = true;
105  // fState = ioError;
106  }
107 
108  if (fState != ioError) {
109  std::string info = "";
110  if (fServInfo.iStreams > 0) dabc::formats(info, "streams = %u", fServInfo.iStreams);
111 
112  DOUT0("Get MBS server info: %s buf_per_stream = %u, swap = %s spanning %s",
113  info.c_str(), fServInfo.iBuffers, DBOOL(fSwapping), DBOOL(fSpanning));
114  }
115 
116  if (fPendingStart) {
117  fPendingStart = false;
118  SubmitRequest();
119  }
120 
121  break;
122 
123  case ioRecvHeader:
124 
125  if (fSwapping) mbs::SwapData(&fHeader, sizeof(fHeader));
126 
127 // DOUT0("MbsClient:: Header received, size %u, rest size = %u used %u", fHeader.BufferLength(), ReadBufferSize(), fHeader.UsedBufferSize());
128 
129  if (ReadBufferSize() > (unsigned) fServInfo.iMaxBytes) {
130  EOUT("Buffer size %u bigger than allowed by info record %d", ReadBufferSize(), fServInfo.iMaxBytes);
131  fState = ioError;
132  MakeCallback(dabc::di_Error);
133  } else
134  if (ReadBufferSize() == 0) {
135  fState = ioReady;
136  DOUT0("Keep alive buffer from MBS side");
137  MakeCallback(dabc::di_SkipBuffer);
138  } else {
139  fState = ioWaitBuffer;
140 
141  // when spanning is used, we need normal-size buffer
142  MakeCallback(fSpanning ? dabc::di_DfltBufSize : ReadBufferSize());
143  }
144 
145  break;
146 
147  case ioRecvBuffer:
148 
149 // DOUT1("Provide recv buffer %p to transport", fRecvBuffer);
150 
151  // DOUT0("RECV BUFFER Used size %u readsize %u", fHeader.UsedBufferSize(), ReadBufferSize());
152 
153  if (fHeader.UsedBufferSize() > 0) {
154  fState = ioComplBuffer;
155  MakeCallback(dabc::di_Ok);
156 
157  } else
158  if (IsDabcEnabledOnMbsSide()) {
159  EOUT("Empty buffer from mbs when dabc enabled?");
160  fState = ioError;
161  MakeCallback(dabc::di_Error);
162  } else {
163  DOUT1("Keep alive buffer from MBS");
164  fState = ioReady;
165  MakeCallback(dabc::di_SkipBuffer);
166  }
167 
168  break;
169  default:
170  EOUT("One should not complete recv in such state %d", fState);
171  return;
172  }
173 }
174 
175 void mbs::ClientTransport::OnSocketError(int err, const std::string &info)
176 {
177  if (err==0) {
178  DOUT3("MBS client Socket close\n");
179 
180  fState = ioClosed;
181 
182  SubmitWorkerCmd(dabc::CmdDataInputClosed());
183 
184  } else {
185 
186  DOUT3("MBS client Socket Error\n");
187 
188  fState = ioError;
189 
190  SubmitWorkerCmd(dabc::CmdDataInputFailed());
191  }
192 }
193 
195 {
197 
198  if (fState != ioInit) {
199  EOUT("Get thread assigned in not-init state - check why");
200  exit(234);
201  }
202 
203  StartRecv(&fServInfo, sizeof(fServInfo));
204  fState = ioRecvInfo;
205 
206  // check that server info is received in reasonable time
207  ActivateTimeout(5.);
208 
209 // DOUT0("Try to recv server info at the begin");
210 }
211 
212 double mbs::ClientTransport::ProcessTimeout(double last_diff)
213 {
214  if (fState == ioRecvInfo) {
215  EOUT("Did not get server info in reasonable time");
216  SubmitWorkerCmd(dabc::Command("CloseTransport"));
217  }
218 
219  return -1;
220 }
221 
223 {
224  if (Kind() == mbs::StreamServer) {
225  strcpy(fSendBuf, "GETEVT");
226  StartSend(fSendBuf, 12);
227  }
228 
229  StartRecv(&fHeader, sizeof(fHeader));
230  fState = ioRecvHeader;
231 }
232 
234 {
235  dabc::InputTransport* tr = dynamic_cast<dabc::InputTransport*> (fWorker());
236 
237  if (tr==0) {
238  EOUT("Didnot found DataInputTransport on other side worker %p", fWorker());
239  fState = ioError;
240  SubmitWorkerCmd(dabc::Command("CloseTransport"));
241  } else {
242  // DOUT0("Activate CallBack with arg %u", arg);
243  tr->Read_CallBack(arg);
244  }
245 }
246 
247 
249 {
250  switch (fState) {
251  case ioRecvInfo:
252  if (fPendingStart) EOUT("Start already pending???");
253  fPendingStart = true;
254  return dabc::di_CallBack;
255  case ioReady:
256  SubmitRequest();
257  return dabc::di_CallBack;
258  default:
259  EOUT("Get read_size at wrong state %d", fState);
260  }
261 
262  return dabc::di_Error;
263 }
264 
266 {
267 // DOUT0("mbs::ClientTransport::Read_Start");
268 
269  DOUT4("BUFFER_START %u USED %u h_beg %u h_end %u", ReadBufferSize(), fHeader.UsedBufferSize(), fHeader.h_begin, fHeader.h_end);
270 
271  if (fState != ioWaitBuffer) {
272  EOUT("Start reading at wrong place");
273  return dabc::di_Error;
274  }
275 
276  if (buf.GetTotalSize() < ReadBufferSize()) {
277  EOUT("Provided buffer size too small %u, required %u",
278  buf.GetTotalSize(), ReadBufferSize());
279  return dabc::di_Error;
280  }
281 
282  bool started = false;
283 
284  if (!fSpanBuffer.null()) {
285 
286  if (fHeader.h_begin==0) {
287  EOUT("We expecting spanned buffer in the begin, but didnot get it");
288  return dabc::di_Error;
289  }
290 
291  dabc::Buffer extra = buf.Duplicate();
292 
293  if (extra.GetTotalSize() < ReadBufferSize() + fSpanBuffer.GetTotalSize()) {
294  EOUT("Buffer size %u not enough to read %u and add spanned buffer %u", extra.GetTotalSize(), ReadBufferSize(), fSpanBuffer.GetTotalSize());
295  return dabc::di_Error;
296  }
297 
298  // we keep place in main buffer, but header for additional peace will be cutted
299  extra.CutFromBegin(fSpanBuffer.GetTotalSize() - sizeof(mbs::Header));
300 
301  // extra buffer required only here, later normal buffer can be used
302  started = StartRecv(extra, ReadBufferSize());
303  } else {
304  started = StartRecv(buf, ReadBufferSize());
305  }
306 
307  if (!started) return dabc::di_Error;
308 
309  fState = ioRecvBuffer;
310 
311  return dabc::di_CallBack;
312 }
313 
315 {
316 // DOUT0("mbs::ClientTransport::Read_Complete");
317 
318  if (fState!=ioComplBuffer) {
319  EOUT("Reading complete at strange place!!!");
320  return dabc::di_Error;
321  }
322 
323  unsigned read_shift = 0;
324  if (!fSpanBuffer.null()) read_shift = fSpanBuffer.GetTotalSize() - sizeof(mbs::Header);
325 
326 
327  // first of all, swap data where it was received
328  if (fSwapping) {
329  dabc::Pointer ptr(buf);
330  if (read_shift>0) ptr.shift(read_shift);
331  ptr.setfullsize(fHeader.UsedBufferSize());
332 
333  while (!ptr.null()) {
334  mbs::SwapData(ptr(), ptr.rawsize());
335  ptr.shift(ptr.rawsize());
336  }
337  }
338 
339  // now we should find how big is block in the beginning
340  // and copy spanned buffer to the beginning
341  if (!fSpanBuffer.null()) {
342  dabc::Pointer ptr(buf);
343  ptr.shift(read_shift);
344 
345  mbs::Header* hdr = (mbs::Header*) ptr();
346  unsigned new_block_size = hdr->FullSize();
347 
348  buf.CopyFrom(fSpanBuffer);
349 
350  DOUT4("Copy block %u to begin", fSpanBuffer.GetTotalSize());
351 
352  hdr = (mbs::Header*) buf.SegmentPtr();
353 
354  hdr->SetFullSize(read_shift + new_block_size);
355 
356  fSpanBuffer.Release();
357  }
358 
359  // in any case release extra buffers
361  buf.SetTotalSize(read_shift + fHeader.UsedBufferSize());
362 
363  if (fSpanning) {
364 
365  // if there is block at the end, keep copy for the next operation
366  if (fHeader.h_end != 0) {
367 
368  mbs::ReadIterator iter(buf);
369 
370  unsigned useful_sz(0), last_sz(0);
371 
372  while (iter.NextEvent()) {
373  useful_sz += last_sz;
374  last_sz = iter.evnt()->FullSize();
375  }
376 
377  fSpanBuffer = buf.Duplicate();
378  if (fSpanBuffer.null()) {
379  EOUT("FAIL to duplicate buffer!!!");
380  return dabc::di_Error;
381  }
382 
383  buf.SetTotalSize(useful_sz);
384 
385  // span buffer remained until next request
386  fSpanBuffer.CutFromBegin(useful_sz);
387 
388  DOUT4("Left block %u from the end", fSpanBuffer.GetTotalSize());
389  }
390  }
391 
392  fState = ioReady;
393 
394  if (buf.GetTotalSize()==0) {
395  DOUT0("EXTREME CASE - FULL BUFFER IS JUST PEACE FROM THE MIDDLE");
396  return dabc::di_SkipBuffer;
397  }
398 
399  return dabc::di_Ok;
400 }
Reference on memory from memory pool.
Definition: Buffer.h:135
Buffer Duplicate() const
Duplicates instance of Buffer with new segments list independent from source.
Definition: Buffer.cxx:192
void CutFromBegin(BufferSize_t len)
Remove part of buffer from the beginning.
Definition: Buffer.cxx:139
void SetTotalSize(BufferSize_t len)
Set total length of the buffer to specified value Size cannot be bigger than original size of the buf...
Definition: Buffer.cxx:99
BufferSize_t CopyFrom(const Buffer &srcbuf, BufferSize_t len=0)
Copy content from source buffer.
Definition: Buffer.cxx:333
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
void SetTypeId(unsigned tid)
Definition: Buffer.h:151
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Definition: Buffer.h:171
Base class for input transport implementations.
Definition: DataTransport.h:41
Represents command with its arguments.
Definition: Command.h:99
void Read_CallBack(unsigned compl_res=di_Ok)
Manipulator with dabc::Buffer class.
Definition: Pointer.h:34
BufferSize_t shift(BufferSize_t sz)
Definition: Pointer.h:153
void setfullsize(BufferSize_t sz)
Definition: Pointer.h:168
bool null() const
Definition: Pointer.h:147
BufferSize_t rawsize() const
Definition: Pointer.h:148
virtual void ProcessEvent(const EventId &)
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
Definition: Worker.cxx:39
virtual void OnThreadAssigned()
Definition: Worker.h:73
void MakeCallback(unsigned sz)
ClientTransport(int fd, int kind)
virtual unsigned Read_Size()
Defines required buffer size for next operation.
virtual unsigned Read_Complete(dabc::Buffer &buf)
Complete reading of the buffer from source,.
virtual void OnSocketError(int err, const std::string &info)
Generic error handler.
virtual void OnSendCompleted()
Method called when send operation is completed.
virtual unsigned Read_Start(dabc::Buffer &buf)
Prepare buffer for reading (if required)
virtual double ProcessTimeout(double last_diff)
mbs::TransportInfo fServInfo
virtual void OnThreadAssigned()
virtual void OnRecvCompleted()
Method called when receive operation is completed.
virtual void ProcessEvent(const dabc::EventId &)
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
Read iterator for MBS events/subevents.
Definition: Iterator.h:40
EventHeader * evnt() const
Definition: Iterator.h:75
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
Event manipulation API.
Definition: api.h:23
void formats(std::string &sbuf, const char *fmt,...)
Definition: string.cxx:26
@ di_Ok
Definition: DataIO.h:38
@ di_CallBack
Definition: DataIO.h:39
@ di_DfltBufSize
Definition: DataIO.h:42
@ di_SkipBuffer
Definition: DataIO.h:41
@ di_Error
Definition: DataIO.h:40
void SwapData(void *data, unsigned bytessize)
@ StreamServer
Definition: MbsTypeDefs.h:355
@ mbt_MbsEvents
Definition: MbsTypeDefs.h:348
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
MBS buffer header.
Definition: MbsTypeDefs.h:152
Structure of any entry in LMD file.
Definition: LmdTypeDefs.h:39
void SetFullSize(uint32_t sz)
Definition: LmdTypeDefs.h:56
uint32_t FullSize() const
Definition: LmdTypeDefs.h:55