18 #include <sys/types.h>
19 #include <sys/socket.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <arpa/inet.h>
32 #if !defined(MSG_NOSIGNAL)
33 #define MSG_NOSIGNAL SO_NOSIGPIPE
40 case -1:
return "Internal";
41 case 0:
return "Close";
42 case EAGAIN:
return "EAGAIN";
43 case EBADF:
return "EBADF";
44 case ECONNREFUSED:
return "ECONNREFUSED";
45 case EFAULT:
return "EFAULT";
46 case EINTR:
return "EINTR";
47 case EINVAL:
return "EINVAL";
48 case ENOMEM:
return "ENOMEM";
49 case ENOTCONN:
return "ENOTCONN";
50 case ENOTSOCK:
return "ENOTSOCK";;
52 case EACCES:
return "EACCES";
53 case ECONNRESET:
return "ECONNRESET";
54 case EDESTADDRREQ:
return "EDESTADDRREQ";
55 case EISCONN:
return "EISCONN";
56 case EMSGSIZE:
return "EMSGSIZE";
57 case ENOBUFS:
return "ENOBUFS";
58 case EOPNOTSUPP:
return "EOPNOTSUPP";
59 case EPIPE:
return "EPIPE";
61 case EPERM:
return "EPERM";
62 case EADDRINUSE:
return "EADDRINUSE";
63 case EAFNOSUPPORT:
return "EAFNOSUPPORT";
64 case EALREADY:
return "EALREADY";
65 case EINPROGRESS:
return "EINPROGRESS";
66 case ENETUNREACH:
return "ENETUNREACH";
67 case ETIMEDOUT:
return "ETIMEDOUT";
81 fDeliverEventsToWorker(false),
82 fDeleteWorkerOnClose(false)
101 case evntSocketError:
102 OnSocketError(-1,
"get error event");
125 if (fSocket<0)
return;
127 DOUT3(
"~~~~~~~~~~~~~~~~ Close socket %d", fSocket);
134 if (Socket()<0)
return -1;
136 int myerrno = 753642;
137 socklen_t optlen =
sizeof(myerrno);
139 int res = getsockopt(Socket(), SOL_SOCKET, SO_ERROR, &myerrno, &optlen);
141 if ((res<0) || (myerrno == 753642))
return -1;
148 if (IsDeliverEventsToWorker()) {
149 DOUT2(
"Addon:%p Connection closed - worker should process",
this);
150 FireWorkerEvent(msg==0 ? evntSocketCloseInfo : evntSocketErrorInfo);
152 if (fDeleteWorkerOnClose) {
153 DOUT2(
"Connection closed - destroy socket");
157 DOUT2(
"Connection closed - destroy addon");
165 ssize_t res = recv(fSocket, buf, len, MSG_DONTWAIT | MSG_NOSIGNAL);
167 if (res==0) OnSocketError(0,
"closed during recv()");
else
169 if (errno!=EAGAIN) OnSocketError(errno,
"when recv()");
179 iov[0].iov_base = hdr;
180 iov[0].iov_len = hdrlen;
182 iov[1].iov_base = buf;
183 iov[1].iov_len = len;
187 msg.msg_name = srcaddr;
188 msg.msg_namelen = srcaddrlen;
190 msg.msg_iovlen = buf ? 2 : 1;
192 msg.msg_controllen = 0;
195 ssize_t res = recvmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
197 if (res==0) OnSocketError(0,
"when recvmsg()");
else
199 if (errno!=EAGAIN) OnSocketError(errno,
"when recvmsg()");
207 ssize_t res = send(fSocket, buf, len, MSG_DONTWAIT | MSG_NOSIGNAL);
209 if (res==0) OnSocketError(0,
"when send()");
else
211 if (errno!=EAGAIN) OnSocketError(errno,
"When send()");
222 iov[0].iov_base = hdr;
223 iov[0].iov_len = hdrlen;
225 iov[1].iov_base = buf;
226 iov[1].iov_len = len;
230 msg.msg_name = tgtaddr;
231 msg.msg_namelen = tgtaddrlen;
233 msg.msg_iovlen = buf ? 2 : 1;
235 msg.msg_controllen = 0;
238 ssize_t res = sendmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
240 if (res==0) OnSocketError(0,
"when sendmsg()");
else
242 if (errno!=EAGAIN) OnSocketError(errno,
"When sendmsg()");
253 fDatagramSocket(isdatagram),
269 EOUT(
"Dangerous - datagram socket MUST use sendmsg()/recvmsg() operation to be able send/recv segmented buffers, force");
275 #ifdef SOCKET_PROFILING
287 #ifdef SOCKET_PROFILING
288 DOUT1(
"SocketIOAddon::~SocketIOAddon Send:%ld Recv:%ld", fSendOper, fRecvOper);
290 DOUT1(
" Send time:%5.1f microsec sz:%7.1f", fSendTime*1e6/fSendOper, 1.*fSendSize/fSendOper);
292 DOUT1(
" Recv time:%5.1f microsec sz:%7.1f", fRecvTime*1e6/fRecvOper, 1.*fRecvSize/fRecvOper);
295 DOUT4(
"Destroying SocketIOAddon %p fd:%d",
this, Socket());
303 if (!IsDatagramSocket()) {
304 EOUT(
"Cannot specify send addr for non-datagram sockets");
307 memset(&fSendAddr, 0,
sizeof(fSendAddr));
308 fSendUseAddr =
false;
318 struct hostent *h = gethostbyname(host.c_str());
319 if ((h==0) || (h->h_addrtype!=AF_INET) || host.empty()) {
320 EOUT(
"Cannot get host information for %s", host.c_str());
324 fSendAddr.sin_family = AF_INET;
325 memcpy(&fSendAddr.sin_addr.s_addr, h->h_addr_list[0], h->h_length);
326 fSendAddr.sin_port = htons (port);
335 if (fSendIOV!=0)
delete [] fSendIOV;
345 fSendIOV =
new struct iovec [size];
350 if (fRecvIOV!=0)
delete [] fRecvIOV;
360 fRecvIOV =
new struct iovec [size];
364 const void* buf2,
unsigned size2,
365 const void* buf3,
unsigned size3)
367 if (fSendIOVNumber>0) {
368 EOUT(
"Current send operation not yet completed");
372 if (fSendIOVSize<3) AllocateSendIOV(8);
375 if (buf1 && (size1>0)) {
376 fSendIOV[indx].iov_base = (
void*) buf1;
377 fSendIOV[indx].iov_len = size1;
381 if (buf2 && (size2>0)) {
382 fSendIOV[indx].iov_base = (
void*) buf2;
383 fSendIOV[indx].iov_len = size2;
387 if (buf3 && (size3>0)) {
388 fSendIOV[indx].iov_base = (
void*) buf3;
389 fSendIOV[indx].iov_len = size3;
394 EOUT(
"No buffer specified");
398 fSendUseMsg = fUseMsgOper;
400 fSendIOVNumber = indx;
403 SetDoingOutput(
true);
411 return StartRecvHdr(0, 0, buf, size);
419 return StartNetSend(0, 0, buf);
424 if (fRecvIOVNumber>0) {
425 EOUT(
"Current recv operation not yet completed");
429 if (fRecvIOVSize<2) AllocateRecvIOV(8);
433 if ((hdr!=0) && (hdrsize>0)) {
434 fRecvIOV[indx].iov_base = hdr;
435 fRecvIOV[indx].iov_len = hdrsize;
439 fRecvIOV[indx].iov_base = buf;
440 fRecvIOV[indx].iov_len = size;
443 fRecvUseMsg = fUseMsgOper;
445 fRecvIOVNumber = indx;
456 return StartNetRecv(0, 0, buf, datasize);
463 if (fRecvIOVNumber>0) {
464 EOUT(
"Current recv operation not yet completed");
468 if (buf.
null())
return false;
472 fRecvUseMsg = fUseMsgOper;
477 if ((hdr!=0) && (hdrsize>0)) {
478 fRecvIOV[indx].iov_base = hdr;
479 fRecvIOV[indx].iov_len = hdrsize;
483 for (
unsigned nseg=0; nseg<buf.
NumSegments(); nseg++) {
485 if (segsize>datasize) segsize = datasize;
486 if (segsize==0)
break;
488 fRecvIOV[indx].iov_base = buf.
SegmentPtr(nseg);
489 fRecvIOV[indx].iov_len = segsize;
495 fRecvIOVNumber = indx;
505 if (fSendIOVNumber>0) {
506 EOUT(
"Current send operation not yet completed");
510 if (buf.
null())
return false;
514 fSendUseMsg = fUseMsgOper;
519 if ((hdr!=0) && (hdrsize>0)) {
520 fSendIOV[indx].iov_base = hdr;
521 fSendIOV[indx].iov_len = hdrsize;
525 for (
unsigned nseg=0; nseg<buf.
NumSegments(); nseg++) {
526 fSendIOV[indx].iov_base = buf.
SegmentPtr(nseg);
531 fSendIOVNumber = indx;
534 SetDoingOutput(
true);
544 case evntSocketRead: {
547 DOUT0(
"Socket %d wants to receive number %d usemsg %s", Socket(), fRecvIOVNumber,
DBOOL(fRecvUseMsg));
550 if (fRecvIOVNumber==0)
return;
552 if ((fRecvIOV==0) || (fSocket<0)) {
553 EOUT(
"HARD PROBLEM when reading socket");
554 OnSocketError(-1,
"Missing socket when evntSocketRead fired");
558 #ifdef SOCKET_PROFILING
574 msg.msg_name = &fRecvAddr;
575 msg.msg_namelen =
sizeof(fRecvAddr);
576 msg.msg_iov = &(fRecvIOV[fRecvIOVFirst]);
577 msg.msg_iovlen = fRecvIOVNumber - fRecvIOVFirst;
579 msg.msg_controllen = 0;
582 res = recvmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
584 socklen_t addrlen =
sizeof(fRecvAddr);
585 res = recvfrom(fSocket, fRecvIOV[fRecvIOVFirst].iov_base, fRecvIOV[fRecvIOVFirst].iov_len, MSG_DONTWAIT | MSG_NOSIGNAL, (sockaddr*) &fRecvAddr, &addrlen);
591 #ifdef SOCKET_PROFILING
593 fRecvTime += (tm2-tm1);
594 if (res>0) fRecvSize += res;
599 OnSocketError(0,
"when recvmsg()");
605 OnSocketError(errno,
"when recvmsg()");
610 EOUT(
"Why socket read message produce but we do not get any data??");
618 if (IsDatagramSocket()) {
632 struct iovec* rec = &(fRecvIOV[fRecvIOVFirst]);
634 if (rec->iov_len <= (
unsigned) res) {
639 if (fRecvIOVFirst==fRecvIOVNumber) {
640 if (res!=0)
EOUT(
"Internal error - length after recvmsg() not zero");
654 rec->iov_base = (
char*)rec->iov_base + res;
665 case evntSocketWrite: {
667 if (fSendIOVNumber==0)
return;
669 #ifdef SOCKET_PROFILING
674 if ((fSocket<0) || (fSendIOV==0)) {
675 EOUT(
"HARD PROBLEM when trying write socket");
684 msg.msg_name = fSendUseAddr ? &fSendAddr : 0;
685 msg.msg_namelen = fSendUseAddr ?
sizeof(fSendAddr) : 0;;
686 msg.msg_iov = &(fSendIOV[fSendIOVFirst]);
687 msg.msg_iovlen = fSendIOVNumber - fSendIOVFirst;
689 msg.msg_controllen = 0;
692 res = sendmsg(fSocket, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
694 res = send(fSocket, fSendIOV[fSendIOVFirst].iov_base, fSendIOV[fSendIOVFirst].iov_len, MSG_DONTWAIT | MSG_NOSIGNAL);
696 #ifdef SOCKET_PROFILING
698 fSendTime += (tm2-tm1);
699 if (res>0) fSendSize += res;
704 OnSocketError(0,
"when sendmsg()");
709 DOUT2(
"Error when sending via socket %d usemsg %s first %d number %d", fSocket,
DBOOL(fSendUseMsg), fSendIOVFirst, fSendIOVNumber);
712 OnSocketError(errno,
"when sendmsg()");
716 SetDoingOutput(
true);
717 EOUT(
"Why socket write message produce but we did not send any bytes?");
722 DOUT5(
"Socket %d send %d bytes", Socket(), res);
725 struct iovec* rec = &(fSendIOV[fSendIOVFirst]);
727 if (rec->iov_len <= (
unsigned) res) {
732 if (fSendIOVFirst==fSendIOVNumber) {
733 if (res!=0)
EOUT(
"Internal error - length after sendmsg() not zero");
744 rec->iov_base = (
char*)rec->iov_base + res;
750 SetDoingOutput(
true);
769 fServerHostName(hostname ? hostname :
""),
770 fServerPortNumber(portnum)
780 memcpy(&
ai_addr, info->ai_addr, info->ai_addrlen);
786 DOUT2(
"Create dabc::SocketServerAddon");
792 case evntSocketRead: {
797 int connfd = accept(Socket(), 0, 0);
800 EOUT(
"Error with accept");
802 if (fAcceptErrors >= 1000) {
804 if (fAcceptErrors % 100 == 0)
807 if (fAcceptErrors > 2000) {
808 EOUT(
"Fatal - too many accept errors, abort application");
816 listen(Socket(), 10);
819 EOUT(
"Cannot set nonblocking flag for connected socket");
824 DOUT2(
"We get new connection with fd: %d", connfd);
826 OnClientConnected(connfd);
839 cmd.
SetStr(
"Type",
"Server");
842 if (!fConnRcv.null() && !fConnId.empty()) {
843 cmd.
SetStr(
"ConnId", fConnId);
844 fConnRcv.Submit(cmd);
846 if (!fWorker.null()) {
847 ((
Worker*) fWorker())->Submit(cmd);
849 EOUT(
"Method not implemented - socked will be closed");
856 if (ai_addrlen == 0)
return false;
859 int sockfd = socket(ai_family, ai_socktype, ai_protocol);
860 if (sockfd < 0)
return false;
863 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(opt));
865 if (bind(sockfd, (
struct sockaddr *) &ai_addr, ai_addrlen) == 0) {
886 EOUT(
"Server address not specified");
890 fServAddr.ai_flags = serv_addr->ai_flags;
891 fServAddr.ai_family = serv_addr->ai_family;
892 fServAddr.ai_socktype = serv_addr->ai_socktype;
893 fServAddr.ai_protocol = serv_addr->ai_protocol;
894 fServAddr.ai_addrlen = serv_addr->ai_addrlen;
896 if ((serv_addr->ai_addrlen>0) && (serv_addr->ai_addr!=0)) {
901 EOUT(
"Memory allocation error");
905 if (serv_addr->ai_canonname) {
906 size_t len = strlen(serv_addr->ai_canonname);
907 fServAddr.ai_canonname = (
char*) malloc(len + 1);
909 strncpy(
fServAddr.ai_canonname, serv_addr->ai_canonname, len+1);
911 EOUT(
"Memory allocation error");
920 free(fServAddr.ai_addr); fServAddr.ai_addr =
nullptr;
921 free(fServAddr.ai_canonname); fServAddr.ai_canonname =
nullptr;
934 FireWorkerEvent(evntSocketStartConnect);
940 case evntSocketWrite: {
943 if (Socket()<=0)
return;
947 int myerrno = TakeSocketError();
951 int fd = TakeSocket();
952 OnConnectionEstablished(fd);
956 DOUT3(
"Postponed connect socket err:%d %s", myerrno,
SocketErr(myerrno));
961 case evntSocketError: {
967 case evntSocketStartConnect: {
970 DOUT3(
"Start next connect attempt sock:%d", Socket());
973 int fd = socket(fServAddr.ai_family, fServAddr.ai_socktype, fServAddr.ai_protocol);
976 EOUT(
"Cannot create socket with given address");
984 int res = connect(Socket(), fServAddr.ai_addr, fServAddr.ai_addrlen);
987 int fd = TakeSocket();
988 OnConnectionEstablished(fd);
992 if (errno==EINPROGRESS) {
994 SetDoingOutput(
true);
998 DOUT3(
"When calling connection socket err:%d %s", errno,
SocketErr(errno));
1009 SetDoingOutput(
false);
1013 DOUT3(
"Try connect after %5.1f s n:%d", fRetryTmout, fRetry);
1015 ActivateTimeout(fRetryTmout > 0. ? fRetryTmout : 0.);
1017 OnConnectionFailed();
1025 FireWorkerEvent(evntSocketStartConnect);
1032 cmd.
SetStr(
"Type",
"Client");
1034 cmd.
SetStr(
"ConnId", fConnId);
1036 if (!fWorker.null() && IsDeliverEventsToWorker()) {
1037 ((
Worker*) fWorker())->Submit(cmd);
1041 if (!fConnRcv.null()) {
1042 fConnRcv.Submit(cmd);
1044 EOUT(
"Connection established, but not processed - close socket");
1054 cmd.
SetStr(
"Type",
"Error");
1055 cmd.
SetStr(
"ConnId", fConnId);
1057 if (!fWorker.null() && IsDeliverEventsToWorker()) {
1058 ((
Worker*) fWorker())->Submit(cmd);
1062 if (!fConnRcv.null()) {
1063 fConnRcv.Submit(cmd);
1065 EOUT(
"Connection failed to establish, error not processed");
1082 fIsAnySocket(false),
1083 fCheckNewEvents(true),
1087 #ifdef SOCKET_PROFILING
1097 auto res = pipe(
fPipe);
1109 DOUT3(
"~~~~~~~~~~~~~~ SOCKThread %s destructor with timeout %3.1fs", GetName(), GetStopTimeout());
1110 Stop(GetStopTimeout());
1112 if (fPipe[0]!=0) { close(fPipe[0]); fPipe[0] = 0; }
1113 if (fPipe[1]!=0) { close(fPipe[1]); fPipe[1] = 0; }
1123 #ifdef SOCKET_PROFILING
1124 DOUT1(
"Thrd:%s Wait called %ld done %ld ratio %5.3f %s Pipe:%ld", GetName(), fWaitCalls, fWaitDone, (fWaitCalls>0 ? 100.*fWaitDone/fWaitCalls : 0.) ,
"%", fPipeCalled);
1126 DOUT1(
"Aver times fill:%5.1f microsec wait:%5.1f microsec", fFillTime*1e6/fWaitDone, fWaitTime*1e6/fWaitDone);
1138 int opts = fcntl(fd, F_GETFL);
1140 EOUT(
"fcntl(F_GETFL) failed");
1143 opts = (opts | O_NONBLOCK);
1144 if (fcntl(fd, F_SETFL,opts) < 0) {
1145 EOUT(
"fcntl(F_SETFL) failed");
1155 int res = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &one,
sizeof(one));
1163 if (host.empty() && force) {
1165 if (gethostname(sbuf,
sizeof(sbuf))) {
1166 EOUT(
"Error to get local host name");
1177 char nameinfo[1024], serviceinfo[1024];
1180 if ((portmin>0) && (portmax>0) && (portmin<=portmax)) numtests+=(portmax-portmin+1);
1182 const char* hostname = host.empty() ? 0 : host.c_str();
1186 for(
int ntest=0;ntest<numtests;ntest++) {
1188 int serviceid = (ntest==0) ? nport : portmin - 1 + ntest;
1190 if (serviceid < 0)
continue;
1192 struct addrinfo hints, *info =
nullptr;
1194 memset(&hints, 0,
sizeof(hints));
1195 hints.ai_flags = AI_PASSIVE;
1196 hints.ai_family = AF_UNSPEC;
1197 hints.ai_socktype = SOCK_STREAM;
1200 sprintf(service,
"%d", serviceid);
1202 int n = getaddrinfo(hostname, service, &hints, &info);
1204 DOUT2(
"GetAddrInfo %s:%s res = %d", host.c_str(), service, n);
1207 EOUT(
"Cannot get addr info for service %s:%s", host.c_str(), service);
1213 for (
struct addrinfo *t = info; t; t = t->ai_next) {
1215 sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
1216 if (sockfd < 0)
continue;
1219 setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(opt));
1221 if (bind(sockfd, t->ai_addr, t->ai_addrlen) == 0) {
1222 int ni = getnameinfo(t->ai_addr, t->ai_addrlen,
1223 nameinfo,
sizeof(nameinfo),
1224 serviceinfo,
sizeof(serviceinfo),
1225 NI_NOFQDN | NI_NUMERICSERV);
1227 if (host.empty() && (ni==0) && (strcmp(nameinfo,
"0.0.0.0")!=0)) hostname = nameinfo;
1229 addon =
new SocketServerAddon(sockfd, hostname ? hostname :
"localhost", serviceid, t);
1240 if (addon)
return addon;
1243 EOUT(
"Cannot bind server socket to port %d or find its in range %d:%d", nport, portmin, portmax);
1250 sprintf(service,
"%d", nport);
1252 struct addrinfo hints, *info = 0;
1253 memset(&hints, 0,
sizeof(hints));
1254 hints.ai_family = AF_UNSPEC;
1255 hints.ai_socktype = SOCK_STREAM;
1258 if (getaddrinfo(host.c_str(), service, &hints, &info)!=0)
return sockfd;
1260 for (
struct addrinfo *t = info; t; t = t->ai_next) {
1262 sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
1264 if (sockfd<=0) { sockfd = -1;
continue; }
1266 if (connect(sockfd, t->ai_addr, t->ai_addrlen)==0) {
1267 if (!nonblocking)
break;
1271 EOUT(
"Cannot set non-blocking flag for client socket");
1286 return send(fd, buf, len, MSG_NOSIGNAL);
1291 return recv(fd, buf, len, MSG_NOSIGNAL);
1296 if (host.empty() || (socket_descriptor<=0)) {
1297 EOUT(
"Multicast address or socket handle not specified");
1301 struct hostent *server_host_name = gethostbyname(host.c_str());
1302 if (server_host_name==0) {
1303 EOUT(
"Cannot get host information for %s", host.c_str());
1307 struct ip_mreq command;
1314 if (setsockopt (socket_descriptor, IPPROTO_IP, IP_MULTICAST_LOOP,
1315 &loop,
sizeof (loop)) < 0) {
1316 EOUT(
"Fail setsockopt IP_MULTICAST_LOOP");
1321 command.imr_multiaddr.s_addr = inet_addr (host.c_str());
1322 command.imr_interface.s_addr = htonl (INADDR_ANY);
1323 if (command.imr_multiaddr.s_addr == (in_addr_t)-1) {
1324 EOUT(
"%s is not valid address", host.c_str());
1327 if (setsockopt(socket_descriptor, IPPROTO_IP, IP_ADD_MEMBERSHIP,
1328 &command,
sizeof (command)) < 0) {
1329 EOUT(
"File setsockopt IP_ADD_MEMBERSHIP");
1339 if ((handle<0) || host.empty())
return;
1341 struct ip_mreq command;
1343 command.imr_multiaddr.s_addr = inet_addr (host.c_str());
1344 command.imr_interface.s_addr = htonl (INADDR_ANY);
1347 if (setsockopt (handle, IPPROTO_IP, IP_DROP_MEMBERSHIP,
1348 &command,
sizeof (command)) < 0 ) {
1349 EOUT(
"Fail setsockopt:IP_DROP_MEMBERSHIP");
1355 int fd = socket(PF_INET, SOCK_DGRAM, 0);
1356 if (fd<0)
return -1;
1358 if (SetNonBlockSocket(fd))
return fd;
1365 if (fd>0) close(fd);
1371 if (fd<0)
return -1;
1373 struct sockaddr_in m_addr;
1375 if ((portmin>0) && (portmax>0) && (portmin<=portmax)) numtests+=(portmax-portmin+1);
1377 for(
int ntest=0;ntest<numtests;ntest++) {
1378 if ((ntest==0) && (nport<0))
continue;
1379 if (ntest>0) nport = portmin - 1 + ntest;
1381 memset(&m_addr, 0,
sizeof(m_addr));
1382 m_addr.sin_family = AF_INET;
1384 m_addr.sin_port = htons(nport);
1386 if (bind(fd, (
struct sockaddr *)&m_addr,
sizeof(m_addr))==0)
return nport;
1394 if (fd<0)
return fd;
1396 struct hostent *host = gethostbyname(remhost.c_str());
1397 if ((host==0) || (host->h_addrtype!=AF_INET) || remhost.empty()) {
1398 EOUT(
"Cannot get host information for %s", remhost.c_str());
1403 struct sockaddr_in address;
1405 memset (&address, 0,
sizeof (address));
1406 address.sin_family = AF_INET;
1407 memcpy(&address.sin_addr.s_addr, host->h_addr_list[0], host->h_length);
1408 address.sin_port = htons (remport);
1410 if (connect(fd, (
struct sockaddr *) &address,
sizeof (address)) < 0) {
1411 EOUT(
"Fail to connect to host %s port %d", remhost.c_str(), remport);
1420 if (serverid.empty())
return 0;
1422 std::string host, service;
1424 size_t pos = serverid.find(
':');
1425 if (pos != std::string::npos) {
1426 host = serverid.substr(0, pos);
1427 service = serverid.substr(pos+1, serverid.length()-pos);
1429 if (dflt_port > 0) {
1438 DOUT5(
"CreateClientAddon %s:%s", host.c_str(), service.c_str());
1440 struct addrinfo *info;
1441 struct addrinfo hints;
1442 memset(&hints, 0,
sizeof(hints));
1443 hints.ai_family = AF_UNSPEC;
1444 hints.ai_socktype = SOCK_STREAM;
1446 if (getaddrinfo(host.c_str(), service.c_str(), &hints, &info)==0) {
1447 for (
struct addrinfo *t = info; t; t = t->ai_next) {
1449 int sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
1451 if (sockfd<=0)
continue;
1459 DOUT5(
"CreateClientAddon %s:%s done res = %p", host.c_str(), service.c_str(), addon);
1467 DOUT5(
"SocketThread::_Fire %s nq:%d numq:%d waiting:%s", GetName(), nq, fNumQueues,
DBOOL(fWaitFire));
1469 _PushEvent(arg, nq);
1471 if (fWaitFire && !fPipeFired) {
1472 auto res = write(fPipe[1],
"w", 1);
1476 #ifdef SOCKET_PROFILING
1486 #ifdef SOCKET_PROFILING
1493 #ifdef DABC_EXTRA_CHECKS
1494 unsigned sizebefore(0), sizeafter(0);
1500 #ifdef DABC_EXTRA_CHECKS
1501 sizebefore = _TotalNumberOfEvents();
1507 if (_TotalNumberOfEvents()>0) {
1509 if (!fCheckNewEvents)
return _GetNextEvent(evnt);
1515 if (f_ufds==0)
return false;
1524 f_ufds[0].fd = fPipe[0];
1525 f_ufds[0].events = POLLIN;
1526 f_ufds[0].revents = 0;
1528 for(
unsigned n=1; n<fWorkers.size(); n++) {
1529 if (!f_recs[n].use)
continue;
1532 if (addon->
Socket()<=0)
continue;
1542 if (events==0)
continue;
1544 f_ufds[numufds].fd = addon->
Socket();
1545 f_ufds[numufds].events = events;
1546 f_ufds[numufds].revents = 0;
1548 f_recs[numufds].indx = n;
1553 int tmout = tmout_sec < 0. ? -1 : int(tmout_sec*1000.);
1555 #ifdef SOCKET_PROFILING
1559 fFillTime += (tm2-tm1);
1566 int poll_res = poll(f_ufds, numufds, tmout);
1568 #ifdef SOCKET_PROFILING
1570 fWaitTime += (tm3-tm2);
1583 auto res = read(fPipe[0], &sbuf, 1);
1592 for (
int imn=1; imn<numufds;imn++) {
1595 int n = 1 + (imn + fBalanceCnt) % (numufds-1);
1597 if (f_ufds[n].revents==0)
continue;
1600 Worker* worker = fWorkers[f_recs[n].indx]->work;
1602 if ((addon==0) || (worker==0)) {
1603 EOUT(
"Something went wrong - socket addon=%p worker = %p, something is gone", addon, worker);
1608 if (f_ufds[n].revents & (POLLERR | POLLHUP | POLLNVAL)) {
1613 IncWorkerFiredEvents(worker);
1617 if (f_ufds[n].revents & (POLLIN | POLLPRI)) {
1620 IncWorkerFiredEvents(worker);
1624 if (f_ufds[n].revents & POLLOUT) {
1627 IncWorkerFiredEvents(worker);
1636 fCheckNewEvents =
false;
1637 _PushEvent(evntEnableCheck, 1);
1638 fBalanceCnt = (fBalanceCnt + 1) % 100000;
1641 #ifdef DABC_EXTRA_CHECKS
1642 sizeafter = _TotalNumberOfEvents();
1646 return _GetNextEvent(evnt);
1651 if (evid.
GetCode() == evntEnableCheck) {
1652 fCheckNewEvents =
true;
1662 unsigned new_sz = fWorkers.size();
1664 if (new_sz > f_sizeufds) {
1668 f_ufds =
new pollfd [new_sz];
1669 f_recs =
new ProcRec [new_sz];
1671 f_sizeufds = new_sz;
1674 memset(f_ufds, 0,
sizeof(pollfd) * f_sizeufds);
1675 memset(f_recs, 0,
sizeof(
ProcRec) * f_sizeufds);
1677 f_recs[0].use =
true;
1679 fIsAnySocket =
false;
1683 for (
unsigned indx=1;indx<fWorkers.size();indx++) {
1686 f_recs[indx].use = addon!=0;
1689 fIsAnySocket =
true;
1695 fCheckNewEvents = fIsAnySocket;
const char * SocketErr(int err)
Reference on memory from memory pool.
unsigned NumSegments() const
Returns number of segment in buffer.
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Represents command with its arguments.
bool SetStr(const std::string &name, const char *value)
bool SetInt(const std::string &name, int v)
static std::string GetLocalHost()
Lock guard for posix mutex.
Reference on the arbitrary object
bool null() const
Returns true if reference contains nullptr.
Special addon class for handling of socket and socket events.
virtual void ProcessEvent(const EventId &)
bool IsDoingOutput() const
virtual void OnSocketError(int msg, const std::string &info)
Generic error handler.
void SetDoingInput(bool on=true)
Call method to indicate that object wants to read data from the socket.
int fIOPriority
priority of socket I/O events, default 1
ssize_t DoSendBuffer(void *buf, ssize_t len)
bool IsDoingInput() const
ssize_t DoSendBufferHdr(void *hdr, ssize_t hdrlen, void *buf, ssize_t len, void *tgtaddr=0, unsigned tgtaddrlen=0)
ssize_t DoRecvBufferHdr(void *hdr, ssize_t hdrlen, void *buf, ssize_t len, void *srcaddr=0, unsigned srcaddrlen=0)
void SetDoingOutput(bool on=true)
Call method to indicate that worker wants to write data to the socket.
ssize_t DoRecvBuffer(void *buf, ssize_t len)
Socket addon for handling connection on client side.
virtual void OnConnectionFailed()
struct addrinfo fServAddr
void SetRetryOpt(int nretry, double tmout=1.)
virtual void OnThreadAssigned()
SocketClientAddon(const struct addrinfo *serv_addr, int fd=-1)
virtual double ProcessTimeout(double)
virtual void OnConnectionEstablished(int fd)
virtual void ProcessEvent(const EventId &)
virtual ~SocketClientAddon()
Socket addon for handling connection events.
void AllocateSendIOV(unsigned size)
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
bool fUseMsgOper
indicate if sendmsg, recvmsg operations should be used, it is must for the datagram sockets
virtual ~SocketIOAddon()
Destructor of SocketIOAddon class.
void SetSendAddr(const std::string &host="", int port=0)
Set destination address for all send operations,.
struct sockaddr_in fSendAddr
optional send address for next send operation
bool IsDatagramSocket() const
void AllocateRecvIOV(unsigned size)
bool StartRecv(void *buf, size_t size)
bool StartNetSend(void *hdr, unsigned hdrsize, const Buffer &buf)
bool StartRecvHdr(void *hdr, unsigned hdrsize, void *buf, size_t size)
virtual void ProcessEvent(const EventId &)
SocketIOAddon(int fd=0, bool isdatagram=false, bool usemsg=true)
Constructor of SocketIOAddon class.
void CancelIOOperations()
Method should be used to cancel all running I/O operation of the socket.
bool StartNetRecv(void *hdr, unsigned hdrsize, Buffer &buf, BufferSize_t datasize)
Socket addon for handling connection requests on server side.
std::string fServerHostName
struct sockaddr_storage ai_addr
virtual void OnClientConnected(int fd)
SocketServerAddon(int serversocket, const char *hostname=nullptr, int portnum=-1, struct addrinfo *info=nullptr)
virtual void ProcessEvent(const EventId &)
static std::string DefineHostName(bool force=true)
Return current host name.
virtual bool WaitEvent(EventId &, double tmout)
int fPipe[2]
array with i/o pipes handles
static void CloseUdp(int fd)
Close datagram (udp) socket.
virtual void ProcessExtraThreadEvent(const EventId &evid)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
static SocketClientAddon * CreateClientAddon(const std::string &servid, int dflt_port=-1)
virtual void _Fire(const EventId &evnt, int nq)
static void DettachMulticast(int handle, const std::string &addr)
Detach datagram socket from multicast group.
virtual void WorkersSetChanged()
Virtual method, called from thread context to inform that number of workers are changed.
static int SendBuffer(int fd, void *buf, int len)
Wrapper for send method, should be used for blocking sockets.
static int StartClient(const std::string &host, int nport, bool nonblocking=true)
static int CreateUdp()
Create datagram (udp) socket.
virtual bool CompatibleClass(const std::string &clname) const
static bool AttachMulticast(int handle, const std::string &addr)
Attach datagram socket to multicast group to make receiving.
static int RecvBuffer(int fd, void *buf, int len)
Wrapper for recv method, should be used for blocking sockets.
static int BindUdp(int fd, int nport, int portmin=-1, int portmax=-1)
Bind UDP socket to specified port.
static int ConnectUdp(int fd, const std::string &remhost, int remport)
SocketThread(Reference parent, const std::string &name, Command cmd)
static bool SetNonBlockSocket(int fd)
static SocketServerAddon * CreateServerAddon(const std::string &host, int nport, int portmin=-1, int portmax=-1)
Create handle for server-side connection If hostname == 0, any available address will be selected If ...
static bool SetNoDelaySocket(int fd)
Represent thread functionality.
virtual bool CompatibleClass(const std::string &clname) const
virtual void ProcessExtraThreadEvent(const EventId &)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
Generic addon for dabc::Worker.
virtual void ProcessEvent(const EventId &)
virtual void OnThreadAssigned()
Active object, which is working inside dabc::Thread.
const char * typeSocketThread
std::string format(const char *fmt,...)
Event structure, exchanged between DABC threads.
Class for acquiring and holding timestamps.