19 #include <infiniband/verbs.h>
31 #ifndef VERBS_USING_PIPE
73 virtual const char*
ClassName()
const {
return "verbs::TimeoutWorker"; }
94 fWaitStatus(wsWorking),
100 fChannel = ibv_create_comp_channel(fContext.context());
102 EOUT(
"Cannot create completion channel - HALT");
107 fWCs =
new struct ibv_wc[fWCSize];
109 #ifdef VERBS_USING_PIPE
117 fLoopBackQP =
new QueuePair(fContext, IBV_QPT_RC,
121 if (!fLoopBackQP->Connect(fContext.lid(), fLoopBackQP->qp_num(), fLoopBackQP->local_psn())) {
122 EOUT(
"fLoopBackQP CONNECTION FAILED");
126 fLoopBackPool =
new MemoryPool(fContext,
"LoopBackPool", 2, 16,
false);
130 DOUT3(
"Verbs thread %s %p is created", GetName(),
this);
148 DOUT2(
"verbs::Thread::CloseThread() %s", GetName());
153 EOUT(
"Bad idea - close thread from itself");
155 DOUT2(
"verbs::Thread::CloseThread() %s - did stop", GetName());
158 #ifdef VERBS_USING_PIPE
168 if (fTimeout) {
delete fTimeout; fTimeout =
nullptr; }
169 delete fLoopBackQP; fLoopBackQP =
nullptr;
170 delete fLoopBackPool; fLoopBackPool =
nullptr;
173 if (fMainCQ) {
delete fMainCQ; fMainCQ =
nullptr; }
175 ibv_destroy_comp_channel(fChannel);
183 DOUT2(
"verbs::Thread::CloseThread() %s done", GetName());
188 if (fMainCQ!=0)
return fMainCQ;
190 fMainCQ =
new ComplQueue(fContext, 10000, fChannel);
197 if (cmd.
IsName(
"EnableFastModus")) {
198 fFastModus = cmd.
GetInt(
"PoolingCounter", 1000);
201 if (cmd.
IsName(
"DisableFastModus")) {
211 DOUT4(
"verbs::Thread %s ::_Fire %s status %d", GetName(), evnt.
asstring().c_str(), fWaitStatus);
213 _PushEvent(evnt, nq);
214 if (fWaitStatus == wsWaiting)
215 #ifdef VERBS_USING_PIPE
217 write(fPipe[1],
"w", 1);
218 fWaitStatus = wsFired;
222 fLoopBackQP->Post_Recv(fLoopBackPool->GetRecvWR(0));
223 fLoopBackQP->Post_Send(fLoopBackPool->GetSendWR(1, 0));
225 fWaitStatus = wsFired;
241 if (_TotalNumberOfEvents() > 0) {
243 if (!fCheckNewEvents)
return _GetNextEvent(evid);
249 fWaitStatus = wsWaiting;
252 struct ibv_cq *ev_cq = 0;
255 if ((fFastModus>0) && (fMainCQ!=0))
256 for (
int n=0;n<fFastModus;n++) {
257 nevents = ibv_poll_cq(fMainCQ->cq(), fWCSize, fWCs);
259 ev_cq = fMainCQ->cq();
266 #ifdef VERBS_USING_PIPE
268 struct pollfd ufds[2];
270 ufds[0].fd = fChannel->fd;
271 ufds[0].events = POLLIN;
274 ufds[1].fd = fPipe[0];
275 ufds[1].events = POLLIN;
278 int tmout = tmout_sec < 0 ? -1 : int(tmout_sec*1000.);
280 DOUT5(
"VerbsThrd:%s start poll tmout:%d", GetName(), tmout);
282 int res = poll(ufds, 2, tmout);
284 DOUT5(
"VerbsThrd:%s did poll res:%d", GetName(), res);
287 if ((res <= 0) || (ufds[0].revents == 0)) {
290 if (fWaitStatus == wsFired) {
292 read(fPipe[0], &sbuf, 1);
295 fWaitStatus = wsWorking;
297 return _GetNextEvent(evid);
304 fTimeout =
new TimeoutWorker(
this);
305 fTimeout->AssignToThread(
dabc::mgr()->thread(),
false);
307 fTimeout->DoTimeout(tmout_sec);
316 if (ibv_get_cq_event(fChannel, &ev_cq, &ev_ctx)) {
317 EOUT(
"ERROR when waiting for cq event");
320 ibv_req_notify_cq(ev_cq, 0);
323 ComplQueueContext* cq_ctx = (ComplQueueContext*) ev_ctx;
324 if (cq_ctx->own_cq != ev_cq) {
325 EOUT(
"Mismatch in cq context");
329 cq_ctx->events_get++;
330 if (cq_ctx->events_get>1000000) {
331 ibv_ack_cq_events(ev_cq, cq_ctx->events_get);
332 cq_ctx->events_get = 0;
335 ibv_ack_cq_events(ev_cq, 1);
341 #ifdef VERBS_USING_PIPE
342 if (fWaitStatus == wsFired) {
344 read(fPipe[0], &sbuf, 1);
348 fWaitStatus = wsWorking;
354 nevents = ibv_poll_cq(ev_cq, fWCSize, fWCs);
356 if (nevents<=0)
break;
358 struct ibv_wc* wc = fWCs;
360 while (nevents-->0) {
362 uint32_t procid = fMap[wc->qp_num];
366 if (wc->status != IBV_WC_SUCCESS) {
368 EOUT(
"Verbs error %s isrecv %s operid %u", StatusStr(wc->status),
DBOOL(wc->opcode & IBV_WC_RECV), wc->wr_id);
371 if (wc->opcode & IBV_WC_RECV)
381 IncWorkerFiredEvents(fWorkers[procid]->work);
382 #ifdef VERBS_USING_PIPE
386 if (fLoopBackQP->qp_num()==wc->qp_num)
396 fCheckNewEvents =
false;
397 _PushEvent(evntEnableCheck, 1);
400 return _GetNextEvent(evid);
406 case evntEnableCheck:
407 fCheckNewEvents =
true;
422 DOUT5(
"WorkersNumberChanged started size:%u", fWorkers.size());
426 for (
unsigned indx=0;indx<fWorkers.size();indx++) {
429 DOUT5(
"Test processor %u: work %p addon %p", indx, fWorkers[indx]->work, addon);
431 if ((addon==0) || (addon->
QP()==0))
continue;
435 if (fWorkers[indx]->work->WorkerId() != indx) {
436 EOUT(
"Mismatch of worker id");
441 fCheckNewEvents =
true;
443 DOUT5(
"WorkersNumberChanged finished");
452 # define VERBSstatus(x) { # x, x }
479 for (
unsigned i = 0; i <
sizeof(verbs_status)/
sizeof(verbs_status[0]); i++)
480 if (verbs_status[i].value == code)
481 return verbs_status[i].name;
483 return "Invalid_VERBS_Status_Code";
Represents command with its arguments.
int GetInt(const std::string &name, int dflt=0) const
Lock guard for posix mutex.
const char * GetName() const
Returns name of the object, thread safe
Reference on the arbitrary object
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Represent thread functionality.
virtual bool CompatibleClass(const std::string &clname) const
void FireDoNothingEvent()
virtual int ExecuteThreadCommand(Command cmd)
virtual void ProcessExtraThreadEvent(const EventId &)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
Active object, which is working inside dabc::Thread.
bool ActivateTimeout(double tmout_sec)
Method used to produce timeout events in the worker.
virtual void OnThreadAssigned()
Worker(const ConstructorPair &pair)
Special constructor, designed for inherited classes.
bool HasThread() const
Indicates if pointer on thread is not zero; thread-safe.
Wrapper for IB VERBS completion queue
Reference to verbs::Context
virtual int ExecuteThreadCommand(dabc::Command cmd)
virtual void _Fire(const dabc::EventId &evnt, int nq)
virtual bool CompatibleClass(const std::string &clname) const
virtual bool WaitEvent(dabc::EventId &evid, double tmout)
virtual void WorkersSetChanged()
Virtual method, called from thread context to inform that number of workers are changed.
virtual void ProcessExtraThreadEvent(const dabc::EventId &)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
static const char * StatusStr(int code)
Timeout producer for verbs::Thread when pipe cannot be used.
virtual const char * ClassName() const
Returns class name of the object instance.
TimeoutWorker(Thread *thrd)
virtual double ProcessTimeout(double)
void DoTimeout(double tmout)
Support of InfiniBand verbs.
const int LoopBackQueueSize
Event structure, exchanged between DABC threads.
std::string asstring() const