stream  0.10.0
stream analysis framework
Public Types | Public Member Functions | Static Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Static Protected Attributes | Friends
base::StreamProc Class Reference

Abstract processor of data streams. More...

#include <base/StreamProc.h>

Inheritance diagram for base::StreamProc:
base::Processor THookProc base::OpticSplitter base::SysCoreProc hadaq::HldProcessor hadaq::SpillProcessor hadaq::StartProcessor hadaq::SubProcessor hadaq::TrbProcessor mbs::Processor

Public Types

enum  SyncKind { sync_None , sync_Inter , sync_Left , sync_Right }
 kind of synchronization More...
 

Public Member Functions

virtual ~StreamProc ()
 destructor
 
void SetTimeSorting (bool on)
 Enable/disable time sorting of data in output event.
 
bool IsTimeSorting () const
 Is time sorting enabled.
 
void SetTriggerMargin (double margin=0.)
 Set minimal distance between two triggers.
 
void CreateTriggerHist (unsigned multipl=40, unsigned nbins=2500, double left=-1e-6, double right=4e-6)
 create histograms for triggers
 
virtual void SetTriggerWindow (double left, double right)
 Set window relative to some reference signal, which will be used as region-of-interest interval to select messages from the stream.
 
void SetRawScanOnly ()
 Method set raw-scan only mode for processor Processor will not be used for any data selection.
 
bool IsRawScanOnly () const
 Is only raw scan will be performed.
 
bool IsRawAnalysis () const
 Is raw analysis only.
 
bool IsTriggeredAnalysis () const
 Is triggered events analysis.
 
bool IsStreamAnalysis () const
 Is full stream analysis.
 
bool IsSynchronisationRequired () const
 Method indicate if any kind of time-synchronization technique should be applied for the processor. More...
 
unsigned minNumSyncRequired () const
 Returns minimal number of syncs required for time synchronisation.
 
virtual bool AddNextBuffer (const Buffer &buf)
 Provide next port of data to the processor. More...
 
virtual bool ScanNewBuffers ()
 Scanning all new buffers in the queue. More...
 
virtual bool ScanNewBuffersTm ()
 With new calibration set (where possible) time of buffers. More...
 
virtual void SkipAllData ()
 Method to remove all buffers, all triggers and so on. More...
 
virtual bool SkipBuffers (unsigned cnt)
 Force processor to skip buffers from input. More...
 
unsigned numSyncs () const
 Returns total number of sync markers.
 
unsigned numReadySyncs () const
 Returns number of read sync markers.
 
SyncMarkergetSync (unsigned n)
 Returns sync marker.
 
unsigned findSyncWithId (unsigned syncid) const
 find sync marker
 
virtual bool CollectTriggers (GlobalMarksQueue &queue)
 Method to deliver detected triggers from processor to central manager. More...
 
virtual bool DistributeTriggers (const GlobalMarksQueue &queue)
 This is method to get back identified triggers from central manager. More...
 
virtual bool ScanDataForNewTriggers ()
 Here each processor should scan data again for new triggers Method made virtual while some subprocessors will do it in connection with others. More...
 
unsigned NumReadySubevents () const
 Returns number of already build events.
 
virtual bool AppendSubevent (base::Event *evt)
 Append data for first trigger to the main event. More...
 
virtual bool FirstBufferScan (const base::Buffer &)
 Central method to scan new data in the queue This should include: More...
 
virtual bool SecondBufferScan (const base::Buffer &)
 Second generic scan of buffer Here selection of data for region-of-interest should be performed.
 
virtual void Store (Event *)
 Generic method to store processor data, In case of ROOT one should copy event data in temporary structures, which are mapped to the branch.
 
virtual void ResetStore ()
 Generic method to store processor data, In case of ROOT one should copy event data in temporary structures, which are mapped to the branch.
 
- Public Member Functions inherited from base::Processor
virtual ~Processor ()
 destructor
 
ProcMgrmgr () const
 Return manager instance.
 
const char * GetName () const
 Get processor name.
 
unsigned GetID () const
 Get processor ID.
 
void SetHistFilling (int lvl=99)
 Set histogram filling level.
 
bool IsHistFilling () const
 Is histogram filling enabled.
 
int HistFillLevel () const
 Get histogram filling level.
 
unsigned GetStoreKind () const
 Get store kind.
 
bool IsStoreEnabled () const
 Is store enabled.
 
virtual void SetStoreKind (unsigned kind=1)
 Set store kind.
 
void SetStoreEnabled (bool on=true)
 Enable store - set store kind 1.
 
virtual void UserPreLoop ()
 pre loop
 
virtual void UserPostLoop ()
 post loop
 

Static Public Member Functions

static void SetMarksQueueCapacity (unsigned sz)
 Set markers queue capacity.
 
static void SetBufsQueueCapacity (unsigned sz)
 Set buffers queue capacity.
 

Protected Types

typedef RecordsQueue< base::Buffer, false > BuffersQueue
 buffers queue
 
typedef RecordsQueue< base::SyncMarker, false > SyncMarksQueue
 sync markers queue
 
- Protected Types inherited from base::Processor
enum  { DummyBrdId = 0xffffffff }
 

Protected Member Functions

 StreamProc (const char *name="", unsigned brdid=DummyBrdId, bool basehist=true)
 Make constructor protected - no way to create base class instance. More...
 
void SetSynchronisationKind (SyncKind kind=sync_Inter)
 Method indicate if any kind of time-synchronization technique should be applied for the processor. More...
 
void AddSyncMarker (SyncMarker &marker)
 add sync marker
 
bool AddTriggerMarker (LocalTimeMarker &marker, double tm_range=0.)
 Add new local trigger. More...
 
GlobalTime_t LocalToGlobalTime (GlobalTime_t localtm, unsigned *sync_index=0)
 Method converts local time (in ns representation) to global time. More...
 
bool IsSyncIndexWithInterpolation (unsigned indx) const
 Method return true when sync_index is means interpolation of time.
 
virtual bool doTriggerSelection () const
 Returns true when processor used to select trigger signal.
 
virtual GlobalTime_t ProvidePotentialFlushTime (GlobalTime_t last_marker)
 Method should return time, which could be flushed from the processor. More...
 
bool VerifyFlushTime (const base::GlobalTime_t &flush_time)
 Method must ensure that processor scanned such time and can really skip this data. More...
 
virtual double MaximumDisorderTm () const
 Time constant, defines how far disorder of messages can go.
 
unsigned TestHitTime (const base::GlobalTime_t &hittime, bool normal_hit, bool can_close_event=true)
 Method decides to which trigger window belong hit normal_hit - indicates that time is belong to data, which than can be assigned to output can_close_event - when true, hit time can be used to decide that event is ready. More...
 
template<class EventClass , class MessageClass >
void AddMessage (unsigned indx, EventClass *ev, const MessageClass &msg)
 add new message to event
 
bool eraseSyncAt (unsigned indx)
 Removes sync at specified position. More...
 
bool eraseFirstSyncs (unsigned sync_num)
 Remove specified number of syncs. More...
 
- Protected Member Functions inherited from base::Processor
 Processor (const char *name="", unsigned brdid=DummyBrdId)
 Make constructor protected - no way to create base class instance. More...
 
void SetBoardId (unsigned id)
 Set board id.
 
void SetPathPrefix (const std::string &prefix)
 Set path prefix for histogramsid.
 
void SetSubPrefix (const char *subname="", int indx=-1, const char *subname2="", int indx2=-1)
 Set subprefix for histograms and conditions. More...
 
void SetSubPrefix2 (const char *subname="", int indx=-1, const char *subname2="", int indx2=-1)
 Set subprefix for histograms and conditions, index uses 2 symbols. More...
 
H1handle MakeH1 (const char *name, const char *title, int nbins, double left, double right, const char *xtitle=0)
 Adds processor prefix to histogram name and calls base::ProcMgr::MakeH1 method.
 
void FillH1 (H1handle h1, double x, double weight=1.)
 Fill 1-D histogram.
 
void FastFillH1 (H1handle h1, int x, double weight=1.)
 Fast fill 1-D histogram. More...
 
double GetH1Content (H1handle h1, int nbin)
 Get bin content of 1-D histogram.
 
void SetH1Content (H1handle h1, int nbin, double v=0.)
 Set bin content of 1-D histogram.
 
int GetH1NBins (H1handle h1)
 Get bins numbers for 1-D histogram.
 
void ClearH1 (H1handle h1)
 Clear 1-D histogram.
 
void CopyH1 (H1handle tgt, H1handle src)
 Copy 1-D histogram from src to tgt.
 
void SetH1Title (H1handle h1, const char *title)
 Set 1-D histogram title.
 
H2handle MakeH2 (const char *name, const char *title, int nbins1, double left1, double right1, int nbins2, double left2, double right2, const char *options=0)
 Adds processor prefix to histogram name and calls base::ProcMgr::MakeH2 method.
 
void FillH2 (H1handle h2, double x, double y, double weight=1.)
 Fill 2-D histogram.
 
void FastFillH2 (H1handle h2, int x, int y)
 Fast fill 2-D histogram. More...
 
void SetH2Content (H2handle h2, int nbin1, int nbin2, double v=0.)
 Set bin content of 2-D histogram.
 
double GetH2Content (H2handle h2, int bin1, int bin2)
 Get bin content of 2-D histogram.
 
bool GetH2NBins (H2handle h2, int &nBins1, int &nBins2)
 Get number of bins for 2-D histogram.
 
void ClearH2 (base::H2handle h2)
 Clear 2-D histogram.
 
void SetH2Title (H2handle h2, const char *title)
 Change title of 2-D histogram.
 
C1handle MakeC1 (const char *name, double left, double right, H1handle h1=0)
 Create condition.
 
void ChangeC1 (C1handle c1, double left, double right)
 Change condition limits.
 
int TestC1 (C1handle c1, double value, double *dist=0)
 Test condition.
 
double GetC1Limit (C1handle c1, bool isleft=true)
 Get condition limit.
 
virtual void CreateBranch (TTree *)
 Create branch.
 
virtual bool RegisterObject (TObject *tobj, const char *subfolder=0)
 Register object.
 

Protected Attributes

BuffersQueue fQueue
 ! buffers queue
 
unsigned fQueueScanIndex
 index of next buffer which should be scanned
 
unsigned fQueueScanIndexTm
 index of buffer to scan and set correct times of the buffer head
 
AnalysisKind fAnalysisKind
 defines that processor is doing
 
SyncKind fSynchronisationKind
 kind of synchronization
 
SyncMarksQueue fSyncs
 list of sync markers
 
unsigned fSyncScanIndex
 sync scan index, indicate number of syncs which can really be used for synchronization
 
bool fSyncFlag
 boolean, used in sync adjustment procedure
 
LocalMarkersQueue fLocalMarks
 queue with local markers
 
double fTriggerAcceptMaring
 time margin (in local time) to accept new trigger
 
GlobalTime_t fLastLocalTriggerTm
 time of last local trigger
 
GlobalMarksQueue fGlobalMarks
 list of global triggers in work
 
unsigned fGlobalTrigScanIndex
 index with first trigger which is not yet ready
 
unsigned fGlobalTrigRightIndex
 temporary value, used during second buffers scan
 
bool fTimeSorting
 defines if time sorting should be used for the messages
 
base::H1handle fTriggerTm
 ! histogram with time relative to the trigger
 
base::H1handle fMultipl
 ! histogram of event multiplicity
 
base::C1handle fTriggerWindow
 window used for data selection
 
- Protected Attributes inherited from base::Processor
std::string fName
 processor name, used for event naming
 
unsigned fID
 identifier, used mostly for debugging
 
ProcMgrfMgr
 direct pointer on manager
 
std::string fPathPrefix
 histogram path prefix, used for histogram folder name
 
std::string fPrefix
 prefix, used for histogram names
 
std::string fSubPrefixD
 sub-prefix for histogram directory
 
std::string fSubPrefixN
 sub-prefix for histogram names
 
int fHistFilling
 level of histogram filling
 
unsigned fStoreKind
 if >0, store will be enabled for processor
 
bool fIntHistFormat
 if true, internal histogram format is used
 

Static Protected Attributes

static unsigned fMarksQueueCapacity = 10000
 maximum number of items in the marksers queue
 
static unsigned fBufsQueueCapacity = 100
 maximum number of items in the queue
 

Friends

class ProcMgr
 

Detailed Description

Abstract processor of data streams.

Class base::StreamProc is abstract processor of data streams from TRB3 or GET4 or nXYTER or any other kind of data. Main motivation for the class is unify way how data-streams can be processed and how all kind of time calculations could be done.

Member Enumeration Documentation

◆ SyncKind

kind of synchronization

Enumerator
sync_None 

no synchronization

sync_Inter 

use time interpolation between two markers

sync_Left 

use sync marker on left side

sync_Right 

use sync marker on right side

Constructor & Destructor Documentation

◆ StreamProc()

base::StreamProc::StreamProc ( const char *  name = "",
unsigned  brdid = DummyBrdId,
bool  basehist = true 
)
protected

Make constructor protected - no way to create base class instance.

constructor

Member Function Documentation

◆ AddNextBuffer()

bool base::StreamProc::AddNextBuffer ( const Buffer buf)
virtual

Provide next port of data to the processor.

add next buffer

◆ AddTriggerMarker()

bool base::StreamProc::AddTriggerMarker ( LocalTimeMarker marker,
double  tm_range = 0. 
)
protected

Add new local trigger.

add trigger marker

Method first proves that new trigger marker stays in time order and have minimal distance to previous trigger

◆ AppendSubevent()

bool base::StreamProc::AppendSubevent ( base::Event evt)
virtual

Append data for first trigger to the main event.

append subevent

◆ CollectTriggers()

bool base::StreamProc::CollectTriggers ( GlobalMarksQueue queue)
virtual

Method to deliver detected triggers from processor to central manager.

collect triggers

◆ DistributeTriggers()

bool base::StreamProc::DistributeTriggers ( const GlobalMarksQueue queue)
virtual

This is method to get back identified triggers from central manager.

distribute triggers

◆ eraseFirstSyncs()

bool base::StreamProc::eraseFirstSyncs ( unsigned  sync_num)
protected

Remove specified number of syncs.

erase first sync markers

◆ eraseSyncAt()

bool base::StreamProc::eraseSyncAt ( unsigned  indx)
protected

Removes sync at specified position.

erase sync marker

◆ FirstBufferScan()

virtual bool base::StreamProc::FirstBufferScan ( const base::Buffer )
inlinevirtual

Central method to scan new data in the queue This should include:

  • data indexing;
  • raw histogram filling;
  • search for special time markers;
  • multiplicity histogramming (if necessary)

Reimplemented in mbs::Processor, hadaq::TrbProcessor, hadaq::TdcProcessor, hadaq::StartProcessor, hadaq::SpillProcessor, hadaq::HldProcessor, hadaq::AdcProcessor, and base::OpticSplitter.

◆ IsSynchronisationRequired()

bool base::StreamProc::IsSynchronisationRequired ( ) const
inline

Method indicate if any kind of time-synchronization technique should be applied for the processor.

If true, sync messages must be produced by processor and will be used. If false, local time stamps could be immediately used (with any necessary conversion)

◆ LocalToGlobalTime()

base::GlobalTime_t base::StreamProc::LocalToGlobalTime ( base::GlobalTime_t  localtm,
unsigned *  indx = 0 
)
protected

Method converts local time (in ns representation) to global time.

method uses helper index to locate faster interval where interpolation could be done For the interpolation value of index is following 0 - last value left to the first sync [1..numReadySyncs()-1] - last value between indx-1 and indx syncs numReadySyncs() - last value right to the last sync For the left/right side sync index indicates sync number used

TODO: One could introduce more precise method, which works with stamps

◆ ProvidePotentialFlushTime()

base::GlobalTime_t base::StreamProc::ProvidePotentialFlushTime ( GlobalTime_t  last_marker)
protectedvirtual

Method should return time, which could be flushed from the processor.

provide flush time

approach is simple - one propose as flush time begin of nbuf-2 make it little bit more complex - use buffers with time stamps

◆ ScanDataForNewTriggers()

bool base::StreamProc::ScanDataForNewTriggers ( )
virtual

Here each processor should scan data again for new triggers Method made virtual while some subprocessors will do it in connection with others.

scan for new triggers

first of all, one should find right boundary, where data can be scanned while when buffer scanned for the second time, it will be automatically rejected

there is nice rule - last trigger and last buffer always remains time of last trigger is used to check which buffers can be scanned time of last buffer is used to check which triggers we could check

◆ ScanNewBuffers()

bool base::StreamProc::ScanNewBuffers ( )
virtual

Scanning all new buffers in the queue.

scan new buffers

Returns
true when any new data was scanned

Reimplemented in THookProc.

◆ ScanNewBuffersTm()

bool base::StreamProc::ScanNewBuffersTm ( )
virtual

With new calibration set (where possible) time of buffers.

scan new buffers times

here we recalculate times for each buffer this only can be done when appropriate syncs are produced

◆ SetSynchronisationKind()

void base::StreamProc::SetSynchronisationKind ( SyncKind  kind = sync_Inter)
inlineprotected

Method indicate if any kind of time-synchronization technique should be applied for the processor.

Following values can be applied: 0 - no sync, local time will be used (with any necessary conversion) 1 - interpolation, for every hit sync on left and right side should be used 2 - sync message on left side will be used for calibration 3 - sync message on left side will be used for calibration

◆ SkipAllData()

void base::StreamProc::SkipAllData ( )
virtual

Method to remove all buffers, all triggers and so on.

skip all data

◆ SkipBuffers()

bool base::StreamProc::SkipBuffers ( unsigned  cnt)
virtual

Force processor to skip buffers from input.

skip buffers

◆ TestHitTime()

unsigned base::StreamProc::TestHitTime ( const base::GlobalTime_t &  hittime,
bool  normal_hit,
bool  can_close_event = true 
)
protected

Method decides to which trigger window belong hit normal_hit - indicates that time is belong to data, which than can be assigned to output can_close_event - when true, hit time can be used to decide that event is ready.

test hit time

◆ VerifyFlushTime()

bool base::StreamProc::VerifyFlushTime ( const base::GlobalTime_t &  flush_time)
protected

Method must ensure that processor scanned such time and can really skip this data.

verify flush time

verify that proposed flush time can be processed already now for that buffer time should be assigned


The documentation for this class was generated from the following files: