DABC (Data Acquisition Backbone Core)  2.9.9
threads.cxx
Go to the documentation of this file.
1 // $Id: threads.cxx 4477 2020-04-15 14:19:58Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/threads.h"
17 
18 #include <signal.h>
19 #include <sys/time.h>
20 #include <cerrno>
21 #include <cstring>
22 
23 
24 #if defined(__MACH__) /* Apple OSX section */
25 
26 // try to provide dummy wrapper for all using functions around affinity
27 
28 void CPU_ZERO(cpu_set_t *arg) { arg->flag = 0; }
29 
30 void CPU_SET(int cpu, cpu_set_t *arg) { arg->flag |= (1<<cpu); }
31 
32 bool CPU_ISSET(int cpu, cpu_set_t *arg) { return arg->flag & (1<<cpu); }
33 
34 void CPU_CLR(int cpu, cpu_set_t *arg) { arg->flag = arg->flag & ~(1<<cpu); }
35 
36 int sched_getaffinity(int, int, cpu_set_t* set) { set->flag = 0xFFFF; return 0; }
37 
38 int sched_setaffinity(int, int, cpu_set_t*) { return 0; }
39 
40 #endif
41 
42 
43 
44 dabc::Mutex::Mutex(bool recursive)
45 {
46  if (recursive) {
47  pthread_mutexattr_t attr;
48  pthread_mutexattr_init(&attr);
49  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
50  pthread_mutex_init(&fMutex, &attr);
51  pthread_mutexattr_destroy(&attr);
52  } else {
53  pthread_mutex_init(&fMutex, 0);
54  }
55 }
56 
58 {
59  return pthread_mutex_trylock(&fMutex) != EBUSY;
60 }
61 
62 
64 {
65  int res = pthread_mutex_trylock(&fMutex);
66  if (res==EBUSY) return true;
67  pthread_mutex_unlock(&fMutex);
68  return false;
69 }
70 
71 //_____________________________________________________________
72 
74 {
75  return fMutex ? pthread_mutex_trylock(fMutex) != EBUSY : false;
76 }
77 
78 
80 {
81  if (fMutex==0) return false;
82  int res = pthread_mutex_trylock(fMutex);
83  if (res==EBUSY) return true;
84  pthread_mutex_unlock(fMutex);
85  return false;
86 }
87 
88 
89 //_____________________________________________________________
90 
92  fInternCondMutex(),
93  fCondMutex(ext_mtx ? ext_mtx : &fInternCondMutex),
94  fFiredCounter(0),
95  fWaiting(false)
96 {
97  pthread_cond_init(&fCond, 0);
98 }
99 
101 {
102  pthread_cond_destroy(&fCond);
103 }
104 
105 bool dabc::Condition::_DoWait(double wait_seconds)
106 {
107  // mutex must be already locked at this point
108 
109  // meaning of argument
110  // wait_seconds > 0 - time to wait
111  // wait_seconds = 0 - do not wait, just check if condition was fired
112  // wait_seconds < 0 - wait forever until condition is fired
113 
114 
115  if (fFiredCounter==0) {
116  if (wait_seconds < 0.) {
117  fWaiting = true;
118  pthread_cond_wait(&fCond, &fCondMutex->fMutex);
119  fWaiting = false;
120  } else
121  if (wait_seconds > 0.) {
122  struct timeval tp;
123  gettimeofday(&tp, 0);
124 
125  long wait_microsec = long(wait_seconds*1e6);
126 
127  tp.tv_sec += (wait_microsec + tp.tv_usec) / 1000000;
128  tp.tv_usec = (wait_microsec + tp.tv_usec) % 1000000;
129 
130  struct timespec tsp = { tp.tv_sec, tp.tv_usec*1000 };
131 
132  fWaiting = true;
133  pthread_cond_timedwait(&fCond, &fCondMutex->fMutex, &tsp);
134  fWaiting = false;
135  }
136  }
137 
138  if (fFiredCounter > 0) {
139  fFiredCounter--;
140  return true;
141  }
142 
143  return false;
144 }
145 
146 //_____________________________________________________________
147 
149 {
150  DOUT5("dabc::Runnable::~Runnable destructor %p", this);
151 }
152 
153 extern "C" void CleanupRunnable(void* abc)
154 {
155  dabc::Runnable *run = (dabc::Runnable*) abc;
156 
157  if (run!=0) run->RunnableCancelled();
158 }
159 
160 extern "C" void* StartTRunnable(void* abc)
161 {
162  dabc::Runnable *run = (dabc::Runnable*) abc;
163 
164  void* res = 0;
165 
166  if (run!=0) {
167 
168  pthread_cleanup_push(CleanupRunnable, abc);
169 
170  res = run->MainLoop();
171 
172  pthread_cleanup_pop(1);
173  }
174 
175  pthread_exit(res);
176 }
177 
178 // ====================================================================
179 
182 
184 {
185  CPU_ZERO(&fSpecialSet);
186  CPU_ZERO(&fDfltSet);
187 
188  if ((aff==0) || (*aff==0)) return true;
189 
190  if ((*aff=='-') && (strlen(aff)>1)) {
191  unsigned numspecial(0);
192  if (!str_to_uint(aff+1, &numspecial) || (numspecial==0)) {
193  EOUT("Wrong default affinity format %s", aff);
194  return false;
195  }
196 
197  int res = sched_getaffinity(0, sizeof(fDfltSet), &fDfltSet);
198 
199  if (res!=0) {
200  EOUT("sched_getaffinity res = %d", res);
201  return false;
202  }
203 
204  unsigned numset(0);
205  for (int cpu=0;cpu<CPU_SETSIZE;cpu++)
206  if (CPU_ISSET(cpu, &fDfltSet)) numset++;
207 
208  if (numset<=numspecial) {
209  EOUT("Cannot reduce affinity on %u processors - only %u assigned for process", numspecial, numset);
210  return false;
211  }
212 
213  unsigned cnt(0);
214  for (int cpu=0;cpu<CPU_SETSIZE;cpu++)
215  if (CPU_ISSET(cpu, &fDfltSet)) {
216  if (++cnt>numset-numspecial) {
217  CPU_CLR(cpu, &fDfltSet);
218  CPU_SET(cpu, &fSpecialSet);
219  }
220  }
221 
222  res = sched_setaffinity(0, sizeof(fDfltSet), &fDfltSet);
223  if (res!=0) { EOUT("sched_setaffinity failed res = %d", res); return false; }
224  return true;
225  }
226 
227 
228  if ((*aff=='o') || (*aff=='x') || (*aff=='s')) {
229  unsigned cpu = 0;
230  const char* curr = aff;
231  bool isany(false);
232 
233  while ((*curr!=0) && (cpu<CPU_SETSIZE)) {
234  switch (*curr) {
235  case 'x': CPU_SET(cpu, &fDfltSet); isany = true; break;
236  case 'o': CPU_CLR(cpu, &fDfltSet); break;
237  case 's': CPU_SET(cpu, &fSpecialSet); break;
238  default: EOUT("Wrong default affinity format %s", aff); return false;
239  }
240  curr++; cpu++;
241  }
242 
243  if (isany) {
244  int res = sched_setaffinity(0, sizeof(fDfltSet), &fDfltSet);
245  if (res!=0) { EOUT("sched_setaffinity failed res = %d", res); return false; }
246  }
247 
248  return true;
249 
250  }
251 
252  unsigned mask(0);
253 
254  if (!str_to_uint(aff, &mask)) {
255  EOUT("Wrong default affinity format %s", aff);
256  return false;
257  }
258 
259  if (mask==0) return true;
260 
261  for (unsigned cpu = 0; (cpu < sizeof(mask)*8) && (cpu<CPU_SETSIZE); cpu++)
262  if ((mask & (1 << cpu)) != 0)
263  CPU_SET(cpu, &fDfltSet);
264 
265  int res = sched_setaffinity(0, sizeof(fDfltSet), &fDfltSet);
266  if (res!=0) { EOUT("sched_setaffinity failed res = %d", res); return false; }
267 
268  return true;
269 }
270 
271 
273  fThrd(),
274  fCpuSet()
275 {
276  CPU_ZERO(&fCpuSet);
277  for (unsigned cpu=0;cpu<CPU_SETSIZE;cpu++)
278  if (CPU_ISSET(cpu, &fDfltSet))
279  CPU_SET(cpu, &fCpuSet);
280 }
281 
283 {
284 }
285 
286 bool dabc::PosixThread::SetAffinity(const char* aff)
287 {
288  CPU_ZERO(&fCpuSet);
289 
290  if ((aff==0) || (*aff==0)) {
291  for (unsigned cpu=0;cpu<CPU_SETSIZE;cpu++)
292  if (CPU_ISSET(cpu, &fDfltSet))
293  CPU_SET(cpu, &fCpuSet);
294  return true;
295  }
296 
297  if ((*aff=='+') && (strlen(aff)>1)) {
298 
299  unsigned specialid(0), numspecial(0);
300  if (!str_to_uint(aff+1, &specialid)) {
301  EOUT("Wrong affinity format %s", aff);
302  return false;
303  }
304 
305  for (unsigned cpu=0;cpu<CPU_SETSIZE;cpu++)
306  if (CPU_ISSET(cpu, &fSpecialSet)) {
307  if (specialid == numspecial++)
308  CPU_SET(cpu, &fCpuSet);
309  }
310 
311  if (specialid >= numspecial) {
312  EOUT("Where are only %u special processors, cannot assigned id %u", numspecial, specialid);
313  return false;
314  }
315 
316  return true;
317  }
318 
319  if ((*aff=='o') || (*aff=='x')) {
320  unsigned cpu = 0;
321  const char* curr = aff;
322  bool isany(false);
323 
324  while ((*curr!=0) && (cpu<CPU_SETSIZE)) {
325  switch (*curr) {
326  case 'x': CPU_SET(cpu, &fCpuSet); isany = true; break;
327  case 'o': CPU_CLR(cpu, &fCpuSet); break;
328  default: EOUT("Wrong affinity format %s", aff); return false;
329  }
330  curr++; cpu++;
331  }
332 
333  if (!isany) { EOUT("Wrong affinity format %s", aff); return false; }
334 
335  return true;
336  }
337 
338  unsigned mask(0);
339 
340  if (!str_to_uint(aff, &mask)) {
341  EOUT("Wrong affinity format %s", aff);
342  return false;
343  }
344 
345  if (mask==0) {
346  EOUT("Zero affinity mask specified %s", aff);
347  return false;
348  }
349 
350  for (unsigned cpu = 0; (cpu < sizeof(mask)*8) && (cpu<CPU_SETSIZE); cpu++)
351  if ((mask & (1 << cpu)) != 0)
352  CPU_SET(cpu, &fCpuSet);
353 
354  return true;
355 }
356 
357 
359 {
360  if (run==0) return;
361 
362  bool isany(false);
363  for (unsigned cpu=0;cpu<CPU_SETSIZE;cpu++)
364  if (CPU_ISSET(cpu, &fCpuSet)) isany = true;
365 
366  if (!isany) {
367  pthread_create(&fThrd, NULL, StartTRunnable, run);
368  } else {
369  pthread_attr_t attr;
370  pthread_attr_init(&attr);
371 
372 #if !defined(__MACH__) && _POSIX_C_SOURCE >= 200112L
373  pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &fCpuSet);
374 #endif
375 
376  pthread_create(&fThrd, &attr, StartTRunnable, run);
377 
378  pthread_attr_destroy(&attr);
379  }
380 }
381 
382 void dabc::PosixThread::Start(StartRoutine* func, void* args)
383 {
384  if (func==0) return;
385 
386  pthread_create(&fThrd, NULL, func, args);
387 }
388 
390 {
391  void* res = 0;
392  pthread_join(fThrd, &res);
393 }
394 
396 {
397  struct sched_param thread_param;
398  int ret = 0;
399  memset(&thread_param, 0, sizeof(thread_param));
400  thread_param.sched_priority = prio;
401  ret = pthread_setschedparam(fThrd, (prio>0) ? SCHED_FIFO : SCHED_OTHER,
402  &thread_param);
403  if (ret!=0)
404  EOUT("pthread_setschedparam ret = %d %d %d %d %d\n", ret, (ret==EPERM), (ret==ESRCH), (ret==EINVAL), (ret==EFAULT));
405 }
406 
408 {
409  pthread_kill(fThrd, sig);
410 }
411 
413 {
414  pthread_cancel(fThrd);
415 }
416 
417 void dabc::PosixThread::SetThreadName(const char *thrdname)
418 {
419 #if !defined(__MACH__)
420  pthread_setname_np(fThrd, thrdname);
421 #endif
422 }
423 
424 bool dabc::PosixThread::GetDfltAffinity(char* buf, unsigned maxbuf)
425 {
426  unsigned last(0);
427 
428  if (maxbuf==0) return false;
429 
430  for (unsigned cpu=0;cpu<CPU_SETSIZE;cpu++) {
431  char symb = 'o';
432  if (CPU_ISSET(cpu, &fDfltSet)) symb = 'x'; else
433  if (CPU_ISSET(cpu, &fSpecialSet)) symb = 's';
434 
435  if (symb!='o') last = cpu;
436  if (cpu<maxbuf) buf[cpu] = symb;
437  }
438 
439  if (last+1 < maxbuf) {
440  unsigned wrap = (last / 8 + 1) * 8;
441  if (wrap < maxbuf) buf[wrap] = 0;
442  else buf[last+1] = 0;
443  return true;
444  }
445 
446  // it was not enough place to keep all active-special threads
447  buf[maxbuf-1] = 0;
448  return false;
449 }
450 
451 bool dabc::PosixThread::GetAffinity(bool actual, char* buf, unsigned maxbuf)
452 {
453 
454  if (maxbuf==0) return false;
455 
456  cpu_set_t mask;
457  cpu_set_t *arg = nullptr;
458  CPU_ZERO(&mask);
459 
460  if (!actual) {
461  arg = &fCpuSet;
462  } else {
463 
464 #if !defined(__MACH__) && _POSIX_C_SOURCE >= 200112L
465  int s;
466  pthread_attr_t attr;
467  s = pthread_getattr_np(pthread_self(), &attr);
468  if (s != 0) { EOUT("pthread_getattr_np failed"); return false; }
469 
470  s = pthread_attr_getaffinity_np(&attr, sizeof(cpu_set_t), &mask);
471  if (s != 0) EOUT("pthread_attr_getaffinity_np failed");
472 
473  s = pthread_attr_destroy(&attr);
474  if (s != 0) EOUT("pthread_attr_destroy failed");
475 
476  arg = &mask;
477 #else
478  buf[0] = 0;
479  return false;
480 #endif
481  }
482 
483  unsigned last(0);
484 
485  for (unsigned cpu=0;cpu<CPU_SETSIZE;cpu++) {
486  char symb = 'o';
487  if (CPU_ISSET(cpu, arg)) symb = 'x';
488  if (symb!='o') last = cpu;
489  if (cpu<maxbuf) buf[cpu] = symb;
490  }
491 
492  if (last+1 < maxbuf) {
493  unsigned wrap = (last / 8 + 1) * 8;
494  if (wrap < maxbuf) buf[wrap] = 0;
495  else buf[last+1] = 0;
496  return true;
497  }
498 
499  // it was not enough place to keep all active-special threads
500  buf[maxbuf-1] = 0;
501  return false;
502 }
503 
virtual ~Condition()
Definition: threads.cxx:100
Condition(Mutex *ext_mtx=nullptr)
Definition: threads.cxx:91
pthread_cond_t fCond
Definition: threads.h:265
bool _DoWait(double wait_seconds)
Definition: threads.cxx:105
bool TryLock()
Definition: threads.cxx:73
bool IsLocked()
Definition: threads.cxx:79
posix pthread mutex
Definition: threads.h:61
bool TryLock()
Definition: threads.cxx:57
bool IsLocked()
Definition: threads.cxx:63
Mutex(bool recursive=false)
Definition: threads.cxx:44
pthread_mutex_t fMutex
Definition: threads.h:67
static bool SetDfltAffinity(const char *aff=nullptr)
Sets default affinity for next threads to be created and for main process.
Definition: threads.cxx:183
cpu_set_t fCpuSet
affinity property of the thread
Definition: threads.h:343
void Start(Runnable *run)
Start thread with provided runnable.
Definition: threads.cxx:358
bool SetAffinity(const char *aff)
Sets affinity mask for the thread.
Definition: threads.cxx:286
void Join()
Join thread - method waits until thread execution is completed.
Definition: threads.cxx:389
bool GetAffinity(bool actual, char *buf, unsigned maxbuf)
Provides thread affinity in form of "xxxooooo".
Definition: threads.cxx:451
void Kill(int sig=9)
Kill thread with specified signal.
Definition: threads.cxx:407
virtual ~PosixThread()
Definition: threads.cxx:282
static cpu_set_t fDfltSet
default affinity for new thread
Definition: threads.h:344
void Cancel()
Try to cancel thread execution.
Definition: threads.cxx:412
static bool GetDfltAffinity(char *buf, unsigned maxbuf)
Returns default affinity mask in form "xxxooosss".
Definition: threads.cxx:424
static cpu_set_t fSpecialSet
set of processors, which can be used for special threads
Definition: threads.h:345
void SetPriority(int prio)
Change thread priority.
Definition: threads.cxx:395
void SetThreadName(const char *thrdname)
Set thread name, which can be seen from htop.
Definition: threads.cxx:417
Object which could be run inside the dabc::PosixThread
Definition: threads.h:320
virtual void * MainLoop()=0
virtual void RunnableCancelled()
Definition: threads.h:326
virtual ~Runnable()
Definition: threads.cxx:148
#define DOUT5(args ...)
Definition: logging.h:188
#define EOUT(args ...)
Definition: logging.h:150
bool str_to_uint(const char *val, unsigned *res)
Convert string to unsigned integer value One could use hexadecimal (in form 0xabc100) or decimal form...
Definition: string.cxx:184
void CleanupRunnable(void *abc)
Definition: threads.cxx:153
void * StartTRunnable(void *abc)
Definition: threads.cxx:160