DABC (Data Acquisition Backbone Core)  2.9.9
ComplQueue.cxx
Go to the documentation of this file.
1 /************************************************************
2  * The Data Acquisition Backbone Core (DABC) *
3  ************************************************************
4  * Copyright (C) 2009 - *
5  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
6  * Planckstr. 1, 64291 Darmstadt, Germany *
7  * Contact: http://dabc.gsi.de *
8  ************************************************************
9  * This software can be used under the GPL license *
10  * agreements as stated in LICENSE.txt file *
11  * which is part of the distribution. *
12  ************************************************************/
13 
14 #include "verbs/ComplQueue.h"
15 
16 #include "dabc/logging.h"
17 #include "dabc/timing.h"
18 
19 #include <poll.h>
20 #include <cmath>
21 #include <cerrno>
22 
24  struct ibv_comp_channel *channel, bool useownchannel) :
25  fContext(ctx),
26  f_cq(0),
27  f_channel(channel),
28  f_ownchannel(false)
29 {
30  if ((f_channel==0) && useownchannel) {
31  f_ownchannel = true;
32  f_channel = ibv_create_comp_channel(fContext.context());
33  }
34 
35  if (f_channel==0) {
36  EOUT("Completion channel not specified ???");
37  return;
38  }
39 
40  f_cq = ibv_create_cq(fContext.context(), size, &fCQContext, f_channel, 0);
41  if (f_cq==0)
42  EOUT("Couldn't allocate completion queue (CQ)");
43 
45  fCQContext.itself = this;
47 
48  ibv_req_notify_cq(f_cq, 0);
49 }
50 
51 
53 {
54  AcknoledgeEvents();
55 
56  if (ibv_destroy_cq(f_cq))
57  EOUT("Fail to destroy CQ");
58 
59  f_cq = 0;
60 
61  if (f_ownchannel)
62  ibv_destroy_comp_channel(f_channel);
63  f_channel = 0;
64 }
65 
67 {
68  // return 0 if no events
69  // 1 if something exist
70  // 2 error
71 
72  int ne = ibv_poll_cq(cq(), 1, &f_wc);
73 
74  if (ne==0) return 0;
75 
76  if (ne<0) {
77  EOUT("ibv_poll_cq error");
78  return 2;
79  }
80 
81  if (f_wc.status != IBV_WC_SUCCESS) {
82  EOUT("Completion error=%d %s wr_id=%llu syndrom 0x%x qpnum=%x src_qp=%x",
83  f_wc.status, GetStrError(f_wc.status), f_wc.wr_id,
84  f_wc.vendor_err, f_wc.qp_num, f_wc.src_qp);
85  return 2;
86  }
87 
88  return 1;
89 }
90 
91 int verbs::ComplQueue::Wait(double timeout, double fasttm)
92 {
93  int res = Poll();
94 
95  if ((res!=0) || (timeout<=0.) || (f_channel==0)) return res;
96 
97  dabc::TimeStamp now = dabc::Now();
98  dabc::TimeStamp finish = now + timeout;
99  dabc::TimeStamp fastfinish = now + fasttm;
100 
101  bool is_event(false);
102 
103  while (now < finish) {
104 
105  if (now < fastfinish) {
106  // polling some portion of time in the beginning
107  res = Poll();
108  if (res!=0) return res;
109  } else {
110 
111  int timeout_ms = lrint((finish-now)*1000);
112 
113  if (timeout_ms<0) { EOUT("Negative timeout!!!"); timeout_ms = 0; }
114 
115  // no need to wait while no timeout is remaining
116  // if (timeout_ms==0) return Poll();
117 
118  timeout_ms = 1;
119 
120  struct pollfd ufds;
121 
122  ufds.fd = f_channel->fd;
123  ufds.events = POLLIN;
124  ufds.revents = 0;
125 
126  int status = poll(&ufds, 1, timeout_ms);
127 
128  if (status==0) return Poll();
129 
130  if (status>0) { is_event = true; break; }
131 
132  if ((status==-1) && (errno != EINTR)) {
133  EOUT("Error when waiting IB event");
134  return 2;
135  }
136 
137 /*
138  int timeout_micros = lrint((finish-now)*1000000);
139  if (timeout_micros<0) { EOUT("Negative timeout!!!"); timeout_micros = 0; }
140  timeout_micros = 0;
141 
142  fd_set rfds;
143  struct timeval tv;
144 
145  FD_ZERO(&rfds);
146  FD_SET(f_channel->fd, &rfds);
147 
148  tv.tv_sec = 0;
149  tv.tv_usec = timeout_micros;
150 
151  int retval = select(1, &rfds, NULL, NULL, &tv);
152 
153  if (retval<0) {
154  EOUT("Error when waiting IB event");
155  return 2;
156  }
157 
158  if (retval==0) return Poll();
159 
160  break;
161 */
162  }
163 
164  now = dabc::Now();
165  }
166 
167 // DOUT((3,"After wait revents = %d expects %d",ufds.revents,ufds.events));
168 
169  if (!is_event) return Poll();
170 
171  struct ibv_cq *ev_cq;
172  ComplQueueContext *ev_ctx(0);
173  if (ibv_get_cq_event(f_channel, &ev_cq, (void**)&ev_ctx)) {
174  EOUT("Failed to get cq_event");
175  return 2;
176  }
177 
178  if ((ev_ctx==0) || (ev_ctx->own_cq != ev_cq)) {
179  EOUT("Error with getting context after ibv_get_cq_event");
180  return 2;
181  }
182 
183  // instead of acknowledging every event, do it very rearly
184  // ibv_ack_cq_events(ev_cq, 1);
185 
186  if (ev_ctx->events_get++ > 1000) {
187  ibv_ack_cq_events(ev_cq, ev_ctx->events_get);
188  ev_ctx->events_get = 0;
189  }
190 
191  if (ibv_req_notify_cq(ev_cq, 0)) {
192  EOUT("Couldn't request CQ notification");
193  return 2;
194  }
195 
196  return Poll();
197 }
198 
200 {
201  if (fCQContext.events_get > 0) {
202  ibv_ack_cq_events(f_cq, fCQContext.events_get);
203  fCQContext.events_get = 0;
204  }
205 }
206 
207 const char* verbs::ComplQueue::GetStrError(int err)
208 {
209  struct discr_t {
210  int code;
211  const char* name;
212  };
213 
214  static discr_t errors[] = {
215  { code:IBV_WC_LOC_LEN_ERR, name:"IBV_WC_LOC_LEN_ERR" },
216  { code:IBV_WC_LOC_QP_OP_ERR, name:"IBV_WC_LOC_QP_OP_ERR" },
217  { code:IBV_WC_LOC_EEC_OP_ERR, name:"IBV_WC_LOC_EEC_OP_ERR" },
218  { code:IBV_WC_LOC_PROT_ERR, name:"IBV_WC_LOC_PROT_ERR" },
219  { code:IBV_WC_WR_FLUSH_ERR, name:"IBV_WC_WR_FLUSH_ERR" },
220  { code:IBV_WC_MW_BIND_ERR, name:"IBV_WC_MW_BIND_ERR" },
221  { code:IBV_WC_BAD_RESP_ERR, name:"IBV_WC_BAD_RESP_ERR" },
222  { code:IBV_WC_LOC_ACCESS_ERR, name:"IBV_WC_LOC_ACCESS_ERR" },
223  { code:IBV_WC_REM_INV_REQ_ERR, name:"IBV_WC_REM_INV_REQ_ERR" },
224  { code:IBV_WC_REM_ACCESS_ERR, name:"IBV_WC_REM_ACCESS_ERR" },
225  { code:IBV_WC_REM_OP_ERR, name:"IBV_WC_REM_OP_ERR" },
226  { code:IBV_WC_RETRY_EXC_ERR, name:"IBV_WC_RETRY_EXC_ERR" },
227  { code:IBV_WC_RNR_RETRY_EXC_ERR, name:"IBV_WC_RNR_RETRY_EXC_ERR" },
228  { code:IBV_WC_LOC_RDD_VIOL_ERR, name:"IBV_WC_LOC_RDD_VIOL_ERR" },
229  { code:IBV_WC_REM_INV_RD_REQ_ERR, name:"IBV_WC_REM_INV_RD_REQ_ERR" },
230  { code:IBV_WC_REM_ABORT_ERR, name:"IBV_WC_REM_ABORT_ERR" },
231  { code:IBV_WC_INV_EECN_ERR, name:"IBV_WC_INV_EECN_ERR" },
232  { code:IBV_WC_INV_EEC_STATE_ERR, name:"IBV_WC_INV_EEC_STATE_ERR" },
233  { code:IBV_WC_FATAL_ERR, name:"IBV_WC_FATAL_ERR" },
234  { code:IBV_WC_RESP_TIMEOUT_ERR, name:"IBV_WC_RESP_TIMEOUT_ERR" },
235  { code:IBV_WC_GENERAL_ERR, name:"IBV_WC_GENERAL_ERR" },
236  { code:0, name:0 }
237  };
238 
239  for (const discr_t* d = errors; d->name!=0; d++) {
240  if (err==d->code) return d->name;
241  }
242 
243  return "noerror";
244 }
245 
ComplQueueContext fCQContext
Definition: ComplQueue.h:44
struct ibv_cq * f_cq
Definition: ComplQueue.h:40
ComplQueue(ContextRef ctx, int size, struct ibv_comp_channel *channel, bool use_own_channel=false)
Definition: ComplQueue.cxx:23
int Wait(double timeout, double fasttm=0.)
Definition: ComplQueue.cxx:91
struct ibv_comp_channel * f_channel
Definition: ComplQueue.h:41
static const char * GetStrError(int err)
Definition: ComplQueue.cxx:207
virtual ~ComplQueue()
Definition: ComplQueue.cxx:52
ContextRef fContext
Definition: ComplQueue.h:38
Reference to verbs::Context
Definition: Context.h:74
struct ibv_context * context() const
Definition: Context.h:87
#define EOUT(args ...)
Definition: logging.h:150
TimeStamp Now()
Definition: timing.h:260
Class for acquiring and holding timestamps.
Definition: timing.h:40
Context object for completion queue operations.
Definition: ComplQueue.h:25
struct ibv_cq * own_cq
Definition: ComplQueue.h:28