DABC (Data Acquisition Backbone Core)  2.9.9
SocketThread.h
Go to the documentation of this file.
1 // $Id: SocketThread.h 4469 2020-04-15 12:53:59Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #ifndef DABC_SocketThread
17 #define DABC_SocketThread
18 
19 #ifndef DABC_Thread
20 #include "dabc/Thread.h"
21 #endif
22 
23 #ifndef DABC_Worker
24 #include "dabc/Worker.h"
25 #endif
26 
27 #include <netdb.h>
28 
29 struct pollfd;
30 
31 // #define SOCKET_PROFILING
32 
33 
34 namespace dabc {
35 
36  class SocketThread;
37 
52  class SocketAddon : public WorkerAddon {
53 
54  friend class SocketThread;
55 
56  protected:
57 
58  int fSocket;
59  bool fDoingInput;
60  bool fDoingOutput;
64 
65  virtual void ProcessEvent(const EventId&);
66 
69  inline void SetDoingInput(bool on = true) { fDoingInput = on; }
70 
73  inline void SetDoingOutput(bool on = true) { fDoingOutput = on; }
74 
76  virtual void OnSocketError(int msg, const std::string &info);
77 
78  ssize_t DoRecvBuffer(void* buf, ssize_t len);
79  ssize_t DoRecvBufferHdr(void* hdr, ssize_t hdrlen, void* buf, ssize_t len, void* srcaddr = 0, unsigned srcaddrlen = 0);
80  ssize_t DoSendBuffer(void* buf, ssize_t len);
81  ssize_t DoSendBufferHdr(void* hdr, ssize_t hdrlen, void* buf, ssize_t len, void* tgtaddr = 0, unsigned tgtaddrlen = 0);
82 
84  void SetDeleteWorkerOnClose(bool on = true) { fDeleteWorkerOnClose = on; }
85 
86 
87  public:
88 
100  };
101 
102  SocketAddon(int fd = -1);
103  virtual ~SocketAddon();
104 
105  virtual const char* ClassName() const { return "SocketAddon"; }
106  virtual std::string RequiredThrdClass() const { return typeSocketThread; }
107 
108  inline int Socket() const { return fSocket; }
109  inline bool IsSocket() const { return Socket() >= 0; }
110 
111  inline bool IsDoingInput() const { return fDoingInput; }
112  inline bool IsDoingOutput() const { return fDoingOutput; }
113 
114  void CloseSocket();
115  void SetSocket(int fd);
116 
117  int TakeSocket();
118  int TakeSocketError();
119 
121  void SetDeliverEventsToWorker(bool on = true) { fDeliverEventsToWorker = on; }
122 
125  inline void SetIOPriority(int prior = 1) { fIOPriority = prior; }
126  };
127 
128 
131 
132  bool IsSocket() const
133  { return GetObject() ? GetObject()->IsSocket() : false; }
134  };
135 
136 
137  // ______________________________________________________________________
138 
144  class SocketIOAddon : public SocketAddon {
145  protected:
147  bool fUseMsgOper;
148 
149  // sending data
150  bool fSendUseMsg;
151  struct iovec* fSendIOV;
152  unsigned fSendIOVSize;
153  unsigned fSendIOVFirst;
154  unsigned fSendIOVNumber;
155  struct sockaddr_in fSendAddr;
157 
158  // receiving data
159  bool fRecvUseMsg;
160  struct iovec* fRecvIOV;
161  unsigned fRecvIOVSize;
162  unsigned fRecvIOVFirst;
163  unsigned fRecvIOVNumber;
164  struct sockaddr_in fRecvAddr;
165  unsigned fLastRecvSize;
166 
167 #ifdef SOCKET_PROFILING
168  long fSendOper;
169  double fSendTime;
170  long fSendSize;
171  long fRecvOper;
172  double fRecvTime;
173  long fRecvSize;
174 #endif
175 
176  virtual const char* ClassName() const { return "SocketIOAddon"; }
177 
178  bool IsDatagramSocket() const { return fDatagramSocket; }
179 
180  virtual void ProcessEvent(const EventId&);
181 
182  void AllocateSendIOV(unsigned size);
183  void AllocateRecvIOV(unsigned size);
184 
185  inline bool IsDoingSend() const { return fSendIOVNumber>0; }
186  inline bool IsDoingRecv() const { return fRecvIOVNumber>0; }
187 
189  virtual void OnSendCompleted()
190  {
192  }
193 
195  virtual void OnRecvCompleted()
196  {
198  }
199 
201  struct sockaddr_in& GetRecvAddr() { return fRecvAddr; }
202 
205  unsigned GetRecvSize() const { return fLastRecvSize; }
206 
207  public:
208 
210  SocketIOAddon(int fd = 0, bool isdatagram = false, bool usemsg = true);
211 
213  virtual ~SocketIOAddon();
214 
217  void SetSendAddr(const std::string &host = "", int port = 0);
218 
219  bool StartSend(const void* buf, unsigned size,
220  const void* buf2 = 0, unsigned size2 = 0,
221  const void* buf3 = 0, unsigned size3 = 0);
222  bool StartRecv(void* buf, size_t size);
223 
224  bool StartRecvHdr(void* hdr, unsigned hdrsize, void* buf, size_t size);
225 
226  bool StartSend(const Buffer& buf);
227  bool StartRecv(Buffer& buf, BufferSize_t datasize);
228 
229  bool StartNetRecv(void* hdr, unsigned hdrsize, Buffer& buf, BufferSize_t datasize);
230  bool StartNetSend(void* hdr, unsigned hdrsize, const Buffer& buf);
231 
234  void CancelIOOperations();
235  };
236 
237  // ______________________________________________________________________
238 
245  protected:
247  std::string fConnId;
248 
249  public:
251  SocketAddon(fd),
252  fConnRcv(),
253  fConnId()
254  {}
255 
256  virtual void ObjectCleanup()
257  {
258  fConnRcv.Release();
260  }
261 
262  virtual ~SocketConnectAddon() {}
263 
265  void SetConnHandler(const WorkerRef& rcv, const std::string &connid)
266  {
267  fConnRcv = rcv;
268  fConnId = connid;
269  }
270 
271  virtual const char* ClassName() const { return "SocketConnectAddon"; }
272 
273  };
274 
275  // ________________________________________________________________
276 
286  protected:
287  std::string fServerHostName;
290  int ai_family{0};
291  int ai_socktype{0};
292  int ai_protocol{0};
293  socklen_t ai_addrlen{0};
294  struct sockaddr_storage ai_addr;
295 
296  virtual void OnClientConnected(int fd);
297 
298  bool RecreateSocket();
299 
300  public:
301  SocketServerAddon(int serversocket, const char* hostname = nullptr, int portnum = -1, struct addrinfo *info = nullptr);
302  virtual ~SocketServerAddon() {}
303 
304  virtual void ProcessEvent(const EventId&);
305 
306  std::string ServerHostName() { return fServerHostName; }
307  int ServerPortNumber() const { return fServerPortNumber; }
308  std::string ServerId() { return dabc::format("%s:%d", fServerHostName.c_str(), fServerPortNumber); }
309 
310  virtual const char* ClassName() const { return "SocketServerAddon"; }
311 
312  };
313 
314  // ______________________________________________________________
315 
323  public:
324  SocketClientAddon(const struct addrinfo* serv_addr, int fd = -1);
325  virtual ~SocketClientAddon();
326 
327  void SetRetryOpt(int nretry, double tmout = 1.);
328 
329  virtual void ProcessEvent(const EventId&);
330 
331  virtual double ProcessTimeout(double);
332 
333  virtual const char* ClassName() const { return "SocketClientAddon"; }
334 
335  protected:
336  virtual void OnConnectionEstablished(int fd);
337  virtual void OnConnectionFailed();
338 
339  virtual void OnThreadAssigned();
340 
341  struct addrinfo fServAddr; // own copy of server address
342  int fRetry;
343  double fRetryTmout;
344  };
345 
346  // ______________________________________________________________
347 
355  class SocketThread : public Thread {
356  protected:
360  };
361 
362  struct ProcRec {
363  bool use;
364  uint32_t indx;
365  };
366 
367  int fPipe[2];
368  long fPipeFired;
369  bool fWaitFire;
371  unsigned f_sizeufds;
372  pollfd *f_ufds;
377 
378 #ifdef SOCKET_PROFILING
379  long fWaitCalls;
380  long fWaitDone;
381  long fPipeCalled;
382  double fWaitTime;
383  double fFillTime;
384 #endif
385 
386  virtual bool WaitEvent(EventId&, double tmout);
387 
388  virtual void _Fire(const EventId& evnt, int nq);
389 
390  virtual void WorkersSetChanged();
391 
392  virtual void ProcessExtraThreadEvent(const EventId& evid);
393 
394  public:
395 
396  // list of all events for all kind of socket processors
397 
398  SocketThread(Reference parent, const std::string &name, Command cmd);
399  virtual ~SocketThread();
400 
401  virtual const char* ClassName() const { return typeSocketThread; }
402  virtual bool CompatibleClass(const std::string &clname) const;
403 
404  static bool SetNonBlockSocket(int fd);
405  static bool SetNoDelaySocket(int fd);
406 
407  static int StartClient(const std::string &host, int nport, bool nonblocking = true);
408 
410  static int SendBuffer(int fd, void* buf, int len);
411 
413  static int RecvBuffer(int fd, void* buf, int len);
414 
415 
418  static int CreateUdp();
419 
425  static int BindUdp(int fd, int nport, int portmin=-1, int portmax=-1);
426 
428  static void CloseUdp(int fd);
429 
431  static bool AttachMulticast(int handle, const std::string &addr);
432 
434  static void DettachMulticast(int handle, const std::string &addr);
435 
437  static std::string DefineHostName(bool force = true);
438 
439  static int ConnectUdp(int fd, const std::string &remhost, int remport);
440 
446  static SocketServerAddon* CreateServerAddon(const std::string &host, int nport, int portmin=-1, int portmax=-1);
447 
448  static SocketClientAddon* CreateClientAddon(const std::string &servid, int dflt_port = -1);
449  };
450 }
451 
452 #endif
#define DABC_REFERENCE(RefClass, ParentClass, T)
Definition: Reference.h:222
Reference on memory from memory pool.
Definition: Buffer.h:135
Represents command with its arguments.
Definition: Command.h:99
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
Object * GetObject() const
Return pointer on the object.
Definition: Reference.h:129
bool IsSocket() const
Definition: SocketThread.h:132
Special addon class for handling of socket and socket events.
Definition: SocketThread.h:52
virtual void ProcessEvent(const EventId &)
bool fDoingOutput
true if data need to be send
Definition: SocketThread.h:60
bool IsDoingOutput() const
Definition: SocketThread.h:112
virtual void OnSocketError(int msg, const std::string &info)
Generic error handler.
bool IsDeliverEventsToWorker() const
Definition: SocketThread.h:120
void SetIOPriority(int prior=1)
Method defines priority level for socket IO events.
Definition: SocketThread.h:125
void SetDoingInput(bool on=true)
Call method to indicate that object wants to read data from the socket.
Definition: SocketThread.h:69
@ evntSocketLast
from this event number one can add more socket system events
Definition: SocketThread.h:94
@ evntSocketSendInfo
event delivered to worker when write is completed
Definition: SocketThread.h:96
@ evntSocketRecvInfo
event delivered to worker when read is completed
Definition: SocketThread.h:95
@ evntSocketLastInfo
last system event, used by sockets
Definition: SocketThread.h:99
@ evntSocketCloseInfo
event delivered to worker when socket is closed
Definition: SocketThread.h:98
@ evntSocketErrorInfo
event delivered to worker when error is detected
Definition: SocketThread.h:97
int Socket() const
Definition: SocketThread.h:108
bool fDoingInput
true if input data are expected
Definition: SocketThread.h:59
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: SocketThread.h:105
int fIOPriority
priority of socket I/O events, default 1
Definition: SocketThread.h:61
void SetDeliverEventsToWorker(bool on=true)
Definition: SocketThread.h:121
bool IsSocket() const
Definition: SocketThread.h:109
virtual std::string RequiredThrdClass() const
Definition: SocketThread.h:106
virtual ~SocketAddon()
bool IsDeleteWorkerOnClose() const
Definition: SocketThread.h:83
ssize_t DoSendBuffer(void *buf, ssize_t len)
bool IsDoingInput() const
Definition: SocketThread.h:111
void SetDeleteWorkerOnClose(bool on=true)
Definition: SocketThread.h:84
int fSocket
socket handle
Definition: SocketThread.h:58
ssize_t DoSendBufferHdr(void *hdr, ssize_t hdrlen, void *buf, ssize_t len, void *tgtaddr=0, unsigned tgtaddrlen=0)
void SetSocket(int fd)
SocketAddon(int fd=-1)
bool fDeleteWorkerOnClose
if true, worker will be deleted when socket closed or socket in error
Definition: SocketThread.h:63
bool fDeliverEventsToWorker
if true, completion events will be delivered to the worker
Definition: SocketThread.h:62
ssize_t DoRecvBufferHdr(void *hdr, ssize_t hdrlen, void *buf, ssize_t len, void *srcaddr=0, unsigned srcaddrlen=0)
void SetDoingOutput(bool on=true)
Call method to indicate that worker wants to write data to the socket.
Definition: SocketThread.h:73
ssize_t DoRecvBuffer(void *buf, ssize_t len)
Socket addon for handling connection on client side.
Definition: SocketThread.h:322
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: SocketThread.h:333
virtual void OnConnectionFailed()
struct addrinfo fServAddr
Definition: SocketThread.h:341
void SetRetryOpt(int nretry, double tmout=1.)
virtual void OnThreadAssigned()
SocketClientAddon(const struct addrinfo *serv_addr, int fd=-1)
virtual double ProcessTimeout(double)
virtual void OnConnectionEstablished(int fd)
virtual void ProcessEvent(const EventId &)
Socket addon for handling connection events.
Definition: SocketThread.h:244
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: SocketThread.h:271
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
Definition: SocketThread.h:256
void SetConnHandler(const WorkerRef &rcv, const std::string &connid)
Set connection handler.
Definition: SocketThread.h:265
Socket addon for handling I/O events.
Definition: SocketThread.h:144
unsigned fRecvIOVNumber
number of elements in current recv operation
Definition: SocketThread.h:163
unsigned fSendIOVSize
total number of elements in send vector
Definition: SocketThread.h:152
void AllocateSendIOV(unsigned size)
struct iovec * fRecvIOV
receive io vector for scatter list
Definition: SocketThread.h:160
unsigned fLastRecvSize
size of last recv operation
Definition: SocketThread.h:165
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
unsigned GetRecvSize() const
Method return size of last buffer read from socket.
Definition: SocketThread.h:205
struct iovec * fSendIOV
sending io vector for gather list
Definition: SocketThread.h:151
bool fUseMsgOper
indicate if sendmsg, recvmsg operations should be used, it is must for the datagram sockets
Definition: SocketThread.h:147
unsigned fSendIOVNumber
number of elements in current send operation
Definition: SocketThread.h:154
virtual ~SocketIOAddon()
Destructor of SocketIOAddon class.
void SetSendAddr(const std::string &host="", int port=0)
Set destination address for all send operations,.
unsigned fRecvIOVFirst
number of element in recv IOV where transfer is started
Definition: SocketThread.h:162
bool fDatagramSocket
indicate if socket is datagram and all operations should be finished with single call
Definition: SocketThread.h:146
bool IsDoingRecv() const
Definition: SocketThread.h:186
struct sockaddr_in fSendAddr
optional send address for next send operation
Definition: SocketThread.h:155
bool IsDatagramSocket() const
Definition: SocketThread.h:178
unsigned fRecvIOVSize
number of elements in recv vector
Definition: SocketThread.h:161
bool fSendUseMsg
use sendmsg for transport
Definition: SocketThread.h:150
struct sockaddr_in fRecvAddr
source address of last receive operation
Definition: SocketThread.h:164
virtual void OnSendCompleted()
Method called when send operation is completed.
Definition: SocketThread.h:189
void AllocateRecvIOV(unsigned size)
unsigned fSendIOVFirst
number of element in send IOV where transfer is started
Definition: SocketThread.h:153
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: SocketThread.h:176
virtual void OnRecvCompleted()
Method called when receive operation is completed.
Definition: SocketThread.h:195
struct sockaddr_in & GetRecvAddr()
Method provide address of last receive operation.
Definition: SocketThread.h:201
bool StartRecv(void *buf, size_t size)
bool StartNetSend(void *hdr, unsigned hdrsize, const Buffer &buf)
bool StartRecvHdr(void *hdr, unsigned hdrsize, void *buf, size_t size)
bool fSendUseAddr
if true, fSendAddr will be used
Definition: SocketThread.h:156
bool fRecvUseMsg
use recvmsg for transport
Definition: SocketThread.h:159
virtual void ProcessEvent(const EventId &)
SocketIOAddon(int fd=0, bool isdatagram=false, bool usemsg=true)
Constructor of SocketIOAddon class.
void CancelIOOperations()
Method should be used to cancel all running I/O operation of the socket.
bool StartNetRecv(void *hdr, unsigned hdrsize, Buffer &buf, BufferSize_t datasize)
bool IsDoingSend() const
Definition: SocketThread.h:185
Socket addon for handling connection requests on server side.
Definition: SocketThread.h:285
std::string ServerId()
Definition: SocketThread.h:308
std::string fServerHostName
Definition: SocketThread.h:287
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: SocketThread.h:310
struct sockaddr_storage ai_addr
Definition: SocketThread.h:294
std::string ServerHostName()
Definition: SocketThread.h:306
virtual void OnClientConnected(int fd)
SocketServerAddon(int serversocket, const char *hostname=nullptr, int portnum=-1, struct addrinfo *info=nullptr)
virtual void ProcessEvent(const EventId &)
int ServerPortNumber() const
Definition: SocketThread.h:307
Special thread class for handling sockets.
Definition: SocketThread.h:355
bool fWaitFire
indicates if pipe firing is awaited
Definition: SocketThread.h:369
static std::string DefineHostName(bool force=true)
Return current host name.
int fBalanceCnt
counter for balancing of input events
Definition: SocketThread.h:376
int fScalerCounter
variable used to test time to time sockets even if there are events in the queue
Definition: SocketThread.h:370
virtual bool WaitEvent(EventId &, double tmout)
int fPipe[2]
array with i/o pipes handles
Definition: SocketThread.h:367
static void CloseUdp(int fd)
Close datagram (udp) socket.
virtual void ProcessExtraThreadEvent(const EventId &evid)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
static SocketClientAddon * CreateClientAddon(const std::string &servid, int dflt_port=-1)
virtual void _Fire(const EventId &evnt, int nq)
static void DettachMulticast(int handle, const std::string &addr)
Detach datagram socket from multicast group.
virtual void WorkersSetChanged()
Virtual method, called from thread context to inform that number of workers are changed.
pollfd * f_ufds
list of file descriptors for poll call
Definition: SocketThread.h:372
bool fCheckNewEvents
flag indicate if sockets should be checked for new events even if there are already events in the que...
Definition: SocketThread.h:375
static int SendBuffer(int fd, void *buf, int len)
Wrapper for send method, should be used for blocking sockets.
static int StartClient(const std::string &host, int nport, bool nonblocking=true)
static int CreateUdp()
Create datagram (udp) socket.
virtual bool CompatibleClass(const std::string &clname) const
bool fIsAnySocket
indicates that at least one socket processors in the list
Definition: SocketThread.h:374
static bool AttachMulticast(int handle, const std::string &addr)
Attach datagram socket to multicast group to make receiving.
static int RecvBuffer(int fd, void *buf, int len)
Wrapper for recv method, should be used for blocking sockets.
static int BindUdp(int fd, int nport, int portmin=-1, int portmax=-1)
Bind UDP socket to specified port.
@ evntEnableCheck
event to enable again checking sockets for new events
Definition: SocketThread.h:358
@ evntLastSocketThrdEvent
last event, which can be used by socket
Definition: SocketThread.h:359
static int ConnectUdp(int fd, const std::string &remhost, int remport)
unsigned f_sizeufds
size of the structure, which was allocated
Definition: SocketThread.h:371
SocketThread(Reference parent, const std::string &name, Command cmd)
ProcRec * f_recs
identify used processors
Definition: SocketThread.h:373
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: SocketThread.h:401
static bool SetNonBlockSocket(int fd)
static SocketServerAddon * CreateServerAddon(const std::string &host, int nport, int portmin=-1, int portmax=-1)
Create handle for server-side connection If hostname == 0, any available address will be selected If ...
long fPipeFired
indicate if something was written in pipe
Definition: SocketThread.h:368
static bool SetNoDelaySocket(int fd)
Represent thread functionality.
Definition: Thread.h:109
@ evntLastThrd
Definition: Thread.h:324
Reference on dabc::WorkerAddon object
Definition: Worker.h:101
Generic addon for dabc::Worker.
Definition: Worker.h:49
void FireWorkerEvent(unsigned evid)
Definition: Worker.cxx:56
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
Definition: Worker.cxx:39
Reference on dabc::Worker
Definition: Worker.h:466
@ evntFirstSystem
Definition: Worker.h:202
@ evntFirstAddOn
Definition: Worker.h:201
Event manipulation API.
Definition: api.h:23
const char * typeSocketThread
Definition: Object.cxx:80
uint32_t BufferSize_t
Definition: Buffer.h:32
std::string format(const char *fmt,...)
Definition: string.cxx:49
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
uint32_t indx
index for dereference of processor from ufds structure
Definition: SocketThread.h:364
bool use
indicates if processor is used for poll
Definition: SocketThread.h:363