DABC (Data Acquisition Backbone Core)  2.9.9
SocketDevice.cxx
Go to the documentation of this file.
1 // $Id: SocketDevice.cxx 4746 2021-03-23 14:40:35Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/SocketDevice.h"
17 
18 #include <unistd.h>
19 
20 #include "dabc/SocketTransport.h"
21 #include "dabc/Manager.h"
22 #include "dabc/Configuration.h"
23 #include "dabc/ConnectionManager.h"
24 
25 #define SocketServerTmout 0.2
26 
27 namespace dabc {
28 
29  class SocketProtocolAddon;
30 
31  // TODO: Can we use here connection manager name - all information already there
32 
33  class NewConnectRec {
34  public:
35  std::string fReqItem;
38  double fTmOut;
39  std::string fConnId;
41 
43  fReqItem(),
44  fClient(0),
45  fProtocol(0),
46  fTmOut(1.),
47  fConnId(),
48  fLocalCmd()
49  {
50  }
51 
52  NewConnectRec(const std::string &item,
54  SocketClientAddon* clnt) :
55  fReqItem(item),
56  fClient(clnt),
57  fProtocol(0),
58  fTmOut(0.),
59  fLocalCmd()
60  {
62  fConnId = req.GetConnId();
63  }
64 
65  const char* ConnId() const { return fConnId.c_str(); }
66  bool IsConnId(const char* id) { return fConnId.compare(id) == 0; }
67  };
68 
69  // this class is used to perform initial protocol
70  // when socket connection is established
71  // it also used to transport commands on remote side and execute them
72 
74 
75  friend class SocketDevice;
76 
78 
79  protected:
80 
82 
88  public:
89 
90  SocketProtocolAddon(int connfd, SocketDevice* dev, NewConnectRec* rec, void* redirect = 0) :
91  dabc::SocketIOAddon(connfd),
92  fDevice(dev),
93  fRec(rec),
95  {
96  if (redirect!=0) {
98  memcpy(fInBuf, redirect, SocketDevice::ProtocolMsgSize); // as it was received
99  }
100  }
101 
103  {
104  }
105 
106  void FinishWork(bool res)
107  {
108  fState = res ? stDone : stError;
109  fDevice->RemoveProtocolAddon(this, res);
110  DeleteWorker();
111  }
112 
113  virtual void OnSocketError(int, const std::string&)
114  {
115  FinishWork(false);
116  }
117 
118  virtual void OnThreadAssigned()
119  {
121 
122  uint32_t header = SocketDevice::headerConnect;
123 
124  switch (fState) {
125  case stServerProto:
127  break;
128  case stClientProto:
129  // we can start both send and recv operations simultaneously,
130  // while buffer will be received only when server answer on request
131 
132  memcpy(fOutBuf, &header, sizeof(header));
133  strncpy(fOutBuf + sizeof(header), fRec->ConnId(), sizeof(fOutBuf) - sizeof(header) - 1);
134  strncpy(fInBuf, "denied", sizeof(fInBuf)-1);
135 
138  break;
139  case stRedirect:
140  // do like we receive input buffer ourself
143  break;
144  default:
145  EOUT("Wrong state %d", fState);
146  FinishWork(false);
147  }
148  }
149 
150  virtual void OnSendCompleted()
151  {
152  switch (fState) {
153  case stServerProto:
154  case stRedirect:
155  // DOUT5("Server job finished");
156  if (fDevice->ProtocolCompleted(this, 0))
157  DeleteWorker();
158  break;
159  case stClientProto:
160  // DOUT5("Client send request, wait reply");
161  break;
162  default:
163  EOUT("Wrong state %d", fState);
164  FinishWork(false);
165  }
166  }
167 
168  virtual void OnRecvCompleted()
169  {
170  switch (fState) {
171  case stServerProto:
174  break;
175  case stClientProto:
176  DOUT5("Client job finished");
177  if (fDevice->ProtocolCompleted(this, fInBuf))
178  DeleteWorker();
179  break;
180  default:
181  EOUT("Wrong state %d", fState);
182  FinishWork(false);
183  }
184  }
185  };
186 
187 }
188 
189 // ______________________________________________________________________
190 
191 dabc::SocketDevice::SocketDevice(const std::string &name, Command cmd) :
192  dabc::Device(name),
193  fConnRecs(),
194  fProtocols(),
195  fConnCounter(0),
196  fCmdChannelId()
197 {
198  fBindHost = Cfg("host", cmd).AsStr();
199  fBindPort = Cfg("port", cmd).AsInt(-1);
200 
201  if (fBindHost.empty() && (fBindPort<0)) {
203  if (!chl.null()) {
204  dabc::Command cmd("RedirectSocketConnect");
205  cmd.SetStr("Device", ItemName());
206  if (chl.Execute(cmd)) fCmdChannelId = cmd.GetStr("ServerId");
207  }
208 
209  if (!fCmdChannelId.empty()) DOUT0("Socket device %s reuses %s for connections", GetName(), fCmdChannelId.c_str());
210  }
211 
212  if (fCmdChannelId.empty() && fBindHost.empty())
214 }
215 
217 {
218  // FIXME: cleanup should be done much earlier
219 
220  CleanupRecs(-1);
221 
222  while (fProtocols.size()>0) {
223  SocketProtocolAddon* pr = (SocketProtocolAddon*) fProtocols[0];
224  fProtocols.remove_at(0);
226  }
227 
228 }
229 
231 {
232  // in standard case use server socket of command channel
233  if (!fCmdChannelId.empty()) return fCmdChannelId;
234 
235  SocketServerAddon* serv = dynamic_cast<SocketServerAddon*> (fAddon());
236 
237  if (serv == 0) {
238 
239  int port0 = fBindPort, portmin(7000), portmax(9000);
240  if (port0 > 0) portmin = portmax = 0;
241 
242  serv = dabc::SocketThread::CreateServerAddon(fBindHost, port0, portmin, portmax);
243 
244  DOUT0("SocketDevice creates server with ID %s", serv->ServerId().c_str());
245 
246  AssignAddon(serv);
247  }
248 
249  if (!serv) return std::string();
250 
251  return serv->ServerId();
252 }
253 
254 
256 {
257  if (rec==0) return;
258 
259  bool firetmout = false;
260  {
261  LockGuard guard(DeviceMutex());
262  fConnRecs.push_back(rec);
263  firetmout = (fConnRecs.size() == 1);
264  }
265  if (firetmout) ActivateTimeout(0.);
266 }
267 
269 {
270  if (rec==0) return;
271  if (rec->fClient) EOUT("Is client %p is destroyed?", rec->fClient);
272  if (rec->fProtocol) EOUT("Is protocol %p is destroyed?", rec->fProtocol);
273 
274  rec->fLocalCmd.ReplyBool(res);
275 
276  delete rec;
277 }
278 
280 {
281  for (unsigned n=0; n<fConnRecs.size();n++) {
282  NewConnectRec* rec = (NewConnectRec*) fConnRecs.at(n);
283 
284  if (rec->IsConnId(connid)) return rec;
285  }
286 
287  return 0;
288 }
289 
291 {
292  PointersVector del_recs;
293 
294  bool more_timeout = false;
295 
296  {
297  LockGuard guard(DeviceMutex());
298 
299  unsigned n = 0;
300 
301  while (n<fConnRecs.size()) {
302  NewConnectRec* rec = (NewConnectRec*) fConnRecs.at(n);
303  rec->fTmOut -= tmout;
304 
305  if ((rec->fTmOut<0) || (tmout<0)) {
306  if (tmout>0) EOUT("Record %u timedout", n);
307  fConnRecs.remove_at(n);
308  del_recs.push_back(rec);
309  } else
310  n++;
311  }
312 
313  more_timeout = fConnRecs.size() > 0;
314  }
315 
316  for (unsigned n=0;n<del_recs.size();n++)
317  DestroyRec((NewConnectRec*) del_recs.at(n), false);
318 
319  return more_timeout;
320 }
321 
322 double dabc::SocketDevice::ProcessTimeout(double last_diff)
323 {
324  return CleanupRecs(last_diff) ? SocketServerTmout : -1;
325 }
326 
328 {
329  std::string reqitem = cmd.GetStr(CmdConnectionManagerHandle::ReqArg());
330 
332 
333  if (req.null()) return cmd_false;
334 
335  switch (req.progress()) {
336 
337  // here on initializes connection
339  if (req.IsServerSide()) {
340  std::string serverid = StartServerAddon();
341  if (serverid.empty()) return cmd_false;
342  req.SetServerId(serverid);
343  } else
344  req.SetClientId("");
345 
346  break;
347  }
348 
350  // one should register request and start connection here
351 
352  DOUT2("****** SOCKET START: %s %s CONN: %s *******", (req.IsServerSide() ? "SERVER" : "CLIENT"), req.GetConnId().c_str(), req.GetConnInfo().c_str());
353 
354  NewConnectRec* rec = 0;
355 
356  if (req.IsServerSide()) {
357 
358  rec = new NewConnectRec(reqitem, req, 0);
359 
360  AddRec(rec);
361  } else {
362 
364  if (client!=0) {
365 
366  // try to make little bit faster than timeout expire why we need
367  // some time for the connection protocol
368  client->SetRetryOpt(5, req.GetConnTimeout());
369  client->SetConnHandler(this, req.GetConnId());
370 
371  rec = new NewConnectRec(reqitem, req, client);
372  AddRec(rec);
373 
374  thread().MakeWorkerFor(client);
375  }
376  }
377 
378  // reply remote command that one other side can start connection
379 
380  req.ReplyRemoteCommand(rec!=0);
381 
382  if (rec==0) return cmd_false;
383 
384  rec->fLocalCmd = cmd;
385 
386  return cmd_postponed;
387  }
388 
389  default:
390  EOUT("Request from connection manager in undefined situation progress = %d ???", req.progress());
391  break;
392  }
393 
394  return cmd_true;
395 }
396 
397 
399 {
400  int cmd_res = cmd_true;
401 
402  if (cmd.IsName(CmdConnectionManagerHandle::CmdName())) {
403 
404  cmd_res = HandleManagerConnectionRequest(cmd);
405 
406  } else
407 
408  if (cmd.IsName("SocketConnect")) {
409  std::string typ = cmd.GetStr("Type");
410  std::string connid = cmd.GetStr("ConnId");
411  int fd = cmd.GetInt("fd", -1);
412 
413  if (typ == "Server") {
414  DOUT2("SocketDevice:: create server protocol for socket %d connid %s", fd, connid.c_str());
415 
416  SocketProtocolAddon* proto = new SocketProtocolAddon(fd, this, 0);
417 
418  thread().MakeWorkerFor(proto, connid);
419 
420  LockGuard guard(DeviceMutex());
421  fProtocols.push_back(proto);
422  } else
423  if (typ == "Client") {
424  SocketProtocolAddon* proto = 0;
425 
426  {
427  LockGuard guard(DeviceMutex());
428  NewConnectRec* rec = _FindRec(connid.c_str());
429  if (rec==0) {
430  EOUT("Client connected for not exiting rec %s", connid.c_str());
431  close(fd);
432  cmd_res = cmd_false;
433  } else {
434  DOUT2("SocketDevice:: create client protocol for socket %d connid:%s", fd, connid.c_str());
435 
436  proto = new SocketProtocolAddon(fd, this, rec);
437  rec->fClient = 0; // if we get command, client is destroyed
438  rec->fProtocol = proto;
439  }
440  }
441  if (proto) thread().MakeWorkerFor(proto, connid);
442  } else
443  if (typ == "Error") {
444  NewConnectRec* rec = 0;
445  {
446  LockGuard guard(DeviceMutex());
447  rec = _FindRec(connid.c_str());
448  if (rec==0) {
449  EOUT("Client error for not existing rec %s", connid.c_str());
450  cmd_res = cmd_false;
451  } else {
452  EOUT("Client error for connid %s", connid.c_str());
453  rec->fClient = 0; // if we get command, client is destroyed
454  fConnRecs.remove(rec);
455  }
456  }
457 
458  if (rec) DestroyRec(rec, false);
459  } else
460  cmd_res = cmd_false;
461  } else
462  if (cmd.IsName("RedirectConnect")) {
463  int fd = cmd.GetInt("Socket");
464  Buffer buf = cmd.GetRawData();
465 
466  SocketProtocolAddon* proto = new SocketProtocolAddon(fd, this, 0, buf.SegmentPtr());
467 
468  thread().MakeWorkerFor(proto, fCmdChannelId);
469 
470  LockGuard guard(DeviceMutex());
471  fProtocols.push_back(proto);
472 
473  cmd_res = cmd_true;
474 
475  } else {
476  cmd_res = dabc::Device::ExecuteCommand(cmd);
477  }
478 
479  return cmd_res;
480 }
481 
482 void dabc::SocketDevice::ServerProtocolRequest(SocketProtocolAddon* proc, const char* inmsg, char* outmsg)
483 {
484  strcpy(outmsg, "denied");
485 
486  uint32_t *header = (uint32_t*) inmsg;
487  if (*header != SocketDevice::headerConnect) {
488  EOUT("Wrong header identifier in the SOCKET connect");
489  return;
490  }
491 
492  NewConnectRec* rec = 0;
493 
494  {
495  LockGuard guard(DeviceMutex());
496  rec = _FindRec(inmsg+sizeof(uint32_t));
497  if (rec==0) return;
498  }
499 
500  strcpy(outmsg, "accepted");
501 
502  LockGuard guard(DeviceMutex());
503  fProtocols.remove(proc);
504  rec->fProtocol = proc;
505  proc->fRec = rec;
506 }
507 
509 {
510  NewConnectRec* rec = proc->fRec;
511 
512  bool destr = false;
513 
514  {
515  LockGuard guard(DeviceMutex());
516  if ((rec==0) || !fConnRecs.has_ptr(rec)) {
517  EOUT("Protocol completed without rec %p", rec);
518  fProtocols.remove(proc);
519  destr = true;
520  }
521  }
522 
523  if (destr) return true;
524 
525  bool res = true;
526  if (inmsg) res = (strcmp(inmsg, "accepted")==0);
527 
528  if (inmsg) DOUT3("Reply from server: %s", inmsg);
529 
530  if (res) {
531  // create transport for the established connection
532  int fd = proc->TakeSocket();
533 
535 
537 
538  res = dabc::NetworkTransport::Make(req, addon, ThreadName());
539 
540  DOUT0("Create socket transport for fd %d res %s", fd, DBOOL(res));
541  }
542 
543  RemoveProtocolAddon(proc, res);
544 
545  return true;
546 }
547 
549 {
550  if (proc==0) return;
551 
552  NewConnectRec* rec = proc->fRec;
553 
554  {
555  LockGuard guard(DeviceMutex());
556  if (rec!=0) {
557  fConnRecs.remove(rec);
558  rec->fProtocol = 0;
559  } else
560  fProtocols.remove(proc);
561  }
562 
563  DestroyRec(rec, res);
564 }
#define SocketServerTmout
Reference on memory from memory pool.
Definition: Buffer.h:135
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Definition: Buffer.h:171
Represents command with its arguments.
Definition: Command.h:99
void ReplyBool(bool res)
Reply on the command with true or false value.
Definition: Command.h:238
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Buffer GetRawData()
Returns reference on raw data Can be called only once - raw data reference will be cleaned.
Definition: Command.cxx:347
static std::string GetLocalHost()
@ progrDoingConnect
at this state device should drive connection itself and inform about completion or failure
@ progrDoingInit
state when record should be prepared by device
Full description of connection request.
std::string GetConnId() const
std::string GetServerId() const
void SetClientId(const std::string &id)
void SetServerId(const std::string &id)
double GetConnTimeout() const
time required to establish connection, if expired connection will be switched to "failed" state
bool IsServerSide() const
Indicates if local node in connection is server or client.
Base class for device implementation.
Definition: Device.h:43
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Device.cxx:43
Lock guard for posix mutex.
Definition: threads.h:127
Parameter FindPar(const std::string &parname)
Definition: Manager.cxx:2069
WorkerRef GetCommandChannel()
Definition: Manager.h:690
static bool Make(const ConnectionRequest &req, WorkerAddon *addon, const std::string &devthrdname="")
bool IsConnId(const char *id)
const char * ConnId() const
double fTmOut
used by device to process connection timeouts
std::string fReqItem
reference in connection request
SocketClientAddon * fClient
client-side processor, to establish connection
NewConnectRec(const std::string &item, ConnectionRequestFull &req, SocketClientAddon *clnt)
SocketProtocolAddon * fProtocol
protocol processor, to verify connection id
Command fLocalCmd
connection id
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Object.cxx:1076
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
static void Destroy(Object *obj)
User method for object destroyment.
Definition: Object.cxx:921
Specialized vector with pointers.
Definition: Queue.h:340
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
int64_t AsInt(int64_t dflt=0) const
Definition: Record.cxx:501
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
@ evntSocketLast
from this event number one can add more socket system events
Definition: SocketThread.h:94
Socket addon for handling connection on client side.
Definition: SocketThread.h:322
void SetRetryOpt(int nretry, double tmout=1.)
void SetConnHandler(const WorkerRef &rcv, const std::string &connid)
Set connection handler.
Definition: SocketThread.h:265
Device for establishing socket connections
Definition: SocketDevice.h:33
void ServerProtocolRequest(SocketProtocolAddon *proc, const char *inmsg, char *outmsg)
bool ProtocolCompleted(SocketProtocolAddon *proc, const char *inmsg)
void DestroyRec(NewConnectRec *rec, bool res)
std::string StartServerAddon()
std::string fBindHost
Definition: SocketDevice.h:43
int HandleManagerConnectionRequest(Command cmd)
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
virtual double ProcessTimeout(double last_diff)
bool CleanupRecs(double tmout)
void RemoveProtocolAddon(SocketProtocolAddon *proc, bool res)
SocketDevice(const std::string &name, Command cmd)
NewConnectRec * _FindRec(const char *connid)
std::string fCmdChannelId
Definition: SocketDevice.h:45
void AddRec(NewConnectRec *rec)
Socket addon for handling I/O events.
Definition: SocketThread.h:144
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
bool StartRecv(void *buf, size_t size)
Specific implementation of network transport for socket.
virtual void OnSendCompleted()
Method called when send operation is completed.
virtual void OnRecvCompleted()
Method called when receive operation is completed.
char fInBuf[SocketDevice::ProtocolMsgSize]
SocketProtocolAddon(int connfd, SocketDevice *dev, NewConnectRec *rec, void *redirect=0)
virtual void OnThreadAssigned()
char fOutBuf[SocketDevice::ProtocolMsgSize]
virtual void OnSocketError(int, const std::string &)
Generic error handler.
Socket addon for handling connection requests on server side.
Definition: SocketThread.h:285
std::string ServerId()
Definition: SocketThread.h:308
static SocketClientAddon * CreateClientAddon(const std::string &servid, int dflt_port=-1)
static SocketServerAddon * CreateServerAddon(const std::string &host, int nport, int portmin=-1, int portmax=-1)
Create handle for server-side connection If hostname == 0, any available address will be selected If ...
void DeleteWorker()
This is way to delete worker with addon inclusive.
Definition: Worker.cxx:44
virtual void OnThreadAssigned()
Definition: Worker.h:73
Reference on dabc::Worker
Definition: Worker.h:466
bool Execute(Command cmd, double tmout=-1.)
Definition: Worker.cxx:1147
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
ManagerRef mgr
Definition: Manager.cxx:42
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38