28 void CPU_ZERO(cpu_set_t *arg) { arg->flag = 0; }
30 void CPU_SET(
int cpu, cpu_set_t *arg) { arg->flag |= (1<<cpu); }
32 bool CPU_ISSET(
int cpu, cpu_set_t *arg) {
return arg->flag & (1<<cpu); }
34 void CPU_CLR(
int cpu, cpu_set_t *arg) { arg->flag = arg->flag & ~(1<<cpu); }
36 int sched_getaffinity(
int,
int, cpu_set_t* set) { set->flag = 0xFFFF;
return 0; }
38 int sched_setaffinity(
int,
int, cpu_set_t*) {
return 0; }
47 pthread_mutexattr_t attr;
48 pthread_mutexattr_init(&attr);
49 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
50 pthread_mutex_init(&
fMutex, &attr);
51 pthread_mutexattr_destroy(&attr);
53 pthread_mutex_init(&
fMutex, 0);
59 return pthread_mutex_trylock(&fMutex) != EBUSY;
65 int res = pthread_mutex_trylock(&fMutex);
66 if (res==EBUSY)
return true;
67 pthread_mutex_unlock(&fMutex);
75 return fMutex ? pthread_mutex_trylock(fMutex) != EBUSY :
false;
81 if (fMutex==0)
return false;
82 int res = pthread_mutex_trylock(fMutex);
83 if (res==EBUSY)
return true;
84 pthread_mutex_unlock(fMutex);
93 fCondMutex(ext_mtx ? ext_mtx : &fInternCondMutex),
97 pthread_cond_init(&
fCond, 0);
102 pthread_cond_destroy(&fCond);
115 if (fFiredCounter==0) {
116 if (wait_seconds < 0.) {
118 pthread_cond_wait(&fCond, &fCondMutex->fMutex);
121 if (wait_seconds > 0.) {
123 gettimeofday(&tp, 0);
125 long wait_microsec = long(wait_seconds*1e6);
127 tp.tv_sec += (wait_microsec + tp.tv_usec) / 1000000;
128 tp.tv_usec = (wait_microsec + tp.tv_usec) % 1000000;
130 struct timespec tsp = { tp.tv_sec, tp.tv_usec*1000 };
133 pthread_cond_timedwait(&fCond, &fCondMutex->fMutex, &tsp);
138 if (fFiredCounter > 0) {
150 DOUT5(
"dabc::Runnable::~Runnable destructor %p",
this);
172 pthread_cleanup_pop(1);
185 CPU_ZERO(&fSpecialSet);
188 if ((aff==0) || (*aff==0))
return true;
190 if ((*aff==
'-') && (strlen(aff)>1)) {
191 unsigned numspecial(0);
192 if (!
str_to_uint(aff+1, &numspecial) || (numspecial==0)) {
193 EOUT(
"Wrong default affinity format %s", aff);
197 int res = sched_getaffinity(0,
sizeof(fDfltSet), &fDfltSet);
200 EOUT(
"sched_getaffinity res = %d", res);
205 for (
int cpu=0;cpu<CPU_SETSIZE;cpu++)
206 if (CPU_ISSET(cpu, &fDfltSet)) numset++;
208 if (numset<=numspecial) {
209 EOUT(
"Cannot reduce affinity on %u processors - only %u assigned for process", numspecial, numset);
214 for (
int cpu=0;cpu<CPU_SETSIZE;cpu++)
215 if (CPU_ISSET(cpu, &fDfltSet)) {
216 if (++cnt>numset-numspecial) {
217 CPU_CLR(cpu, &fDfltSet);
218 CPU_SET(cpu, &fSpecialSet);
222 res = sched_setaffinity(0,
sizeof(fDfltSet), &fDfltSet);
223 if (res!=0) {
EOUT(
"sched_setaffinity failed res = %d", res);
return false; }
228 if ((*aff==
'o') || (*aff==
'x') || (*aff==
's')) {
230 const char* curr = aff;
233 while ((*curr!=0) && (cpu<CPU_SETSIZE)) {
235 case 'x': CPU_SET(cpu, &fDfltSet); isany =
true;
break;
236 case 'o': CPU_CLR(cpu, &fDfltSet);
break;
237 case 's': CPU_SET(cpu, &fSpecialSet);
break;
238 default:
EOUT(
"Wrong default affinity format %s", aff);
return false;
244 int res = sched_setaffinity(0,
sizeof(fDfltSet), &fDfltSet);
245 if (res!=0) {
EOUT(
"sched_setaffinity failed res = %d", res);
return false; }
255 EOUT(
"Wrong default affinity format %s", aff);
259 if (mask==0)
return true;
261 for (
unsigned cpu = 0; (cpu <
sizeof(mask)*8) && (cpu<CPU_SETSIZE); cpu++)
262 if ((mask & (1 << cpu)) != 0)
263 CPU_SET(cpu, &fDfltSet);
265 int res = sched_setaffinity(0,
sizeof(fDfltSet), &fDfltSet);
266 if (res!=0) {
EOUT(
"sched_setaffinity failed res = %d", res);
return false; }
277 for (
unsigned cpu=0;cpu<CPU_SETSIZE;cpu++)
290 if ((aff==0) || (*aff==0)) {
291 for (
unsigned cpu=0;cpu<CPU_SETSIZE;cpu++)
292 if (CPU_ISSET(cpu, &fDfltSet))
293 CPU_SET(cpu, &fCpuSet);
297 if ((*aff==
'+') && (strlen(aff)>1)) {
299 unsigned specialid(0), numspecial(0);
301 EOUT(
"Wrong affinity format %s", aff);
305 for (
unsigned cpu=0;cpu<CPU_SETSIZE;cpu++)
306 if (CPU_ISSET(cpu, &fSpecialSet)) {
307 if (specialid == numspecial++)
308 CPU_SET(cpu, &fCpuSet);
311 if (specialid >= numspecial) {
312 EOUT(
"Where are only %u special processors, cannot assigned id %u", numspecial, specialid);
319 if ((*aff==
'o') || (*aff==
'x')) {
321 const char* curr = aff;
324 while ((*curr!=0) && (cpu<CPU_SETSIZE)) {
326 case 'x': CPU_SET(cpu, &fCpuSet); isany =
true;
break;
327 case 'o': CPU_CLR(cpu, &fCpuSet);
break;
328 default:
EOUT(
"Wrong affinity format %s", aff);
return false;
333 if (!isany) {
EOUT(
"Wrong affinity format %s", aff);
return false; }
341 EOUT(
"Wrong affinity format %s", aff);
346 EOUT(
"Zero affinity mask specified %s", aff);
350 for (
unsigned cpu = 0; (cpu <
sizeof(mask)*8) && (cpu<CPU_SETSIZE); cpu++)
351 if ((mask & (1 << cpu)) != 0)
352 CPU_SET(cpu, &fCpuSet);
363 for (
unsigned cpu=0;cpu<CPU_SETSIZE;cpu++)
364 if (CPU_ISSET(cpu, &fCpuSet)) isany =
true;
370 pthread_attr_init(&attr);
372 #if !defined(__MACH__) && _POSIX_C_SOURCE >= 200112L
373 pthread_attr_setaffinity_np(&attr,
sizeof(cpu_set_t), &fCpuSet);
378 pthread_attr_destroy(&attr);
386 pthread_create(&fThrd, NULL, func, args);
392 pthread_join(fThrd, &res);
397 struct sched_param thread_param;
399 memset(&thread_param, 0,
sizeof(thread_param));
400 thread_param.sched_priority = prio;
401 ret = pthread_setschedparam(fThrd, (prio>0) ? SCHED_FIFO : SCHED_OTHER,
404 EOUT(
"pthread_setschedparam ret = %d %d %d %d %d\n", ret, (ret==EPERM), (ret==ESRCH), (ret==EINVAL), (ret==EFAULT));
409 pthread_kill(fThrd, sig);
414 pthread_cancel(fThrd);
419 #if !defined(__MACH__)
420 pthread_setname_np(fThrd, thrdname);
428 if (maxbuf==0)
return false;
430 for (
unsigned cpu=0;cpu<CPU_SETSIZE;cpu++) {
432 if (CPU_ISSET(cpu, &fDfltSet)) symb =
'x';
else
433 if (CPU_ISSET(cpu, &fSpecialSet)) symb =
's';
435 if (symb!=
'o') last = cpu;
436 if (cpu<maxbuf) buf[cpu] = symb;
439 if (last+1 < maxbuf) {
440 unsigned wrap = (last / 8 + 1) * 8;
441 if (wrap < maxbuf) buf[wrap] = 0;
442 else buf[last+1] = 0;
454 if (maxbuf==0)
return false;
457 cpu_set_t *arg =
nullptr;
464 #if !defined(__MACH__) && _POSIX_C_SOURCE >= 200112L
467 s = pthread_getattr_np(pthread_self(), &attr);
468 if (s != 0) {
EOUT(
"pthread_getattr_np failed");
return false; }
470 s = pthread_attr_getaffinity_np(&attr,
sizeof(cpu_set_t), &mask);
471 if (s != 0)
EOUT(
"pthread_attr_getaffinity_np failed");
473 s = pthread_attr_destroy(&attr);
474 if (s != 0)
EOUT(
"pthread_attr_destroy failed");
485 for (
unsigned cpu=0;cpu<CPU_SETSIZE;cpu++) {
487 if (CPU_ISSET(cpu, arg)) symb =
'x';
488 if (symb!=
'o') last = cpu;
489 if (cpu<maxbuf) buf[cpu] = symb;
492 if (last+1 < maxbuf) {
493 unsigned wrap = (last / 8 + 1) * 8;
494 if (wrap < maxbuf) buf[wrap] = 0;
495 else buf[last+1] = 0;
Condition(Mutex *ext_mtx=nullptr)
bool _DoWait(double wait_seconds)
Mutex(bool recursive=false)
static bool SetDfltAffinity(const char *aff=nullptr)
Sets default affinity for next threads to be created and for main process.
cpu_set_t fCpuSet
affinity property of the thread
void Start(Runnable *run)
Start thread with provided runnable.
bool SetAffinity(const char *aff)
Sets affinity mask for the thread.
void Join()
Join thread - method waits until thread execution is completed.
bool GetAffinity(bool actual, char *buf, unsigned maxbuf)
Provides thread affinity in form of "xxxooooo".
void Kill(int sig=9)
Kill thread with specified signal.
static cpu_set_t fDfltSet
default affinity for new thread
void Cancel()
Try to cancel thread execution.
static bool GetDfltAffinity(char *buf, unsigned maxbuf)
Returns default affinity mask in form "xxxooosss".
static cpu_set_t fSpecialSet
set of processors, which can be used for special threads
void SetPriority(int prio)
Change thread priority.
void SetThreadName(const char *thrdname)
Set thread name, which can be seen from htop.
Object which could be run inside the dabc::PosixThread
virtual void * MainLoop()=0
virtual void RunnableCancelled()
bool str_to_uint(const char *val, unsigned *res)
Convert string to unsigned integer value One could use hexadecimal (in form 0xabc100) or decimal form...
void CleanupRunnable(void *abc)
void * StartTRunnable(void *abc)