stream  0.10.0
stream analysis framework
StreamProc.h
1 #ifndef BASE_STREAMPROC_H
2 #define BASE_STREAMPROC_H
3 
4 #include <string>
5 
6 #include "base/Processor.h"
7 
8 namespace base {
9 
10  class Event;
11 
21  class StreamProc : public Processor {
22  friend class ProcMgr;
23 
24  public:
25 
27  enum SyncKind {
31  sync_Right
32  };
33 
34 
35  protected:
36 
39 
42 
44 
45  unsigned fQueueScanIndex;
46  unsigned fQueueScanIndexTm;
47 
48  AnalysisKind fAnalysisKind;
49 
52  unsigned fSyncScanIndex;
53  bool fSyncFlag;
54 
57  GlobalTime_t fLastLocalTriggerTm;
58 
60 
63 
64  bool fTimeSorting;
65 
66  base::H1handle fTriggerTm;
67  base::H1handle fMultipl;
68 
69  base::C1handle fTriggerWindow;
70 
71  static unsigned fMarksQueueCapacity;
72  static unsigned fBufsQueueCapacity;
73 
75  StreamProc(const char* name = "", unsigned brdid = DummyBrdId, bool basehist = true);
76 
84 
85  void AddSyncMarker(SyncMarker& marker);
86 
90  bool AddTriggerMarker(LocalTimeMarker& marker, double tm_range = 0.);
91 
92  GlobalTime_t LocalToGlobalTime(GlobalTime_t localtm, unsigned* sync_index = 0);
93 
95  bool IsSyncIndexWithInterpolation(unsigned indx) const
96  { return (indx>0) && (indx<numReadySyncs()); }
97 
99  virtual bool doTriggerSelection() const { return false; }
100 
102  virtual GlobalTime_t ProvidePotentialFlushTime(GlobalTime_t last_marker);
103 
105  bool VerifyFlushTime(const base::GlobalTime_t& flush_time);
106 
108  virtual double MaximumDisorderTm() const { return 0.; }
109 
113  unsigned TestHitTime(const base::GlobalTime_t& hittime, bool normal_hit, bool can_close_event = true);
114 
115  // TODO: this is another place for future improvement
116  // one can preallocate number of subevents with place ready for some messages
117  // than one can use these events instead of creating them on the fly
118 
120  template<class EventClass, class MessageClass>
121  void AddMessage(unsigned indx, EventClass* ev, const MessageClass& msg)
122  {
123  if (!ev) {
124  ev = new EventClass;
125  fGlobalMarks.item(indx).subev = ev;
126  }
127  ev->AddMsg(msg);
128  }
129 
131  bool eraseSyncAt(unsigned indx);
132 
134  bool eraseFirstSyncs(unsigned sync_num);
135 
136  public:
137 
138  virtual ~StreamProc();
139 
141  void SetTimeSorting(bool on) { fTimeSorting = on; }
143  bool IsTimeSorting() const { return fTimeSorting; }
144 
146  void SetTriggerMargin(double margin = 0.) { fTriggerAcceptMaring = margin; }
147 
148  void CreateTriggerHist(unsigned multipl = 40, unsigned nbins = 2500, double left = -1e-6, double right = 4e-6);
149 
152  virtual void SetTriggerWindow(double left, double right)
153  { ChangeC1(fTriggerWindow, left, right); }
154 
157  void SetRawScanOnly() { fAnalysisKind = kind_RawOnly; }
159  bool IsRawScanOnly() const { return fAnalysisKind == kind_RawOnly; }
160 
162  bool IsRawAnalysis() const { return fAnalysisKind <= kind_Raw; }
164  bool IsTriggeredAnalysis() const { return fAnalysisKind == kind_Triggered; }
166  bool IsStreamAnalysis() const { return fAnalysisKind == kind_Stream; }
167 
173 
175  unsigned minNumSyncRequired() const {
176  switch (fSynchronisationKind) {
177  case sync_None: return 0;
178  case sync_Inter: return 2;
179  case sync_Left: return 1;
180  case sync_Right: return 1;
181  }
182  return 0;
183  }
184 
186  virtual bool AddNextBuffer(const Buffer& buf);
187 
190  virtual bool ScanNewBuffers();
191 
193  virtual bool ScanNewBuffersTm();
194 
196  virtual void SkipAllData();
197 
199  virtual bool SkipBuffers(unsigned cnt);
200 
202  unsigned numSyncs() const { return fSyncs.size(); }
204  unsigned numReadySyncs() const { return fSyncScanIndex; }
206  SyncMarker& getSync(unsigned n) { return fSyncs.item(n); }
207  unsigned findSyncWithId(unsigned syncid) const;
208 
210  virtual bool CollectTriggers(GlobalMarksQueue& queue);
211 
213  virtual bool DistributeTriggers(const GlobalMarksQueue& queue);
214 
217  virtual bool ScanDataForNewTriggers();
218 
220  unsigned NumReadySubevents() const { return fGlobalTrigScanIndex; }
221 
223  virtual bool AppendSubevent(base::Event* evt);
224 
231  virtual bool FirstBufferScan(const base::Buffer&) { return false; }
232 
235  virtual bool SecondBufferScan(const base::Buffer&) { return false; }
236 
240  virtual void Store(Event*) {}
241 
245  virtual void ResetStore() {}
246 
248  static void SetMarksQueueCapacity(unsigned sz) { fMarksQueueCapacity = sz; }
250  static void SetBufsQueueCapacity(unsigned sz) { fBufsQueueCapacity = sz; }
251 
252  };
253 
254 }
255 
256 #endif
Memory management class.
Definition: Buffer.h:49
Event - collection of several subevents.
Definition: Event.h:17
Central data and process manager.
Definition: ProcMgr.h:30
Abstract processor.
Definition: base/Processor.h:62
void ChangeC1(C1handle c1, double left, double right)
Change condition limits.
Definition: base/Processor.cxx:191
unsigned size() const
size
Definition: Queue.h:375
T & item(unsigned indx) const
item reference
Definition: Queue.h:384
Abstract processor of data streams.
Definition: StreamProc.h:21
GlobalMarksQueue fGlobalMarks
list of global triggers in work
Definition: StreamProc.h:59
void SetRawScanOnly()
Method set raw-scan only mode for processor Processor will not be used for any data selection.
Definition: StreamProc.h:157
static unsigned fMarksQueueCapacity
maximum number of items in the marksers queue
Definition: StreamProc.h:71
RecordsQueue< base::Buffer, false > BuffersQueue
buffers queue
Definition: StreamProc.h:38
bool IsTriggeredAnalysis() const
Is triggered events analysis.
Definition: StreamProc.h:164
bool fSyncFlag
boolean, used in sync adjustment procedure
Definition: StreamProc.h:53
void AddSyncMarker(SyncMarker &marker)
add sync marker
Definition: StreamProc.cxx:301
AnalysisKind fAnalysisKind
defines that processor is doing
Definition: StreamProc.h:48
RecordsQueue< base::SyncMarker, false > SyncMarksQueue
sync markers queue
Definition: StreamProc.h:41
bool IsTimeSorting() const
Is time sorting enabled.
Definition: StreamProc.h:143
SyncKind fSynchronisationKind
kind of synchronization
Definition: StreamProc.h:50
SyncKind
kind of synchronization
Definition: StreamProc.h:27
@ sync_Right
use sync marker on right side
Definition: StreamProc.h:31
@ sync_Inter
use time interpolation between two markers
Definition: StreamProc.h:29
@ sync_Left
use sync marker on left side
Definition: StreamProc.h:30
@ sync_None
no synchronization
Definition: StreamProc.h:28
void SetSynchronisationKind(SyncKind kind=sync_Inter)
Method indicate if any kind of time-synchronization technique should be applied for the processor.
Definition: StreamProc.h:83
bool fTimeSorting
defines if time sorting should be used for the messages
Definition: StreamProc.h:64
static void SetMarksQueueCapacity(unsigned sz)
Set markers queue capacity.
Definition: StreamProc.h:248
void CreateTriggerHist(unsigned multipl=40, unsigned nbins=2500, double left=-1e-6, double right=4e-6)
create histograms for triggers
Definition: StreamProc.cxx:56
unsigned minNumSyncRequired() const
Returns minimal number of syncs required for time synchronisation.
Definition: StreamProc.h:175
virtual void SkipAllData()
Method to remove all buffers, all triggers and so on.
Definition: StreamProc.cxx:458
virtual bool ScanDataForNewTriggers()
Here each processor should scan data again for new triggers Method made virtual while some subprocess...
Definition: StreamProc.cxx:674
virtual ~StreamProc()
destructor
Definition: StreamProc.cxx:44
virtual void SetTriggerWindow(double left, double right)
Set window relative to some reference signal, which will be used as region-of-interest interval to se...
Definition: StreamProc.h:152
double fTriggerAcceptMaring
time margin (in local time) to accept new trigger
Definition: StreamProc.h:56
void AddMessage(unsigned indx, EventClass *ev, const MessageClass &msg)
add new message to event
Definition: StreamProc.h:121
bool IsStreamAnalysis() const
Is full stream analysis.
Definition: StreamProc.h:166
unsigned fSyncScanIndex
sync scan index, indicate number of syncs which can really be used for synchronization
Definition: StreamProc.h:52
void SetTriggerMargin(double margin=0.)
Set minimal distance between two triggers.
Definition: StreamProc.h:146
GlobalTime_t fLastLocalTriggerTm
time of last local trigger
Definition: StreamProc.h:57
static unsigned fBufsQueueCapacity
maximum number of items in the queue
Definition: StreamProc.h:72
virtual bool AddNextBuffer(const Buffer &buf)
Provide next port of data to the processor.
Definition: StreamProc.cxx:185
unsigned fQueueScanIndexTm
index of buffer to scan and set correct times of the buffer head
Definition: StreamProc.h:46
base::C1handle fTriggerWindow
window used for data selection
Definition: StreamProc.h:69
virtual bool CollectTriggers(GlobalMarksQueue &queue)
Method to deliver detected triggers from processor to central manager.
Definition: StreamProc.cxx:474
unsigned fGlobalTrigScanIndex
index with first trigger which is not yet ready
Definition: StreamProc.h:61
SyncMarker & getSync(unsigned n)
Returns sync marker.
Definition: StreamProc.h:206
virtual void Store(Event *)
Generic method to store processor data, In case of ROOT one should copy event data in temporary struc...
Definition: StreamProc.h:240
virtual bool SecondBufferScan(const base::Buffer &)
Second generic scan of buffer Here selection of data for region-of-interest should be performed.
Definition: StreamProc.h:235
LocalMarkersQueue fLocalMarks
queue with local markers
Definition: StreamProc.h:55
virtual void ResetStore()
Generic method to store processor data, In case of ROOT one should copy event data in temporary struc...
Definition: StreamProc.h:245
unsigned fQueueScanIndex
index of next buffer which should be scanned
Definition: StreamProc.h:45
unsigned TestHitTime(const base::GlobalTime_t &hittime, bool normal_hit, bool can_close_event=true)
Method decides to which trigger window belong hit normal_hit - indicates that time is belong to data,...
Definition: StreamProc.cxx:594
bool IsRawScanOnly() const
Is only raw scan will be performed.
Definition: StreamProc.h:159
unsigned numReadySyncs() const
Returns number of read sync markers.
Definition: StreamProc.h:204
virtual bool ScanNewBuffersTm()
With new calibration set (where possible) time of buffers.
Definition: StreamProc.cxx:226
bool IsSynchronisationRequired() const
Method indicate if any kind of time-synchronization technique should be applied for the processor.
Definition: StreamProc.h:172
virtual bool DistributeTriggers(const GlobalMarksQueue &queue)
This is method to get back identified triggers from central manager.
Definition: StreamProc.cxx:518
StreamProc(const char *name="", unsigned brdid=DummyBrdId, bool basehist=true)
Make constructor protected - no way to create base class instance.
Definition: StreamProc.cxx:16
SyncMarksQueue fSyncs
list of sync markers
Definition: StreamProc.h:51
static void SetBufsQueueCapacity(unsigned sz)
Set buffers queue capacity.
Definition: StreamProc.h:250
virtual GlobalTime_t ProvidePotentialFlushTime(GlobalTime_t last_marker)
Method should return time, which could be flushed from the processor.
Definition: StreamProc.cxx:266
bool VerifyFlushTime(const base::GlobalTime_t &flush_time)
Method must ensure that processor scanned such time and can really skip this data.
Definition: StreamProc.cxx:285
GlobalTime_t LocalToGlobalTime(GlobalTime_t localtm, unsigned *sync_index=0)
Method converts local time (in ns representation) to global time.
Definition: StreamProc.cxx:77
void SetTimeSorting(bool on)
Enable/disable time sorting of data in output event.
Definition: StreamProc.h:141
unsigned fGlobalTrigRightIndex
temporary value, used during second buffers scan
Definition: StreamProc.h:62
base::H1handle fMultipl
! histogram of event multiplicity
Definition: StreamProc.h:67
virtual bool ScanNewBuffers()
Scanning all new buffers in the queue.
Definition: StreamProc.cxx:199
bool IsSyncIndexWithInterpolation(unsigned indx) const
Method return true when sync_index is means interpolation of time.
Definition: StreamProc.h:95
virtual bool AppendSubevent(base::Event *evt)
Append data for first trigger to the main event.
Definition: StreamProc.cxx:551
unsigned NumReadySubevents() const
Returns number of already build events.
Definition: StreamProc.h:220
virtual bool SkipBuffers(unsigned cnt)
Force processor to skip buffers from input.
Definition: StreamProc.cxx:401
virtual bool doTriggerSelection() const
Returns true when processor used to select trigger signal.
Definition: StreamProc.h:99
bool eraseFirstSyncs(unsigned sync_num)
Remove specified number of syncs.
Definition: StreamProc.cxx:386
unsigned numSyncs() const
Returns total number of sync markers.
Definition: StreamProc.h:202
virtual double MaximumDisorderTm() const
Time constant, defines how far disorder of messages can go.
Definition: StreamProc.h:108
bool eraseSyncAt(unsigned indx)
Removes sync at specified position.
Definition: StreamProc.cxx:373
bool IsRawAnalysis() const
Is raw analysis only.
Definition: StreamProc.h:162
bool AddTriggerMarker(LocalTimeMarker &marker, double tm_range=0.)
Add new local trigger.
Definition: StreamProc.cxx:323
BuffersQueue fQueue
! buffers queue
Definition: StreamProc.h:43
virtual bool FirstBufferScan(const base::Buffer &)
Central method to scan new data in the queue This should include:
Definition: StreamProc.h:231
base::H1handle fTriggerTm
! histogram with time relative to the trigger
Definition: StreamProc.h:66
unsigned findSyncWithId(unsigned syncid) const
find sync marker
Definition: StreamProc.cxx:362
SubEvent * subev
structure with data, selected for the trigger, ownership
Definition: Markers.h:64
local time marker
Definition: Markers.h:36
sync marker
Definition: Markers.h:14