DABC (Data Acquisition Backbone Core)  2.9.9
Transport.cxx
Go to the documentation of this file.
1 /************************************************************
2  * The Data Acquisition Backbone Core (DABC) *
3  ************************************************************
4  * Copyright (C) 2009 - *
5  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
6  * Planckstr. 1, 64291 Darmstadt, Germany *
7  * Contact: http://dabc.gsi.de *
8  ************************************************************
9  * This software can be used under the GPL license *
10  * agreements as stated in LICENSE.txt file *
11  * which is part of the distribution. *
12  ************************************************************/
13 
14 #include "verbs/Transport.h"
15 
16 #include "dabc/logging.h"
17 #include "dabc/Manager.h"
18 
19 #include "verbs/Device.h"
20 #include "verbs/QueuePair.h"
21 #include "verbs/ComplQueue.h"
22 #include "verbs/MemoryPool.h"
23 
25  verbs::WorkerAddon(qp),
27  fContext(ctx),
28  fInitOk(false),
29  fPoolReg(),
30  f_rwr(0),
31  f_swr(0),
32  f_sge(0),
33  fHeadersPool(0),
34  fSegmPerOper(2),
35  f_ud_ah(0),
36  f_ud_qpn(0),
37  f_ud_qkey(0),
38  f_multi(0),
39  f_multi_lid(0),
40  f_multi_attch(false)
41 {
42 }
43 
45 {
46  if (f_multi) {
47  if (f_multi_attch)
49 
51  f_multi = 0;
52  }
53 
54  if(f_ud_ah!=0) {
55  ibv_destroy_ah(f_ud_ah);
56  f_ud_ah = 0;
57  }
58 
59  delete fQP; fQP = 0;
60 
61  delete[] f_rwr; f_rwr = 0;
62  delete[] f_swr; f_swr = 0;
63  delete[] f_sge; f_sge = 0;
64 
66 }
67 
68 long verbs::VerbsNetworkInetrface::Notify(const std::string &cmd, int arg)
69 {
70  if (cmd == "GetNetworkTransportInetrface") return (long) ((dabc::NetworkInetrface*) this);
71 
72  return verbs::WorkerAddon::Notify(cmd, arg);
73 }
74 
75 void verbs::VerbsNetworkInetrface::SetUdAddr(struct ibv_ah *ud_ah, uint32_t ud_qpn, uint32_t ud_qkey)
76 {
77  f_ud_ah = ud_ah;
78  f_ud_qpn = ud_qpn;
79  f_ud_qkey = ud_qkey;
80 }
81 
83 {
85  if (tr==0) return false;
86 
87  if (multi_gid == 0) return false;
88 
89  if (!QP()->InitUD()) return false;
90 
91  memcpy(f_multi_gid.raw, multi_gid->raw, sizeof(f_multi_gid.raw));
92 
93  f_multi = fContext.ManageMulticast(ContextRef::mcst_Register, f_multi_gid, f_multi_lid);
94 
95  DOUT3("Init multicast group LID:%x %s", f_multi_lid, ConvertGidToStr(f_multi_gid).c_str());
96 
97  if (!f_multi) return false;
98 
99  f_ud_ah = fContext.CreateMAH(f_multi_gid, f_multi_lid);
100  if (f_ud_ah==0) return false;
101 
102  f_ud_qpn = VERBS_MCAST_QPN;
103  f_ud_qkey = VERBS_DEFAULT_QKEY;
104 
105  f_multi_attch = tr->IsInputTransport();
106 
107  if (f_multi_attch)
108  if (!QP()->AttachMcast(&f_multi_gid, f_multi_lid)) return false;
109 
110  return true;
111 }
112 
113 
114 
115 void verbs::VerbsNetworkInetrface::AllocateNet(unsigned fulloutputqueue, unsigned fullinputqueue)
116 {
118 
119  if (tr==0) return;
120 
121  DOUT3("++++ verbs::VerbsNetworkInetrface::AllocateNet tr = %p poolname %s", tr, tr->TransportPoolName().c_str());
122 
124 
125  if (!pool.null()) {
126  fPoolReg = fContext.RegisterPool(pool());
127 // dabc::mgr()->RegisterDependency(this, fPoolReg());
128  } else {
129  EOUT("Cannot make verbs transport without memory pool");
130  return;
131  }
132 
133  if (tr->NumRecs()>0) {
134  fHeadersPool = new MemoryPool(fContext, "HeadersPool", tr->NumRecs(),
135  tr->GetFullHeaderSize() + (IsUD() ? VERBS_UD_MEMADDON : 0), IsUD(), true);
136 
137  // we use at least 2 segments per operation, one for header and one for buffer itself
138  fSegmPerOper = fQP->NumSendSegs();
139  if (fSegmPerOper<2) fSegmPerOper = 2;
140 
141  f_rwr = new ibv_recv_wr [tr->NumRecs()];
142  f_swr = new ibv_send_wr [tr->NumRecs()];
143  f_sge = new ibv_sge [tr->NumRecs()*fSegmPerOper];
144 
145  for (uint32_t n=0;n<tr->NumRecs();n++) {
146 
147  tr->SetRecHeader(n, fHeadersPool->GetSendBufferLocation(n));
148 
149  for (unsigned seg_cnt=0; seg_cnt<fSegmPerOper; seg_cnt++) {
150  unsigned nseg = n*fSegmPerOper + seg_cnt;
151  f_sge[nseg].addr = (uintptr_t) 0; // must be specified later
152  f_sge[nseg].length = 0; // must be specified later
153  f_sge[nseg].lkey = 0; // must be specified later
154  }
155 
156  f_swr[n].wr_id = 0; // must be set later
157  f_swr[n].sg_list = 0;
158  f_swr[n].num_sge = 1;
159  f_swr[n].opcode = IBV_WR_SEND;
160  f_swr[n].next = NULL;
161  f_swr[n].send_flags = IBV_SEND_SIGNALED;
162 
163  f_rwr[n].wr_id = 0; // must be set later
164  f_rwr[n].sg_list = 0;
165  f_rwr[n].num_sge = 1;
166  f_rwr[n].next = NULL;
167  }
168  }
169 
170  fInitOk = true;
171 }
172 
174 {
176  if (tr==0) return;
177 
178  uint32_t segid = recid*fSegmPerOper;
179  int senddtyp = tr->PackHeader(recid);
180 
181  dabc::NetworkTransport::NetIORec* rec = tr->GetRec(recid);
182 
183  if ((rec==0) || (f_sge==0)) {
184  EOUT("Did not found rec %p or f_sge %p - abort", rec, f_sge);
185  exit(7);
186  }
187 
188  f_sge[segid].addr = (uintptr_t) rec->header;
189  f_sge[segid].length = tr->GetFullHeaderSize();
190  f_sge[segid].lkey = fHeadersPool->GetLkey(recid);
191 
192  f_swr[recid].wr_id = recid;
193  f_swr[recid].sg_list = &(f_sge[segid]);
194  f_swr[recid].num_sge = 1;
195  f_swr[recid].opcode = IBV_WR_SEND;
196  f_swr[recid].next = NULL;
197  f_swr[recid].send_flags = IBV_SEND_SIGNALED;
198 
199  if (f_ud_ah) {
200  f_swr[recid].wr.ud.ah = f_ud_ah;
201  f_swr[recid].wr.ud.remote_qpn = f_ud_qpn;
202  f_swr[recid].wr.ud.remote_qkey = f_ud_qkey;
203  }
204 
205  if ((senddtyp==2) && !fPoolReg.null()) {
206 
207  if (rec->buf.NumSegments() >= fSegmPerOper) {
208  EOUT("Too many segments");
209  exit(147);
210  }
211 
212  // FIXME: dangerous, acquire memory pool mutex when transport mutex is locked
213  fPoolReg()->CheckMRStructure();
214 
215  for (unsigned seg=0;seg<rec->buf.NumSegments();seg++) {
216  f_sge[segid+1+seg].addr = (uintptr_t) rec->buf.SegmentPtr(seg);
217  f_sge[segid+1+seg].length = rec->buf.SegmentSize(seg);
218  f_sge[segid+1+seg].lkey = fPoolReg()->GetLkey(rec->buf.SegmentId(seg));
219  }
220 
221  f_swr[recid].num_sge += rec->buf.NumSegments();
222  }
223 
224  if ((f_swr[recid].num_sge==1) && (f_sge[segid].length<=256))
225  // try to send small portion of data as inline
226  f_swr[recid].send_flags = (ibv_send_flags) (IBV_SEND_SIGNALED | IBV_SEND_INLINE);
227 
228  fQP->Post_Send(&(f_swr[recid]));
229 }
230 
232 {
234  if (tr==0) return;
235 
236  uint32_t segid = recid*fSegmPerOper;
237 
238  dabc::NetworkTransport::NetIORec* rec = tr->GetRec(recid);
239 
240  f_rwr[recid].wr_id = recid;
241  f_rwr[recid].sg_list = &(f_sge[segid]);
242  f_rwr[recid].num_sge = 1;
243  f_rwr[recid].next = NULL;
244 
245  if (IsUD()) {
246  f_sge[segid].addr = (uintptr_t) fHeadersPool->GetBufferLocation(recid);
247  f_sge[segid].length = tr->GetFullHeaderSize() + VERBS_UD_MEMADDON;
248  } else {
249  f_sge[segid].addr = (uintptr_t) rec->header;
250  f_sge[segid].length = tr->GetFullHeaderSize();
251  }
252  f_sge[segid].lkey = fHeadersPool->GetLkey(recid);
253 
254  if (!rec->buf.null() && !fPoolReg.null()) {
255 
256  if (rec->buf.NumSegments() >= fSegmPerOper) {
257  EOUT("Too many segments");
258  exit(146);
259  }
260 
261  fPoolReg()->CheckMRStructure();
262 
263  for (unsigned seg=0;seg<rec->buf.NumSegments();seg++) {
264  f_sge[segid+1+seg].addr = (uintptr_t) rec->buf.SegmentPtr(seg);
265  f_sge[segid+1+seg].length = rec->buf.SegmentSize(seg);
266  f_sge[segid+1+seg].lkey = fPoolReg()->GetLkey(rec->buf.SegmentId(seg));
267  }
268 
269  f_rwr[recid].num_sge += rec->buf.NumSegments();
270  }
271 
272  fQP->Post_Recv(&(f_rwr[recid]));
273 }
274 
275 
276 
278 {
280 
281  if (tr) tr->ProcessSendCompl(arg);
282 }
283 
285 {
287 
288  if (tr) tr->ProcessRecvCompl(arg);
289 }
290 
292 {
293  EOUT("Verbs error");
294 }
#define VERBS_DEFAULT_QKEY
Definition: QueuePair.h:18
#define VERBS_UD_MEMADDON
Definition: QueuePair.h:20
#define VERBS_MCAST_QPN
Definition: QueuePair.h:19
unsigned NumSegments() const
Returns number of segment in buffer.
Definition: Buffer.h:163
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
Definition: Buffer.h:174
unsigned SegmentId(unsigned n=0) const
Returns id of the segment, no any boundary checks.
Definition: Buffer.h:168
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Definition: Buffer.h:171
Reference FindPool(const std::string &name)
Definition: Manager.cxx:2064
Reference on dabc::MemoryPool class
Definition: MemoryPool.h:245
Network interface.
Network transport.
std::string TransportPoolName() const
Provides name of memory pool, used by transport.
void SetRecHeader(uint32_t recid, void *header)
NetIORec * GetRec(unsigned id) const
unsigned GetFullHeaderSize() const
void ProcessRecvCompl(uint32_t recid)
void ProcessSendCompl(uint32_t recid)
int PackHeader(uint32_t recid)
unsigned NumRecs() const
static void Destroy(Object *obj)
User method for object destroyment.
Definition: Object.cxx:921
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
bool IsInputTransport() const
Definition: Transport.h:94
virtual long Notify(const std::string &, int)
Light-weight command interface, which can be used from worker.
Definition: Worker.h:82
Reference to verbs::Context
Definition: Context.h:74
int ManageMulticast(int action, ibv_gid &mgid, uint16_t &mlid)
Definition: Context.cxx:316
Represent VERBS queue pair functionality.
Definition: QueuePair.h:37
bool DetachMcast(ibv_gid *mgid, uint16_t mlid)
Definition: QueuePair.cxx:290
struct ibv_send_wr * f_swr
Definition: Transport.h:54
virtual void VerbsProcessSendCompl(uint32_t)
Definition: Transport.cxx:277
virtual void AllocateNet(unsigned fulloutputqueue, unsigned fullinputqueue)
Definition: Transport.cxx:115
void SetUdAddr(struct ibv_ah *ud_ah, uint32_t ud_qpn, uint32_t ud_qkey)
Definition: Transport.cxx:75
virtual void SubmitSend(uint32_t recid)
Definition: Transport.cxx:173
virtual void VerbsProcessOperError(uint32_t)
Definition: Transport.cxx:291
virtual void VerbsProcessRecvCompl(uint32_t)
Definition: Transport.cxx:284
virtual void SubmitRecv(uint32_t recid)
Definition: Transport.cxx:231
VerbsNetworkInetrface(verbs::ContextRef ctx, QueuePair *qp)
Definition: Transport.cxx:24
verbs::ContextRef fContext
Definition: Transport.h:49
struct ibv_recv_wr * f_rwr
Definition: Transport.h:53
virtual long Notify(const std::string &, int)
Light-weight command interface, which can be used from worker.
Definition: Transport.cxx:68
bool AssignMultiGid(ibv_gid *multi_gid)
Definition: Transport.cxx:82
struct ibv_ah * f_ud_ah
Definition: Transport.h:58
struct ibv_sge * f_sge
Definition: Transport.h:55
Addon for VERBS thread
Definition: Worker.h:39
QueuePair * QP() const
Definition: Worker.h:63
QueuePair * fQP
Definition: Worker.h:45
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
ManagerRef mgr
Definition: Manager.cxx:42
Support of InfiniBand verbs.
Definition: Device.cxx:54
std::string ConvertGidToStr(ibv_gid &gid)
Definition: Device.cxx:496