DABC (Data Acquisition Backbone Core)  2.9.9
Thread.cxx
Go to the documentation of this file.
1 /************************************************************
2  * The Data Acquisition Backbone Core (DABC) *
3  ************************************************************
4  * Copyright (C) 2009 - *
5  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
6  * Planckstr. 1, 64291 Darmstadt, Germany *
7  * Contact: http://dabc.gsi.de *
8  ************************************************************
9  * This software can be used under the GPL license *
10  * agreements as stated in LICENSE.txt file *
11  * which is part of the distribution. *
12  ************************************************************/
13 
14 #include "verbs/Thread.h"
15 
16 #include <sys/poll.h>
17 #include <unistd.h>
18 
19 #include <infiniband/verbs.h>
20 
21 #include "dabc/logging.h"
22 #include "dabc/Manager.h"
23 #include "dabc/Command.h"
24 
25 #include "verbs/Device.h"
26 #include "verbs/QueuePair.h"
27 #include "verbs/ComplQueue.h"
28 #include "verbs/MemoryPool.h"
29 #include "verbs/Worker.h"
30 
31 #ifndef VERBS_USING_PIPE
32 
33 // ____________________________________________________________________
34 
35 namespace verbs {
36 
39  class TimeoutWorker : public dabc::Worker {
40 
41  protected:
43  double fLastTm;
44 
45  public:
47  dabc::Worker(),
48  fVerbsThrd(thrd),
49  fLastTm(-1)
50  {
51  }
52 
54  {
57  }
58 
59 
60  void DoTimeout(double tmout)
61  {
62  fLastTm = tmout;
63  if (HasThread())
64  ActivateTimeout(tmout);
65  }
66 
67  virtual double ProcessTimeout(double)
68  {
70  return -1;
71  }
72 
73  virtual const char* ClassName() const { return "verbs::TimeoutWorker"; }
74 
75  };
76 
77 }
78 
79 #endif
80 
81 // ____________________________________________________________________
82 
83 verbs::Thread::Thread(dabc::Reference parent, const std::string &name, dabc::Command cmd, ContextRef ctx) :
84  dabc::Thread(parent, name, cmd),
85  fContext(ctx),
86  fChannel(0),
87 #ifndef VERBS_USING_PIPE
88  fLoopBackQP(0),
89  fLoopBackPool(0),
90  fLoopBackCnt(0),
91  fTimeout(0),
92 #endif
93  fMainCQ(0),
94  fWaitStatus(wsWorking),
95  fWCSize(0),
96  fWCs(0),
97  fFastModus(0),
98  fCheckNewEvents(true)
99 {
100  fChannel = ibv_create_comp_channel(fContext.context());
101  if (fChannel==0) {
102  EOUT("Cannot create completion channel - HALT");
103  exit(143);
104  }
105 
106  fWCSize = 128;
107  fWCs = new struct ibv_wc[fWCSize];
108 
109  #ifdef VERBS_USING_PIPE
110 
111  fPipe[0] = 0;
112  fPipe[1] = 0;
113  pipe(fPipe);
114 
115  #else
116 
117  fLoopBackQP = new QueuePair(fContext, IBV_QPT_RC,
118  MakeCQ(), LoopBackQueueSize, 1,
119  MakeCQ(), LoopBackQueueSize, 1);
120 
121  if (!fLoopBackQP->Connect(fContext.lid(), fLoopBackQP->qp_num(), fLoopBackQP->local_psn())) {
122  EOUT("fLoopBackQP CONNECTION FAILED");
123  exit(144);
124  }
125 
126  fLoopBackPool = new MemoryPool(fContext, "LoopBackPool", 2, 16, false);
127 
128  #endif
129 
130  DOUT3("Verbs thread %s %p is created", GetName(), this);
131 }
132 
134 {
135  CloseThread();
136 
137  DOUT3("Verbs thread %p %s destroyed", this, GetName());
138 }
139 
140 bool verbs::Thread::CompatibleClass(const std::string &clname) const
141 {
142  if (dabc::Thread::CompatibleClass(clname)) return true;
143  return clname == verbs::typeThread;
144 }
145 
147 {
148  DOUT2("verbs::Thread::CloseThread() %s", GetName());
149 
150  if (!IsItself())
151  Stop(2.);
152  else
153  EOUT("Bad idea - close thread from itself");
154 
155  DOUT2("verbs::Thread::CloseThread() %s - did stop", GetName());
156 
157 
158  #ifdef VERBS_USING_PIPE
159  if (fPipe[0] != 0) {
160  close(fPipe[0]);
161  fPipe[0] = 0;
162  }
163  if (fPipe[1] != 0) {
164  close(fPipe[1]);
165  fPipe[1] = 0;
166  }
167  #else
168  if (fTimeout) { delete fTimeout; fTimeout = nullptr; }
169  delete fLoopBackQP; fLoopBackQP = nullptr;
170  delete fLoopBackPool; fLoopBackPool = nullptr;
171  #endif
172 
173  if (fMainCQ) { delete fMainCQ; fMainCQ = nullptr; }
174 
175  ibv_destroy_comp_channel(fChannel);
176 
177  if (fWCs) {
178  delete [] fWCs;
179  fWCs = nullptr;
180  fWCSize = 0;
181  }
182 
183  DOUT2("verbs::Thread::CloseThread() %s done", GetName());
184 }
185 
187 {
188  if (fMainCQ!=0) return fMainCQ;
189 
190  fMainCQ = new ComplQueue(fContext, 10000, fChannel);
191 
192  return fMainCQ;
193 }
194 
196 {
197  if (cmd.IsName("EnableFastModus")) {
198  fFastModus = cmd.GetInt("PoolingCounter", 1000);
199  return dabc::cmd_true;
200  } else
201  if (cmd.IsName("DisableFastModus")) {
202  fFastModus = 0;
203  return dabc::cmd_true;
204  }
205 
207 }
208 
209 void verbs::Thread::_Fire(const dabc::EventId& evnt, int nq)
210 {
211  DOUT4("verbs::Thread %s ::_Fire %s status %d", GetName(), evnt.asstring().c_str(), fWaitStatus);
212 
213  _PushEvent(evnt, nq);
214  if (fWaitStatus == wsWaiting)
215  #ifdef VERBS_USING_PIPE
216  {
217  write(fPipe[1],"w", 1);
218  fWaitStatus = wsFired;
219  }
220  #else
221  if (fLoopBackCnt<LoopBackQueueSize*2) {
222  fLoopBackQP->Post_Recv(fLoopBackPool->GetRecvWR(0));
223  fLoopBackQP->Post_Send(fLoopBackPool->GetSendWR(1, 0));
224  fLoopBackCnt+=2;
225  fWaitStatus = wsFired;
226  }
227  #endif
228 }
229 
230 bool verbs::Thread::WaitEvent(dabc::EventId& evid, double tmout_sec)
231 {
232 
233 // if (tmout_sec>=0) EOUT("Non-empty timeout");
234 
235  {
236  dabc::LockGuard lock(ThreadMutex());
237 
238  // if we already have events in the queue,
239  // check if we take them out or first check if new verbs events there
240 
241  if (_TotalNumberOfEvents() > 0) {
242 
243  if (!fCheckNewEvents) return _GetNextEvent(evid);
244 
245  // we have events in the queue, therefore do not wait - just check new events
246  tmout_sec = 0.;
247  }
248 
249  fWaitStatus = wsWaiting;
250  }
251 
252  struct ibv_cq *ev_cq = 0;
253  int nevents = 0;
254 
255  if ((fFastModus>0) && (fMainCQ!=0))
256  for (int n=0;n<fFastModus;n++) {
257  nevents = ibv_poll_cq(fMainCQ->cq(), fWCSize, fWCs);
258  if (nevents>0) {
259  ev_cq = fMainCQ->cq();
260  break;
261  }
262  }
263 
264  if (nevents == 0) {
265 
266 #ifdef VERBS_USING_PIPE
267 
268  struct pollfd ufds[2];
269 
270  ufds[0].fd = fChannel->fd;
271  ufds[0].events = POLLIN;
272  ufds[0].revents = 0;
273 
274  ufds[1].fd = fPipe[0];
275  ufds[1].events = POLLIN;
276  ufds[1].revents = 0;
277 
278  int tmout = tmout_sec < 0 ? -1 : int(tmout_sec*1000.);
279 
280  DOUT5("VerbsThrd:%s start poll tmout:%d", GetName(), tmout);
281 
282  int res = poll(ufds, 2, tmout);
283 
284  DOUT5("VerbsThrd:%s did poll res:%d", GetName(), res);
285 
286  // if no events on the main channel
287  if ((res <= 0) || (ufds[0].revents == 0)) {
288 
289  dabc::LockGuard lock(ThreadMutex());
290  if (fWaitStatus == wsFired) {
291  char sbuf;
292  read(fPipe[0], &sbuf, 1);
293  }
294 
295  fWaitStatus = wsWorking;
296 
297  return _GetNextEvent(evid);
298  }
299 
300 #else
301 
302  if (tmout_sec>=0.) {
303  if (fTimeout==0) {
304  fTimeout = new TimeoutWorker(this);
305  fTimeout->AssignToThread(dabc::mgr()->thread(), false);
306  }
307  fTimeout->DoTimeout(tmout_sec);
308  }
309 
310 #endif
311 
312  void *ev_ctx = 0;
313 
314 // DOUT1("Call ibv_get_cq_event");
315 
316  if (ibv_get_cq_event(fChannel, &ev_cq, &ev_ctx)) {
317  EOUT("ERROR when waiting for cq event");
318  }
319 
320  ibv_req_notify_cq(ev_cq, 0);
321 
322  if(ev_ctx!=0) {
323  ComplQueueContext* cq_ctx = (ComplQueueContext*) ev_ctx;
324  if (cq_ctx->own_cq != ev_cq) {
325  EOUT("Mismatch in cq context");
326  exit(145);
327  }
328 
329  cq_ctx->events_get++;
330  if (cq_ctx->events_get>1000000) {
331  ibv_ack_cq_events(ev_cq, cq_ctx->events_get);
332  cq_ctx->events_get = 0;
333  }
334  } else
335  ibv_ack_cq_events(ev_cq, 1);
336 
337  } // nevents==0
338 
339  dabc::LockGuard lock(ThreadMutex());
340 
341 #ifdef VERBS_USING_PIPE
342  if (fWaitStatus == wsFired) {
343  char sbuf;
344  read(fPipe[0], &sbuf, 1);
345  }
346 #endif
347 
348  fWaitStatus = wsWorking;
349 
350  bool isany = false;
351 
352  while (true) {
353  if (nevents==0)
354  nevents = ibv_poll_cq(ev_cq, fWCSize, fWCs);
355 
356  if (nevents<=0) break;
357 
358  struct ibv_wc* wc = fWCs;
359 
360  while (nevents-->0) {
361 
362  uint32_t procid = fMap[wc->qp_num];
363 
364  if (procid!=0) {
365  uint16_t evnt = 0;
366  if (wc->status != IBV_WC_SUCCESS) {
368  EOUT("Verbs error %s isrecv %s operid %u", StatusStr(wc->status), DBOOL(wc->opcode & IBV_WC_RECV), wc->wr_id);
369  }
370  else
371  if (wc->opcode & IBV_WC_RECV)
373  else
375 
376  _PushEvent(dabc::EventId(evnt, procid, wc->wr_id), 1);
377 
378  isany = true;
379 
380  // FIXME: we should increase number of fired events by worker
381  IncWorkerFiredEvents(fWorkers[procid]->work);
382 #ifdef VERBS_USING_PIPE
383  }
384 #else
385  } else {
386  if (fLoopBackQP->qp_num()==wc->qp_num)
387  fLoopBackCnt--;
388  }
389 #endif
390  wc++;
391  }
392  }
393 
394  // we put additional event to enable again events checking after current events are processed
395  if (isany) {
396  fCheckNewEvents = false;
397  _PushEvent(evntEnableCheck, 1);
398  }
399 
400  return _GetNextEvent(evid);
401 }
402 
404 {
405  switch (evid.GetCode()) {
406  case evntEnableCheck:
407  fCheckNewEvents = true;
408  break;
409  default:
411  break;
412  }
413 }
414 
415 
416 
417 
419 {
420  // we do not need locks while fWorkers and fMap can be changed only inside the thread
421 
422  DOUT5("WorkersNumberChanged started size:%u", fWorkers.size());
423 
424  fMap.clear();
425 
426  for (unsigned indx=0;indx<fWorkers.size();indx++) {
427  verbs::WorkerAddon* addon = dynamic_cast<verbs::WorkerAddon*> (fWorkers[indx]->addon);
428 
429  DOUT5("Test processor %u: work %p addon %p", indx, fWorkers[indx]->work, addon);
430 
431  if ((addon==0) || (addon->QP()==0)) continue;
432 
433  fMap[addon->QP()->qp_num()] = indx;
434 
435  if (fWorkers[indx]->work->WorkerId() != indx) {
436  EOUT("Mismatch of worker id");
437  exit(44);
438  }
439  }
440 
441  fCheckNewEvents = true;
442 
443  DOUT5("WorkersNumberChanged finished");
444 }
445 
446 const char* verbs::Thread::StatusStr(int code)
447 {
448  static struct {
449  const char *name;
450  ibv_wc_status value;
451  } verbs_status[] = {
452 # define VERBSstatus(x) { # x, x }
453 
454  VERBSstatus (IBV_WC_SUCCESS),
455  VERBSstatus (IBV_WC_LOC_LEN_ERR),
456  VERBSstatus (IBV_WC_LOC_QP_OP_ERR),
457  VERBSstatus (IBV_WC_LOC_EEC_OP_ERR),
458  VERBSstatus (IBV_WC_LOC_PROT_ERR),
459  VERBSstatus (IBV_WC_WR_FLUSH_ERR),
460  VERBSstatus (IBV_WC_MW_BIND_ERR),
461  VERBSstatus (IBV_WC_BAD_RESP_ERR),
462  VERBSstatus (IBV_WC_LOC_ACCESS_ERR),
463  VERBSstatus (IBV_WC_REM_INV_REQ_ERR),
464  VERBSstatus (IBV_WC_REM_ACCESS_ERR),
465  VERBSstatus (IBV_WC_REM_OP_ERR),
466  VERBSstatus (IBV_WC_RETRY_EXC_ERR),
467  VERBSstatus (IBV_WC_RNR_RETRY_EXC_ERR),
468  VERBSstatus (IBV_WC_LOC_RDD_VIOL_ERR),
469  VERBSstatus (IBV_WC_REM_INV_RD_REQ_ERR),
470  VERBSstatus (IBV_WC_REM_ABORT_ERR),
471  VERBSstatus (IBV_WC_INV_EECN_ERR),
472  VERBSstatus (IBV_WC_INV_EEC_STATE_ERR),
473  VERBSstatus (IBV_WC_FATAL_ERR),
474  VERBSstatus (IBV_WC_RESP_TIMEOUT_ERR),
475  VERBSstatus (IBV_WC_GENERAL_ERR)
476 #undef VERBSstatus
477  };
478 
479  for (unsigned i = 0; i < sizeof(verbs_status)/sizeof(verbs_status[0]); i++)
480  if (verbs_status[i].value == code)
481  return verbs_status[i].name;
482 
483  return "Invalid_VERBS_Status_Code";
484 }
Represents command with its arguments.
Definition: Command.h:99
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Lock guard for posix mutex.
Definition: threads.h:127
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
Reference on the arbitrary object
Definition: Reference.h:73
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
friend class Thread
Definition: threads.h:321
Represent thread functionality.
Definition: Thread.h:109
virtual bool CompatibleClass(const std::string &clname) const
Definition: Thread.cxx:428
void FireDoNothingEvent()
Definition: Thread.cxx:826
virtual int ExecuteThreadCommand(Command cmd)
Definition: Thread.cxx:833
virtual void ProcessExtraThreadEvent(const EventId &)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
Definition: Thread.h:390
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
bool ActivateTimeout(double tmout_sec)
Method used to produce timeout events in the worker.
Definition: Worker.cxx:385
virtual void OnThreadAssigned()
Definition: Worker.h:392
Worker(const ConstructorPair &pair)
Special constructor, designed for inherited classes.
Definition: Worker.cxx:109
bool HasThread() const
Indicates if pointer on thread is not zero; thread-safe.
Definition: Worker.cxx:151
Wrapper for IB VERBS completion queue
Definition: ComplQueue.h:35
Reference to verbs::Context
Definition: Context.h:74
uint32_t qp_num() const
Definition: QueuePair.h:48
VERBS thread.
Definition: Thread.h:46
virtual int ExecuteThreadCommand(dabc::Command cmd)
Definition: Thread.cxx:195
virtual void _Fire(const dabc::EventId &evnt, int nq)
Definition: Thread.cxx:209
virtual bool CompatibleClass(const std::string &clname) const
Definition: Thread.cxx:140
virtual bool WaitEvent(dabc::EventId &evid, double tmout)
Definition: Thread.cxx:230
void CloseThread()
Definition: Thread.cxx:146
virtual void WorkersSetChanged()
Virtual method, called from thread context to inform that number of workers are changed.
Definition: Thread.cxx:418
virtual ~Thread()
Definition: Thread.cxx:133
virtual void ProcessExtraThreadEvent(const dabc::EventId &)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
Definition: Thread.cxx:403
static const char * StatusStr(int code)
Definition: Thread.cxx:446
ComplQueue * MakeCQ()
Definition: Thread.cxx:186
Timeout producer for verbs::Thread when pipe cannot be used.
Definition: Thread.cxx:39
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Thread.cxx:73
TimeoutWorker(Thread *thrd)
Definition: Thread.cxx:46
virtual double ProcessTimeout(double)
Definition: Thread.cxx:67
void DoTimeout(double tmout)
Definition: Thread.cxx:60
void OnThreadAssigned()
Definition: Thread.cxx:53
Thread * fVerbsThrd
Definition: Thread.cxx:42
Addon for VERBS thread
Definition: Worker.h:39
QueuePair * QP() const
Definition: Worker.h:63
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
Event manipulation API.
Definition: api.h:23
ManagerRef mgr
Definition: Manager.cxx:42
@ cmd_true
Definition: Command.h:38
Support of InfiniBand verbs.
Definition: Device.cxx:54
const char * typeThread
Definition: Context.cxx:36
const int LoopBackQueueSize
Definition: Device.cxx:45
#define VERBSstatus(x)
#define VERBS_USING_PIPE
Definition: Thread.h:31
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
std::string asstring() const
Definition: Thread.cxx:37
uint16_t GetCode() const
Definition: Thread.h:92