DABC (Data Acquisition Backbone Core)  2.9.9
Thread.cxx
Go to the documentation of this file.
1 // $Id: Thread.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/Thread.h"
17 
18 #include <cstdlib>
19 #include <cstdio>
20 #include <cmath>
21 #include <unistd.h>
22 #include <sys/types.h>
23 #include <sys/syscall.h>
24 #include <sys/resource.h>
25 //#include <linux/version.h>
26 
27 #include "dabc/Manager.h"
28 #include "dabc/Configuration.h"
29 
30 
31 #ifdef DABC_EXTRA_CHECKS
32 unsigned dabc::Thread::maxlimit = 1000;
33 #endif
34 
35 
36 
37 std::string dabc::EventId::asstring() const
38 {
39  return dabc::format("Code:%x Item:%x Arg:%x", GetCode(), GetItem(), GetArg());
40 }
41 
42 
51  protected:
52 
53  friend class Thread;
54 
55  bool fPublish;
56 
57  int fCnt;
58 
59  public:
60  ExecWorker(Thread* parent, Command cmd) :
61  dabc::Worker(parent, "Exec"),
62  fPublish(false),
63  fCnt(0)
64  {
66  // special case - thread keep only pointer
67  SetAutoDestroy(false);
68 
69  // all threads configuration should be found on the top-level
70  SetFlag(flTopXmlLevel, true);
71 
72  fThread = ThreadRef(parent);
73  fThreadMutex = parent->ThreadMutex();
74  fWorkerId = parent->fWorkers.size();
75  fWorkerActive = true;
76 
77  if (!fThread()->IsTemporaryThread()) {
78  fThread()->fProfiling = Cfg("prof", cmd).AsBool(false);
79  fPublish = fThread()->fProfiling || Cfg("publ", cmd).AsBool(false);
80  DOUT2("Exec %s publ %s", fThread.GetName(), DBOOL(fPublish));
81  }
82  }
83 
84  virtual ~ExecWorker() {
85  DOUT3("Destroy EXEC worker %p", this);
86  }
87 
89  bool DirtyWorkaround() {
90  if (fWorkerCommandsLevel <= 0) return false;
91  fThread.Release();
92  fThreadMutex = 0;
93  return true;
94  }
95 
96 
97  virtual const char* ClassName() const { return "Thread"; }
98 
99  virtual int ExecuteCommand(Command cmd)
100  {
101  int res = cmd_ignore;
102 
103  if (!fThread.null())
104  res = fThread()->ExecuteThreadCommand(cmd);
105 
106  if (res == cmd_ignore) res = dabc::Worker::ExecuteCommand(cmd);
107 
108  return res;
109  }
110 
111  virtual bool Find(ConfigIO &cfg)
112  {
113  while (cfg.FindItem(xmlThreadNode)) {
114  // DOUT0("Worker found thread node");
115  if (cfg.CheckAttr(xmlNameAttr, fThread.GetName())) return true;
116  }
117  return false;
118  }
119 
120  virtual void BeforeHierarchyScan(Hierarchy& h)
121  {
122  }
123 
124  virtual double ProcessTimeout(double last_diff)
125  {
126  // timeout is used to update published hierarchy
127  if (!fPublish || fThread.null()) return -1;
128 
129  // we need to wait for the publisher itself (if it anytime will be created)
130  if (GetPublisher().null()) return 1.;
131 
132  if (fWorkerHierarchy.null()) {
133  fWorkerHierarchy.Create("Thread");
134  dabc::Hierarchy item = fWorkerHierarchy.CreateHChild("NumWorkers");
135  item.SetField(dabc::prop_kind, "rate");
136  item.EnableHistory(100);
137 
138  item = fWorkerHierarchy.CreateHChild("Workers");
139  item.SetField(dabc::prop_kind, "log");
140  item.EnableHistory(100);
141 
142  item = fWorkerHierarchy.CreateHChild("pid");
143  item.SetField(dabc::prop_kind, "log");
144 
145  item = fWorkerHierarchy.CreateHChild("threadid");
146  item.SetField(dabc::prop_kind, "log");
147 
148  if (fThread()->fProfiling) {
149  item = fWorkerHierarchy.CreateHChild("Load");
150  item.SetField(dabc::prop_kind, "rate");
151  item.SetField("min", 0);
152  item.SetField("max", 1);
153  item.EnableHistory(100);
154  }
155 
156  Publish(fWorkerHierarchy, std::string("$MGR$") + fThread.ItemName());
157  }
158 
159  unsigned num(0);
160  std::vector<std::string> names;
161 
162  for (unsigned n=1;n<fThread()->fWorkers.size();n++) {
163  Worker* work = fThread()->fWorkers[n]->work;
164  if (work==0) continue;
165  num++;
166  names.push_back(work->GetName());
167  }
168 
169  fWorkerHierarchy.GetHChild("NumWorkers").SetField("value", num);
170  fWorkerHierarchy.GetHChild("Workers").SetField("value", names);
171  fWorkerHierarchy.GetHChild("pid").SetField("value", getpid());
172  fWorkerHierarchy.GetHChild("threadid").SetField("value", (uint64_t) Self());
173 
174  if (fThread()->fProfiling) {
175 
176  double real_tm = fThread()->fLastProfileTime.SpentTillNow(true);
177  double run_tm = 0.;
178  // #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,28)
179  #ifdef RUSAGE_THREAD
180  struct rusage usage;
181 
182  if (getrusage(RUSAGE_THREAD, &usage) == 0) {
183 
184  double curr =
185  usage.ru_utime.tv_sec * 1. +
186  usage.ru_utime.tv_usec * 1e-6 +
187  usage.ru_stime.tv_sec * 1. +
188  usage.ru_stime.tv_usec * 1e-6;
189 
190  run_tm = curr - fThread()->fThreadRunTime;
191  fThread()->fThreadRunTime = curr;
192  }
193  #endif
194 
195  if ((real_tm>0) && (run_tm>0)) {
196  double load = run_tm / real_tm;
197  if (load > 1) load = 1.;
198  fWorkerHierarchy.GetHChild("Load").SetField("value", load);
199  }
200  }
201 
203 
204  return 1.;
205  }
206 
207 };
208 
209 unsigned dabc::Thread::fThreadInstances = 0;
210 
211 dabc::Thread::Thread(Reference parent, const std::string &name, Command cmd) :
212  Object(parent, name),
213  PosixThread(),
214  Runnable(),
215  fState(stCreated),
216  fThrdWorking(false),
217  fRealThrd(true),
218  fWorkCond(fObjectMutex),
219  fQueues(0),
220  fNumQueues(3),
221  fNextTimeout(),
222  fProcessingTimeouts(0),
223  fWorkers(),
224  fExplicitLoop(0),
225  fExec(0),
226  fDidDecRefCnt(false),
227  fCheckThrdCleanup(false),
228  fProfiling(false),
229  fLastProfileTime(),
230  fThreadRunTime(0.),
231  fThrdStopTimeout(0.)
232 {
233  fThreadInstances++;
234 
235  // hide all possible thread childs from hierarchy scan
236  SetFlag(flChildsHidden, true);
237 
238  DOUT3("---------- CNT:%2d Thread %s %p created", fThreadInstances, GetName(), this);
239 
240  fWorkers.push_back(new WorkerRec(0,0)); // exclude id==0
241 
242  fExec = new ExecWorker(this, cmd);
243  //fExec->SetLogging(true);
244 
245  // keep numqueues 3 for the moment
246  //fNumQueues = fExec->Cfg("NumQueues", cmd).AsUInt(fNumQueues);
247 
248  if (fNumQueues>0) {
249  fQueues = new EventsQueue[fNumQueues];
250  for (int n=0;n<fNumQueues;n++) {
251  fQueues[n].Init(256);
252  fQueues[n].scaler = 8;
253  }
254  }
255 
256  std::string affinity = fExec->Cfg(xmlAffinity, cmd).AsStr();
257 
258  if (!affinity.empty()) {
259  SetAffinity(affinity.c_str());
260  char sbuf[200];
261  if (GetAffinity(false, sbuf, sizeof(sbuf)))
262  DOUT0("Thread %s specified affinity %s mask %s", GetName(), affinity.c_str(), sbuf);
263  }
264 
265  fThrdStopTimeout = fExec->Cfg(xmlThrdStopTime, cmd).AsDouble();
266  if ((fThrdStopTimeout <= 0) && !dabc::mgr.null()) fThrdStopTimeout = dabc::mgr()->cfg()->GetThrdStopTime();
267  if (fThrdStopTimeout <= 0) fThrdStopTimeout = 5.;
268 
269  fWorkers.push_back(new WorkerRec(fExec,0));
270 
271 // SetLogging(true);
272 
273  DOUT2("-------- THRD %s Constructed -------------- ", GetName());
274 }
275 
277 {
278  // !!!!!!!! Do not forgot stopping thread in destructors of inherited classes too !!!!!
279 
280  DOUT3("~~~~~~~~~~~~~~ THRD %s destructor with timeout %3.1f s", GetName(), GetStopTimeout());
281 
282  // we stop thread in destructor, in all inherited classes stop also should be called
283  // otherwise one get problem here if stop will use inherited methods which is no longer available
284 
285  //Stop(1.); JAM 6.7.2017 - try with larger timeout for ltsm
286  Stop(GetStopTimeout());
287 
288  ExecWorker* exec = 0;
289 
290  {
291  LockGuard lock(ObjectMutex());
292  // Workaround - Exec processor is keeping one reference
293  // We decrease it during cleanup, now increase it again
295  exec = fExec;
296  fExec = 0;
297  }
298 
299  exec->ClearThreadRef();
300  if (exec->DirtyWorkaround())
301  EOUT("Execution instance for the thread %s is blocked - KEEP EXEC ALIVE AND FIX LATER", GetName());
302  else
303  dabc::Object::Destroy(exec); // normally destroy should be called
304 
305  for (unsigned n=0;n<fWorkers.size();n++) {
306  if (fWorkers[n]) {
307  //EOUT("Still non-empty worker rec %u in thread %s destructor", n, GetName());
308  delete fWorkers[n];
309  fWorkers[n] = 0;
310  }
311  }
312 
313  LockGuard guard(ThreadMutex());
314  if (fState==stError) {
315  EOUT("Kill thread in error state, nothing better can be done");
316  Kill();
317  fState = stStopped;
318  }
319 
320 #ifdef DABC_EXTRA_CHECKS
321  unsigned totalsize = 0;
322  for (int n=0;n<fNumQueues;n++) {
323  totalsize+=fQueues[n].Size();
324  for (unsigned k=0; k<fQueues[n].Size(); k++) {
325  EventId evnt = fQueues[n].Item(k);
326  DOUT0("THRD %s Queue:%u Item:%u Event:%s", GetName(), n, k, evnt.asstring().c_str());
327  }
328  }
329  if (totalsize>0)
330  EOUT("THRD %s %u events are not processed", GetName(), totalsize);
331 #endif
332 
333  delete [] fQueues; fQueues = 0;
334  fNumQueues = 0;
335 
336  DOUT3("~~~~~~~~~~~~~~ THRD %s destroyed cnt:%d", GetName(), fThreadInstances);
337 
339 }
340 
341 
342 void dabc::Thread::IncWorkerFiredEvents(Worker* work)
343 {
344  work->fWorkerFiredEvents++;
345 }
346 
348 {
349  unsigned totalsize = 0;
350  for (int n=0;n<fNumQueues;n++)
351  totalsize+=fQueues[n].Size();
352  return totalsize;
353 }
354 
356 {
357  LockGuard lock(ThreadMutex());
358  return _TotalNumberOfEvents();
359 }
360 
362 {
363  LockGuard lock(ThreadMutex());
364 
365  if (!fCheckThrdCleanup) return;
366 
367  fCheckThrdCleanup = false;
368 
369  DOUT3("THREAD %s check cleanup", GetName());
370 
371  // here we doing cleanup when no any events there
372 
373  unsigned new_size(fWorkers.size());
374 
375  while ((new_size>1) && (fWorkers[new_size-1]->work==0)) {
376  new_size--;
377  delete fWorkers[new_size];
378  fWorkers[new_size] = 0;
379  }
380 
381  DOUT3("THREAD %s oldsize %u newsize %u", GetName(), fWorkers.size(), new_size);
382 
383  if (new_size==fWorkers.size()) return;
384 
385  fWorkers.resize(new_size);
386  DOUT3("Thrd:%s Shrink processors size to %u normal state %s refcnt %d", GetName(), new_size, DBOOL(_IsNormalState()), fObjectRefCnt);
387 
388  // we check that object is in normal state,
389  // otherwise it means that destroyment is already started and will be done in other means
390  if ((new_size==2) && _IsNormalState()) {
391  DOUT3("THREAD %s generate cleanup", GetName());
392  _Fire(EventId(evntCleanupThrd, 0, 0), priorityLowest);
393  }
394 }
395 
397 {
398  // return next event from the queues
399  // in general, events returned according their priority
400  // but in rare cases lower-priority events also can be taken even when high-priority events exists
401  // This will allow to react on such events.
402  //
403  // If there are no events in the queue, one checks if thread should be cleaned up and
404  // even destroyed
405 
406 
407  for(int nq=0; nq<fNumQueues; nq++)
408  if (fQueues[nq].Size()>0) {
409  if (--(fQueues[nq].scaler)>0) {
410  evnt = fQueues[nq].Pop();
411  return true;
412  }
413  fQueues[nq].scaler = 8;
414  }
415 
416  // in second loop check all queues according their priority
417 
418  for(int nq=0; nq<fNumQueues; nq++)
419  if (fQueues[nq].Size()>0) {
420  evnt = fQueues[nq].Pop();
421  return true;
422  }
423 
424  return false;
425 }
426 
427 
428 bool dabc::Thread::CompatibleClass(const std::string &clname) const
429 {
430  if (clname.empty()) return true;
431 
432  return clname == typeThread;
433 }
434 
436 {
437  EventId evid;
438  double tmout;
439 
440  DOUT3("*** Thrd:%s Starting MainLoop PROCID = %u THRDID %u", GetName(), (unsigned) getpid(), (unsigned) syscall(SYS_gettid));
441 
442  while (fThrdWorking) {
443 
444  DOUT5("*** Thrd:%s Checking timeouts", GetName());
445 
446  tmout = CheckTimeouts();
447 
448  DOUT5("*** Thrd:%s Check timeouts %5.3f", GetName(), tmout);
449 
450  if (WaitEvent(evid, tmout)) {
451 
452  DOUT5("*** Thrd:%s GetEvent %s", GetName(), evid.asstring().c_str());
453 
454  ProcessEvent(evid);
455 
456  DOUT5("*** Thrd:%s DidEvent %s", GetName(), evid.asstring().c_str());
457  } else
458  ProcessNoneEvent();
459 
460  if (fExplicitLoop!=0) RunExplicitLoop();
461  }
462 
463  DOUT3("*** Thrd:%s Leaving MainLoop", GetName());
464 
465  return 0;
466 }
467 
468 bool dabc::Thread::SingleLoop(unsigned workerid, double tmout_user)
469 {
470  DOUT5("*** Thrd:%s SingleLoop user_tmout %5.3f", GetName(), tmout_user);
471 
472  // check situation that worker is halted and should brake its execution
473  // if necessary, worker should fire exception
474  if ((workerid>0) && (workerid<fWorkers.size())) {
475  if (fWorkers[workerid]->doinghalt) return false;
476  }
477 
478  double tmout = CheckTimeouts();
479  if (tmout_user>=0)
480  if ((tmout<0) || (tmout_user<tmout)) tmout = tmout_user;
481 
482  EventId evid;
483 
484  if (WaitEvent(evid, tmout))
485  ProcessEvent(evid);
486  else
487  ProcessNoneEvent();
488 
489  return true;
490 }
491 
492 void dabc::Thread::RunEventLoop(double tm)
493 {
494  if (!IsItself()) {
495  EOUT("Cannot run thread %s event loop outer own thread", GetName());
496  return;
497  }
498 
499  if (tm<0) {
500  EOUT("negative (endless) timeout specified - set default 0 sec (single event)");
501  tm = 0;
502  }
503 
504  TimeStamp finish = dabc::Now() + tm;
505 
506  while (fThrdWorking) {
507  double remain = finish - dabc::Now();
508 
509  // we execute event loop at least once even when no time remains
510  SingleLoop(0, remain>0 ? remain : 0.);
511 
512  if (remain <= 0) break;
513  }
514 }
515 
516 
517 int dabc::Thread::RunCommandInTheThread(Worker* caller, Worker* dest, Command cmd)
518 {
519  if (cmd.null() || (dest==0)) {
520  cmd.ReplyFalse();
521  return dabc::cmd_false;
522  }
523 
524  {
525  dabc::LockGuard lock(ObjectMutex());
526 
527  // in principle, it is enough to check state of the thread
528  // if destructor is started, we can reject submission of all commands
529  if (!_IsNormalState()) return cmd_ignore;
530 
531  if (caller==0) caller = fExec;
532  }
533 
534 
535  // we must be sure that call is done from thread itself - otherwise it is wrong
536  if (!IsItself()) {
537  EOUT("Cannot execute command in wrong thread context!!!");
538  cmd.ReplyFalse();
539  return cmd_false;
540  }
541 
542  int res = cmd_false;
543 
544  { // this is begin of parenthesis for RecursionGuard
545 
546  // we indicate that processor involved in
547  Thread::RecursionGuard iguard(this, caller->fWorkerId);
548 
549  bool exe_ready = false;
550 
551  cmd.AddCaller(caller, &exe_ready);
552 
553  try {
554 
555  DOUT3("********** Calling ExecteIn in thread %s %p", GetName(), this);
556 
557  // critical point - we want to submit command to other thread
558  // if command receiver does not accept command means it either do not have thread or lost it
559  // in this case command can be executed in current thread context ???
560  // Once command is submitted it is guaranteed that it will be executed or command will be canceled
561 
562  if (dest->Submit(cmd)) {
563 
564  // we can access exe_ready directly, while this flag only access from caller thread
565  // loop should be executed at least once to process do-nothing event produced by command reply
566 
567  do {
568 
569  // account timeout, whait 0.5 second longer as specified timeout
570  double tmout = cmd.TimeTillTimeout(0.5);
571 
572  if (tmout==0.) {
573  res = cmd_timedout;
574  break;
575  }
576 
577  DOUT3("ExecuteIn - cmd:%s singleLoop proc %u time %4.1f", cmd.GetName(), caller->fWorkerId, ((tmout<=0) ? 0.1 : tmout));
578 
579  if (!SingleLoop(caller->fWorkerId, (tmout<=0) ? 0.1 : tmout)) {
580  // FIXME: one should cancel command in normal way
581  res = cmd_false;
582  break;
583  }
584  } while (!exe_ready);
585  DOUT3("------------ Proc %p Cmd %s ready = %s", caller, cmd.GetName(), DBOOL(exe_ready));
586 
587  if (exe_ready) res = cmd.GetResult();
588 
589  } else {
590 
591  // this is a case when command can be executed in current thread context
592 
593  // FIXME: should we do this - if destination does not accept command via Submit, should we execute it that way?
594 
595  DOUT0("Worker %s refuse to submit command - we do it as well", dest->GetName());
596  res = cmd_false;
597  cmd.SetResult(cmd_false);
598  }
599 
600  // in any case remove caller from the command
601  cmd.RemoveCaller(caller, &exe_ready);
602 
603  } catch (...) {
604 
605  // even in case of exception
606  cmd.RemoveCaller(caller, &exe_ready);
607  }
608 
609  } // this is end of parenthesis for RecursionGuard, should be closed before thread reference is released
610 
611  DOUT3("------------ Proc %p Cmd %s res = %d", caller, cmd.GetName(), res);
612 
613  return res;
614 }
615 
616 
617 
618 bool dabc::Thread::Start(double timeout_sec, bool real_thread)
619 {
620  // first, check if we should join thread,
621  // when it was stopped before
622  bool needkill = false;
623  {
624  LockGuard guard(ThreadMutex());
625  switch (fState) {
626  case stCreated:
627  case stStopped:
628  fRealThrd = real_thread;
629  break;
630  case stRunning:
631  return true;
632  case stError:
633  EOUT("Restart from error state, may be dangerous");
634  needkill = fRealThrd;
635  fRealThrd = real_thread;
636  break;
637  case stChanging:
638  EOUT("Status is changing from other thread. Not supported");
639  return false;
640  default:
641  EOUT("Forgot something???");
642  break;
643  }
644 
645  fState = stChanging;
646  }
647 
648  DOUT3("Thread %s starting kill:%s", GetName(), DBOOL(needkill));
649 
650  if (needkill) PosixThread::Kill();
651 
652  // from this moment on thread main loop must became functional
653 
654  fThrdWorking = true;
655  bool res = true;
656 
657  if (fRealThrd) {
658  PosixThread::Start(this);
659 
660  std::string tname = GetName();
661  if (tname.length()>15) tname.resize(15);
662 
663  PosixThread::SetThreadName(tname.c_str());
664 
665  if (!fExec) {
666  EOUT("Start thread without EXEC???");
667  exit(765);
668  }
669  res = fExec->Execute("ConfirmStart", timeout_sec) == cmd_true;
670  } else {
672  }
673 
674  LockGuard guard(ThreadMutex());
675  fState = res ? stRunning : stError;
676 
677  return res;
678 }
679 
681 {
682  DOUT3("Thread cancelled %s", GetName());
683 
684  LockGuard guard(ThreadMutex());
685 
686  fState = stStopped;
687 }
688 
689 
690 bool dabc::Thread::Stop(double timeout_sec)
691 {
692  bool needstop = false;
693 
694  {
695  LockGuard guard(ThreadMutex());
696  switch (fState) {
697  case stCreated:
698  return true;
699  case stRunning:
700  needstop = fRealThrd && !IsItself();
701  break;
702  case stStopped:
703  return true;
704  case stError:
705  EOUT("Stop from error state, do nothing");
706  return true;
707  case stChanging:
708  EOUT("State is changing from other thread. Not supported");
709  return false;
710  default:
711  EOUT("Forgot something???");
712  return false;
713  }
714 
715  fState = stChanging;
716 
717  if (needstop)
718  _Fire(EventId(evntStopThrd), priorityHighest);
719  }
720 
721  bool res(false);
722 
723  DOUT3("Start doing thread %s stop with timeout %f s", GetName(),timeout_sec);
724 
725  if (!needstop) {
726 
727  fThrdWorking = false;
728  res = true;
729  LockGuard guard(ThreadMutex());
730  fState = stStopped;
731 
732  } else {
733  if (timeout_sec<=0.) timeout_sec = 1.;
734 
735  // FIXME: one should avoid any kind of timeouts and use normal condition here
736 
737  TimeStamp tm1 = dabc::Now();
738 
739  int cnt(0);
740  double spent_time(0.);
741  bool did_cancel(false);
742 
743  do {
744  if (cnt++>1000) dabc::Sleep(0.001);
745 
746  if ((spent_time > timeout_sec * 0.7) && !did_cancel) {
747  DOUT1("Cancel thread %s", GetName());
749  did_cancel = true;
750  cnt = 0;
751  }
752 
753  LockGuard guard(ThreadMutex());
754  if (fState == stStopped) { res = true; break; }
755 
756  } while ( (spent_time = tm1.SpentTillNow()) < timeout_sec);
757 
758  if (!res) EOUT("Cannot wait for join while stop was not succeeded");
759  else PosixThread::Join();
760  }
761 
762  LockGuard guard(ThreadMutex());
763 
764  if (fState!=stStopped) {
765  // not necessary, but to be sure that everything is done to stop thread
766  fThrdWorking = false;
767  fState = stError;
768  } else
769  fState = stStopped;
770 
771  return res;
772 }
773 
774 bool dabc::Thread::Sync(double timeout_sec)
775 {
776  return fExec->Execute("ConfirmSync", timeout_sec);
777 }
778 
779 bool dabc::Thread::SetExplicitLoop(Worker* proc)
780 {
781  if (!IsItself()) {
782  EOUT("Call from other thread - absolutely wrong");
783  exit(113);
784  }
785 
786  if (fExplicitLoop!=0)
787  EOUT("Explicit loop is already set");
788  else
789  fExplicitLoop = proc->fWorkerId;
790 
791  return fExplicitLoop == proc->fWorkerId;
792 }
793 
795 {
796  if ((fExplicitLoop==0) || (fExplicitLoop>=fWorkers.size())) return;
797 
798  DOUT4("Enter RunExplicitMainLoop");
799 
800  // first check that worker want to be halted, when do not start explicit loop at all
801  if (fWorkers[fExplicitLoop]->doinghalt) return;
802 
803  RecursionGuard iguard(this, fExplicitLoop);
804 
805  try {
806 
807  fWorkers[fExplicitLoop]->work->DoWorkerMainLoop();
808 
809  } catch (dabc::Exception& e) {
810  if (e.IsStop()) DOUT2("Worker %u stopped via exception", fExplicitLoop); else
811  if (e.IsTimeout()) DOUT2("Worker %u stopped via timeout", fExplicitLoop); else
812  EOUT("Exception %s in processor %u", e.what(), fExplicitLoop);
813  } catch(...) {
814  EOUT("Exception UNCKNOWN in processor %u", fExplicitLoop);
815  }
816 
817  // we should call postloop in any case
818  fWorkers[fExplicitLoop]->work->DoWorkerAfterMainLoop();
819 
820  DOUT5("Exit from RunExplicitMainLoop");
821 
822  fExplicitLoop = 0;
823 }
824 
825 
827 {
828  // used by timeout object to activate thread and leave WaitEvent function
829 
830  Fire(EventId(evntDoNothing), priorityLowest);
831 }
832 
833 int dabc::Thread::ExecuteThreadCommand(Command cmd)
834 {
835  DOUT2("Thread %s Execute command %s", GetName(), cmd.GetName());
836 
837  if (cmd.IsName("ConfirmStart")) {
838 
839  DOUT3("THRD:%s item %s", GetName(), ItemName().c_str());
840 
841  // activate timeout at least once
842  fExec->ActivateTimeout(0.01);
843 
844  return cmd_true;
845  } else
846  if (cmd.IsName("ConfirmSync")) {
847  return cmd_true;
848  } else
849  if (cmd.IsName("AddWorker")) {
850 
851  Reference ref = cmd.GetRef("Worker");
852  Worker* worker = (Worker*) ref();
853 
854  DOUT2("AddWorker %p in thrd %p", worker, this);
855 
856  if (worker==0) return cmd_false;
857 
858  if ((worker->fThread() != this) ||
859  (worker->fThreadMutex != ThreadMutex())) {
860  EOUT("Something went wrong - CRASH");
861  ref.Destroy();
862  exit(765);
863  }
864 
865  {
866  LockGuard guard(ThreadMutex());
867 
868  // we can use workers array outside mutex (as long as we are inside thread)
869  // but we should lock mutex when we would like to change workers vector
870  fWorkers.push_back(new WorkerRec(worker, worker->fAddon()));
871 
872  // from this moment on processor is fully functional
873  worker->fWorkerId = fWorkers.size()-1;
874 
875  worker->fWorkerActive = true;
876 
877  DOUT3("----------------THRD %s WORKER %p %s assigned ------------------ ", GetName(), worker, worker->GetName());
878 
879  }
880 
881  WorkersSetChanged();
882 
883  worker->InformThreadAssigned();
884 
885  //cmd->Print(1, "DIDjob");
886 
887  return cmd_true;
888 
889  } else
890 
891  if (cmd.IsName("InvokeWorkerDestroy")) {
892 
893  DOUT3("THRD:%s Request to destroy worker id %u", GetName(), cmd.GetUInt("WorkerId"));
894 
895  return CheckWorkerCanBeHalted(cmd.GetUInt("WorkerId"), actDestroy, cmd);
896  } else
897 
898  if (cmd.IsName("HaltWorker")) {
899 
900  DOUT3("THRD:%s Request to halt worker id %u", GetName(), cmd.GetUInt("WorkerId"));
901 
902  return CheckWorkerCanBeHalted(cmd.GetUInt("WorkerId"), actHalt, cmd);
903  }
904 
905  return cmd_ignore;
906 }
907 
908 
909 void dabc::Thread::ChangeRecursion(unsigned id, bool inc)
910 {
911 #ifdef DABC_EXTRA_CHECKS
912  if (!IsItself()) {
913  EOUT("ALARM, recursion changed not from thread itself");
914  }
915 #endif
916 
917  if ((id>=fWorkers.size()) || (fWorkers[id]->work==0)) return;
918 
919  if (inc) {
920  fWorkers[id]->recursion++;
921  } else {
922  fWorkers[id]->recursion--;
923 
924  if ((fWorkers[id]->recursion==0) && fWorkers[id]->doinghalt)
925  CheckWorkerCanBeHalted(id);
926  }
927 }
928 
929 int dabc::Thread::CheckWorkerCanBeHalted(unsigned id, unsigned request, Command cmd)
930 {
931  DOUT4("THRD:%s CheckWorkerCanBeHalted %u", GetName(), id);
932 
933  if ((id>=fWorkers.size()) || (fWorkers[id]->work==0)) {
934  DOUT3("THRD:%s Worker %u no longer exists", GetName(), id);
935 
936  return cmd_false;
937  }
938 
939  fWorkers[id]->doinghalt |= request;
940 
941  unsigned balance = 0;
942 
943  {
944  LockGuard guard(ThreadMutex());
945  // we indicate that worker should not produce more normal events
946  // it will be able to supply commands with magic priority
947  if (fWorkers[id]->doinghalt)
948  fWorkers[id]->work->fWorkerActive = false;
949 
950  balance = fWorkers[id]->work->fWorkerFiredEvents - fWorkers[id]->processed;
951  }
952 
953  DOUT4("THRD:%s CheckWorkerCanBeHalted %u doinghalt = %u", GetName(), id, fWorkers[id]->doinghalt);
954 
955  if (fWorkers[id]->doinghalt==0) return cmd_false;
956 
957  if ((fWorkers[id]->recursion > 0) || (balance > 0)) {
958  DOUT2("THRD:%s ++++++++++++++++++++++ worker %p %s %s event balance %u fired:%u processed:%u recursion %d",
959  GetName(), fWorkers[id]->work, fWorkers[id]->work->GetName(), fWorkers[id]->work->ClassName(),
960  balance, fWorkers[id]->work->fWorkerFiredEvents, fWorkers[id]->processed,
961  fWorkers[id]->recursion);
962  if (!cmd.null()) fWorkers[id]->cmds.Push(cmd);
963  return cmd_postponed;
964  }
965 
966  WorkerRec* rec(0);
967 
968  {
969  LockGuard guard(ThreadMutex());
970 
971  // this excludes worker from any further event processing
972  // do it under lock
973 
974  rec = fWorkers[id];
975 
976  DOUT5("Thrd:%s Remove record %u\n", GetName(), id);
977 
978  fWorkers[id] = new WorkerRec(0, 0);
979 
980  // reset id
981  if (rec->work) rec->work->fWorkerId = 0;
982  }
983 
984  DOUT4("THRD:%s CheckWorkerCanBeHalted %u rec = %p worker = %p", GetName(), id, rec, rec ? rec->work : 0);
985 
986  // FIXME: this must be legitime method to destroy any worker
987  // one can remove it from workers vector
988 
989  // before worker will be really destroyed indicate to the world that processor is disappear
990  WorkersSetChanged();
991 
992  if (rec!=0) {
993 
994  if (rec->work && rec->work->IsLogging())
995  DOUT0("Trying to destroy worker %p id %u via thread %s", rec->work, id, GetName());
996 
997  // release thread reference from here
998  if (rec->work) rec->work->ClearThreadRef();
999 
1000  // true indicates that object should be destroyed immediately
1001  if (rec->doinghalt & actDestroy) {
1002  if (rec->work && rec->work->DestroyCalledFromOwnThread())
1003  delete rec->work;
1004  }
1005 
1006  // inform all commands that everything goes well
1007  rec->cmds.ReplyAll(cmd_true);
1008 
1009  delete rec;
1010  }
1011 
1012  LockGuard guard(ThreadMutex());
1013 
1014  DOUT2("THRD:%s mark thread for cleanup check", GetName());
1015 
1016  // indicate for thread itself that it can be optimized
1017  fCheckThrdCleanup = true;
1018 
1019  return cmd_true;
1020 }
1021 
1022 
1023 void dabc::Thread::_Fire(const EventId& arg, int nq)
1024 {
1025  DOUT3("Thrd: %p %s Fire event code:%u item:%u arg:%u nq:%d NumQueues:%u", this, GetName(), arg.GetCode(), arg.GetItem(), arg.GetArg(), nq, fNumQueues);
1026 
1027  _PushEvent(arg, nq);
1028  fWorkCond._DoFire();
1029 
1030 #ifdef DABC_EXTRA_CHECKS
1031 
1032  long sum = 0;
1033  for (int n=0;n<fNumQueues;n++) sum+=fQueues[n].Size();
1034  if (sum!=fWorkCond._FiredCounter()) {
1036  DOUT5("Thrd %s Error sum1 %ld cond %ld event %s",
1037  GetName(), sum, fWorkCond._FiredCounter(),
1038  arg.asstring().c_str());
1039  }
1040 #endif
1041 
1042 }
1043 
1044 bool dabc::Thread::WaitEvent(EventId& evid, double tmout)
1045 {
1046 
1047  LockGuard lock(ThreadMutex());
1048 
1049  if (fWorkCond._DoWait(tmout))
1050  return _GetNextEvent(evid);
1051 
1052  return false;
1053 }
1054 
1055 
1056 void dabc::Thread::ProcessEvent(const EventId& evnt)
1057 {
1058  uint16_t itemid = evnt.GetItem();
1059 
1060  DOUT5("*** Thrd:%s Event:%s q0:%u q1:%u q2:%u",
1061  GetName(), evnt.asstring().c_str(),
1062  fQueues[0].Size(), fQueues[1].Size(), fQueues[2].Size());
1063 
1064  if (itemid>0) {
1065  if (itemid>=fWorkers.size()) {
1066  EOUT("Thrd:%p %s FALSE worker id:%u size:%u evnt:%s - ignore", this, GetName(), itemid, fWorkers.size(), evnt.asstring().c_str());
1067  return;
1068  }
1069 
1070  Worker* worker = fWorkers[itemid]->work;
1071 
1072  DOUT3("*** Thrd:%p proc:%p itemid:%u event:%u doinghalt:%u", this, worker, itemid, evnt.GetCode(), fWorkers[itemid]->doinghalt);
1073 
1074  if (worker==0) return;
1075 
1076  fWorkers[itemid]->processed++;
1077 
1078  if (worker==dabc::mgr())
1079  DOUT3("Process manager event %s fired:%u processed: %u", evnt.asstring().c_str(), worker->fWorkerFiredEvents, fWorkers[itemid]->processed);
1080 
1081  try {
1082 
1083  DOUT3("*** Thrd:%p proc:%s event:%u", this, worker->GetName(), evnt.GetCode());
1084 
1085  IntGuard iguard(fWorkers[itemid]->recursion);
1086 
1087  if (evnt.GetCode() < Worker::evntFirstSystem) {
1088  if (evnt.GetCode() < Worker::evntFirstAddOn)
1089  worker->ProcessCoreEvent(evnt);
1090  else {
1091  if (worker->fAddon.null())
1092  EOUT("Get event for non-existing addon");
1093  else
1094  worker->fAddon()->ProcessEvent(evnt);
1095  }
1096  } else
1097  worker->ProcessEvent(evnt);
1098 
1099  } catch (...) {
1100 
1101  if (fWorkers[itemid]->doinghalt)
1102  CheckWorkerCanBeHalted(itemid);
1103 
1104  throw;
1105  }
1106 
1107  // this block will be executed also if exception was produced by user
1108  if (fWorkers[itemid]->doinghalt)
1109  CheckWorkerCanBeHalted(itemid);
1110 
1111  } else
1112 
1113  switch (evnt.GetCode()) {
1114  case evntCheckTmoutWorker: {
1115 
1116  if (evnt.GetArg() >= fWorkers.size()) {
1117  DOUT3("evntCheckTmoutWorker - mismatch in processor id:%u sz:%u ", evnt.GetArg(), fWorkers.size());
1118  break;
1119  }
1120 
1121  WorkerRec* rec = fWorkers[evnt.GetArg()];
1122  if (rec->work==0) {
1123  DOUT3("Worker no longer exists", evnt.GetArg());
1124  break;
1125  }
1126 
1127  if (rec->tmout_worker.CheckEvent(ThreadMutex())) CheckTimeouts(true);
1128 
1129  break;
1130  }
1131 
1132  case evntCheckTmoutAddon: {
1133 
1134  if (evnt.GetArg() >= fWorkers.size()) {
1135  DOUT3("evntCheckTmoutWorker - mismatch in processor id:%u sz:%u ", evnt.GetArg(), fWorkers.size());
1136  break;
1137  }
1138 
1139  WorkerRec* rec = fWorkers[evnt.GetArg()];
1140  if (rec->work==0) {
1141  DOUT3("Worker no longer exists", evnt.GetArg());
1142  break;
1143  }
1144 
1145  if (rec->tmout_addon.CheckEvent(ThreadMutex())) CheckTimeouts(true);
1146 
1147  break;
1148  }
1149 
1150 
1151  case evntCleanupThrd: {
1152 
1153  unsigned totalsize(0);
1154  {
1155  LockGuard lock(ThreadMutex());
1156  totalsize = _TotalNumberOfEvents();
1157 
1158  // if cleanup was started due to no workers in thread,
1159  // one should stop it while thread will be deleted by other means
1160  if ((evnt.GetArg() < 100) && !_IsNormalState()) break;
1161  }
1162 
1163  if (fWorkers.size()!=2) {
1164  // this is situation when cleanup was started by DecReference while
1165  // there is no more references on the thread and one can destroy thread
1166  // one need to ensure that no more other events existing
1167  DOUT3("Thrd:%s Cleanup running when num workers %u != 2 - something strange", GetName(), fWorkers.size());
1168  }
1169 
1170  DOUT3("THRD:%s Num workers = %u totalsize %u", GetName(), fWorkers.size(), totalsize);
1171 
1172  if ((totalsize>0) && (evnt.GetArg() % 100 < 20)) {
1173  LockGuard lock(ThreadMutex());
1174  _Fire(EventId(evntCleanupThrd, 0, evnt.GetArg()+1), priorityLowest);
1175  } else {
1176 
1177  if (totalsize>0)
1178  EOUT("THRD %s %u events are not processed", GetName(), totalsize);
1179 
1180  if (dabc::mgr.null()) {
1181  printf("Cannot normally destroy thread %s while manager reference is already empty\n", GetName());
1182  fThrdWorking = false;
1183  } else
1184  if (!dabc::mgr()->DestroyObject(this)) {
1185  EOUT("Thread cannot be normally destroyed, just leave main loop");
1186  fThrdWorking = false;
1187  } else {
1188  DOUT3(" -------- THRD %s refcnt %u DESTROYMENT GOES TO MANAGER", GetName(), fObjectRefCnt);
1189  }
1190  }
1191 
1192  break;
1193  }
1194 
1195  case evntDoNothing:
1196  break;
1197 
1198  case evntStopThrd: {
1199  DOUT3("Thread %s get stop event", GetName());
1200  fThrdWorking = false;
1201  break;
1202  }
1203 
1204  default:
1205  ProcessExtraThreadEvent(evnt);
1206  break;
1207  }
1208 
1209  DOUT5("Thrd:%s Item:%u Event:%u arg:%u done", GetName(), itemid, evnt.GetCode(), evnt.GetArg());
1210 }
1211 
1212 
1213 bool dabc::Thread::AddWorker(Reference ref, bool sync)
1214 {
1215  Command cmd("AddWorker");
1216  cmd.SetRef("Worker", ref);
1217  return sync ? fExec->Execute(cmd) : fExec->Submit(cmd);
1218 }
1219 
1220 bool dabc::Thread::Execute(dabc::Command cmd, double tmout)
1221 {
1223 
1224  return fExec->Execute(cmd, tmout);
1225 }
1226 
1227 bool dabc::Thread::HaltWorker(Worker* work)
1228 {
1229  if (work==0) return false;
1230 
1231  if (IsItself())
1232  return CheckWorkerCanBeHalted(work->fWorkerId, actHalt) == cmd_true;
1233 
1234  Command cmd("HaltWorker");
1235  cmd.SetUInt("WorkerId", work->fWorkerId);
1237  return fExec->Execute(cmd) == cmd_true;
1238 }
1239 
1240 
1241 void dabc::Thread::WorkerAddonChanged(Worker* work, bool assign)
1242 {
1243  if (work==0) return;
1244 
1245  if (!IsItself()) {
1246  EOUT("Not allowed from other thread");
1247  exit(333);
1248  }
1249 
1250  if (work->WorkerId() >= fWorkers.size()) {
1251  EOUT("Mismatch of workers IDs");
1252  exit(333);
1253  }
1254 
1255  WorkerRec* rec = fWorkers[work->WorkerId()];
1256 
1257  if (rec->work != work) {
1258  EOUT("%s Mismatch of worker id %u rec: %p worker: %p ", GetName(), work->WorkerId(), rec->work, work);
1259  exit(444);
1260  }
1261 
1262  rec->addon = assign ? work->fAddon() : 0;
1263 
1264  WorkersSetChanged();
1265 }
1266 
1267 
1268 bool dabc::Thread::InvokeWorkerDestroy(Worker* work)
1269 {
1270  // TODO: one must be sure that command is executed,
1271  // therefore state of the thread must be checked
1272  // This action can only work asynchron
1273 
1274  if (work==0) return false;
1275 
1276  Command cmd("InvokeWorkerDestroy");
1277  cmd.SetUInt("WorkerId", work->fWorkerId);
1278 
1279  DOUT4("Exec %p Invoke to destroy worker %p %s", fExec, work, work->GetName());
1280 
1281  return fExec->Submit(cmd);
1282 }
1283 
1284 double dabc::Thread::CheckTimeouts(bool forcerecheck)
1285 {
1286  if (fProcessingTimeouts>0) return -1.;
1287 
1288  IntGuard guard(fProcessingTimeouts);
1289 
1290  TimeStamp now;
1291 
1292  if (!forcerecheck) {
1293  if (fNextTimeout.null()) return -1.;
1294  now.GetNow();
1295  double dist = fNextTimeout - now;
1296  if (dist>0.) return dist;
1297  } else
1298  now.GetNow();
1299 
1300  double min_tmout(-1.), last_diff(0.);
1301 
1302  for (unsigned n=1;n<fWorkers.size();n++) {
1303  WorkerRec* rec = fWorkers[n];
1304  if ((rec==0) || (rec->work==0)) continue;
1305 
1306  if (rec->tmout_worker.CheckNextProcess(now, min_tmout, last_diff)) {
1307 
1308  double dist = rec->work->ProcessTimeout(last_diff);
1309 
1310  rec->tmout_worker.SetNextFire(now, dist, min_tmout);
1311  }
1312 
1313  if (rec->tmout_addon.CheckNextProcess(now, min_tmout, last_diff)) {
1314 
1315  double dist = rec->work->ProcessAddonTimeout(last_diff);
1316 
1317  rec->tmout_addon.SetNextFire(now, dist, min_tmout);
1318  }
1319  }
1320 
1321  if (min_tmout>=0.)
1322  fNextTimeout = now + min_tmout;
1323  else
1324  fNextTimeout.Reset();
1325 
1326  return min_tmout;
1327 }
1328 
1330 {
1331  DOUT3("---- THRD %s ObjectCleanup refcnt %u", GetName(), fObjectRefCnt);
1332 
1333  // FIXME: should we wait until all commands and all events are processed
1334  // FIXME: can we delete worker already here??
1335 
1336  // we keep exec worker for a while
1337  RemoveChild(fExec, false);
1338  if (fExec->GetParent()!=0) EOUT("PARENT IS STILL THERE");
1339 
1340  {
1341  LockGuard lock(ObjectMutex());
1342  // Workaround - Exec processor is keeping one reference
1343  // We ignore this reference and manager is allowed to destroy thread once any other references are cleared
1344  fObjectRefCnt--;
1345  fDidDecRefCnt = true;
1346  }
1347 
1348  DOUT3("---- THRD %s ObjectCleanup in the middle", GetName());
1349 
1351 
1352  DOUT3("---- THRD %s ObjectCleanup done refcnt = %u workerssize = %u", GetName(), NumReferences(), fWorkers.size());
1353 }
1354 
1356 {
1357  // we are outside own thread - let other do dirty job for other, also if thread not working one cannot delete itself
1358  // FIXME :should we check that there is no events in the queues like
1359  // if (!IsItself() && (_TotalNumberOfEvents()==0)) return false;
1360 
1361  if (!IsItself() || !fThrdWorking || !fRealThrd) return false;
1362 
1363  DOUT2("!!!!!!!!!!!! THRD %s DO DELETE ITSELF !!!!!!!!!!!!!!!", GetName());
1364 
1365  _Fire(EventId(evntCleanupThrd, 0, 100), priorityNormal);
1366 
1367  return true;
1368 }
1369 
1370 
1371 void dabc::Thread::Print(int lvl)
1372 {
1373  dabc::Object::Print(lvl);
1374 
1375  LockGuard guard(ThreadMutex());
1376 
1377  dabc::lgr()->Debug(lvl, "file", 1, "func", dabc::format(" Workers vector size: %u", fWorkers.size()).c_str());
1378 
1379  for (unsigned n=1;n<fWorkers.size();n++) {
1380  Worker* work = fWorkers[n]->work;
1381 
1382  if (work==0) continue;
1383  dabc::lgr()->Debug(lvl, "file", 1, "func", dabc::format(" Worker: %u is %p %s %s", n, work, work->GetName(), work->ClassName()).c_str());
1384  }
1385 }
1386 
1387 unsigned dabc::Thread::NumWorkers()
1388 {
1389  unsigned cnt = 0;
1390  for (unsigned n=1;n<fWorkers.size();n++)
1391  if (fWorkers[n]->work != 0) cnt++;
1392 
1393  return cnt;
1394 }
1395 
1396 // -----------------------------------------------------------------
1397 
1398 bool dabc::ThreadRef::Execute(dabc::Command cmd, double tmout)
1399 {
1400  if (GetObject()==0) return false;
1401 
1402  return GetObject()->Execute(cmd, tmout);
1403 }
1404 
1405 
1406 void dabc::ThreadRef::RunEventLoop(double tmout)
1407 {
1408  if (tmout<0) tmout = 0.;
1409 
1410  if (GetObject()==0)
1411  dabc::Sleep(tmout);
1412  else
1413  GetObject()->RunEventLoop(tmout);
1414 }
1415 
1416 bool dabc::ThreadRef::_ActivateWorkerTimeout(unsigned workerid, int priority, double tmout)
1417 {
1418  if (GetObject()==0) return false;
1419 
1420  if (workerid >= GetObject()->fWorkers.size()) return false;
1421 
1422  Thread::WorkerRec* rec = GetObject()->fWorkers[workerid];
1423 
1424  // TODO: why worker priority is important here ????
1425  // with default priority multinode applications (ib-test) not connecting correctly
1426 
1427  if (rec->tmout_worker.Activate(tmout))
1428  GetObject()->_Fire(EventId(Thread::evntCheckTmoutWorker, 0, workerid), priority);
1429 
1430  return true;
1431 }
1432 
1433 bool dabc::ThreadRef::_ActivateAddonTimeout(unsigned workerid, int priority, double tmout)
1434 {
1435  if (GetObject()==0) return false;
1436 
1437  if (workerid >= GetObject()->fWorkers.size()) return false;
1438 
1439  Thread::WorkerRec* rec = GetObject()->fWorkers[workerid];
1440 
1441  // TODO: why worker priority is important here ????
1442  // with default priority multinode applications (ib-test) not connecting correctly
1443 
1444  if (rec->tmout_addon.Activate(tmout))
1445  GetObject()->_Fire(EventId(Thread::evntCheckTmoutAddon, 0, workerid), priority);
1446 
1447  return true;
1448 }
1449 
1450 
1451 bool dabc::ThreadRef::MakeWorkerFor(WorkerAddon* addon, const std::string &name)
1452 {
1453  if (null()) return false;
1454  Worker* worker = new Worker(0, name.empty() ? "dummy" : name.c_str());
1455  worker->AssignAddon(addon);
1456  return worker->AssignToThread(*this);
1457 }
Represents command with its arguments.
Definition: Command.h:99
bool SetUInt(const std::string &name, unsigned v)
Definition: Command.h:147
void SetPriority(int prio)
Set command priority, defines how fast command should be treated In special cases priority allows to ...
Definition: Command.h:213
DABC exception.
Definition: Exception.h:57
bool IsTimeout() const
Definition: Exception.h:77
bool IsStop() const
Definition: Exception.h:76
virtual const char * what() const
Definition: Exception.cxx:53
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
void MarkChangedItems(uint64_t tm=0)
If any field was modified, item will be marked with new version.
Definition: Hierarchy.h:330
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
Hierarchy GetHChild(const std::string &name, bool allowslahes=false, bool force=false, bool sortorder=false)
Return child, if necessary creates with full subfolder If force specified, missing childs and folders...
Definition: Hierarchy.cxx:944
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Definition: Hierarchy.cxx:934
void EnableHistory(unsigned length=100, bool withchilds=false)
Activate history production for selected element and its childs.
Definition: Hierarchy.cxx:837
Lock guard for posix mutex.
Definition: threads.h:127
static void Debug(int level, const char *filename, unsigned linenumber, const char *funcname, const char *message)
Definition: logging.h:106
void SetFlag(unsigned fl, bool on=true)
Change value of selected flag, not thread safe
Definition: Object.h:187
virtual void Print(int lvl=0)
Print object content on debug output.
Definition: Object.cxx:1092
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
void SetAutoDestroy(bool on=true)
Set autodestroy flag for the object Once enabled, object will be destroyed when last reference will b...
Definition: Object.cxx:160
Mutex * ObjectMutex() const
Returns mutex, used for protection of Object data members.
Definition: Object.h:190
int fObjectRefCnt
accounts how many references existing on the object, thread safe
Definition: Object.h:179
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
Definition: Object.cxx:532
@ flTopXmlLevel
object (or folder) can be found on top xml level in the Context
Definition: Object.h:172
static void Destroy(Object *obj)
User method for object destroyment.
Definition: Object.cxx:921
static Thread_t Self()
Definition: threads.h:401
void Start(Runnable *run)
Start thread with provided runnable.
Definition: threads.cxx:358
void UseCurrentAsSelf()
Definition: threads.h:347
void Join()
Join thread - method waits until thread execution is completed.
Definition: threads.cxx:389
void Kill(int sig=9)
Kill thread with specified signal.
Definition: threads.cxx:407
void Cancel()
Try to cancel thread execution.
Definition: threads.cxx:412
void SetThreadName(const char *thrdname)
Set thread name, which can be seen from htop.
Definition: threads.cxx:417
unsigned Size() const
Definition: Queue.h:321
T & Item(unsigned indx) const
Definition: Queue.h:280
bool AsBool(bool dflt=false) const
Definition: Record.cxx:477
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Reference.cxx:241
friend class Thread
Definition: threads.h:321
bool Execute(Command cmd, double tmout=-1.)
Definition: Thread.cxx:1398
bool MakeWorkerFor(WorkerAddon *addon, const std::string &name="")
Make dummy worker to run addon inside the thread.
Definition: Thread.cxx:1451
bool _ActivateWorkerTimeout(unsigned workerid, int priority, double tmout)
Definition: Thread.cxx:1416
bool _ActivateAddonTimeout(unsigned workerid, int priority, double tmout)
Definition: Thread.cxx:1433
void RunEventLoop(double tmout=1.)
Runs thread event loop for specified time.
Definition: Thread.cxx:1406
helper class to use methods, available in dabc::Worker in thread itself
Definition: Thread.cxx:50
int fCnt
! if true, different thread parameters will be published
Definition: Thread.cxx:57
virtual double ProcessTimeout(double last_diff)
Definition: Thread.cxx:124
virtual void BeforeHierarchyScan(Hierarchy &h)
Method called before publisher makes next snapshot of hierarchy.
Definition: Thread.cxx:120
virtual bool Find(ConfigIO &cfg)
Method to locate object in xml file.
Definition: Thread.cxx:111
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Thread.cxx:99
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Thread.cxx:97
ExecWorker(Thread *parent, Command cmd)
Definition: Thread.cxx:60
friend class Thread
Definition: Thread.cxx:53
bool DirtyWorkaround()
Just workaround to check if execution is still performed.
Definition: Thread.cxx:89
bool _GetNextEvent(EventId &)
Definition: Thread.cxx:396
void IncWorkerFiredEvents(Worker *work)
Definition: Thread.cxx:342
virtual bool CompatibleClass(const std::string &clname) const
Definition: Thread.cxx:428
bool IsTemporaryThread() const
Returns true is this is temporary thread for command execution.
Definition: Thread.h:380
virtual void Print(int lvl=0)
Print thread content on debug output.
Definition: Thread.cxx:1371
bool Execute(Command cmd, double tmout=-1)
Definition: Thread.cxx:1220
WorkersVector fWorkers
vector of all processors
Definition: Thread.h:289
EThreadState fState
actual thread state
Definition: Thread.h:276
void WorkerAddonChanged(Worker *work, bool assign=true)
Called when worker addon changed on the fly.
Definition: Thread.cxx:1241
void FireDoNothingEvent()
Definition: Thread.cxx:826
virtual bool _DoDeleteItself()
This method is called at the moment when DecReference think that object can be destroyed and wants to...
Definition: Thread.cxx:1355
int CheckWorkerCanBeHalted(unsigned id, unsigned request=0, Command cmd=nullptr)
Internal DABC method, used to verify if worker can be halted now while recursion is over Request indi...
Definition: Thread.cxx:929
bool HaltWorker(Worker *proc)
Halt worker - stops any execution, break recursion.
Definition: Thread.cxx:1227
friend class ThreadRef
Definition: Thread.h:113
int fNumQueues
number of queues
Definition: Thread.h:284
bool Stop(double timeout_sec=10)
Definition: Thread.cxx:690
void RunEventLoop(double tmout=1.)
Runs thread event loop for specified time.
Definition: Thread.cxx:492
double GetStopTimeout() const
Definition: Thread.h:344
friend class ExecWorker
Definition: Thread.h:266
virtual void RunnableCancelled()
Definition: Thread.cxx:680
Mutex * ThreadMutex() const
Definition: Thread.h:377
bool InvokeWorkerDestroy(Worker *work)
Cleanup object asynchronously.
Definition: Thread.cxx:1268
virtual bool WaitEvent(EventId &, double tmout)
Definition: Thread.cxx:1044
bool SetExplicitLoop(Worker *work)
Definition: Thread.cxx:779
EventsQueue * fQueues
queues for threads events
Definition: Thread.h:283
virtual int ExecuteThreadCommand(Command cmd)
Definition: Thread.cxx:833
unsigned TotalNumberOfEvents()
Return total number of all events in the queues.
Definition: Thread.cxx:355
bool SingleLoop(unsigned workerid=0, double tmout=-1)
Processes single event from the thread queue.
Definition: Thread.cxx:468
bool fDidDecRefCnt
indicates if object cleanup was called - need in destructor
Definition: Thread.h:294
int RunCommandInTheThread(Worker *caller, Worker *dest, Command cmd)
Definition: Thread.cxx:517
virtual ~Thread()
Definition: Thread.cxx:276
virtual void * MainLoop()
Definition: Thread.cxx:435
unsigned NumWorkers()
Returns actual number of workers.
Definition: Thread.cxx:1387
unsigned _TotalNumberOfEvents()
Definition: Thread.cxx:347
void RunExplicitLoop()
Definition: Thread.cxx:794
bool Start(double timeout_sec=-1., bool real_thread=true)
Definition: Thread.cxx:618
virtual void ObjectCleanup()
Cleanup thread that manager is allowed to delete it.
Definition: Thread.cxx:1329
void ProcessNoneEvent()
Definition: Thread.cxx:361
virtual void _Fire(const EventId &arg, int nq)
Definition: Thread.cxx:1023
static unsigned fThreadInstances
Definition: Thread.h:303
ExecWorker * fExec
processor to execute commands in the thread
Definition: Thread.h:293
friend class RecursionGuard
Definition: Thread.h:270
bool AddWorker(Reference ref, bool sync=true)
Internal DABC method, Add worker to thread; reference-safe Reference safe means - it is safe to call ...
Definition: Thread.cxx:1213
@ evntCheckTmoutWorker
event used to process timeout for specific worker, used by ActivateTimeout
Definition: Thread.h:319
@ evntCheckTmoutAddon
event used to process timeout for addon, used by ActivateTimeout
Definition: Thread.h:320
bool fProfiling
if true, different statistic will be accumulated about thread
Definition: Thread.h:297
double CheckTimeouts(bool forcerecheck=false)
Definition: Thread.cxx:1284
bool Sync(double timeout_sec=-1)
Definition: Thread.cxx:774
void ChangeRecursion(unsigned id, bool inc)
Method which allows to control recursion of each worker.
Definition: Thread.cxx:909
void ProcessEvent(const EventId &)
Definition: Thread.cxx:1056
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
@ priorityMaximum
Definition: Worker.h:207
uint32_t fWorkerId
worker id in thread list, used for events submit
Definition: Worker.h:155
Worker(const ConstructorPair &pair)
Special constructor, designed for inherited classes.
Definition: Worker.cxx:109
void ClearThreadRef()
Method to 'forget' thread reference.
Definition: Worker.cxx:213
Hierarchy fWorkerHierarchy
place for publishing of worker parameters
Definition: Worker.h:168
bool fWorkerActive
indicates if worker can submit events to the thread
Definition: Worker.h:161
friend class Command
Definition: Worker.h:128
int fWorkerCommandsLevel
Number of process commands recursion.
Definition: Worker.h:166
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration field of specified name Configuration value of specified name searched in follo...
Definition: Worker.cxx:521
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Worker.cxx:851
@ evntFirstSystem
Definition: Worker.h:202
@ evntFirstAddOn
Definition: Worker.h:201
ThreadRef fThread
reference on the thread, once assigned remain whole time
Definition: Worker.h:151
void SetWorkerPriority(int nq)
Definition: Worker.h:259
Reference GetPublisher()
Return reference on publisher.
Definition: Worker.cxx:1068
virtual bool Publish(const Hierarchy &h, const std::string &path)
Definition: Worker.cxx:1075
Mutex * fThreadMutex
pointer on main thread mutex
Definition: Worker.h:158
int usage(const char *errstr=nullptr)
Definition: hldprint.cxx:28
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
Event manipulation API.
Definition: api.h:23
void Sleep(double tm)
Definition: timing.cxx:129
const char * xmlThrdStopTime
Definition: ConfigBase.cxx:75
TimeStamp Now()
Definition: timing.h:260
void SetDebugLevel(int level=0)
Definition: logging.cxx:468
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * prop_kind
Definition: Hierarchy.cxx:29
const char * typeThread
Definition: Object.cxx:77
const char * xmlAffinity
Definition: ConfigBase.cxx:77
const char * xmlNameAttr
Definition: ConfigBase.cxx:33
const char * xmlThreadNode
Definition: Object.cxx:31
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_timedout
Definition: Command.h:40
@ cmd_ignore
Definition: Command.h:41
@ cmd_true
Definition: Command.h:38
Logger * lgr()
Definition: logging.cxx:483
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
std::string asstring() const
Definition: Thread.cxx:37
uint32_t GetArg() const
Definition: Thread.h:94
uint16_t GetItem() const
Definition: Thread.h:93
uint16_t GetCode() const
Definition: Thread.h:92