DABC (Data Acquisition Backbone Core)  2.9.9
Device.cxx
Go to the documentation of this file.
1 /************************************************************
2  * The Data Acquisition Backbone Core (DABC) *
3  ************************************************************
4  * Copyright (C) 2009 - *
5  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
6  * Planckstr. 1, 64291 Darmstadt, Germany *
7  * Contact: http://dabc.gsi.de *
8  ************************************************************
9  * This software can be used under the GPL license *
10  * agreements as stated in LICENSE.txt file *
11  * which is part of the distribution. *
12  ************************************************************/
13 
14 #include "verbs/Device.h"
15 
16 #include <sys/poll.h>
17 #include <cerrno>
18 #include <cstdio>
19 #include <cstdlib>
20 #include <unistd.h>
21 #include <cstring>
22 #include <netinet/in.h>
23 
24 
25 const char* verbs::xmlMcastAddr = "McastAddr";
26 
27 #ifndef __NO_MULTICAST__
28 #include "verbs/OpenSM.h"
29 #endif
30 
31 #include "dabc/timing.h"
32 #include "dabc/logging.h"
33 
34 #include "dabc/MemoryPool.h"
35 #include "dabc/Manager.h"
36 #include "dabc/ConnectionRequest.h"
37 #include "dabc/ConnectionManager.h"
38 
39 #include "verbs/ComplQueue.h"
40 #include "verbs/QueuePair.h"
41 #include "verbs/Thread.h"
42 #include "verbs/Transport.h"
44 
45 const int LoopBackQueueSize = 8;
46 const int LoopBackBufferSize = 64;
47 
48 // this boolean indicates if one can use verbs calls from different threads
49 // if no, all post/recv/completion operation for all QP/CQ will happens in the same thread
50 
52 
53 
54 namespace verbs {
55 
58  class ProtocolAddon : public WorkerAddon {
59  public:
60  std::string fReqItem;
61 
63 
65 
67 
68  // address data of remote side
69  unsigned fRemoteLID;
70  unsigned fRemoteQPN;
71  unsigned fRemotePSN;
72 
74 
75  bool fConnected;
76 
77  public:
79  virtual ~ProtocolAddon();
80 
81  virtual void VerbsProcessSendCompl(uint32_t);
82  virtual void VerbsProcessRecvCompl(uint32_t);
83  virtual void VerbsProcessOperError(uint32_t);
84 
85  virtual void OnThreadAssigned();
86  virtual double ProcessTimeout(double last_diff);
87 
88  void Finish(bool res);
89  };
90 
91 }
92 
93 
95  WorkerAddon(qp),
96  fReqItem(),
97  fLocalCmd(),
98  fPortThrd(),
99  fRemoteLID(0),
100  fRemoteQPN(0),
101  fRemotePSN(0),
102  fPool(nullptr),
103  fConnected(false)
104 {
105 }
106 
108 {
109  fReqItem.clear();
110 
111  if (fPool) {
112  delete fPool;
113  fPool = nullptr;
114  }
115 
116  CloseQP();
117 }
118 
120 {
122 
124  if (req.null()) {
125  Finish(false);
126  return;
127  }
128 
129  unsigned bufid = 0;
130 
131  if (req.IsServerSide()) {
132  QP()->Post_Recv(fPool->GetRecvWR(bufid));
133  } else {
134  std::string connid = req.GetConnId();
135 
136  strcpy((char*) fPool->GetSendBufferLocation(bufid), connid.c_str());
137  QP()->Post_Send(fPool->GetSendWR(bufid, connid.length()+1));
138  }
139 
140  ActivateTimeout(req.GetConnTimeout());
141 }
142 
144 {
145  fConnected = res;
146 
147  fReqItem.clear();
148 
149  fLocalCmd.ReplyBool(res);
150 
151  fIbContext.Release();
152 
153  DeleteWorker();
154 }
155 
156 
157 
158 double verbs::ProtocolAddon::ProcessTimeout(double last_diff)
159 {
160  if (fConnected) return -1;
161 
162  EOUT("HANDSHAKE is timedout");
163 
164  Finish(false);
165 
166  return -1;
167 }
168 
170 {
171  if (bufid!=0) {
172  EOUT("Wrong buffer id %u", bufid);
173  return;
174  }
175 
177  if (req.null()) {
178  EOUT("Connection request disappear");
179  Finish(true);
180  return;
181  }
182 
183  const char* connid = (const char*) fPool->GetSendBufferLocation(bufid);
184 
185  if (req.GetConnId().compare(connid)!=0) {
186  EOUT("AAAAA !!!!! Mismatch with connid %s %s", connid, req.GetConnId().c_str());
187  }
188 
189  // Here we are sure, that other side receive handshake packet,
190  // therefore we can declare connection as done
191 
192 
193  VerbsNetworkInetrface* addon = new VerbsNetworkInetrface(fIbContext, TakeQP());
194 
195  dabc::NetworkTransport::Make(req, addon, fPortThrd.GetName());
196 
197  DOUT0("CREATE VERBS CLIENT %s", req.GetConnId().c_str());
198 
199  Finish(true);
200 }
201 
203 {
204  if (bufid!=0) {
205  EOUT("Wrong buffer id %u", bufid);
206  return;
207  }
208 
210 
211  const char* connid = (const char*) fPool->GetBufferLocation(bufid);
212 
213  if (req.GetConnId().compare(connid)!=0) {
214  EOUT("AAAAA !!!!! Mismatch with connid %s %s", connid, req.GetConnId().c_str());
215  }
216 
217  // from here we knew, that client is ready and we also can complete connection
218 
219  VerbsNetworkInetrface* addon = new VerbsNetworkInetrface(fIbContext, TakeQP());
220 
221  dabc::NetworkTransport::Make(req, addon, fPortThrd.GetName());
222 
223  DOUT0("CREATE VERBS SERVER %s", req.GetConnId().c_str());
224 
225  Finish(true);
226 }
227 
229 {
230  EOUT("VerbsProtocolAddon error");
231 
232  Finish(false);
233 }
234 
235 // ____________________________________________________________________
236 
237 verbs::Device::Device(const std::string &name) :
238  dabc::Device(name),
239  fIbContext(),
240  fAllocateIndividualCQ(false)
241 {
242  DOUT1("Creating VERBS device %s refcnt %u", GetName(), NumReferences());
243 
244  if (!fIbContext.OpenVerbs(true)) {
245  EOUT("FATAL. Cannot start VERBS device");
246  exit(139);
247  }
248 
249  DOUT1("Creating thread for device %s", GetName());
250 
252 
253  DOUT1("Creating VERBS device %s done refcnt %u", GetName(), NumReferences());
254 }
255 
257 {
258  DOUT5("verbs::Device::~Device()");
259 }
260 
261 verbs::QueuePair* verbs::Device::CreatePortQP(const std::string &thrd_name, dabc::Reference port, int conn_type, dabc::ThreadRef& thrd)
262 {
263  ibv_qp_type qp_type = IBV_QPT_RC;
264 
265  if (conn_type>0) qp_type = (ibv_qp_type) conn_type;
266 
267  thrd = MakeThread(thrd_name.c_str(), true);
268 
269  verbs::Thread* thrd_ptr = dynamic_cast<verbs::Thread*> (thrd());
270 
271  if (thrd_ptr == 0) return 0;
272 
273  unsigned input_size(0), output_size(0);
274 
275  dabc::NetworkTransport::GetRequiredQueuesSizes(port, input_size, output_size);
276 
277  ComplQueue* port_cq = thrd_ptr->MakeCQ();
278 
279  int num_send_seg = fIbContext.max_sge() - 1;
280  if (conn_type==IBV_QPT_UD) num_send_seg = fIbContext.max_sge() - 5; // I do not now why, but otherwise it fails
281  if (num_send_seg<2) num_send_seg = 2;
282 
283  verbs::QueuePair* port_qp = new QueuePair(IbContext(), qp_type,
284  port_cq, output_size + 1, num_send_seg,
285  port_cq, input_size + 1, 2);
286 
287  if (port_qp->qp()==0) {
288  delete port_qp;
289  port_qp = 0;
290  }
291 
292  return port_qp;
293 }
294 
295 dabc::ThreadRef verbs::Device::MakeThread(const char* name, bool force)
296 {
297  ThreadRef thrd = dabc::mgr.FindThread(name, verbs::typeThread);
298 
299  if (!thrd.null() || !force) return thrd;
300 
301  return dabc::mgr.CreateThread(name, verbs::typeThread, GetName());
302 }
303 
305 {
306  // TODO: implement multicast transport for IB
307 
308  dabc::PortRef portref = port;
309 
310  std::string maddr = portref.Cfg(xmlMcastAddr, cmd).AsStr();
311 
312  if (!maddr.empty()) {
313  std::string thrdname = portref.Cfg(dabc::xmlThreadAttr,cmd).AsStr();
314 
315  if (thrdname.empty())
316  switch (dabc::mgr.GetThreadsLayout()) {
317  case dabc::layoutMinimalistic: thrdname = ThreadName(); break;
318  case dabc::layoutPerModule: thrdname = portref.GetModule().ThreadName(); break;
319  case dabc::layoutBalanced: thrdname = portref.GetModule().ThreadName() + (portref.IsInput() ? "Inp" : "Out"); break;
320  case dabc::layoutMaximal: thrdname = portref.GetModule().ThreadName() + portref.GetName(); break;
321  default: thrdname = portref.GetModule().ThreadName(); break;
322  }
323 
324  ibv_gid multi_gid;
325 
326  if (!ConvertStrToGid(maddr, multi_gid)) {
327  EOUT("Cannot convert address %s to ibv_gid type", maddr.c_str());
328  return 0;
329  }
330 
331  std::string buf = ConvertGidToStr(multi_gid);
332  if (buf!=maddr) {
333  EOUT("Addresses not the same: %s - %s", maddr.c_str(), buf.c_str());
334  return 0;
335  }
336 
337 
338 
339 // QueuePair* port_qp(0);
340 // ThreadRef thrd;
341 // if (CreatePortQP(thrdname.c_str(), port, IBV_QPT_UD, thrd, port_qp))
342 // return new Transport(fIbContext, port_cq, port_qp, portref, false, &multi_gid);
343  }
344 
345  return 0;
346 }
347 
349 {
350  std::string reqitem = cmd.GetStr(dabc::CmdConnectionManagerHandle::ReqArg());
351 
353 
354  if (req.null()) return dabc::cmd_false;
355 
356  switch (req.progress()) {
357 
358  // here on initializes connection
360 
361  dabc::PortRef port = req.GetPort();
362  if (port.null()) {
363  EOUT("No port is available for the request");
364  return dabc::cmd_false;
365  }
366 
367  dabc::ThreadRef thrd;
368 
369  // FIXME: ConnectionRequest should be used
370  QueuePair* port_qp = CreatePortQP(req.GetConnThread(), port, 0, thrd);
371  if (port_qp==0) return dabc::cmd_false;
372 
373  std::string portid = dabc::format("%04X:%08X:%08X", (unsigned) fIbContext.lid(), (unsigned) port_qp->qp_num(), (unsigned) port_qp->local_psn());
374  DOUT0("CREATE CONNECTION %s", portid.c_str());
375 
376  ProtocolAddon* addon = new ProtocolAddon(port_qp);
377  addon->fPortThrd << thrd;
378 
379  if (req.IsServerSide())
380  req.SetServerId(portid);
381  else
382  req.SetClientId(portid);
383 
384  // make backpointers, fCustomData is reference, automatically cleaned up by the connection manager
385  req.SetCustomData(dabc::Reference(addon));
386 
387  addon->fReqItem = reqitem;
388 
389  addon->fIbContext = fIbContext;
390 
391  return dabc::cmd_true;
392  }
393 
395  // one should register request and start connection here
396 
397  DOUT2("****** VERBS START: %s %s CONN: %s *******", (req.IsServerSide() ? "SERVER" : "CLIENT"), req.GetConnId().c_str(), req.GetConnInfo().c_str());
398 
399  // once connection is started, custom data is no longer necessary by connection record
400  // protocol worker will be cleaned up automatically either when connection is done or when connection is timedout
401 
402  // by coping of the reference source reference will be cleaned
403  // we use reference on addon while it will remain even if worker will be destroyed
404  dabc::Reference prot_ref = req.TakeCustomData();
405 
406  ProtocolAddon* proto = dynamic_cast<ProtocolAddon*> (prot_ref());
407 
408  if (proto == 0) {
409  EOUT("SOMETHING WRONG - NO PROTOCOL addon for the connection request");
410  return dabc::cmd_false;
411  }
412 
413  std::string remoteid;
414  bool res = true;
415 
416  if (req.IsServerSide())
417  remoteid = req.GetClientId();
418  else
419  remoteid = req.GetServerId();
420 
421  if (sscanf(remoteid.c_str(),"%X:%X:%X", &proto->fRemoteLID, &proto->fRemoteQPN, &proto->fRemotePSN)!=3) {
422  EOUT("Cannot decode remote id string %s", remoteid.c_str());
423  res = false;
424  }
425 
426  // reply remote command that one other side can start connection
427  req.ReplyRemoteCommand(res);
428 
429  if (!res) {
430  prot_ref.Release();
432  }
433 
434  DOUT0("CONNECT TO REMOTE %04x:%08x:%08x - %s", proto->fRemoteLID, proto->fRemoteQPN, proto->fRemotePSN, remoteid.c_str());
435 
436  // FIXME: remote port should be handled correctly
437  if (proto->QP()->Connect(proto->fRemoteLID, proto->fRemoteQPN, proto->fRemotePSN)) {
438 
439  proto->fPool = new verbs::MemoryPool(fIbContext, "HandshakePool", 1, 1024, false);
440 
441  proto->fLocalCmd = cmd;
442 
443  // we need to preserve thread reference until transport itself will be created
444  proto->fPortThrd.MakeWorkerFor(proto);
445 
446  return dabc::cmd_postponed;
447  }
448 
449  return dabc::cmd_false;
450  }
451 
452  default:
453  EOUT("Request from connection manager in undefined situation progress = %d ???", req.progress());
454  }
455 
456  return dabc::cmd_false;
457 }
458 
459 
461 {
462  int cmd_res = dabc::cmd_true;
463 
464  DOUT5("Execute command %s", cmd.GetName());
465 
466 
467  if (cmd.IsName(dabc::CmdConnectionManagerHandle::CmdName())) {
468 
469  cmd_res = HandleManagerConnectionRequest(cmd);
470 
471  } else
472 
473  cmd_res = dabc::Device::ExecuteCommand(cmd);
474 
475  return cmd_res;
476 }
477 
478 double verbs::Device::ProcessTimeout(double last_diff)
479 {
480  return -1;
481 }
482 
483 bool verbs::ConvertStrToGid(const std::string &s, ibv_gid &gid)
484 {
485  unsigned raw[16];
486 
487  if (sscanf(s.c_str(),
488  "%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X:%2X",
489  raw, raw+1, raw+2, raw+3, raw+4, raw+5, raw+6, raw+7,
490  raw+8, raw+9, raw+10, raw+11, raw+12, raw+13, raw+14, raw+15) != 16) return false;
491  for (unsigned n=0;n<16;n++)
492  gid.raw[n] = raw[n];
493  return true;
494 }
495 
496 std::string verbs::ConvertGidToStr(ibv_gid &gid)
497 {
498  unsigned raw[16];
499  for (unsigned n=0;n<16;n++)
500  raw[n] = gid.raw[n];
501 
502  std::string res;
503 
504  dabc::formats(res, "%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X:%02X",
505  raw[0], raw[1], raw[2], raw[3], raw[4], raw[5], raw[6], raw[7],
506  raw[8], raw[9], raw[10], raw[11], raw[12], raw[13], raw[14], raw[15]);
507  return res;
508 }
Represents command with its arguments.
Definition: Command.h:99
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
@ progrDoingConnect
at this state device should drive connection itself and inform about completion or failure
@ progrDoingInit
state when record should be prepared by device
Full description of connection request.
std::string GetConnId() const
void SetCustomData(Reference ref)
std::string GetServerId() const
std::string GetClientId() const
void SetClientId(const std::string &id)
void SetServerId(const std::string &id)
std::string GetConnThread() const
Thread name for transport.
double GetConnTimeout() const
time required to establish connection, if expired connection will be switched to "failed" state
Reference GetPort() const
bool IsServerSide() const
Indicates if local node in connection is server or client.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Device.cxx:43
Parameter FindPar(const std::string &parname)
Definition: Manager.cxx:2069
ThreadRef FindThread(const std::string &name, const std::string &required_class="")
Definition: Manager.h:655
ThreadRef CreateThread(const std::string &thrdname, const std::string &classname="", const std::string &devname="")
Definition: Manager.cxx:1988
WorkerRef GetModule() const
Definition: ModuleItem.cxx:66
static bool Make(const ConnectionRequest &req, WorkerAddon *addon, const std::string &devthrdname="")
static void GetRequiredQueuesSizes(const PortRef &port, unsigned &input_size, unsigned &output_size)
unsigned NumReferences()
Return number of references on the object.
Definition: Object.cxx:557
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
Reference on the dabc::Port class
Definition: Port.h:195
bool IsInput() const
Returns true if it is input port.
Definition: Port.h:199
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Reference on the dabc::Thread class
Definition: Thread.h:482
Base class for transport implementations.
Definition: Transport.h:37
virtual void OnThreadAssigned()
Definition: Worker.h:73
std::string ThreadName() const
Returns thread name of worker assigned.
Definition: Worker.h:489
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
Definition: Worker.h:482
bool MakeThreadForWorker(const std::string &thrdname="")
Creates appropriate thread for worker and assign worker to the thread.
Definition: Worker.cxx:302
Wrapper for IB VERBS completion queue
Definition: ComplQueue.h:35
Reference to verbs::Context
Definition: Context.h:74
bool OpenVerbs(bool withmulticast=false, const char *devicename=0, int ibport=-1)
Definition: Context.cxx:294
Device for VERBS
Definition: Device.h:42
virtual double ProcessTimeout(double last_diff)
Definition: Device.cxx:478
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
Definition: Device.cxx:460
virtual dabc::Transport * CreateTransport(dabc::Command cmd, const dabc::Reference &port)
Definition: Device.cxx:304
ContextRef fIbContext
Definition: Device.h:49
QueuePair * CreatePortQP(const std::string &thrd_name, dabc::Reference port, int conn_type, dabc::ThreadRef &thrd)
Definition: Device.cxx:261
virtual ~Device()
Definition: Device.cxx:256
dabc::ThreadRef MakeThread(const char *name, bool force=false)
Definition: Device.cxx:295
int HandleManagerConnectionRequest(dabc::Command cmd)
Definition: Device.cxx:348
static bool fThreadSafeVerbs
Definition: Device.h:64
Device(const std::string &name)
Definition: Device.cxx:237
Special memory pool, which automatically includes PoolRegistry.
Definition: MemoryPool.h:31
Addon to establish and verify QP connection with remote node
Definition: Device.cxx:58
dabc::ThreadRef fPortThrd
Definition: Device.cxx:66
virtual ~ProtocolAddon()
Definition: Device.cxx:107
unsigned fRemoteQPN
Definition: Device.cxx:70
virtual double ProcessTimeout(double last_diff)
Definition: Device.cxx:158
virtual void VerbsProcessOperError(uint32_t)
Definition: Device.cxx:228
ProtocolAddon(QueuePair *qp)
Definition: Device.cxx:94
unsigned fRemoteLID
Definition: Device.cxx:69
void Finish(bool res)
Definition: Device.cxx:143
ContextRef fIbContext
Definition: Device.cxx:64
virtual void OnThreadAssigned()
Definition: Device.cxx:119
virtual void VerbsProcessRecvCompl(uint32_t)
Definition: Device.cxx:202
std::string fReqItem
Definition: Device.cxx:60
virtual void VerbsProcessSendCompl(uint32_t)
Definition: Device.cxx:169
unsigned fRemotePSN
Definition: Device.cxx:71
MemoryPool * fPool
Definition: Device.cxx:73
dabc::Command fLocalCmd
command which should be replied when connection established or failed
Definition: Device.cxx:62
Represent VERBS queue pair functionality.
Definition: QueuePair.h:37
struct ibv_qp * qp() const
Definition: QueuePair.h:44
VERBS thread.
Definition: Thread.h:46
ComplQueue * MakeCQ()
Definition: Thread.cxx:186
Implementation of NetworkTransport for VERBS.
Definition: Transport.h:38
Addon for VERBS thread
Definition: Worker.h:39
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT5(args ...)
Definition: logging.h:188
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
Event manipulation API.
Definition: api.h:23
@ layoutMaximal
Definition: Manager.h:278
@ layoutPerModule
Definition: Manager.h:276
@ layoutBalanced
Definition: Manager.h:277
@ layoutMinimalistic
Definition: Manager.h:275
void formats(std::string &sbuf, const char *fmt,...)
Definition: string.cxx:26
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * xmlThreadAttr
Definition: ConfigBase.cxx:38
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38
Support of InfiniBand verbs.
Definition: Device.cxx:54
const char * xmlMcastAddr
Definition: Device.cxx:25
std::string ConvertGidToStr(ibv_gid &gid)
Definition: Device.cxx:496
bool ConvertStrToGid(const std::string &s, ibv_gid &gid)
Definition: Device.cxx:483
const char * typeThread
Definition: Context.cxx:36
const int LoopBackBufferSize
Definition: Device.cxx:46
const int LoopBackQueueSize
Definition: Device.cxx:45