22 #include <sys/types.h>
23 #include <sys/syscall.h>
24 #include <sys/resource.h>
31 #ifdef DABC_EXTRA_CHECKS
32 unsigned dabc::Thread::maxlimit = 1000;
85 DOUT3(
"Destroy EXEC worker %p",
this);
97 virtual const char*
ClassName()
const {
return "Thread"; }
104 res =
fThread()->ExecuteThreadCommand(cmd);
111 virtual bool Find(ConfigIO &cfg)
160 std::vector<std::string> names;
162 for (
unsigned n=1;n<
fThread()->fWorkers.size();n++) {
164 if (work==0)
continue;
166 names.push_back(work->GetName());
176 double real_tm =
fThread()->fLastProfileTime.SpentTillNow(
true);
182 if (getrusage(RUSAGE_THREAD, &
usage) == 0) {
185 usage.ru_utime.tv_sec * 1. +
186 usage.ru_utime.tv_usec * 1e-6 +
187 usage.ru_stime.tv_sec * 1. +
188 usage.ru_stime.tv_usec * 1e-6;
190 run_tm = curr -
fThread()->fThreadRunTime;
191 fThread()->fThreadRunTime = curr;
195 if ((real_tm>0) && (run_tm>0)) {
196 double load = run_tm / real_tm;
197 if (load > 1) load = 1.;
212 Object(parent, name),
218 fWorkCond(fObjectMutex),
222 fProcessingTimeouts(0),
226 fDidDecRefCnt(false),
227 fCheckThrdCleanup(false),
236 SetFlag(flChildsHidden,
true);
238 DOUT3(
"---------- CNT:%2d Thread %s %p created", fThreadInstances, GetName(),
this);
240 fWorkers.push_back(
new WorkerRec(0,0));
242 fExec =
new ExecWorker(
this, cmd);
249 fQueues =
new EventsQueue[fNumQueues];
250 for (
int n=0;n<fNumQueues;n++) {
251 fQueues[n].Init(256);
252 fQueues[n].scaler = 8;
256 std::string affinity = fExec->Cfg(
xmlAffinity, cmd).AsStr();
258 if (!affinity.empty()) {
259 SetAffinity(affinity.c_str());
261 if (GetAffinity(
false, sbuf,
sizeof(sbuf)))
262 DOUT0(
"Thread %s specified affinity %s mask %s", GetName(), affinity.c_str(), sbuf);
266 if ((fThrdStopTimeout <= 0) && !
dabc::mgr.
null()) fThrdStopTimeout =
dabc::mgr()->cfg()->GetThrdStopTime();
267 if (fThrdStopTimeout <= 0) fThrdStopTimeout = 5.;
269 fWorkers.push_back(
new WorkerRec(fExec,0));
273 DOUT2(
"-------- THRD %s Constructed -------------- ", GetName());
300 if (exec->DirtyWorkaround())
301 EOUT(
"Execution instance for the thread %s is blocked - KEEP EXEC ALIVE AND FIX LATER",
GetName());
305 for (
unsigned n=0;n<
fWorkers.size();n++) {
315 EOUT(
"Kill thread in error state, nothing better can be done");
320 #ifdef DABC_EXTRA_CHECKS
321 unsigned totalsize = 0;
326 DOUT0(
"THRD %s Queue:%u Item:%u Event:%s",
GetName(), n, k, evnt.asstring().c_str());
330 EOUT(
"THRD %s %u events are not processed",
GetName(), totalsize);
344 work->fWorkerFiredEvents++;
349 unsigned totalsize = 0;
350 for (
int n=0;n<fNumQueues;n++)
351 totalsize+=fQueues[n].Size();
357 LockGuard lock(ThreadMutex());
358 return _TotalNumberOfEvents();
363 LockGuard lock(ThreadMutex());
365 if (!fCheckThrdCleanup)
return;
367 fCheckThrdCleanup =
false;
369 DOUT3(
"THREAD %s check cleanup", GetName());
373 unsigned new_size(fWorkers.size());
375 while ((new_size>1) && (fWorkers[new_size-1]->work==0)) {
377 delete fWorkers[new_size];
378 fWorkers[new_size] = 0;
381 DOUT3(
"THREAD %s oldsize %u newsize %u", GetName(), fWorkers.size(), new_size);
383 if (new_size==fWorkers.size())
return;
385 fWorkers.resize(new_size);
386 DOUT3(
"Thrd:%s Shrink processors size to %u normal state %s refcnt %d", GetName(), new_size,
DBOOL(_IsNormalState()), fObjectRefCnt);
390 if ((new_size==2) && _IsNormalState()) {
391 DOUT3(
"THREAD %s generate cleanup", GetName());
392 _Fire(EventId(evntCleanupThrd, 0, 0), priorityLowest);
407 for(
int nq=0; nq<fNumQueues; nq++)
408 if (fQueues[nq].Size()>0) {
409 if (--(fQueues[nq].scaler)>0) {
410 evnt = fQueues[nq].Pop();
413 fQueues[nq].scaler = 8;
418 for(
int nq=0; nq<fNumQueues; nq++)
419 if (fQueues[nq].Size()>0) {
420 evnt = fQueues[nq].Pop();
430 if (clname.empty())
return true;
440 DOUT3(
"*** Thrd:%s Starting MainLoop PROCID = %u THRDID %u", GetName(), (
unsigned) getpid(), (
unsigned) syscall(SYS_gettid));
442 while (fThrdWorking) {
444 DOUT5(
"*** Thrd:%s Checking timeouts", GetName());
446 tmout = CheckTimeouts();
448 DOUT5(
"*** Thrd:%s Check timeouts %5.3f", GetName(), tmout);
450 if (WaitEvent(evid, tmout)) {
452 DOUT5(
"*** Thrd:%s GetEvent %s", GetName(), evid.asstring().c_str());
456 DOUT5(
"*** Thrd:%s DidEvent %s", GetName(), evid.asstring().c_str());
460 if (fExplicitLoop!=0) RunExplicitLoop();
463 DOUT3(
"*** Thrd:%s Leaving MainLoop", GetName());
470 DOUT5(
"*** Thrd:%s SingleLoop user_tmout %5.3f", GetName(), tmout_user);
474 if ((workerid>0) && (workerid<fWorkers.size())) {
475 if (fWorkers[workerid]->doinghalt)
return false;
478 double tmout = CheckTimeouts();
480 if ((tmout<0) || (tmout_user<tmout)) tmout = tmout_user;
484 if (WaitEvent(evid, tmout))
495 EOUT(
"Cannot run thread %s event loop outer own thread", GetName());
500 EOUT(
"negative (endless) timeout specified - set default 0 sec (single event)");
506 while (fThrdWorking) {
510 SingleLoop(0, remain>0 ? remain : 0.);
512 if (remain <= 0)
break;
519 if (cmd.null() || (dest==0)) {
531 if (caller==0) caller = fExec;
537 EOUT(
"Cannot execute command in wrong thread context!!!");
549 bool exe_ready =
false;
551 cmd.AddCaller(caller, &exe_ready);
555 DOUT3(
"********** Calling ExecteIn in thread %s %p", GetName(),
this);
562 if (dest->Submit(cmd)) {
570 double tmout = cmd.TimeTillTimeout(0.5);
577 DOUT3(
"ExecuteIn - cmd:%s singleLoop proc %u time %4.1f", cmd.GetName(), caller->fWorkerId, ((tmout<=0) ? 0.1 : tmout));
579 if (!SingleLoop(caller->fWorkerId, (tmout<=0) ? 0.1 : tmout)) {
584 }
while (!exe_ready);
585 DOUT3(
"------------ Proc %p Cmd %s ready = %s", caller, cmd.GetName(),
DBOOL(exe_ready));
587 if (exe_ready) res = cmd.GetResult();
595 DOUT0(
"Worker %s refuse to submit command - we do it as well", dest->GetName());
601 cmd.RemoveCaller(caller, &exe_ready);
606 cmd.RemoveCaller(caller, &exe_ready);
611 DOUT3(
"------------ Proc %p Cmd %s res = %d", caller, cmd.GetName(), res);
622 bool needkill =
false;
624 LockGuard guard(ThreadMutex());
628 fRealThrd = real_thread;
633 EOUT(
"Restart from error state, may be dangerous");
634 needkill = fRealThrd;
635 fRealThrd = real_thread;
638 EOUT(
"Status is changing from other thread. Not supported");
641 EOUT(
"Forgot something???");
648 DOUT3(
"Thread %s starting kill:%s", GetName(),
DBOOL(needkill));
660 std::string tname = GetName();
661 if (tname.length()>15) tname.resize(15);
666 EOUT(
"Start thread without EXEC???");
669 res = fExec->Execute(
"ConfirmStart", timeout_sec) ==
cmd_true;
674 LockGuard guard(ThreadMutex());
675 fState = res ? stRunning : stError;
682 DOUT3(
"Thread cancelled %s", GetName());
684 LockGuard guard(ThreadMutex());
692 bool needstop =
false;
695 LockGuard guard(ThreadMutex());
700 needstop = fRealThrd && !IsItself();
705 EOUT(
"Stop from error state, do nothing");
708 EOUT(
"State is changing from other thread. Not supported");
711 EOUT(
"Forgot something???");
718 _Fire(EventId(evntStopThrd), priorityHighest);
723 DOUT3(
"Start doing thread %s stop with timeout %f s", GetName(),timeout_sec);
727 fThrdWorking =
false;
729 LockGuard guard(ThreadMutex());
733 if (timeout_sec<=0.) timeout_sec = 1.;
740 double spent_time(0.);
741 bool did_cancel(
false);
746 if ((spent_time > timeout_sec * 0.7) && !did_cancel) {
747 DOUT1(
"Cancel thread %s", GetName());
753 LockGuard guard(ThreadMutex());
754 if (fState == stStopped) { res =
true;
break; }
756 }
while ( (spent_time = tm1.SpentTillNow()) < timeout_sec);
758 if (!res)
EOUT(
"Cannot wait for join while stop was not succeeded");
762 LockGuard guard(ThreadMutex());
764 if (fState!=stStopped) {
766 fThrdWorking =
false;
776 return fExec->Execute(
"ConfirmSync", timeout_sec);
782 EOUT(
"Call from other thread - absolutely wrong");
786 if (fExplicitLoop!=0)
787 EOUT(
"Explicit loop is already set");
789 fExplicitLoop = proc->fWorkerId;
791 return fExplicitLoop == proc->fWorkerId;
796 if ((fExplicitLoop==0) || (fExplicitLoop>=fWorkers.size()))
return;
798 DOUT4(
"Enter RunExplicitMainLoop");
801 if (fWorkers[fExplicitLoop]->doinghalt)
return;
803 RecursionGuard iguard(
this, fExplicitLoop);
807 fWorkers[fExplicitLoop]->work->DoWorkerMainLoop();
810 if (e.
IsStop())
DOUT2(
"Worker %u stopped via exception", fExplicitLoop);
else
811 if (e.
IsTimeout())
DOUT2(
"Worker %u stopped via timeout", fExplicitLoop);
else
812 EOUT(
"Exception %s in processor %u", e.
what(), fExplicitLoop);
814 EOUT(
"Exception UNCKNOWN in processor %u", fExplicitLoop);
818 fWorkers[fExplicitLoop]->work->DoWorkerAfterMainLoop();
820 DOUT5(
"Exit from RunExplicitMainLoop");
830 Fire(EventId(evntDoNothing), priorityLowest);
835 DOUT2(
"Thread %s Execute command %s", GetName(), cmd.GetName());
837 if (cmd.IsName(
"ConfirmStart")) {
839 DOUT3(
"THRD:%s item %s", GetName(), ItemName().c_str());
842 fExec->ActivateTimeout(0.01);
846 if (cmd.IsName(
"ConfirmSync")) {
849 if (cmd.IsName(
"AddWorker")) {
851 Reference ref = cmd.GetRef(
"Worker");
852 Worker* worker = (Worker*) ref();
854 DOUT2(
"AddWorker %p in thrd %p", worker,
this);
858 if ((worker->fThread() !=
this) ||
859 (worker->fThreadMutex != ThreadMutex())) {
860 EOUT(
"Something went wrong - CRASH");
866 LockGuard guard(ThreadMutex());
870 fWorkers.push_back(
new WorkerRec(worker, worker->fAddon()));
873 worker->fWorkerId = fWorkers.size()-1;
875 worker->fWorkerActive =
true;
877 DOUT3(
"----------------THRD %s WORKER %p %s assigned ------------------ ", GetName(), worker, worker->GetName());
883 worker->InformThreadAssigned();
891 if (cmd.IsName(
"InvokeWorkerDestroy")) {
893 DOUT3(
"THRD:%s Request to destroy worker id %u", GetName(), cmd.GetUInt(
"WorkerId"));
895 return CheckWorkerCanBeHalted(cmd.GetUInt(
"WorkerId"), actDestroy, cmd);
898 if (cmd.IsName(
"HaltWorker")) {
900 DOUT3(
"THRD:%s Request to halt worker id %u", GetName(), cmd.GetUInt(
"WorkerId"));
902 return CheckWorkerCanBeHalted(cmd.GetUInt(
"WorkerId"), actHalt, cmd);
911 #ifdef DABC_EXTRA_CHECKS
913 EOUT(
"ALARM, recursion changed not from thread itself");
917 if ((
id>=fWorkers.size()) || (fWorkers[
id]->work==0))
return;
920 fWorkers[id]->recursion++;
922 fWorkers[id]->recursion--;
924 if ((fWorkers[
id]->recursion==0) && fWorkers[
id]->doinghalt)
925 CheckWorkerCanBeHalted(
id);
931 DOUT4(
"THRD:%s CheckWorkerCanBeHalted %u", GetName(),
id);
933 if ((
id>=fWorkers.size()) || (fWorkers[
id]->work==0)) {
934 DOUT3(
"THRD:%s Worker %u no longer exists", GetName(),
id);
939 fWorkers[id]->doinghalt |= request;
941 unsigned balance = 0;
944 LockGuard guard(ThreadMutex());
947 if (fWorkers[
id]->doinghalt)
948 fWorkers[id]->work->fWorkerActive =
false;
950 balance = fWorkers[id]->work->fWorkerFiredEvents - fWorkers[id]->processed;
953 DOUT4(
"THRD:%s CheckWorkerCanBeHalted %u doinghalt = %u", GetName(),
id, fWorkers[
id]->doinghalt);
955 if (fWorkers[
id]->doinghalt==0)
return cmd_false;
957 if ((fWorkers[
id]->recursion > 0) || (balance > 0)) {
958 DOUT2(
"THRD:%s ++++++++++++++++++++++ worker %p %s %s event balance %u fired:%u processed:%u recursion %d",
959 GetName(), fWorkers[
id]->work, fWorkers[
id]->work->GetName(), fWorkers[
id]->work->ClassName(),
960 balance, fWorkers[
id]->work->fWorkerFiredEvents, fWorkers[
id]->processed,
961 fWorkers[
id]->recursion);
962 if (!cmd.null()) fWorkers[id]->cmds.Push(cmd);
969 LockGuard guard(ThreadMutex());
976 DOUT5(
"Thrd:%s Remove record %u\n", GetName(),
id);
978 fWorkers[id] =
new WorkerRec(0, 0);
981 if (rec->work) rec->work->fWorkerId = 0;
984 DOUT4(
"THRD:%s CheckWorkerCanBeHalted %u rec = %p worker = %p", GetName(),
id, rec, rec ? rec->work : 0);
994 if (rec->work && rec->work->IsLogging())
995 DOUT0(
"Trying to destroy worker %p id %u via thread %s", rec->work,
id, GetName());
998 if (rec->work) rec->work->ClearThreadRef();
1001 if (rec->doinghalt & actDestroy) {
1002 if (rec->work && rec->work->DestroyCalledFromOwnThread())
1012 LockGuard guard(ThreadMutex());
1014 DOUT2(
"THRD:%s mark thread for cleanup check", GetName());
1017 fCheckThrdCleanup =
true;
1025 DOUT3(
"Thrd: %p %s Fire event code:%u item:%u arg:%u nq:%d NumQueues:%u",
this, GetName(), arg.GetCode(), arg.GetItem(), arg.GetArg(), nq, fNumQueues);
1027 _PushEvent(arg, nq);
1028 fWorkCond._DoFire();
1030 #ifdef DABC_EXTRA_CHECKS
1033 for (
int n=0;n<fNumQueues;n++) sum+=fQueues[n].Size();
1034 if (sum!=fWorkCond._FiredCounter()) {
1036 DOUT5(
"Thrd %s Error sum1 %ld cond %ld event %s",
1037 GetName(), sum, fWorkCond._FiredCounter(),
1038 arg.asstring().c_str());
1047 LockGuard lock(ThreadMutex());
1049 if (fWorkCond._DoWait(tmout))
1050 return _GetNextEvent(evid);
1058 uint16_t itemid = evnt.GetItem();
1060 DOUT5(
"*** Thrd:%s Event:%s q0:%u q1:%u q2:%u",
1061 GetName(), evnt.asstring().c_str(),
1062 fQueues[0].Size(), fQueues[1].Size(), fQueues[2].Size());
1065 if (itemid>=fWorkers.size()) {
1066 EOUT(
"Thrd:%p %s FALSE worker id:%u size:%u evnt:%s - ignore",
this, GetName(), itemid, fWorkers.size(), evnt.asstring().c_str());
1070 Worker* worker = fWorkers[itemid]->work;
1072 DOUT3(
"*** Thrd:%p proc:%p itemid:%u event:%u doinghalt:%u",
this, worker, itemid, evnt.GetCode(), fWorkers[itemid]->doinghalt);
1074 if (worker==0)
return;
1076 fWorkers[itemid]->processed++;
1079 DOUT3(
"Process manager event %s fired:%u processed: %u", evnt.asstring().c_str(), worker->fWorkerFiredEvents, fWorkers[itemid]->processed);
1083 DOUT3(
"*** Thrd:%p proc:%s event:%u",
this, worker->GetName(), evnt.GetCode());
1085 IntGuard iguard(fWorkers[itemid]->recursion);
1089 worker->ProcessCoreEvent(evnt);
1091 if (worker->fAddon.null())
1092 EOUT(
"Get event for non-existing addon");
1094 worker->fAddon()->ProcessEvent(evnt);
1097 worker->ProcessEvent(evnt);
1101 if (fWorkers[itemid]->doinghalt)
1102 CheckWorkerCanBeHalted(itemid);
1108 if (fWorkers[itemid]->doinghalt)
1109 CheckWorkerCanBeHalted(itemid);
1113 switch (evnt.GetCode()) {
1114 case evntCheckTmoutWorker: {
1116 if (evnt.GetArg() >= fWorkers.size()) {
1117 DOUT3(
"evntCheckTmoutWorker - mismatch in processor id:%u sz:%u ", evnt.GetArg(), fWorkers.size());
1121 WorkerRec* rec = fWorkers[evnt.GetArg()];
1123 DOUT3(
"Worker no longer exists", evnt.GetArg());
1127 if (rec->tmout_worker.CheckEvent(ThreadMutex())) CheckTimeouts(
true);
1132 case evntCheckTmoutAddon: {
1134 if (evnt.GetArg() >= fWorkers.size()) {
1135 DOUT3(
"evntCheckTmoutWorker - mismatch in processor id:%u sz:%u ", evnt.GetArg(), fWorkers.size());
1139 WorkerRec* rec = fWorkers[evnt.GetArg()];
1141 DOUT3(
"Worker no longer exists", evnt.GetArg());
1145 if (rec->tmout_addon.CheckEvent(ThreadMutex())) CheckTimeouts(
true);
1151 case evntCleanupThrd: {
1153 unsigned totalsize(0);
1155 LockGuard lock(ThreadMutex());
1156 totalsize = _TotalNumberOfEvents();
1160 if ((evnt.GetArg() < 100) && !_IsNormalState())
break;
1163 if (fWorkers.size()!=2) {
1167 DOUT3(
"Thrd:%s Cleanup running when num workers %u != 2 - something strange", GetName(), fWorkers.size());
1170 DOUT3(
"THRD:%s Num workers = %u totalsize %u", GetName(), fWorkers.size(), totalsize);
1172 if ((totalsize>0) && (evnt.GetArg() % 100 < 20)) {
1173 LockGuard lock(ThreadMutex());
1174 _Fire(EventId(evntCleanupThrd, 0, evnt.GetArg()+1), priorityLowest);
1178 EOUT(
"THRD %s %u events are not processed", GetName(), totalsize);
1181 printf(
"Cannot normally destroy thread %s while manager reference is already empty\n", GetName());
1182 fThrdWorking =
false;
1184 if (!
dabc::mgr()->DestroyObject(
this)) {
1185 EOUT(
"Thread cannot be normally destroyed, just leave main loop");
1186 fThrdWorking =
false;
1188 DOUT3(
" -------- THRD %s refcnt %u DESTROYMENT GOES TO MANAGER", GetName(), fObjectRefCnt);
1198 case evntStopThrd: {
1199 DOUT3(
"Thread %s get stop event", GetName());
1200 fThrdWorking =
false;
1205 ProcessExtraThreadEvent(evnt);
1209 DOUT5(
"Thrd:%s Item:%u Event:%u arg:%u done", GetName(), itemid, evnt.GetCode(), evnt.GetArg());
1215 Command cmd(
"AddWorker");
1216 cmd.SetRef(
"Worker", ref);
1217 return sync ? fExec->Execute(cmd) : fExec->Submit(cmd);
1224 return fExec->Execute(cmd, tmout);
1229 if (work==0)
return false;
1232 return CheckWorkerCanBeHalted(work->fWorkerId, actHalt) ==
cmd_true;
1234 Command cmd(
"HaltWorker");
1235 cmd.
SetUInt(
"WorkerId", work->fWorkerId);
1237 return fExec->Execute(cmd) ==
cmd_true;
1243 if (work==0)
return;
1246 EOUT(
"Not allowed from other thread");
1250 if (work->WorkerId() >= fWorkers.size()) {
1251 EOUT(
"Mismatch of workers IDs");
1255 WorkerRec* rec = fWorkers[work->WorkerId()];
1257 if (rec->work != work) {
1258 EOUT(
"%s Mismatch of worker id %u rec: %p worker: %p ", GetName(), work->WorkerId(), rec->work, work);
1262 rec->addon = assign ? work->fAddon() : 0;
1264 WorkersSetChanged();
1274 if (work==0)
return false;
1276 Command cmd(
"InvokeWorkerDestroy");
1277 cmd.
SetUInt(
"WorkerId", work->fWorkerId);
1279 DOUT4(
"Exec %p Invoke to destroy worker %p %s", fExec, work, work->GetName());
1281 return fExec->Submit(cmd);
1286 if (fProcessingTimeouts>0)
return -1.;
1288 IntGuard guard(fProcessingTimeouts);
1292 if (!forcerecheck) {
1293 if (fNextTimeout.null())
return -1.;
1295 double dist = fNextTimeout - now;
1296 if (dist>0.)
return dist;
1300 double min_tmout(-1.), last_diff(0.);
1302 for (
unsigned n=1;n<fWorkers.size();n++) {
1303 WorkerRec* rec = fWorkers[n];
1304 if ((rec==0) || (rec->work==0))
continue;
1306 if (rec->tmout_worker.CheckNextProcess(now, min_tmout, last_diff)) {
1308 double dist = rec->work->ProcessTimeout(last_diff);
1310 rec->tmout_worker.SetNextFire(now, dist, min_tmout);
1313 if (rec->tmout_addon.CheckNextProcess(now, min_tmout, last_diff)) {
1315 double dist = rec->work->ProcessAddonTimeout(last_diff);
1317 rec->tmout_addon.SetNextFire(now, dist, min_tmout);
1322 fNextTimeout = now + min_tmout;
1324 fNextTimeout.Reset();
1331 DOUT3(
"---- THRD %s ObjectCleanup refcnt %u", GetName(), fObjectRefCnt);
1337 RemoveChild(fExec,
false);
1338 if (fExec->GetParent()!=0)
EOUT(
"PARENT IS STILL THERE");
1341 LockGuard lock(ObjectMutex());
1345 fDidDecRefCnt =
true;
1348 DOUT3(
"---- THRD %s ObjectCleanup in the middle", GetName());
1352 DOUT3(
"---- THRD %s ObjectCleanup done refcnt = %u workerssize = %u", GetName(), NumReferences(), fWorkers.size());
1361 if (!IsItself() || !fThrdWorking || !fRealThrd)
return false;
1363 DOUT2(
"!!!!!!!!!!!! THRD %s DO DELETE ITSELF !!!!!!!!!!!!!!!", GetName());
1365 _Fire(EventId(evntCleanupThrd, 0, 100), priorityNormal);
1375 LockGuard guard(ThreadMutex());
1379 for (
unsigned n=1;n<fWorkers.size();n++) {
1380 Worker* work = fWorkers[n]->work;
1382 if (work==0)
continue;
1383 dabc::lgr()->
Debug(lvl,
"file", 1,
"func",
dabc::format(
" Worker: %u is %p %s %s", n, work, work->GetName(), work->ClassName()).c_str());
1390 for (
unsigned n=1;n<fWorkers.size();n++)
1391 if (fWorkers[n]->work != 0) cnt++;
1400 if (GetObject()==0)
return false;
1402 return GetObject()->Execute(cmd, tmout);
1408 if (tmout<0) tmout = 0.;
1413 GetObject()->RunEventLoop(tmout);
1418 if (GetObject()==0)
return false;
1420 if (workerid >= GetObject()->fWorkers.size())
return false;
1422 Thread::WorkerRec* rec = GetObject()->fWorkers[workerid];
1427 if (rec->tmout_worker.Activate(tmout))
1435 if (GetObject()==0)
return false;
1437 if (workerid >= GetObject()->fWorkers.size())
return false;
1439 Thread::WorkerRec* rec = GetObject()->fWorkers[workerid];
1444 if (rec->tmout_addon.Activate(tmout))
1453 if (
null())
return false;
1454 Worker* worker =
new Worker(0, name.empty() ?
"dummy" : name.c_str());
1455 worker->AssignAddon(addon);
1456 return worker->AssignToThread(*
this);
Represents command with its arguments.
bool SetUInt(const std::string &name, unsigned v)
void SetPriority(int prio)
Set command priority, defines how fast command should be treated In special cases priority allows to ...
virtual const char * what() const
Represents objects hierarchy of remote (or local) DABC process.
void MarkChangedItems(uint64_t tm=0)
If any field was modified, item will be marked with new version.
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Hierarchy GetHChild(const std::string &name, bool allowslahes=false, bool force=false, bool sortorder=false)
Return child, if necessary creates with full subfolder If force specified, missing childs and folders...
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
void EnableHistory(unsigned length=100, bool withchilds=false)
Activate history production for selected element and its childs.
Lock guard for posix mutex.
static void Debug(int level, const char *filename, unsigned linenumber, const char *funcname, const char *message)
void SetFlag(unsigned fl, bool on=true)
Change value of selected flag, not thread safe
virtual void Print(int lvl=0)
Print object content on debug output.
const char * GetName() const
Returns name of the object, thread safe
void SetAutoDestroy(bool on=true)
Set autodestroy flag for the object Once enabled, object will be destroyed when last reference will b...
Mutex * ObjectMutex() const
Returns mutex, used for protection of Object data members.
int fObjectRefCnt
accounts how many references existing on the object, thread safe
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
@ flTopXmlLevel
object (or folder) can be found on top xml level in the Context
static void Destroy(Object *obj)
User method for object destroyment.
void Start(Runnable *run)
Start thread with provided runnable.
void Join()
Join thread - method waits until thread execution is completed.
void Kill(int sig=9)
Kill thread with specified signal.
void Cancel()
Try to cancel thread execution.
void SetThreadName(const char *thrdname)
Set thread name, which can be seen from htop.
T & Item(unsigned indx) const
bool AsBool(bool dflt=false) const
bool SetField(const std::string &name, const RecordField &v)
void Release()
Releases reference on the object.
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
bool null() const
Returns true if reference contains nullptr.
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
bool Execute(Command cmd, double tmout=-1.)
bool MakeWorkerFor(WorkerAddon *addon, const std::string &name="")
Make dummy worker to run addon inside the thread.
bool _ActivateWorkerTimeout(unsigned workerid, int priority, double tmout)
bool _ActivateAddonTimeout(unsigned workerid, int priority, double tmout)
void RunEventLoop(double tmout=1.)
Runs thread event loop for specified time.
helper class to use methods, available in dabc::Worker in thread itself
int fCnt
! if true, different thread parameters will be published
virtual double ProcessTimeout(double last_diff)
virtual void BeforeHierarchyScan(Hierarchy &h)
Method called before publisher makes next snapshot of hierarchy.
virtual bool Find(ConfigIO &cfg)
Method to locate object in xml file.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
virtual const char * ClassName() const
Returns class name of the object instance.
ExecWorker(Thread *parent, Command cmd)
bool DirtyWorkaround()
Just workaround to check if execution is still performed.
bool _GetNextEvent(EventId &)
void IncWorkerFiredEvents(Worker *work)
virtual bool CompatibleClass(const std::string &clname) const
bool IsTemporaryThread() const
Returns true is this is temporary thread for command execution.
virtual void Print(int lvl=0)
Print thread content on debug output.
bool Execute(Command cmd, double tmout=-1)
WorkersVector fWorkers
vector of all processors
EThreadState fState
actual thread state
void WorkerAddonChanged(Worker *work, bool assign=true)
Called when worker addon changed on the fly.
void FireDoNothingEvent()
virtual bool _DoDeleteItself()
This method is called at the moment when DecReference think that object can be destroyed and wants to...
int CheckWorkerCanBeHalted(unsigned id, unsigned request=0, Command cmd=nullptr)
Internal DABC method, used to verify if worker can be halted now while recursion is over Request indi...
bool HaltWorker(Worker *proc)
Halt worker - stops any execution, break recursion.
int fNumQueues
number of queues
bool Stop(double timeout_sec=10)
void RunEventLoop(double tmout=1.)
Runs thread event loop for specified time.
double GetStopTimeout() const
virtual void RunnableCancelled()
Mutex * ThreadMutex() const
bool InvokeWorkerDestroy(Worker *work)
Cleanup object asynchronously.
virtual bool WaitEvent(EventId &, double tmout)
bool SetExplicitLoop(Worker *work)
EventsQueue * fQueues
queues for threads events
virtual int ExecuteThreadCommand(Command cmd)
unsigned TotalNumberOfEvents()
Return total number of all events in the queues.
bool SingleLoop(unsigned workerid=0, double tmout=-1)
Processes single event from the thread queue.
bool fDidDecRefCnt
indicates if object cleanup was called - need in destructor
int RunCommandInTheThread(Worker *caller, Worker *dest, Command cmd)
virtual void * MainLoop()
unsigned NumWorkers()
Returns actual number of workers.
unsigned _TotalNumberOfEvents()
bool Start(double timeout_sec=-1., bool real_thread=true)
virtual void ObjectCleanup()
Cleanup thread that manager is allowed to delete it.
virtual void _Fire(const EventId &arg, int nq)
static unsigned fThreadInstances
ExecWorker * fExec
processor to execute commands in the thread
friend class RecursionGuard
bool AddWorker(Reference ref, bool sync=true)
Internal DABC method, Add worker to thread; reference-safe Reference safe means - it is safe to call ...
@ evntCheckTmoutWorker
event used to process timeout for specific worker, used by ActivateTimeout
@ evntCheckTmoutAddon
event used to process timeout for addon, used by ActivateTimeout
bool fProfiling
if true, different statistic will be accumulated about thread
double CheckTimeouts(bool forcerecheck=false)
bool Sync(double timeout_sec=-1)
void ChangeRecursion(unsigned id, bool inc)
Method which allows to control recursion of each worker.
void ProcessEvent(const EventId &)
Active object, which is working inside dabc::Thread.
uint32_t fWorkerId
worker id in thread list, used for events submit
Worker(const ConstructorPair &pair)
Special constructor, designed for inherited classes.
void ClearThreadRef()
Method to 'forget' thread reference.
Hierarchy fWorkerHierarchy
place for publishing of worker parameters
bool fWorkerActive
indicates if worker can submit events to the thread
int fWorkerCommandsLevel
Number of process commands recursion.
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
ThreadRef fThread
reference on the thread, once assigned remain whole time
void SetWorkerPriority(int nq)
Reference GetPublisher()
Return reference on publisher.
virtual bool Publish(const Hierarchy &h, const std::string &path)
Mutex * fThreadMutex
pointer on main thread mutex
int usage(const char *errstr=nullptr)
const char * xmlThrdStopTime
void SetDebugLevel(int level=0)
std::string format(const char *fmt,...)
const char * xmlThreadNode
Event structure, exchanged between DABC threads.
std::string asstring() const