DABC (Data Acquisition Backbone Core)  2.9.9
SocketTransport.cxx
Go to the documentation of this file.
1 // $Id: SocketTransport.cxx 3862 2018-05-11 10:06:18Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/SocketTransport.h"
17 
19  SocketIOAddon(fd, datagram, true),
21  fHeaders(0),
22  fSendQueue(),
23  fRecvQueue(),
24  fRecvStatus(0),
25  fRecvRecid(0),
26  fSendStatus(0),
27  fSendRecid(0),
28  fMcastAddr()
29 {
30 }
31 
33 {
34  if (!fMcastAddr.empty())
35  SocketThread::DettachMulticast(Socket(), fMcastAddr);
36 
37  delete [] fHeaders; fHeaders = 0;
38 }
39 
40 void dabc::SocketNetworkInetrface::AllocateNet(unsigned fulloutputqueue, unsigned fullinputqueue)
41 {
42  NetworkTransport* tr = (NetworkTransport*) fWorker();
43 
44  fHeaders = new char[tr->NumRecs() * tr->GetFullHeaderSize()];
45  for (uint32_t n=0;n<tr->NumRecs();n++)
46  tr->SetRecHeader(n, fHeaders + n * tr->GetFullHeaderSize());
47 
48  // 16 is default value, if required, it will be dynamically increased
49  AllocateSendIOV(16);
50  AllocateRecvIOV(16);
51 
52  fSendQueue.Allocate(fulloutputqueue); // +2 for sending and recv ackn
53  fRecvQueue.Allocate(fullinputqueue);
54 
55  DOUT5("Create queues inp: %d out: %d", fullinputqueue, fulloutputqueue);
56 }
57 
58 
59 long dabc::SocketNetworkInetrface::Notify(const std::string &cmd, int arg)
60 {
61  if (cmd == "GetNetworkTransportInetrface") return (long) ((NetworkInetrface*) this);
62 
63  return dabc::SocketIOAddon::Notify(cmd, arg);
64 }
65 
66 
68 {
69  fSendQueue.Push(recid);
70 
71  // we are in transport thread and can call event-processing methods directly
72  if ((fSendQueue.Size()==1) && (fSendStatus==0)) OnSendCompleted();
73 }
74 
76 {
77 // DOUT0("dabc::SocketNetworkInetrface::SubmitRecv %u", recid);
78 
79  fRecvQueue.Push(recid);
80 
81  // we are in transport thread and can call event-processing methods directly
82  if ((fRecvQueue.Size()==1) && (fRecvStatus==0)) OnRecvCompleted();
83 }
84 
85 
86 void dabc::SocketNetworkInetrface::OnSocketError(int msg, const std::string &info)
87 {
88  NetworkTransport* tr = (NetworkTransport*) fWorker();
89  if (tr) tr->CloseTransport(msg!=0);
90  else EOUT("Socket msg without transport %d %s", msg, info.c_str());
91 }
92 
93 
95 {
96  NetworkTransport* tr = (NetworkTransport*) fWorker();
97 
98  if (tr==0) {
99  EOUT("Transport not available!!!");
100  return;
101  }
102 
103 // DOUT0("SocketNetworkInetrface::OnSendCompleted status %d ", fSendStatus);
104 
105  if (fSendStatus==1) {
106  tr->ProcessSendCompl(fSendRecid);
107  fSendRecid = 0;
108  fSendStatus = 0;
109  }
110 
111  // nothing to do, just wait for new submitted recv operation
112  if (fSendQueue.Size() == 0) return;
113 
114  fSendRecid = fSendQueue.Pop();
115 
116  fSendStatus = 1;
117 
118  int sendtyp = tr->PackHeader(fSendRecid);
119 
120  if (sendtyp==0) {
121  EOUT("record %u failed", fSendRecid);
122  throw dabc::Exception("send record failed - should never happen");
123  }
124 
125  NetworkTransport::NetIORec* rec = tr->GetRec(fSendRecid);
126 
127  if (rec==0) {
128  EOUT("Completely wrong send recid %u", fSendRecid);
129  exit(432);
130  }
131 
132 // DOUT0("Start sending rec %u typ %d buf %u", fSendRecid, sendtyp, rec->buf.GetTotalSize());
133 
134  if (sendtyp == 1)
135  StartSend(rec->header, tr->GetFullHeaderSize());
136  else
137  StartNetSend(rec->header, tr->GetFullHeaderSize(), rec->buf);
138 
139 }
140 
142 {
143  NetworkTransport* tr = (NetworkTransport*) fWorker();
144 
145 // DOUT0("dabc::SocketNetworkInetrface::OnRecvCompleted %p", tr);
146 
147  if (tr==0) {
148  EOUT("Transport not available!!!");
149  return;
150  }
151 
152 do_compl:
153 
154  if (fRecvStatus==2) {
155  // if we complete receiving of the buffer
156 
157  tr->ProcessRecvCompl(fRecvRecid);
158  fRecvRecid = 0;
159  fRecvStatus = 0;
160  }
161 
162  if (fRecvStatus==1) {
163  // analyze header, set new recv operation and so on
164 
165  fRecvStatus = 2;
166 
167  NetworkTransport::NetIORec* rec = tr->GetRec(fRecvRecid);
168 
169  if (rec==0) {
170  EOUT("Completely wrong recv rec id %u", fRecvRecid);
171  exit(75);
172  }
173 
175 
176  if (nethdr->typid == dabc::mbt_EOL) {
177  DOUT1("Receive buffer with EOL bufsize = %u resthdr = %u",
178  nethdr->size, tr->GetFullHeaderSize() - sizeof(NetworkTransport::NetworkHeader));
179  }
180 
181  if (nethdr->kind & NetworkTransport::netot_HdrSend) {
182  goto do_compl;
183  }
184 
185  if (nethdr->size > rec->buf.GetTotalSize()) {
186  EOUT("Fatal - no buffer to receive data rec %d sz1:%d sz2:%d",
187  fRecvRecid, nethdr->size, rec->buf.GetTotalSize());
188 
189  tr->CloseTransport(true);
190  return;
191  }
192 
193  if (!StartRecv(rec->buf, nethdr->size)) {
194  EOUT("Cannot start recv - fatal error");
195  tr->CloseTransport(true);
196  return;
197  }
198 
199  // DOUT0("Start receiving of buffer size %u", nethdr->size);
200  } else {
201  // nothing to do, just wait for new submitted recv operation
202  if (fRecvQueue.Size() == 0) return;
203  fRecvRecid = fRecvQueue.Pop();
204 
205  NetworkTransport::NetIORec* rec = tr->GetRec(fRecvRecid);
206 
207  if (rec==0) {
208  EOUT("Completely wrong recv recid %u", fRecvRecid);
209  exit(432);
210  }
211 
212 // DOUT0("SocketNetworkInetrface::OnRecvCompleted start receiving socket %d thrd %s", Socket(), tr->ThreadName().c_str());
213 
214  if (IsDatagramSocket()) {
215 // DOUT0("Start recv from datagram socket");
216  fRecvStatus = 2;
217  StartNetRecv(rec->header, tr->GetFullHeaderSize(), rec->buf, rec->buf.GetTotalSize());
218  } else {
219  fRecvStatus = 1;
220  // do normal recv of the header data without evolving messages
221  StartRecv(rec->header, tr->GetFullHeaderSize());
222  }
223  }
224 }
225 
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
DABC exception.
Definition: Exception.h:57
Network interface.
Network transport.
void SetRecHeader(uint32_t recid, void *header)
NetIORec * GetRec(unsigned id) const
unsigned GetFullHeaderSize() const
void ProcessRecvCompl(uint32_t recid)
void ProcessSendCompl(uint32_t recid)
int PackHeader(uint32_t recid)
unsigned NumRecs() const
Socket addon for handling I/O events.
Definition: SocketThread.h:144
virtual void OnSendCompleted()
Method called when send operation is completed.
virtual void AllocateNet(unsigned fulloutputqueue, unsigned fullinputqueue)
virtual void SubmitRecv(uint32_t recid)
SocketNetworkInetrface(int fd, bool datagram=false)
virtual long Notify(const std::string &, int)
Light-weight command interface, which can be used from worker.
virtual void OnSocketError(int msg, const std::string &info)
Generic error handler.
virtual void SubmitSend(uint32_t recid)
virtual void OnRecvCompleted()
Method called when receive operation is completed.
static void DettachMulticast(int handle, const std::string &addr)
Detach datagram socket from multicast group.
virtual void CloseTransport(bool witherr=false)
Definition: Transport.cxx:196
virtual long Notify(const std::string &, int)
Light-weight command interface, which can be used from worker.
Definition: Worker.h:82
#define DOUT5(args ...)
Definition: logging.h:188
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
@ mbt_EOL
Definition: Buffer.h:46