DABC (Data Acquisition Backbone Core)  2.9.9
Thread.h
Go to the documentation of this file.
1 // $Id: Thread.h 3862 2018-05-11 10:06:18Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #ifndef DABC_Thread
17 #define DABC_Thread
18 
19 #ifndef DABC_Object
20 #include "dabc/Object.h"
21 #endif
22 
23 #ifndef DABC_defines
24 #include "dabc/defines.h"
25 #endif
26 
27 #ifndef DABC_threads
28 #include "dabc/threads.h"
29 #endif
30 
31 #ifndef DABC_Queue
32 #include "dabc/Queue.h"
33 #endif
34 
35 #ifndef DABC_Command
36 #include "dabc/Command.h"
37 #endif
38 
39 #ifndef DABC_CommandsQueue
40 #include "dabc/CommandsQueue.h"
41 #endif
42 
43 #ifndef DABC_timing
44 #include "dabc/timing.h"
45 #endif
46 
47 #ifndef DABC_Exception
48 #include "dabc/Exception.h"
49 #endif
50 
51 #ifndef DABC_logging
52 #include "dabc/logging.h"
53 #endif
54 
55 #include <vector>
56 
57 namespace dabc {
58 
59  class Worker;
60  class WorkerAddon;
61  class ThreadRef;
62 
63 
70  struct EventId {
71  uint64_t value;
72 
73  inline EventId() : value(0) {}
74 
75  inline EventId(const EventId& src) : value(src.value) {}
76 
77  inline EventId(uint16_t code) : value(code) {}
78 
79  inline EventId(uint16_t code, uint16_t item) : value((uint64_t) item << 48 | code) {}
80 
81  inline EventId(uint16_t code, uint16_t item, uint32_t arg) :
82  value((((uint64_t) item) << 48) | (((uint64_t) arg) << 16) | code) {}
83 
84  inline EventId& operator=(const EventId& src) { value = src.value; return *this; }
85 
86  inline EventId& operator=(uint64_t _value) { value = _value; return *this; }
87 
88  inline bool isnull() const { return value==0; }
89 
90  inline bool notnull() const { return value!=0; }
91 
92  inline uint16_t GetCode() const { return value &0xffffLU; }
93  inline uint16_t GetItem() const { return value >> 48; }
94  inline uint32_t GetArg() const { return (value >> 16) & 0xffffffffLU; }
95 
96  std::string asstring() const;
97 
98  inline static EventId Null() { return EventId(); }
99  };
100 
107  class Thread : public Object,
108  protected PosixThread,
109  protected Runnable {
110  protected:
111 
112  friend class Worker;
113  friend class ThreadRef;
114 
116  private:
118  unsigned workerid;
119  public:
120  RecursionGuard(Thread* t, unsigned id) :
121  thrd(t),
122  workerid(id)
123  {
124  if (thrd) thrd->ChangeRecursion(workerid, true);
125  }
126 
128  {
129  if (thrd) thrd->ChangeRecursion(workerid, false);
130  }
131 
132  };
133 
134  struct TimeoutRec {
136  double tmout_interv;
138 
141 
143 
144  TimeoutRec(const TimeoutRec& src) :
145  tmout_mark(src.tmout_mark),
148  prev_fire(src.prev_fire),
149  next_fire(src.next_fire) {}
150 
152  bool Activate(double tmout)
153  {
154  bool dofire = !tmout_active;
155  tmout_mark = dabc::Now();
156  tmout_interv = tmout;
157  tmout_active = true;
158  return dofire;
159  }
160 
163  bool CheckEvent(Mutex* thread_mutex)
164  {
165  TimeStamp mark;
166  double interv(0.);
167 
168  {
169  LockGuard lock(thread_mutex);
170  if (!tmout_active) return false;
171  mark = tmout_mark;
172  interv = tmout_interv;
173  tmout_active = false;
174  }
175 
176  if (interv<0) {
177  next_fire.Reset();
178  prev_fire.Reset();
179  } else {
180  // if one activate timeout with positive interval, emulate
181  // that one already has previous call to ProcessTimeout
182  if (prev_fire.null() && (interv>0))
183  prev_fire = mark;
184 
185  mark+=interv;
186 
187  // set activation time only in the case if no other active timeout was used
188  // TODO: why such condition was here??
189  // every new activate call should set new marker for timeout processing
190 
191 // if (next_fire.null() || (mark < next_fire)) {
192  next_fire = mark;
193  return true;
194 // }
195  }
196 
197  return false;
198  }
199 
200  bool CheckNextProcess(const TimeStamp& now, double& min_tmout, double& last_diff)
201  {
202  if (next_fire.null()) return false;
203 
204  double dist = next_fire - now;
205 
206  if (dist>=0.) {
207  if ((min_tmout<0.) || (dist<min_tmout)) min_tmout = dist;
208  return false;
209  }
210 
211  last_diff = prev_fire.null() ? 0. : now - prev_fire;
212 
213  return true;
214  }
215 
216  void SetNextFire(const TimeStamp& now, double dist, double& min_tmout)
217  {
218  if (dist>=0.) {
219  prev_fire = now;
220  next_fire = now + dist;
221  if ((min_tmout<0.) || (dist<min_tmout)) min_tmout = dist;
222  } else {
223  prev_fire.Reset();
224  next_fire.Reset();
225  }
226  }
227  };
228 
229 
230 
231  struct WorkerRec {
234  unsigned doinghalt;
235  int recursion;
236  unsigned processed;
238 
241 
243  work(w),
244  addon(a),
245  doinghalt(0),
246  recursion(0),
247  processed(0),
248  cmds(CommandsQueue::kindSubmit),
249  tmout_worker(),
250  tmout_addon() {}
251  };
252 
253  typedef std::vector<WorkerRec*> WorkersVector;
254 
255  class EventsQueue : public Queue<EventId, true> {
256  public:
257  int scaler;
258 
260  Queue<EventId, true>(),
261  scaler(8)
262  {
263  }
264  };
265 
266  class ExecWorker;
267 
268  friend class ExecWorker;
269  friend class Object;
270  friend class RecursionGuard;
271 
272  enum EHaltActions { actDestroy = 1, actHalt = 2 };
273 
275 
277 
279  bool fRealThrd;
280 
282 
285 
288 
290 
291  unsigned fExplicitLoop;
292 
296 
297  bool fProfiling;
299  double fThreadRunTime;
300 
302 
303  static unsigned fThreadInstances;
304 
305 
310  int CheckWorkerCanBeHalted(unsigned id, unsigned request = 0, Command cmd = nullptr);
311 
312  void IncWorkerFiredEvents(Worker* work);
313 
314  int RunCommandInTheThread(Worker* caller, Worker* dest, Command cmd);
315 
316 
317  public:
318 
325  evntUser = 10000 };
326 
330 
331  static unsigned NumThreadInstances() { return fThreadInstances; }
332 
333  Thread(Reference parent, const std::string &name, Command cmd = nullptr);
334 
335  virtual ~Thread();
336 
337  virtual void* MainLoop();
338 
339  inline bool IsItself() const { return PosixThread::IsItself(); }
340  void SetPriority(int prio = 0) { PosixThread::SetPriority(prio); }
341 
342  // set stop timeout if required in the thread destructir
343  void SetStopTimeout(double timeout_sec) { fThrdStopTimeout = timeout_sec; }
344  double GetStopTimeout() const { return fThrdStopTimeout; }
345 
346  // thread manipulation, thread safe routines
347  bool Start(double timeout_sec = -1., bool real_thread = true);
348  bool Stop(double timeout_sec = 10);
349  bool Sync(double timeout_sec = -1);
350 
351  virtual const char* ClassName() const { return typeThread; }
352 
353  virtual bool CompatibleClass(const std::string &clname) const;
354 
355  void FireDoNothingEvent();
356 
357  bool Execute(Command cmd, double tmout = -1);
358 
362  bool SingleLoop(unsigned workerid = 0, double tmout = -1);
363 
367  void RunEventLoop(double tmout = 1.);
368 
370  virtual void Print(int lvl = 0);
371 
373  unsigned TotalNumberOfEvents();
374 
375  protected:
376 
377  inline Mutex* ThreadMutex() const { return ObjectMutex(); }
378 
380  bool IsTemporaryThread() const { return GetParent() == 0; }
381 
382  virtual int ExecuteThreadCommand(Command cmd);
383 
384  virtual bool WaitEvent(EventId&, double tmout);
385 
386  void ProcessEvent(const EventId&);
387 
390  virtual void ProcessExtraThreadEvent(const EventId&) {}
391 
392  void ProcessNoneEvent();
393 
394  bool _GetNextEvent(EventId&);
395 
396  virtual void RunnableCancelled();
397 
398  #ifdef DABC_EXTRA_CHECKS
399  static unsigned maxlimit;
400  #endif
401 
402  unsigned _TotalNumberOfEvents();
403 
404  inline void _PushEvent(const EventId& arg, int nq)
405  {
406  #ifdef DABC_EXTRA_CHECKS
407  if ((fNumQueues==0) || (fQueues==0) || (nq>=fNumQueues)) {
408  EOUT("False arguments fNumQueues:%d nq:%d", fNumQueues, nq);
409  return;
410  }
411  #endif
412 
413  fQueues[nq<0 ? fNumQueues - 1 : nq].Push(arg);
414 
415  #ifdef DABC_EXTRA_CHECKS
416  if (nq<0) nq = fNumQueues - 1;
417  if (fQueues[nq].Size()>maxlimit) {
418  EOUT("Thrd:%s Queue %d Event code:%u item:%u arg:%u exceed limit: %u", GetName(), nq,
419  arg.GetCode(), arg.GetItem(), arg.GetArg(), maxlimit);
420  maxlimit *= 2;
421  }
422  #endif
423  }
424 
425  virtual void _Fire(const EventId& arg, int nq);
426 
427  inline void Fire(const EventId& arg, int nq)
428  {
429  LockGuard lock(ThreadMutex());
430  _Fire(arg, nq);
431  }
432 
433  double CheckTimeouts(bool forcerecheck = false);
434 
438  bool AddWorker(Reference ref, bool sync = true);
439 
441  bool HaltWorker(Worker* proc);
442 
444  void WorkerAddonChanged(Worker* work, bool assign = true);
445 
446  bool SetExplicitLoop(Worker* work);
447 
448  void RunExplicitLoop();
449 
452  virtual void WorkersSetChanged() {}
453 
458  bool InvokeWorkerDestroy(Worker* work);
459 
463  void ChangeRecursion(unsigned id, bool inc);
464 
466  virtual void ObjectCleanup();
467 
469  virtual bool _DoDeleteItself();
470 
472  unsigned NumWorkers();
473  };
474 
475  // __________________________________________________________________________
476 
482  class ThreadRef : public Reference {
483 
484  friend class Worker;
485  friend class WorkerAddon;
486 
488 
489  protected:
490  bool _ActivateWorkerTimeout(unsigned workerid, int priority, double tmout);
491  bool _ActivateAddonTimeout(unsigned workerid, int priority, double tmout);
492 
494 
495  public:
496  bool IsItself() const { return GetObject() ? GetObject()->IsItself() : false; }
497 
498  bool Execute(Command cmd, double tmout = -1.);
499 
503  void RunEventLoop(double tmout = 1.);
504 
505  inline bool IsRealThrd() const { return GetObject() ? GetObject()->fRealThrd : false; }
506 
508  bool MakeWorkerFor(WorkerAddon* addon, const std::string &name = "");
509  };
510 
511 }
512 
513 #endif
#define DABC_REFERENCE(RefClass, ParentClass, T)
Definition: Reference.h:222
Represents command with its arguments.
Definition: Command.h:99
Queue of commands
Definition: CommandsQueue.h:39
posix pthread condition
Definition: threads.h:261
Lock guard for posix mutex.
Definition: threads.h:127
posix pthread mutex
Definition: threads.h:61
Base class for most of the DABC classes.
Definition: Object.h:116
Object * GetParent() const
Returns pointer on parent object, thread safe
Definition: Object.h:286
const char * GetName() const
Returns name of the object, thread safe
Definition: Object.h:295
Mutex * ObjectMutex() const
Returns mutex, used for protection of Object data members.
Definition: Object.h:190
class represents posix pthread functionality
Definition: threads.h:340
bool IsItself() const
Returns true if called from thread context.
Definition: threads.h:404
void SetPriority(int prio)
Change thread priority.
Definition: threads.cxx:395
Template of circular queue.
Definition: Queue.h:41
void Push(T val)
Definition: Queue.h:215
Reference on the arbitrary object
Definition: Reference.h:73
Object * GetObject() const
Return pointer on the object.
Definition: Reference.h:129
bool AcquireRefWithoutMutex(Reference &ref)
Special method, which allows to generate new reference when object mutex is locked.
Definition: Reference.cxx:123
Object which could be run inside the dabc::PosixThread
Definition: threads.h:320
friend class Thread
Definition: threads.h:321
Reference on the dabc::Thread class
Definition: Thread.h:482
bool Execute(Command cmd, double tmout=-1.)
Definition: Thread.cxx:1398
bool MakeWorkerFor(WorkerAddon *addon, const std::string &name="")
Make dummy worker to run addon inside the thread.
Definition: Thread.cxx:1451
bool _ActivateWorkerTimeout(unsigned workerid, int priority, double tmout)
Definition: Thread.cxx:1416
bool _ActivateAddonTimeout(unsigned workerid, int priority, double tmout)
Definition: Thread.cxx:1433
void RunEventLoop(double tmout=1.)
Runs thread event loop for specified time.
Definition: Thread.cxx:1406
bool IsRealThrd() const
Definition: Thread.h:505
bool IsItself() const
Definition: Thread.h:496
bool _AcquireThreadRef(ThreadRef &ref)
Definition: Thread.h:493
helper class to use methods, available in dabc::Worker in thread itself
Definition: Thread.cxx:50
RecursionGuard(Thread *t, unsigned id)
Definition: Thread.h:120
unsigned workerid
worker id which recursion is guarded
Definition: Thread.h:118
Thread * thrd
we can use direct pointer, reference will be preserved by other means
Definition: Thread.h:117
Represent thread functionality.
Definition: Thread.h:109
bool _GetNextEvent(EventId &)
Definition: Thread.cxx:396
bool fThrdWorking
flag indicates if mainloop of the thread should continue to work
Definition: Thread.h:278
void IncWorkerFiredEvents(Worker *work)
Definition: Thread.cxx:342
virtual bool CompatibleClass(const std::string &clname) const
Definition: Thread.cxx:428
bool IsTemporaryThread() const
Returns true is this is temporary thread for command execution.
Definition: Thread.h:380
void SetStopTimeout(double timeout_sec)
Definition: Thread.h:343
virtual void Print(int lvl=0)
Print thread content on debug output.
Definition: Thread.cxx:1371
bool Execute(Command cmd, double tmout=-1)
Definition: Thread.cxx:1220
WorkersVector fWorkers
vector of all processors
Definition: Thread.h:289
EThreadState fState
actual thread state
Definition: Thread.h:276
void WorkerAddonChanged(Worker *work, bool assign=true)
Called when worker addon changed on the fly.
Definition: Thread.cxx:1241
void _PushEvent(const EventId &arg, int nq)
Definition: Thread.h:404
void FireDoNothingEvent()
Definition: Thread.cxx:826
@ actDestroy
Definition: Thread.h:272
double fThreadRunTime
total run time (user and sys), measured by getrusage
Definition: Thread.h:299
virtual bool _DoDeleteItself()
This method is called at the moment when DecReference think that object can be destroyed and wants to...
Definition: Thread.cxx:1355
Condition fWorkCond
condition, which is used in default MainLoop implementation
Definition: Thread.h:281
int CheckWorkerCanBeHalted(unsigned id, unsigned request=0, Command cmd=nullptr)
Internal DABC method, used to verify if worker can be halted now while recursion is over Request indi...
Definition: Thread.cxx:929
bool HaltWorker(Worker *proc)
Halt worker - stops any execution, break recursion.
Definition: Thread.cxx:1227
int fNumQueues
number of queues
Definition: Thread.h:284
bool Stop(double timeout_sec=10)
Definition: Thread.cxx:690
unsigned fExplicitLoop
id of the worker, selected to run own explicit loop
Definition: Thread.h:291
void RunEventLoop(double tmout=1.)
Runs thread event loop for specified time.
Definition: Thread.cxx:492
double GetStopTimeout() const
Definition: Thread.h:344
double fThrdStopTimeout
time in second set as timeout when stopping thred
Definition: Thread.h:301
friend class ExecWorker
Definition: Thread.h:266
int fProcessingTimeouts
indicate recursion in timeouts processing
Definition: Thread.h:287
virtual void RunnableCancelled()
Definition: Thread.cxx:680
Mutex * ThreadMutex() const
Definition: Thread.h:377
bool fCheckThrdCleanup
indicates if thread should be checked for clean up
Definition: Thread.h:295
bool InvokeWorkerDestroy(Worker *work)
Cleanup object asynchronously.
Definition: Thread.cxx:1268
virtual bool WaitEvent(EventId &, double tmout)
Definition: Thread.cxx:1044
bool SetExplicitLoop(Worker *work)
Definition: Thread.cxx:779
EventsQueue * fQueues
queues for threads events
Definition: Thread.h:283
TimeStamp fNextTimeout
indicate when we expects next timeout
Definition: Thread.h:286
std::vector< WorkerRec * > WorkersVector
Definition: Thread.h:253
virtual int ExecuteThreadCommand(Command cmd)
Definition: Thread.cxx:833
unsigned TotalNumberOfEvents()
Return total number of all events in the queues.
Definition: Thread.cxx:355
bool SingleLoop(unsigned workerid=0, double tmout=-1)
Processes single event from the thread queue.
Definition: Thread.cxx:468
bool IsItself() const
Definition: Thread.h:339
@ stChanging
Definition: Thread.h:274
bool fDidDecRefCnt
indicates if object cleanup was called - need in destructor
Definition: Thread.h:294
int RunCommandInTheThread(Worker *caller, Worker *dest, Command cmd)
Definition: Thread.cxx:517
void Fire(const EventId &arg, int nq)
Definition: Thread.h:427
virtual ~Thread()
Definition: Thread.cxx:276
virtual void * MainLoop()
Definition: Thread.cxx:435
static unsigned NumThreadInstances()
Definition: Thread.h:331
unsigned NumWorkers()
Returns actual number of workers.
Definition: Thread.cxx:1387
unsigned _TotalNumberOfEvents()
Definition: Thread.cxx:347
void RunExplicitLoop()
Definition: Thread.cxx:794
bool Start(double timeout_sec=-1., bool real_thread=true)
Definition: Thread.cxx:618
virtual void ObjectCleanup()
Cleanup thread that manager is allowed to delete it.
Definition: Thread.cxx:1329
void SetPriority(int prio=0)
Definition: Thread.h:340
@ priorityHighest
Definition: Thread.h:327
@ priorityNormal
Definition: Thread.h:328
@ priorityLowest
Definition: Thread.h:329
void ProcessNoneEvent()
Definition: Thread.cxx:361
virtual void _Fire(const EventId &arg, int nq)
Definition: Thread.cxx:1023
virtual void ProcessExtraThreadEvent(const EventId &)
Method to process events which are not processed by Thread class itself Should be used in derived cla...
Definition: Thread.h:390
static unsigned fThreadInstances
Definition: Thread.h:303
ExecWorker * fExec
processor to execute commands in the thread
Definition: Thread.h:293
TimeStamp fLastProfileTime
when doing profiling, last time when profiling was done
Definition: Thread.h:298
bool AddWorker(Reference ref, bool sync=true)
Internal DABC method, Add worker to thread; reference-safe Reference safe means - it is safe to call ...
Definition: Thread.cxx:1213
virtual const char * ClassName() const
Returns class name of the object instance.
Definition: Thread.h:351
@ evntDoNothing
event fired to wake-up thread and let thread or processor to perform regular checks
Definition: Thread.h:322
@ evntStopThrd
event should stop thread
Definition: Thread.h:323
@ evntLastThrd
Definition: Thread.h:324
@ evntCheckTmoutWorker
event used to process timeout for specific worker, used by ActivateTimeout
Definition: Thread.h:319
@ evntCleanupThrd
event will be generated when thread can be destroyed
Definition: Thread.h:321
@ evntCheckTmoutAddon
event used to process timeout for addon, used by ActivateTimeout
Definition: Thread.h:320
bool fProfiling
if true, different statistic will be accumulated about thread
Definition: Thread.h:297
virtual void WorkersSetChanged()
Virtual method, called from thread context to inform that number of workers are changed.
Definition: Thread.h:452
double CheckTimeouts(bool forcerecheck=false)
Definition: Thread.cxx:1284
bool fRealThrd
indicate if we create real thread and not running mainloop from top process
Definition: Thread.h:279
bool Sync(double timeout_sec=-1)
Definition: Thread.cxx:774
void ChangeRecursion(unsigned id, bool inc)
Method which allows to control recursion of each worker.
Definition: Thread.cxx:909
void ProcessEvent(const EventId &)
Definition: Thread.cxx:1056
Generic addon for dabc::Worker.
Definition: Worker.h:49
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
#define EOUT(args ...)
Definition: logging.h:150
Event manipulation API.
Definition: api.h:23
TimeStamp Now()
Definition: timing.h:260
const char * typeThread
Definition: Object.cxx:77
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
EventId(const EventId &src)
Definition: Thread.h:75
EventId & operator=(uint64_t _value)
Definition: Thread.h:86
bool notnull() const
Definition: Thread.h:90
EventId(uint16_t code, uint16_t item, uint32_t arg)
Definition: Thread.h:81
static EventId Null()
Definition: Thread.h:98
std::string asstring() const
Definition: Thread.cxx:37
uint32_t GetArg() const
Definition: Thread.h:94
uint64_t value
Definition: Thread.h:71
EventId(uint16_t code, uint16_t item)
Definition: Thread.h:79
bool isnull() const
Definition: Thread.h:88
uint16_t GetItem() const
Definition: Thread.h:93
uint16_t GetCode() const
Definition: Thread.h:92
EventId(uint16_t code)
Definition: Thread.h:77
EventId & operator=(const EventId &src)
Definition: Thread.h:84
TimeoutRec(const TimeoutRec &src)
Definition: Thread.h:144
void SetNextFire(const TimeStamp &now, double dist, double &min_tmout)
Definition: Thread.h:216
TimeStamp tmout_mark
time mark when timeout should happen
Definition: Thread.h:135
bool CheckEvent(Mutex *thread_mutex)
Method called to check event, submitted when timeout was requested Returns true when check should be ...
Definition: Thread.h:163
bool Activate(double tmout)
Activating timeout.
Definition: Thread.h:152
bool tmout_active
true when timeout active
Definition: Thread.h:137
TimeStamp next_fire
when next timeout event will be called
Definition: Thread.h:140
bool CheckNextProcess(const TimeStamp &now, double &min_tmout, double &last_diff)
Definition: Thread.h:200
TimeStamp prev_fire
when previous timeout event was called
Definition: Thread.h:139
double tmout_interv
time interval was specified by timeout active
Definition: Thread.h:136
WorkerAddon * addon
addon for the worker, maybe thread-specific
Definition: Thread.h:233
Worker * work
pointer on the worker, should we use reference?
Definition: Thread.h:232
int recursion
recursion calls of the worker
Definition: Thread.h:235
TimeoutRec tmout_worker
timeout handling for worker
Definition: Thread.h:239
unsigned processed
current number of processed events, when balance between processed and fired is 0,...
Definition: Thread.h:236
WorkerRec(Worker *w, WorkerAddon *a)
Definition: Thread.h:242
TimeoutRec tmout_addon
timeout handling for addon
Definition: Thread.h:240
CommandsQueue cmds
postponed commands, which are waiting until object is destroyed or halted
Definition: Thread.h:237
unsigned doinghalt
indicates that events will not be longer accepted by the worker, all submitted commands still should ...
Definition: Thread.h:234
Class for acquiring and holding timestamps.
Definition: timing.h:40
bool null() const
Returns true if time stamp is not initialized or its value less than 0.
Definition: timing.h:131
void Reset()
Set time stamp value to null.
Definition: timing.h:134