DABC (Data Acquisition Backbone Core)  2.9.9
SocketThread.cxx
Go to the documentation of this file.
1 // $Id: SocketThread.cxx 4726 2021-03-13 18:09:15Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/SocketThread.h"
17 
18 #include <sys/types.h> /* See NOTES */
19 #include <sys/socket.h>
20 #include <sys/poll.h>
21 #include <fcntl.h>
22 #include <cerrno>
23 #include <cstdlib>
24 #include <unistd.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <arpa/inet.h>
28 
29 #include "dabc/Configuration.h"
30 
31 #if defined(__MACH__) /* Apple OSX section */
32 #if !defined(MSG_NOSIGNAL)
33 #define MSG_NOSIGNAL SO_NOSIGPIPE
34 #endif
35 #endif
36 
37 const char* SocketErr(int err)
38 {
39  switch (err) {
40  case -1: return "Internal";
41  case 0: return "Close";
42  case EAGAIN: return "EAGAIN";
43  case EBADF: return "EBADF";
44  case ECONNREFUSED: return "ECONNREFUSED";
45  case EFAULT: return "EFAULT";
46  case EINTR: return "EINTR";
47  case EINVAL: return "EINVAL";
48  case ENOMEM: return "ENOMEM";
49  case ENOTCONN: return "ENOTCONN";
50  case ENOTSOCK: return "ENOTSOCK";;
51 
52  case EACCES: return "EACCES";
53  case ECONNRESET: return "ECONNRESET";
54  case EDESTADDRREQ: return "EDESTADDRREQ";
55  case EISCONN: return "EISCONN";
56  case EMSGSIZE: return "EMSGSIZE";
57  case ENOBUFS: return "ENOBUFS";
58  case EOPNOTSUPP: return "EOPNOTSUPP";
59  case EPIPE: return "EPIPE";
60 
61  case EPERM: return "EPERM";
62  case EADDRINUSE: return "EADDRINUSE";
63  case EAFNOSUPPORT: return "EAFNOSUPPORT";
64  case EALREADY: return "EALREADY";
65  case EINPROGRESS: return "EINPROGRESS";
66  case ENETUNREACH: return "ENETUNREACH";
67  case ETIMEDOUT: return "ETIMEDOUT";
68  }
69 
70  return "UNCKNOWN";
71 }
72 
73 // _____________________________________________________________________
74 
76  WorkerAddon("socket"),
77  fSocket(fd),
78  fDoingInput(false),
79  fDoingOutput(false),
80  fIOPriority(1),
81  fDeliverEventsToWorker(false),
82  fDeleteWorkerOnClose(false)
83 {
84 }
85 
87 {
88  CloseSocket();
89 }
90 
91 
93 {
94  switch (evnt.GetCode()) {
95  case evntSocketRead:
96  break;
97 
98  case evntSocketWrite:
99  break;
100 
101  case evntSocketError:
102  OnSocketError(-1, "get error event");
103  break;
104 
105  default:
107  }
108 }
109 
111 {
112  CloseSocket();
113  fSocket = fd;
114 }
115 
117 {
118  int fd = fSocket;
119  fSocket = -1;
120  return fd;
121 }
122 
124 {
125  if (fSocket<0) return;
126 
127  DOUT3("~~~~~~~~~~~~~~~~ Close socket %d", fSocket);
128  close(fSocket);
129  fSocket = -1;
130 }
131 
133 {
134  if (Socket()<0) return -1;
135 
136  int myerrno = 753642;
137  socklen_t optlen = sizeof(myerrno);
138 
139  int res = getsockopt(Socket(), SOL_SOCKET, SO_ERROR, &myerrno, &optlen);
140 
141  if ((res<0) || (myerrno == 753642)) return -1;
142 
143  return myerrno;
144 }
145 
146 void dabc::SocketAddon::OnSocketError(int msg, const std::string &info)
147 {
148  if (IsDeliverEventsToWorker()) {
149  DOUT2("Addon:%p Connection closed - worker should process", this);
150  FireWorkerEvent(msg==0 ? evntSocketCloseInfo : evntSocketErrorInfo);
151  } else
152  if (fDeleteWorkerOnClose) {
153  DOUT2("Connection closed - destroy socket");
154  CloseSocket();
155  DeleteWorker();
156  } else {
157  DOUT2("Connection closed - destroy addon");
158  CloseSocket();
159  DeleteAddonItself();
160  }
161 }
162 
163 ssize_t dabc::SocketAddon::DoRecvBuffer(void* buf, ssize_t len)
164 {
165  ssize_t res = recv(fSocket, buf, len, MSG_DONTWAIT | MSG_NOSIGNAL);
166 
167  if (res==0) OnSocketError(0, "closed during recv()"); else
168  if (res<0) {
169  if (errno!=EAGAIN) OnSocketError(errno, "when recv()");
170  }
171 
172  return res;
173 }
174 
175 ssize_t dabc::SocketAddon::DoRecvBufferHdr(void* hdr, ssize_t hdrlen, void* buf, ssize_t len, void* srcaddr, unsigned srcaddrlen)
176 {
177  struct iovec iov[2];
178 
179  iov[0].iov_base = hdr;
180  iov[0].iov_len = hdrlen;
181 
182  iov[1].iov_base = buf;
183  iov[1].iov_len = len;
184 
185  struct msghdr msg;
186 
187  msg.msg_name = srcaddr;
188  msg.msg_namelen = srcaddrlen;
189  msg.msg_iov = iov;
190  msg.msg_iovlen = buf ? 2 : 1;
191  msg.msg_control = 0;
192  msg.msg_controllen = 0;
193  msg.msg_flags = 0;
194 
195  ssize_t res = recvmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
196 
197  if (res==0) OnSocketError(0, "when recvmsg()"); else
198  if (res<0) {
199  if (errno!=EAGAIN) OnSocketError(errno, "when recvmsg()");
200  }
201 
202  return res;
203 }
204 
205 ssize_t dabc::SocketAddon::DoSendBuffer(void* buf, ssize_t len)
206 {
207  ssize_t res = send(fSocket, buf, len, MSG_DONTWAIT | MSG_NOSIGNAL);
208 
209  if (res==0) OnSocketError(0, "when send()"); else
210  if (res<0) {
211  if (errno!=EAGAIN) OnSocketError(errno, "When send()");
212  }
213 
214  return res;
215 }
216 
217 
218 ssize_t dabc::SocketAddon::DoSendBufferHdr(void* hdr, ssize_t hdrlen, void* buf, ssize_t len, void* tgtaddr, unsigned tgtaddrlen)
219 {
220  struct iovec iov[2];
221 
222  iov[0].iov_base = hdr;
223  iov[0].iov_len = hdrlen;
224 
225  iov[1].iov_base = buf;
226  iov[1].iov_len = len;
227 
228  struct msghdr msg;
229 
230  msg.msg_name = tgtaddr;
231  msg.msg_namelen = tgtaddrlen;
232  msg.msg_iov = iov;
233  msg.msg_iovlen = buf ? 2 : 1;
234  msg.msg_control = 0;
235  msg.msg_controllen = 0;
236  msg.msg_flags = 0;
237 
238  ssize_t res = sendmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
239 
240  if (res==0) OnSocketError(0, "when sendmsg()"); else
241  if (res<0) {
242  if (errno!=EAGAIN) OnSocketError(errno, "When sendmsg()");
243  }
244 
245  return res;
246 }
247 
248 
249 // _____________________________________________________________________
250 
251 dabc::SocketIOAddon::SocketIOAddon(int fd, bool isdatagram, bool usemsg) :
252  SocketAddon(fd),
253  fDatagramSocket(isdatagram),
254  fUseMsgOper(usemsg),
255  fSendUseMsg(true),
256  fSendIOV(0),
257  fSendIOVSize(0),
258  fSendIOVFirst(0),
259  fSendIOVNumber(0),
260  fSendUseAddr(false),
261  fRecvUseMsg(true),
262  fRecvIOV(0),
263  fRecvIOVSize(0),
264  fRecvIOVFirst(0),
265  fRecvIOVNumber(0),
266  fLastRecvSize(0)
267 {
268  if (IsDatagramSocket() && !fUseMsgOper) {
269  EOUT("Dangerous - datagram socket MUST use sendmsg()/recvmsg() operation to be able send/recv segmented buffers, force");
270  fUseMsgOper = true;
271  }
272 
273  memset(&fSendAddr, 0, sizeof(fSendAddr));
274 
275  #ifdef SOCKET_PROFILING
276  fSendOper = 0;
277  fSendTime = 0.;
278  fSendSize = 0;
279  fRecvOper = 0;
280  fRecvTime = 0.;
281  fRecvSize = 0.;
282  #endif
283 }
284 
286 {
287  #ifdef SOCKET_PROFILING
288  DOUT1("SocketIOAddon::~SocketIOAddon Send:%ld Recv:%ld", fSendOper, fRecvOper);
289  if (fSendOper>0)
290  DOUT1(" Send time:%5.1f microsec sz:%7.1f", fSendTime*1e6/fSendOper, 1.*fSendSize/fSendOper);
291  if (fRecvOper>0)
292  DOUT1(" Recv time:%5.1f microsec sz:%7.1f", fRecvTime*1e6/fRecvOper, 1.*fRecvSize/fRecvOper);
293  #endif
294 
295  DOUT4("Destroying SocketIOAddon %p fd:%d", this, Socket());
296 
297  AllocateSendIOV(0);
298  AllocateRecvIOV(0);
299 }
300 
301 void dabc::SocketIOAddon::SetSendAddr(const std::string &host, int port)
302 {
303  if (!IsDatagramSocket()) {
304  EOUT("Cannot specify send addr for non-datagram sockets");
305  return;
306  }
307  memset(&fSendAddr, 0, sizeof(fSendAddr));
308  fSendUseAddr = false;
309 
310 /* if (!host.empty() && (port>0)) {
311  fSendUseAddr = true;
312  fSendAddr.sin_family = AF_INET;
313  fSendAddr.sin_addr.s_addr = inet_addr(host.c_str());
314  fSendAddr.sin_port = htons(port);
315  }
316 */
317 
318  struct hostent *h = gethostbyname(host.c_str());
319  if ((h==0) || (h->h_addrtype!=AF_INET) || host.empty()) {
320  EOUT("Cannot get host information for %s", host.c_str());
321  return;
322  }
323 
324  fSendAddr.sin_family = AF_INET;
325  memcpy(&fSendAddr.sin_addr.s_addr, h->h_addr_list[0], h->h_length);
326  fSendAddr.sin_port = htons (port);
327 
328  fSendUseAddr = true;
329 
330 }
331 
332 
334 {
335  if (fSendIOV!=0) delete [] fSendIOV;
336 
337  fSendIOV = 0;
338  fSendIOVSize = 0;
339  fSendIOVFirst = 0;
340  fSendIOVNumber = 0;
341 
342  if (size<=0) return;
343 
344  fSendIOVSize = size;
345  fSendIOV = new struct iovec [size];
346 }
347 
349 {
350  if (fRecvIOV!=0) delete [] fRecvIOV;
351 
352  fRecvIOV = 0;
353  fRecvIOVSize = 0;
354  fRecvIOVFirst = 0;
355  fRecvIOVNumber = 0;
356 
357  if (size<=0) return;
358 
359  fRecvIOVSize = size;
360  fRecvIOV = new struct iovec [size];
361 }
362 
363 bool dabc::SocketIOAddon::StartSend(const void* buf1, unsigned size1,
364  const void* buf2, unsigned size2,
365  const void* buf3, unsigned size3)
366 {
367  if (fSendIOVNumber>0) {
368  EOUT("Current send operation not yet completed");
369  return false;
370  }
371 
372  if (fSendIOVSize<3) AllocateSendIOV(8);
373 
374  int indx = 0;
375  if (buf1 && (size1>0)) {
376  fSendIOV[indx].iov_base = (void*) buf1;
377  fSendIOV[indx].iov_len = size1;
378  indx++;
379  }
380 
381  if (buf2 && (size2>0)) {
382  fSendIOV[indx].iov_base = (void*) buf2;
383  fSendIOV[indx].iov_len = size2;
384  indx++;
385  }
386 
387  if (buf3 && (size3>0)) {
388  fSendIOV[indx].iov_base = (void*) buf3;
389  fSendIOV[indx].iov_len = size3;
390  indx++;
391  }
392 
393  if (indx==0) {
394  EOUT("No buffer specified");
395  return false;
396  }
397 
398  fSendUseMsg = fUseMsgOper;
399  fSendIOVFirst = 0;
400  fSendIOVNumber = indx;
401 
402  // TODO: Should we inform thread directly that we want to send data??
403  SetDoingOutput(true);
404 
405  return true;
406 }
407 
408 
409 bool dabc::SocketIOAddon::StartRecv(void* buf, size_t size)
410 {
411  return StartRecvHdr(0, 0, buf, size);
412 }
413 
415 {
416  // this is simple version,
417  // where only buffer itself without header is transported
418 
419  return StartNetSend(0, 0, buf);
420 }
421 
422 bool dabc::SocketIOAddon::StartRecvHdr(void* hdr, unsigned hdrsize, void* buf, size_t size)
423 {
424  if (fRecvIOVNumber>0) {
425  EOUT("Current recv operation not yet completed");
426  return false;
427  }
428 
429  if (fRecvIOVSize<2) AllocateRecvIOV(8);
430 
431  int indx = 0;
432 
433  if ((hdr!=0) && (hdrsize>0)) {
434  fRecvIOV[indx].iov_base = hdr;
435  fRecvIOV[indx].iov_len = hdrsize;
436  indx++;
437  }
438 
439  fRecvIOV[indx].iov_base = buf;
440  fRecvIOV[indx].iov_len = size;
441  indx++;
442 
443  fRecvUseMsg = fUseMsgOper;
444  fRecvIOVFirst = 0;
445  fRecvIOVNumber = indx;
446 
447  // TODO: Should we inform thread directly that we want to recv data??
448  SetDoingInput(true);
449 
450  return true;
451 }
452 
453 
455 {
456  return StartNetRecv(0, 0, buf, datasize);
457 }
458 
459 bool dabc::SocketIOAddon::StartNetRecv(void* hdr, unsigned hdrsize, Buffer& buf, BufferSize_t datasize)
460 {
461  // datasize==0 here really means that there is no data to get !!!!
462 
463  if (fRecvIOVNumber>0) {
464  EOUT("Current recv operation not yet completed");
465  return false;
466  }
467 
468  if (buf.null()) return false;
469 
470  if (fRecvIOVSize<=buf.NumSegments()) AllocateRecvIOV(buf.NumSegments()+1);
471 
472  fRecvUseMsg = fUseMsgOper;
473  fRecvIOVFirst = 0;
474 
475  int indx = 0;
476 
477  if ((hdr!=0) && (hdrsize>0)) {
478  fRecvIOV[indx].iov_base = hdr;
479  fRecvIOV[indx].iov_len = hdrsize;
480  indx++;
481  }
482 
483  for (unsigned nseg=0; nseg<buf.NumSegments(); nseg++) {
484  BufferSize_t segsize = buf.SegmentSize(nseg);
485  if (segsize>datasize) segsize = datasize;
486  if (segsize==0) break;
487 
488  fRecvIOV[indx].iov_base = buf.SegmentPtr(nseg);
489  fRecvIOV[indx].iov_len = segsize;
490  indx++;
491 
492  datasize-=segsize;
493  }
494 
495  fRecvIOVNumber = indx;
496 
497  // TODO: Should we inform thread directly that we want to recv data??
498  SetDoingInput(true);
499 
500  return true;
501 }
502 
503 bool dabc::SocketIOAddon::StartNetSend(void* hdr, unsigned hdrsize, const Buffer& buf)
504 {
505  if (fSendIOVNumber>0) {
506  EOUT("Current send operation not yet completed");
507  return false;
508  }
509 
510  if (buf.null()) return false;
511 
512  if (fSendIOVSize<=buf.NumSegments()) AllocateSendIOV(buf.NumSegments()+1);
513 
514  fSendUseMsg = fUseMsgOper;
515  fSendIOVFirst = 0;
516 
517  int indx = 0;
518 
519  if ((hdr!=0) && (hdrsize>0)) {
520  fSendIOV[indx].iov_base = hdr;
521  fSendIOV[indx].iov_len = hdrsize;
522  indx++;
523  }
524 
525  for (unsigned nseg=0; nseg<buf.NumSegments(); nseg++) {
526  fSendIOV[indx].iov_base = buf.SegmentPtr(nseg);
527  fSendIOV[indx].iov_len = buf.SegmentSize(nseg);
528  indx++;
529  }
530 
531  fSendIOVNumber = indx;
532 
533  // TODO: Should we inform thread that we want to send data??
534  SetDoingOutput(true);
535 
536  return true;
537 }
538 
540 {
541 // DOUT0("IO addon:%p process event %u", this, evnt.GetCode());
542 
543  switch (evnt.GetCode()) {
544  case evntSocketRead: {
545 
546  if (IsLogging())
547  DOUT0("Socket %d wants to receive number %d usemsg %s", Socket(), fRecvIOVNumber, DBOOL(fRecvUseMsg));
548 
549  // nothing to recv
550  if (fRecvIOVNumber==0) return;
551 
552  if ((fRecvIOV==0) || (fSocket<0)) {
553  EOUT("HARD PROBLEM when reading socket");
554  OnSocketError(-1, "Missing socket when evntSocketRead fired");
555  return;
556  }
557 
558  #ifdef SOCKET_PROFILING
559  fRecvOper++;
560  TimeStamp tm1 = dabc::Now();
561  #endif
562 
563  fLastRecvSize = 0;
564  ssize_t res = 0;
565 
566 // DOUT1("Socket %d fRecvIOV = %p fRecvIOVFirst = %u number %u iov: %p %u",
567 // fSocket, fRecvIOV, fRecvIOVFirst, fRecvIOVNumber,
568 // fRecvIOV[fRecvIOVFirst].iov_base, fRecvIOV[fRecvIOVFirst].iov_len);
569 
570  if (fRecvUseMsg) {
571 
572  struct msghdr msg;
573 
574  msg.msg_name = &fRecvAddr;
575  msg.msg_namelen = sizeof(fRecvAddr);
576  msg.msg_iov = &(fRecvIOV[fRecvIOVFirst]);
577  msg.msg_iovlen = fRecvIOVNumber - fRecvIOVFirst;
578  msg.msg_control = 0;
579  msg.msg_controllen = 0;
580  msg.msg_flags = 0;
581 
582  res = recvmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
583  } else {
584  socklen_t addrlen = sizeof(fRecvAddr);
585  res = recvfrom(fSocket, fRecvIOV[fRecvIOVFirst].iov_base, fRecvIOV[fRecvIOVFirst].iov_len, MSG_DONTWAIT | MSG_NOSIGNAL, (sockaddr*) &fRecvAddr, &addrlen);
586  }
587 
588 // if (IsLogging())
589 // DOUT0("Socket %d get receive %d", Socket(), res);
590 
591  #ifdef SOCKET_PROFILING
592  TimeStamp tm2 = dabc::Now();
593  fRecvTime += (tm2-tm1);
594  if (res>0) fRecvSize += res;
595  #endif
596 
597  if (res==0) {
598  // DOUT0("Addon:%p socket:%d res==0 when doing read usemsg %s numseg %u seg0.len %u", this, fSocket, DBOOL(fRecvUseMsg), fRecvIOVNumber - fRecvIOVFirst, fRecvIOV[fRecvIOVFirst].iov_len);
599  OnSocketError(0, "when recvmsg()");
600  return;
601  }
602 
603  if (res<0) {
604  if (errno!=EAGAIN) {
605  OnSocketError(errno, "when recvmsg()");
606  } else {
607  // we indicating that we want to receive data but there is nothing to read
608  // why we get message at all?
609  SetDoingInput(true);
610  EOUT("Why socket read message produce but we do not get any data??");
611  }
612 
613  return;
614  }
615 
616  fLastRecvSize = res;
617 
618  if (IsDatagramSocket()) {
619  // for datagram the only recv message is possible
620  fRecvIOVFirst = 0;
621  fRecvIOVNumber = 0;
622 
623 // if (IsLogging())
624 // DOUT0("Socket %d signals COMPL", Socket());
625 
626  OnRecvCompleted();
627  return;
628  }
629 
630  while (res>0) {
631 
632  struct iovec* rec = &(fRecvIOV[fRecvIOVFirst]);
633 
634  if (rec->iov_len <= (unsigned) res) {
635  // just skip current rec, jump to next
636  res -= rec->iov_len;
637  fRecvIOVFirst++;
638 
639  if (fRecvIOVFirst==fRecvIOVNumber) {
640  if (res!=0) EOUT("Internal error - length after recvmsg() not zero");
641 
642  fRecvIOVFirst = 0;
643  fRecvIOVNumber = 0;
644 
645 // if (IsLogging())
646 // DOUT0("Socket %d signals COMPL", Socket());
647 
648  OnRecvCompleted();
649 
650  return;
651  }
652  } else {
653  rec->iov_len -= res;
654  rec->iov_base = (char*)rec->iov_base + res;
655  res = 0;
656  }
657  }
658 
659  // there is still some portion of data should be read from the socket, indicate this for the thread
660  SetDoingInput(true);
661 
662  break;
663  }
664 
665  case evntSocketWrite: {
666 
667  if (fSendIOVNumber==0) return; // nothing to send
668 
669  #ifdef SOCKET_PROFILING
670  fSendOper++;
671  TimeStamp tm1 = dabc::Now();
672  #endif
673 
674  if ((fSocket<0) || (fSendIOV==0)) {
675  EOUT("HARD PROBLEM when trying write socket");
676  }
677 
678  ssize_t res = 0;
679 
680  if (fSendUseMsg) {
681 
682  struct msghdr msg;
683 
684  msg.msg_name = fSendUseAddr ? &fSendAddr : 0;
685  msg.msg_namelen = fSendUseAddr ? sizeof(fSendAddr) : 0;;
686  msg.msg_iov = &(fSendIOV[fSendIOVFirst]);
687  msg.msg_iovlen = fSendIOVNumber - fSendIOVFirst;
688  msg.msg_control = 0;
689  msg.msg_controllen = 0;
690  msg.msg_flags = 0;
691 
692  res = sendmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
693  } else
694  res = send(fSocket, fSendIOV[fSendIOVFirst].iov_base, fSendIOV[fSendIOVFirst].iov_len, MSG_DONTWAIT | MSG_NOSIGNAL);
695 
696  #ifdef SOCKET_PROFILING
697  TimeStamp tm2 = dabc::Now();
698  fSendTime += (tm2-tm1);
699  if (res>0) fSendSize += res;
700  #endif
701 
702 
703  if (res==0) {
704  OnSocketError(0, "when sendmsg()");
705  return;
706  }
707 
708  if (res<0) {
709  DOUT2("Error when sending via socket %d usemsg %s first %d number %d", fSocket, DBOOL(fSendUseMsg), fSendIOVFirst, fSendIOVNumber);
710 
711  if (errno!=EAGAIN) {
712  OnSocketError(errno, "when sendmsg()");
713  } else {
714  // we indicating that we want to receive data but there is nothing to read
715  // why we get message at all?
716  SetDoingOutput(true);
717  EOUT("Why socket write message produce but we did not send any bytes?");
718  }
719  return;
720  }
721 
722  DOUT5("Socket %d send %d bytes", Socket(), res);
723 
724  while (res>0) {
725  struct iovec* rec = &(fSendIOV[fSendIOVFirst]);
726 
727  if (rec->iov_len <= (unsigned) res) {
728  // just skip current rec, jump to next
729  res -= rec->iov_len;
730  fSendIOVFirst++;
731 
732  if (fSendIOVFirst==fSendIOVNumber) {
733  if (res!=0) EOUT("Internal error - length after sendmsg() not zero");
734 
735  fSendIOVFirst = 0;
736  fSendIOVNumber = 0;
737 
738  OnSendCompleted();
739 
740  return;
741  }
742  } else {
743  rec->iov_len -= res;
744  rec->iov_base = (char*)rec->iov_base + res;
745  res = 0;
746  }
747  }
748 
749  // we are informing that there is some data still to send
750  SetDoingOutput(true);
751 
752  break;
753  }
754  default:
756  }
757 }
758 
760 {
761  fSendIOVNumber = 0;
762  fRecvIOVNumber = 0;
763 }
764 
765 // ___________________________________________________________________
766 
767 dabc::SocketServerAddon::SocketServerAddon(int serversocket, const char* hostname, int portnum, struct addrinfo *info) :
768  SocketConnectAddon(serversocket),
769  fServerHostName(hostname ? hostname : ""),
770  fServerPortNumber(portnum)
771 {
772  if (fServerHostName.empty())
774 
775  if (info) {
776  ai_family = info->ai_family;
777  ai_socktype = info->ai_socktype;
778  ai_protocol = info->ai_protocol;
779  ai_addrlen = info->ai_addrlen;
780  memcpy(&ai_addr, info->ai_addr, info->ai_addrlen);
781  }
782 
783  SetDoingInput(true);
784  listen(Socket(), 10);
785 
786  DOUT2("Create dabc::SocketServerAddon");
787 }
788 
790 {
791  switch (evnt.GetCode()) {
792  case evntSocketRead: {
793 
794  // we accept more connections
795  SetDoingInput(true);
796 
797  int connfd = accept(Socket(), 0, 0);
798 
799  if (connfd < 0) {
800  EOUT("Error with accept");
801  fAcceptErrors++;
802  if (fAcceptErrors >= 1000) {
803  // try to recreate socket every 100 failure
804  if (fAcceptErrors % 100 == 0)
805  RecreateSocket();
806  }
807  if (fAcceptErrors > 2000) {
808  EOUT("Fatal - too many accept errors, abort application");
809  exit(1);
810  }
811  return;
812  }
813 
814  fAcceptErrors = 0;
815 
816  listen(Socket(), 10);
817 
819  EOUT("Cannot set nonblocking flag for connected socket");
820  close(connfd);
821  return;
822  }
823 
824  DOUT2("We get new connection with fd: %d", connfd);
825 
826  OnClientConnected(connfd);
827 
828  break;
829  }
830 
831  default:
833  }
834 }
835 
837 {
838  Command cmd("SocketConnect");
839  cmd.SetStr("Type", "Server");
840  cmd.SetInt("fd", fd);
841 
842  if (!fConnRcv.null() && !fConnId.empty()) {
843  cmd.SetStr("ConnId", fConnId);
844  fConnRcv.Submit(cmd);
845  } else
846  if (!fWorker.null()) {
847  ((Worker*) fWorker())->Submit(cmd);
848  } else {
849  EOUT("Method not implemented - socked will be closed");
850  close(fd);
851  }
852 }
853 
855 {
856  if (ai_addrlen == 0) return false;
857 
858  CloseSocket();
859  int sockfd = socket(ai_family, ai_socktype, ai_protocol);
860  if (sockfd < 0) return false;
861 
862  int opt = 1;
863  setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
864 
865  if (bind(sockfd, (struct sockaddr *) &ai_addr, ai_addrlen) == 0) {
867  SetSocket(sockfd);
868  return true;
869  }
870  }
871 
872  close(sockfd);
873  return false;
874 }
875 
876 
877 
878 // _____________________________________________________________________
879 
880 dabc::SocketClientAddon::SocketClientAddon(const struct addrinfo* serv_addr, int fd) :
881  SocketConnectAddon(fd),
882  fRetry(1),
883  fRetryTmout(-1)
884 {
885  if (serv_addr==0) {
886  EOUT("Server address not specified");
887  return;
888  }
889 
890  fServAddr.ai_flags = serv_addr->ai_flags;
891  fServAddr.ai_family = serv_addr->ai_family;
892  fServAddr.ai_socktype = serv_addr->ai_socktype;
893  fServAddr.ai_protocol = serv_addr->ai_protocol;
894  fServAddr.ai_addrlen = serv_addr->ai_addrlen;
895  fServAddr.ai_addr = nullptr;
896  if ((serv_addr->ai_addrlen>0) && (serv_addr->ai_addr!=0)) {
897  fServAddr.ai_addr = (sockaddr*) malloc(fServAddr.ai_addrlen);
898  if (fServAddr.ai_addr)
899  memcpy(fServAddr.ai_addr, serv_addr->ai_addr, fServAddr.ai_addrlen);
900  else
901  EOUT("Memory allocation error");
902  }
903 
904  fServAddr.ai_canonname = nullptr;
905  if (serv_addr->ai_canonname) {
906  size_t len = strlen(serv_addr->ai_canonname);
907  fServAddr.ai_canonname = (char*) malloc(len + 1);
908  if (fServAddr.ai_canonname)
909  strncpy(fServAddr.ai_canonname, serv_addr->ai_canonname, len+1);
910  else
911  EOUT("Memory allocation error");
912  }
913  fServAddr.ai_next = 0;
914 }
915 
917 {
918 // DOUT0("Actual destroy of SocketClientAddon %p worker %p", this, fWorker());
919 
920  free(fServAddr.ai_addr); fServAddr.ai_addr = nullptr;
921  free(fServAddr.ai_canonname); fServAddr.ai_canonname = nullptr;
922 }
923 
924 void dabc::SocketClientAddon::SetRetryOpt(int nretry, double tmout)
925 {
926  fRetry = nretry;
927  fRetryTmout = tmout;
928 }
929 
931 {
933 
934  FireWorkerEvent(evntSocketStartConnect);
935 }
936 
938 {
939  switch (evnt.GetCode()) {
940  case evntSocketWrite: {
941 
942  // we could get write event after socket was closed by error - ignore such event
943  if (Socket()<=0) return;
944 
945  // we can check if connection established
946 
947  int myerrno = TakeSocketError();
948 
949  if (myerrno==0) {
950  DOUT5("Connection done %7.5f", dabc::Now().AsDouble());
951  int fd = TakeSocket();
952  OnConnectionEstablished(fd);
953  return;
954  }
955 
956  DOUT3("Postponed connect socket err:%d %s", myerrno, SocketErr(myerrno));
957 
958  break;
959  }
960 
961  case evntSocketError: {
962  // int myerrno = TakeSocketError();
963  // DOUT3("Doing connect socket err:%d %s", myerrno, SocketErr(myerrno));
964  break;
965  }
966 
967  case evntSocketStartConnect: {
968  // start next attempt for connection
969 
970  DOUT3("Start next connect attempt sock:%d", Socket());
971 
972  if (Socket()<=0) {
973  int fd = socket(fServAddr.ai_family, fServAddr.ai_socktype, fServAddr.ai_protocol);
974 
975  if (fd<=0)
976  EOUT("Cannot create socket with given address");
977  else
978  SetSocket(fd);
979  }
980 
981  if (Socket()>0) {
983 
984  int res = connect(Socket(), fServAddr.ai_addr, fServAddr.ai_addrlen);
985 
986  if (res==0) {
987  int fd = TakeSocket();
988  OnConnectionEstablished(fd);
989  return;
990  }
991 
992  if (errno==EINPROGRESS) {
993  DOUT3("Connection in progress %7.5f", dabc::Now().AsDouble());
994  SetDoingOutput(true);
995  return;
996  }
997 
998  DOUT3("When calling connection socket err:%d %s", errno, SocketErr(errno));
999  }
1000 
1001  break;
1002  }
1003 
1004  default:
1006  return;
1007  }
1008 
1009  SetDoingOutput(false);
1010  CloseSocket();
1011 
1012  if (--fRetry>0) {
1013  DOUT3("Try connect after %5.1f s n:%d", fRetryTmout, fRetry);
1014 
1015  ActivateTimeout(fRetryTmout > 0. ? fRetryTmout : 0.);
1016  } else {
1017  OnConnectionFailed();
1018  }
1019 }
1020 
1022 {
1023  // activate connection start again
1024 
1025  FireWorkerEvent(evntSocketStartConnect);
1026  return -1.;
1027 }
1028 
1030 {
1031  Command cmd("SocketConnect");
1032  cmd.SetStr("Type", "Client");
1033  cmd.SetInt("fd", fd);
1034  cmd.SetStr("ConnId", fConnId);
1035 
1036  if (!fWorker.null() && IsDeliverEventsToWorker()) {
1037  ((Worker*) fWorker())->Submit(cmd);
1038  return;
1039  }
1040 
1041  if (!fConnRcv.null()) {
1042  fConnRcv.Submit(cmd);
1043  } else {
1044  EOUT("Connection established, but not processed - close socket");
1045  close(fd);
1046  }
1047 
1048  DeleteWorker();
1049 }
1050 
1052 {
1053  Command cmd("SocketConnect");
1054  cmd.SetStr("Type", "Error");
1055  cmd.SetStr("ConnId", fConnId);
1056 
1057  if (!fWorker.null() && IsDeliverEventsToWorker()) {
1058  ((Worker*) fWorker())->Submit(cmd);
1059  return;
1060  }
1061 
1062  if (!fConnRcv.null()) {
1063  fConnRcv.Submit(cmd);
1064  } else {
1065  EOUT("Connection failed to establish, error not processed");
1066  }
1067 
1068  DeleteWorker();
1069 }
1070 
1071 
1072 // _______________________________________________________________________
1073 
1074 dabc::SocketThread::SocketThread(Reference parent, const std::string &name, Command cmd) :
1075  dabc::Thread(parent, name, cmd),
1076  fPipeFired(false),
1077  fWaitFire(false),
1078  fScalerCounter(10),
1079  f_sizeufds(0),
1080  f_ufds(0),
1081  f_recs(0),
1082  fIsAnySocket(false),
1083  fCheckNewEvents(true),
1084  fBalanceCnt(0)
1085 {
1086 
1087 #ifdef SOCKET_PROFILING
1088  fWaitCalls = 0;
1089  fWaitDone = 0;
1090  fWaitTime = 0;
1091  fFillTime = 0;
1092  fPipeCalled = 0;
1093 #endif
1094 
1095  fPipe[0] = 0;
1096  fPipe[1] = 0;
1097  auto res = pipe(fPipe);
1098  (void) res; // ignore compiler warnings
1099 
1100  // by this call we rebuild ufds array, for now only for the pipe
1102 
1103 }
1104 
1106 {
1107  // !!!!!! we should stop thread before destroying anything while
1108  // thread may use some structures in the MainLoop !!!!!!!!
1109  DOUT3("~~~~~~~~~~~~~~ SOCKThread %s destructor with timeout %3.1fs", GetName(), GetStopTimeout());
1110  Stop(GetStopTimeout()); // JAM 6.7.2017 - try with larger timeout for ltsm
1111 
1112  if (fPipe[0]!=0) { close(fPipe[0]); fPipe[0] = 0; }
1113  if (fPipe[1]!=0) { close(fPipe[1]); fPipe[1] = 0; }
1114 
1115  if (f_ufds!=0) {
1116  delete[] f_ufds;
1117  delete[] f_recs;
1118  f_ufds = 0;
1119  f_recs = 0;
1120  f_sizeufds = 0;
1121  }
1122 
1123  #ifdef SOCKET_PROFILING
1124  DOUT1("Thrd:%s Wait called %ld done %ld ratio %5.3f %s Pipe:%ld", GetName(), fWaitCalls, fWaitDone, (fWaitCalls>0 ? 100.*fWaitDone/fWaitCalls : 0.) ,"%", fPipeCalled);
1125  if (fWaitDone>0)
1126  DOUT1("Aver times fill:%5.1f microsec wait:%5.1f microsec", fFillTime*1e6/fWaitDone, fWaitTime*1e6/fWaitDone);
1127  #endif
1128 }
1129 
1130 bool dabc::SocketThread::CompatibleClass(const std::string &clname) const
1131 {
1132  if (Thread::CompatibleClass(clname)) return true;
1133  return clname == typeSocketThread;
1134 }
1135 
1137 {
1138  int opts = fcntl(fd, F_GETFL);
1139  if (opts < 0) {
1140  EOUT("fcntl(F_GETFL) failed");
1141  return false;
1142  }
1143  opts = (opts | O_NONBLOCK);
1144  if (fcntl(fd, F_SETFL,opts) < 0) {
1145  EOUT("fcntl(F_SETFL) failed");
1146  return false;
1147  }
1148  return true;
1149 }
1150 
1151 
1153 {
1154  int one = 1;
1155  int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
1156  return res == 0;
1157 }
1158 
1159 std::string dabc::SocketThread::DefineHostName(bool force)
1160 {
1161  std::string host = dabc::Configuration::GetLocalHost();
1162 
1163  if (host.empty() && force) {
1164  char sbuf[500];
1165  if (gethostname(sbuf, sizeof(sbuf))) {
1166  EOUT("Error to get local host name");
1167  host = "localhost";
1168  } else
1169  host = sbuf;
1170  }
1171 
1172  return host;
1173 }
1174 
1175 dabc::SocketServerAddon* dabc::SocketThread::CreateServerAddon(const std::string &host, int nport, int portmin, int portmax)
1176 {
1177  char nameinfo[1024], serviceinfo[1024];
1178 
1179  int numtests = 1; // at least test value of nport
1180  if ((portmin>0) && (portmax>0) && (portmin<=portmax)) numtests+=(portmax-portmin+1);
1181 
1182  const char* hostname = host.empty() ? 0 : host.c_str();
1183 
1184  SocketServerAddon *addon = nullptr;
1185 
1186  for(int ntest=0;ntest<numtests;ntest++) {
1187 
1188  int serviceid = (ntest==0) ? nport : portmin - 1 + ntest;
1189 
1190  if (serviceid < 0) continue;
1191 
1192  struct addrinfo hints, *info = nullptr;
1193 
1194  memset(&hints, 0, sizeof(hints));
1195  hints.ai_flags = AI_PASSIVE;
1196  hints.ai_family = AF_UNSPEC; //AF_INET;
1197  hints.ai_socktype = SOCK_STREAM;
1198 
1199  char service[100];
1200  sprintf(service, "%d", serviceid);
1201 
1202  int n = getaddrinfo(hostname, service, &hints, &info);
1203 
1204  DOUT2("GetAddrInfo %s:%s res = %d", host.c_str(), service, n);
1205 
1206  if (n < 0) {
1207  EOUT("Cannot get addr info for service %s:%s", host.c_str(), service);
1208  continue;
1209  }
1210 
1211  int sockfd = -1;
1212 
1213  for (struct addrinfo *t = info; t; t = t->ai_next) {
1214 
1215  sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
1216  if (sockfd < 0) continue;
1217 
1218  int opt = 1;
1219  setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
1220 
1221  if (bind(sockfd, t->ai_addr, t->ai_addrlen) == 0) {
1222  int ni = getnameinfo(t->ai_addr, t->ai_addrlen,
1223  nameinfo, sizeof(nameinfo),
1224  serviceinfo, sizeof(serviceinfo),
1225  /*NI_NUMERICHOST | NI_NUMERICSERV */ NI_NOFQDN | NI_NUMERICSERV);
1226 
1227  if (host.empty() && (ni==0) && (strcmp(nameinfo,"0.0.0.0")!=0)) hostname = nameinfo;
1229  addon = new SocketServerAddon(sockfd, hostname ? hostname : "localhost", serviceid, t);
1230  break;
1231  }
1232  }
1233 
1234  close(sockfd);
1235  sockfd = -1;
1236  }
1237 
1238  freeaddrinfo(info);
1239 
1240  if (addon) return addon;
1241  }
1242 
1243  EOUT("Cannot bind server socket to port %d or find its in range %d:%d", nport, portmin, portmax);
1244  return nullptr;
1245 }
1246 
1247 int dabc::SocketThread::StartClient(const std::string &host, int nport, bool nonblocking)
1248 {
1249  char service[100];
1250  sprintf(service, "%d", nport);
1251 
1252  struct addrinfo hints, *info = 0;
1253  memset(&hints, 0, sizeof(hints));
1254  hints.ai_family = AF_UNSPEC;
1255  hints.ai_socktype = SOCK_STREAM;
1256 
1257  int sockfd(-1);
1258  if (getaddrinfo(host.c_str(), service, &hints, &info)!=0) return sockfd;
1259 
1260  for (struct addrinfo *t = info; t; t = t->ai_next) {
1261 
1262  sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
1263 
1264  if (sockfd<=0) { sockfd = -1; continue; }
1265 
1266  if (connect(sockfd, t->ai_addr, t->ai_addrlen)==0) {
1267  if (!nonblocking) break;
1269  break; // socket is initialized - one could return it
1270  else
1271  EOUT("Cannot set non-blocking flag for client socket");
1272  }
1273 
1274  close(sockfd);
1275  sockfd = -1;
1276  }
1277 
1278  // always must be called
1279  freeaddrinfo(info);
1280 
1281  return sockfd;
1282 }
1283 
1284 int dabc::SocketThread::SendBuffer(int fd, void* buf, int len)
1285 {
1286  return send(fd, buf, len, MSG_NOSIGNAL);
1287 }
1288 
1289 int dabc::SocketThread::RecvBuffer(int fd, void* buf, int len)
1290 {
1291  return recv(fd, buf, len, MSG_NOSIGNAL);
1292 }
1293 
1294 bool dabc::SocketThread::AttachMulticast(int socket_descriptor, const std::string &host)
1295 {
1296  if (host.empty() || (socket_descriptor<=0)) {
1297  EOUT("Multicast address or socket handle not specified");
1298  return false;
1299  }
1300 
1301  struct hostent *server_host_name = gethostbyname(host.c_str());
1302  if (server_host_name==0) {
1303  EOUT("Cannot get host information for %s", host.c_str());
1304  return false;
1305  }
1306 
1307  struct ip_mreq command;
1308 
1309  // Allow to use same port by many processes
1310 
1311 
1312  // Allow to receive broadcast to this port
1313  int loop = 1;
1314  if (setsockopt (socket_descriptor, IPPROTO_IP, IP_MULTICAST_LOOP,
1315  &loop, sizeof (loop)) < 0) {
1316  EOUT("Fail setsockopt IP_MULTICAST_LOOP");
1317  return false;
1318  }
1319 
1320  // Join the multicast group
1321  command.imr_multiaddr.s_addr = inet_addr (host.c_str());
1322  command.imr_interface.s_addr = htonl (INADDR_ANY);
1323  if (command.imr_multiaddr.s_addr == (in_addr_t)-1) {
1324  EOUT("%s is not valid address", host.c_str());
1325  return false;
1326  }
1327  if (setsockopt(socket_descriptor, IPPROTO_IP, IP_ADD_MEMBERSHIP,
1328  &command, sizeof (command)) < 0) {
1329  EOUT("File setsockopt IP_ADD_MEMBERSHIP");
1330  return false;
1331  }
1332 
1333  return true;
1334 }
1335 
1336 
1337 void dabc::SocketThread::DettachMulticast(int handle, const std::string &host)
1338 {
1339  if ((handle<0) || host.empty()) return;
1340 
1341  struct ip_mreq command;
1342 
1343  command.imr_multiaddr.s_addr = inet_addr (host.c_str());
1344  command.imr_interface.s_addr = htonl (INADDR_ANY);
1345 
1346  // Remove socket from multicast group
1347  if (setsockopt (handle, IPPROTO_IP, IP_DROP_MEMBERSHIP,
1348  &command, sizeof (command)) < 0 ) {
1349  EOUT("Fail setsockopt:IP_DROP_MEMBERSHIP");
1350  }
1351 }
1352 
1354 {
1355  int fd = socket(PF_INET, SOCK_DGRAM, 0);
1356  if (fd<0) return -1;
1357 
1358  if (SetNonBlockSocket(fd)) return fd;
1359  close(fd);
1360  return -1;
1361 }
1362 
1364 {
1365  if (fd>0) close(fd);
1366 }
1367 
1368 
1369 int dabc::SocketThread::BindUdp(int fd, int nport, int portmin, int portmax)
1370 {
1371  if (fd<0) return -1;
1372 
1373  struct sockaddr_in m_addr;
1374  int numtests = 1; // at least test value of portnum
1375  if ((portmin>0) && (portmax>0) && (portmin<=portmax)) numtests+=(portmax-portmin+1);
1376 
1377  for(int ntest=0;ntest<numtests;ntest++) {
1378  if ((ntest==0) && (nport<0)) continue;
1379  if (ntest>0) nport = portmin - 1 + ntest;
1380 
1381  memset(&m_addr, 0, sizeof(m_addr));
1382  m_addr.sin_family = AF_INET;
1383  // m_addr.s_addr = htonl (INADDR_ANY);
1384  m_addr.sin_port = htons(nport);
1385 
1386  if (bind(fd, (struct sockaddr *)&m_addr, sizeof(m_addr))==0) return nport;
1387  }
1388 
1389  return -1;
1390 }
1391 
1392 int dabc::SocketThread::ConnectUdp(int fd, const std::string &remhost, int remport)
1393 {
1394  if (fd<0) return fd;
1395 
1396  struct hostent *host = gethostbyname(remhost.c_str());
1397  if ((host==0) || (host->h_addrtype!=AF_INET) || remhost.empty()) {
1398  EOUT("Cannot get host information for %s", remhost.c_str());
1399  close(fd);
1400  return -1;
1401  }
1402 
1403  struct sockaddr_in address;
1404 
1405  memset (&address, 0, sizeof (address));
1406  address.sin_family = AF_INET;
1407  memcpy(&address.sin_addr.s_addr, host->h_addr_list[0], host->h_length);
1408  address.sin_port = htons (remport);
1409 
1410  if (connect(fd, (struct sockaddr *) &address, sizeof (address)) < 0) {
1411  EOUT("Fail to connect to host %s port %d", remhost.c_str(), remport);
1412  close(fd);
1413  return -1;
1414  }
1415  return fd;
1416 }
1417 
1418 dabc::SocketClientAddon* dabc::SocketThread::CreateClientAddon(const std::string &serverid, int dflt_port)
1419 {
1420  if (serverid.empty()) return 0;
1421 
1422  std::string host, service;
1423 
1424  size_t pos = serverid.find(':');
1425  if (pos != std::string::npos) {
1426  host = serverid.substr(0, pos);
1427  service = serverid.substr(pos+1, serverid.length()-pos);
1428  } else
1429  if (dflt_port > 0) {
1430  host = serverid;
1431  service = dabc::format("%d", dflt_port);
1432  } else {
1433  return 0;
1434  }
1435 
1436  SocketClientAddon* addon = 0;
1437 
1438  DOUT5("CreateClientAddon %s:%s", host.c_str(), service.c_str());
1439 
1440  struct addrinfo *info;
1441  struct addrinfo hints;
1442  memset(&hints, 0, sizeof(hints));
1443  hints.ai_family = AF_UNSPEC;
1444  hints.ai_socktype = SOCK_STREAM;
1445 
1446  if (getaddrinfo(host.c_str(), service.c_str(), &hints, &info)==0) {
1447  for (struct addrinfo *t = info; t; t = t->ai_next) {
1448 
1449  int sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
1450 
1451  if (sockfd<=0) continue;
1452 
1453  addon = new SocketClientAddon(t, sockfd);
1454  break;
1455  }
1456  freeaddrinfo(info);
1457  }
1458 
1459  DOUT5("CreateClientAddon %s:%s done res = %p", host.c_str(), service.c_str(), addon);
1460 
1461  return addon;
1462 }
1463 
1464 
1466 {
1467  DOUT5("SocketThread::_Fire %s nq:%d numq:%d waiting:%s", GetName(), nq, fNumQueues, DBOOL(fWaitFire));
1468 
1469  _PushEvent(arg, nq);
1470 
1471  if (fWaitFire && !fPipeFired) {
1472  auto res = write(fPipe[1], "w", 1);
1473  (void) res; // suppress compiler warnings
1474  fPipeFired = true;
1475 
1476  #ifdef SOCKET_PROFILING
1477  fPipeCalled++;
1478  #endif
1479  }
1480 }
1481 
1482 bool dabc::SocketThread::WaitEvent(EventId& evnt, double tmout_sec)
1483 {
1484  // first check, if we have already event, which must be processed
1485 
1486  #ifdef SOCKET_PROFILING
1487  fWaitCalls++;
1488 
1489  TimeStamp tm1 = dabc::Now();
1490  #endif
1491 
1492 
1493  #ifdef DABC_EXTRA_CHECKS
1494  unsigned sizebefore(0), sizeafter(0);
1495  #endif
1496 
1497  {
1498  dabc::LockGuard lock(ThreadMutex());
1499 
1500  #ifdef DABC_EXTRA_CHECKS
1501  sizebefore = _TotalNumberOfEvents();
1502  #endif
1503 
1504  // if we already have events in the queue,
1505  // check if we take them out or first check if new sockets events there
1506 
1507  if (_TotalNumberOfEvents()>0) {
1508 
1509  if (!fCheckNewEvents) return _GetNextEvent(evnt);
1510 
1511  // we have events in the queue, therefore do not wait - just check new events
1512  tmout_sec = 0.;
1513  }
1514 
1515  if (f_ufds==0) return false;
1516 
1517  fWaitFire = true;
1518  }
1519 
1520  // here we wait for next event from any socket, including pipe
1521 
1522  int numufds = 1;
1523 
1524  f_ufds[0].fd = fPipe[0];
1525  f_ufds[0].events = POLLIN;
1526  f_ufds[0].revents = 0;
1527 
1528  for(unsigned n=1; n<fWorkers.size(); n++) {
1529  if (!f_recs[n].use) continue;
1530  SocketAddon* addon = (SocketAddon*) fWorkers[n]->addon;
1531 
1532  if (addon->Socket()<=0) continue;
1533 
1534  short events = 0;
1535 
1536  if (addon->IsDoingInput())
1537  events |= POLLIN;
1538 
1539  if (addon->IsDoingOutput())
1540  events |= POLLOUT;
1541 
1542  if (events==0) continue;
1543 
1544  f_ufds[numufds].fd = addon->Socket();
1545  f_ufds[numufds].events = events;
1546  f_ufds[numufds].revents = 0;
1547 
1548  f_recs[numufds].indx = n; // this is for dereferencing of the value
1549 
1550  numufds++;
1551  }
1552 
1553  int tmout = tmout_sec < 0. ? -1 : int(tmout_sec*1000.);
1554 
1555  #ifdef SOCKET_PROFILING
1556  fWaitDone++;
1557  TimeStamp tm2 = dabc::Now();
1558 
1559  fFillTime += (tm2-tm1);
1560  #endif
1561 
1562 // DOUT2("SOCKETTHRD: start waiting %d", tmout);
1563 
1564 // DOUT0("SocketThread %s (%d) wait with timeout %d ms numufds %d", GetName(), entry_cnt, tmout, numufds);
1565 
1566  int poll_res = poll(f_ufds, numufds, tmout);
1567 
1568  #ifdef SOCKET_PROFILING
1569  TimeStamp tm3 = dabc::Now();
1570  fWaitTime += (tm3-tm2);
1571  #endif
1572 
1573  dabc::LockGuard lock(ThreadMutex());
1574 
1575 // DOUT2("SOCKETTHRD: did waiting %d numevents %u", tmout, _TotalNumberOfEvents());
1576 
1577 
1578  fWaitFire = false;
1579 
1580  // cleanup pipe in bigger steps
1581  if (fPipeFired) {
1582  char sbuf;
1583  auto res = read(fPipe[0], &sbuf, 1);
1584  (void) res; // suppress compiler warnings
1585  fPipeFired = false;
1586  }
1587 
1588  bool isany = false;
1589 
1590  // if we really has any events, analyze all of them and push in the queue
1591  if (poll_res>0)
1592  for (int imn=1; imn<numufds;imn++) {
1593 
1594  // we use shifted index, that sockets at the end of list has chance to come at right time
1595  int n = 1 + (imn + fBalanceCnt) % (numufds-1);
1596 
1597  if (f_ufds[n].revents==0) continue;
1598 
1599  SocketAddon* addon = (SocketAddon*) fWorkers[f_recs[n].indx]->addon;
1600  Worker* worker = fWorkers[f_recs[n].indx]->work;
1601 
1602  if ((addon==0) || (worker==0)) {
1603  EOUT("Something went wrong - socket addon=%p worker = %p, something is gone", addon, worker);
1604  exit(543);
1605  }
1606 
1607 
1608  if (f_ufds[n].revents & (POLLERR | POLLHUP | POLLNVAL)) {
1609 // EOUT("Error on the socket %d", f_ufds[n].fd);
1610  _PushEvent(EventId(SocketAddon::evntSocketError, f_recs[n].indx), 0);
1611  addon->SetDoingInput(false);
1612  addon->SetDoingOutput(false);
1613  IncWorkerFiredEvents(worker);
1614  isany = true;
1615  }
1616 
1617  if (f_ufds[n].revents & (POLLIN | POLLPRI)) {
1618  _PushEvent(EventId(SocketAddon::evntSocketRead, f_recs[n].indx), addon->fIOPriority);
1619  addon->SetDoingInput(false);
1620  IncWorkerFiredEvents(worker);
1621  isany = true;
1622  }
1623 
1624  if (f_ufds[n].revents & POLLOUT) {
1625  _PushEvent(EventId(SocketAddon::evntSocketWrite, f_recs[n].indx), addon->fIOPriority);
1626  addon->SetDoingOutput(false);
1627  IncWorkerFiredEvents(worker);
1628  isany = true;
1629  }
1630  }
1631 
1632 // DOUT0("SocketThread %s (%d) did wait with res %d isany %s", GetName(), entry_cnt, poll_res, DBOOL(isany));
1633 
1634  // we put additional event to enable again sockets checking
1635  if (isany) {
1636  fCheckNewEvents = false;
1637  _PushEvent(evntEnableCheck, 1);
1638  fBalanceCnt = (fBalanceCnt + 1) % 100000; // just use big number, fWorker.size() is not proper here
1639  }
1640 
1641  #ifdef DABC_EXTRA_CHECKS
1642  sizeafter = _TotalNumberOfEvents();
1643 // if (sizeafter-sizebefore > 1) DOUT0("Thread:%s before:%u after:%u diff:%u", GetName(), sizebefore, sizeafter, sizeafter - sizebefore);
1644  #endif
1645 
1646  return _GetNextEvent(evnt);
1647 }
1648 
1650 {
1651  if (evid.GetCode() == evntEnableCheck) {
1652  fCheckNewEvents = true;
1653  return;
1654  }
1655 
1657 }
1658 
1659 
1661 {
1662  unsigned new_sz = fWorkers.size();
1663 
1664  if (new_sz > f_sizeufds) {
1665 
1666  delete[] f_ufds;
1667  delete[] f_recs;
1668  f_ufds = new pollfd [new_sz];
1669  f_recs = new ProcRec [new_sz];
1670 
1671  f_sizeufds = new_sz;
1672  }
1673 
1674  memset(f_ufds, 0, sizeof(pollfd) * f_sizeufds);
1675  memset(f_recs, 0, sizeof(ProcRec) * f_sizeufds);
1676 
1677  f_recs[0].use = true;
1678  f_recs[0].indx = 0;
1679  fIsAnySocket = false;
1680 
1681 // DOUT0("SocketThread %s WorkersNumberChanged size %u", GetName(), fWorkers.size());
1682 
1683  for (unsigned indx=1;indx<fWorkers.size();indx++) {
1684  SocketAddon* addon = dynamic_cast<SocketAddon*> (fWorkers[indx]->addon);
1685 
1686  f_recs[indx].use = addon!=0;
1687 
1688  if (addon!=0) {
1689  fIsAnySocket = true;
1690 // DOUT0("Socket %d doing input=%s output=%s", addon->Socket(), DBOOL(addon->IsDoingInput()), DBOOL(addon->IsDoingOutput()));
1691  }
1692  }
1693 
1694  // any time new processor is added, check for new socket events
1695  fCheckNewEvents = fIsAnySocket;
1696 
1697 // DOUT0("SocketThread %s WorkersNumberChanged %u done", GetName(), fWorkers.size());
1698 
1699 }
const char * SocketErr(int err)
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned NumSegments() const
Returns number of segment in buffer.
Definition: Buffer.h:163
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
Definition: Buffer.h:174
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Definition: Buffer.h:171
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
static std::string GetLocalHost()
Lock guard for posix mutex.
Definition: threads.h:127
Reference on the arbitrary object
Definition: Reference.h:73
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Special addon class for handling of socket and socket events.
Definition: SocketThread.h:52
virtual void ProcessEvent(const EventId &)
bool IsDoingOutput() const
Definition: SocketThread.h:112
virtual void OnSocketError(int msg, const std::string &info)
Generic error handler.
void SetDoingInput(bool on=true)
Call method to indicate that object wants to read data from the socket.
Definition: SocketThread.h:69
int Socket() const
Definition: SocketThread.h:108
int fIOPriority
priority of socket I/O events, default 1
Definition: SocketThread.h:61
virtual ~SocketAddon()
ssize_t DoSendBuffer(void *buf, ssize_t len)
bool IsDoingInput() const
Definition: SocketThread.h:111
ssize_t DoSendBufferHdr(void *hdr, ssize_t hdrlen, void *buf, ssize_t len, void *tgtaddr=0, unsigned tgtaddrlen=0)
void SetSocket(int fd)
SocketAddon(int fd=-1)
ssize_t DoRecvBufferHdr(void *hdr, ssize_t hdrlen, void *buf, ssize_t len, void *srcaddr=0, unsigned srcaddrlen=0)
void SetDoingOutput(bool on=true)
Call method to indicate that worker wants to write data to the socket.
Definition: SocketThread.h:73
ssize_t DoRecvBuffer(void *buf, ssize_t len)
Socket addon for handling connection on client side.
Definition: SocketThread.h:322
virtual void OnConnectionFailed()
struct addrinfo fServAddr
Definition: SocketThread.h:341
void SetRetryOpt(int nretry, double tmout=1.)
virtual void OnThreadAssigned()
SocketClientAddon(const struct addrinfo *serv_addr, int fd=-1)
virtual double ProcessTimeout(double)
virtual void OnConnectionEstablished(int fd)
virtual void ProcessEvent(const EventId &)
Socket addon for handling connection events.
Definition: SocketThread.h:244
void AllocateSendIOV(unsigned size)
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
bool fUseMsgOper
indicate if sendmsg, recvmsg operations should be used, it is must for the datagram sockets
Definition: SocketThread.h:147
virtual ~SocketIOAddon()
Destructor of SocketIOAddon class.
void SetSendAddr(const std::string &host="", int port=0)
Set destination address for all send operations,.
struct sockaddr_in fSendAddr
optional send address for next send operation
Definition: SocketThread.h:155
bool IsDatagramSocket() const
Definition: SocketThread.h:178
void AllocateRecvIOV(unsigned size)
bool StartRecv(void *buf, size_t size)
bool StartNetSend(void *hdr, unsigned hdrsize, const Buffer &buf)
bool StartRecvHdr(void *hdr, unsigned hdrsize, void *buf, size_t size)
virtual void ProcessEvent(const EventId &)
SocketIOAddon(int fd=0, bool isdatagram=false, bool usemsg=true)
Constructor of SocketIOAddon class.
void CancelIOOperations()
Method should be used to cancel all running I/O operation of the socket.
bool StartNetRecv(void *hdr, unsigned hdrsize, Buffer &buf, BufferSize_t datasize)
Socket addon for handling connection requests on server side.
Definition: SocketThread.h:285
std::string fServerHostName
Definition: SocketThread.h:287
struct sockaddr_storage ai_addr
Definition: SocketThread.h:294
virtual void OnClientConnected(int fd)
SocketServerAddon(int serversocket, const char *hostname=nullptr, int portnum=-1, struct addrinfo *info=nullptr)
virtual void ProcessEvent(const EventId &)
static std::string DefineHostName(bool force=true)
Return current host name.
virtual bool WaitEvent(EventId &, double tmout)
int fPipe[2]
array with i/o pipes handles
Definition: SocketThread.h:367
static void CloseUdp(int fd)
Close datagram (udp) socket.
virtual void ProcessExtraThreadEvent(const EventId &evid)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
static SocketClientAddon * CreateClientAddon(const std::string &servid, int dflt_port=-1)
virtual void _Fire(const EventId &evnt, int nq)
static void DettachMulticast(int handle, const std::string &addr)
Detach datagram socket from multicast group.
virtual void WorkersSetChanged()
Virtual method, called from thread context to inform that number of workers are changed.
static int SendBuffer(int fd, void *buf, int len)
Wrapper for send method, should be used for blocking sockets.
static int StartClient(const std::string &host, int nport, bool nonblocking=true)
static int CreateUdp()
Create datagram (udp) socket.
virtual bool CompatibleClass(const std::string &clname) const
static bool AttachMulticast(int handle, const std::string &addr)
Attach datagram socket to multicast group to make receiving.
static int RecvBuffer(int fd, void *buf, int len)
Wrapper for recv method, should be used for blocking sockets.
static int BindUdp(int fd, int nport, int portmin=-1, int portmax=-1)
Bind UDP socket to specified port.
static int ConnectUdp(int fd, const std::string &remhost, int remport)
SocketThread(Reference parent, const std::string &name, Command cmd)
static bool SetNonBlockSocket(int fd)
static SocketServerAddon * CreateServerAddon(const std::string &host, int nport, int portmin=-1, int portmax=-1)
Create handle for server-side connection If hostname == 0, any available address will be selected If ...
static bool SetNoDelaySocket(int fd)
Represent thread functionality.
Definition: Thread.h:109
virtual bool CompatibleClass(const std::string &clname) const
Definition: Thread.cxx:428
virtual void ProcessExtraThreadEvent(const EventId &)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
Definition: Thread.h:390
Generic addon for dabc::Worker.
Definition: Worker.h:49
virtual void ProcessEvent(const EventId &)
Definition: Worker.h:60
virtual void OnThreadAssigned()
Definition: Worker.h:73
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
Event manipulation API.
Definition: api.h:23
const char * typeSocketThread
Definition: Object.cxx:80
TimeStamp Now()
Definition: timing.h:260
uint32_t BufferSize_t
Definition: Buffer.h:32
std::string format(const char *fmt,...)
Definition: string.cxx:49
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
uint16_t GetCode() const
Definition: Thread.h:92
Class for acquiring and holding timestamps.
Definition: timing.h:40