24 #include <machine/endian.h>
35 dabc::FileOutput(url,
".hld"),
66 EOUT(
"Cannot create RFIO object, check if libDabcRfio.so loaded");
74 EOUT(
"Cannot create LTSM object, check if libDabcLtsm.so loaded");
81 DOUT3(
" hadaq::HldOutput::DTOR");
92 ShowInfo(0,
dabc::format(
"%s slave mode is enabled, waiting for runid", (fRunSlave ?
"RUN" :
"EPICS")));
96 return StartNewFile();
104 if (fRunNumber == 0) {
106 EOUT(
"Cannot start new file without valid RUNID");
112 ShowInfo(0,
dabc::format(
"HldOutput Generates New Runid %d (0x%x)", fRunNumber, fRunNumber));
117 if(!fDiskNumberPar.
null()) {
119 size_t prepos = fFileName.rfind(
"/");
120 if (prepos == std::string::npos)
123 prefix = fFileName.substr(prepos+1);
125 unsigned disknumber = fDiskNumberPar.
Value().
AsUInt();
126 fFileName =
dabc::format(
"/data%02d/data/%s",disknumber,prefix.c_str());
127 DOUT1(
"Set filename from daq_disks to %s, disknumber was %d, prefix=%s",
128 fFileName.c_str(), disknumber, prefix.c_str());
133 EOUT(
"Could not find daq_disk parameter although disk demon mode is on!");
138 std::string fname = fFileName;
140 if (!fLastPrefix.empty() && fRunSlave) {
142 size_t slash = fname.rfind(
"/");
143 if (slash == std::string::npos)
146 fname.erase(slash+1);
147 fname.append(fLastPrefix);
150 size_t pos = fname.rfind(
".hld");
151 if (pos == std::string::npos)
152 pos = fname.rfind(
".HLD");
154 if ((pos != std::string::npos) && (pos == fname.length()-4)) {
155 if (!fPlainName) fname.insert(pos, extens);
157 if (!fPlainName) fname += extens;
160 fCurrentFileName = fname;
162 if (fRunSlave && fRfio)
163 DOUT1(
"Before open file %s for writing", CurrentFileName().c_str());
165 if (!fFile.OpenWrite(CurrentFileName().c_str(), fRunNumber, fUrlOptions.c_str())) {
166 ShowInfo(-1,
dabc::format(
"%s cannot open file for writing", CurrentFileName().c_str()));
173 if(fFile.GetStrPar(
"RealFileName", tmp, 1024))
175 std::string previous=CurrentFileName();
176 fCurrentFileName=tmp;
177 DOUT0(
"Note: Original file name %s was changed by implementation to %s", previous.c_str(), CurrentFileName().c_str());
180 if (fRunSlave && fRfio)
181 DOUT1(
"File %s is open for writing", CurrentFileName().c_str());
183 ShowInfo(0,
dabc::format(
"%s open for writing runid %d", CurrentFileName().c_str(), fRunNumber));
184 DOUT0(
"%s open for writing runid %d", CurrentFileName().c_str(), fRunNumber);
186 fLastRunNumber = fRunNumber;
196 if (!fRunSlave) fRunNumber = 0;
202 if (fFile.isOpened()) {
203 ShowInfo(0,
"HLD file is CLOSED");
206 fCurrentFileSize = 0;
207 fCurrentFileName =
"";
213 if (cmd.
GetBool(
"only_prefix")) {
215 std::string prefix = cmd.
GetStr(
"prefix");
216 if (!prefix.empty()) fLastPrefix = prefix;
217 }
else if (fFile.isWriting()) {
221 cmd.
SetStr(
"FileName", fCurrentFileName);
230 cmd.
SetUInt(
"RunId", fRunNumber);
231 cmd.
SetUInt(
"RunSize", fCurrentFileSize);
232 cmd.
SetStr(
"RunName", fCurrentFileName);
233 cmd.
SetStr(
"RunPrefix", fLastPrefix);
254 if (!is_events && !is_eol && !is_subev) {
262 bool startnewfile =
false;
267 DOUT2(
"HLD output process EOL");
268 }
else if (fRunSlave) {
272 unsigned numevents(0), payload(0);
276 if (nextrunid == fRunNumber) {
284 DOUT1(
"HldOutput Finds New Runid %d or 0x%x from EPICS in event header (previous: %d or 0x%x)",
285 nextrunid, nextrunid, fRunNumber, fRunNumber);
286 fRunNumber = nextrunid;
293 if (!startnewfile && (fRunNumber == 0))
return dabc::do_Ok;
300 if (fFile.isWriting())
303 if (payload==0)
break;
306 if (write_size > payload) write_size = payload;
309 DOUT1(
"HldOutput write %u bytes from buffer with old runid", write_size);
313 DOUT1(
"HldOutput did flushes %d bytes (%d events) of old runid in buffer segment %d to file",
314 write_size, numevents, n);
316 payload -= write_size;
329 if (fRunSlave && (fRunNumber == 0)) {
332 DOUT0(
"CLOSE FILE WRITING in slave mode");
336 if ((fLastRunNumber != 0) && (fLastRunNumber == fRunNumber)) {
337 DOUT0(
"Saw same runid %d 0x%u as previous - skip buffer", fLastRunNumber, fLastRunNumber);
341 if (!StartNewFile()) {
342 EOUT(
"Cannot start new file for writing");
345 ShowInfo(0,
dabc::format(
"%s open for writing runid %d", CurrentFileName().c_str(), fRunNumber));
350 unsigned total_write_size = 0, num_events = 0;
363 char* write_ptr = (
char*) iter.
subevnt();
366 evnt.
Init(fEventNumber++, fRunNumber);
372 if (!fFile.WriteBuffer(write_ptr, write_size))
379 }
else if (is_events) {
385 if (cursor >= write_size) {
387 cursor -= write_size;
394 DOUT2(
"Wrote to %s at segment %d, cursor %d, size %d", CurrentFileName().c_str(), n, cursor, write_size-cursor);
398 write_size -= cursor;
402 if (fRunSlave && fRfio && startnewfile)
403 DOUT1(
"HldOutput write %u bytes after new file was started", write_size);
405 if (!fFile.WriteBuffer(write_ptr, write_size))
408 if (fRunSlave && fRfio && startnewfile)
409 DOUT1(
"HldOutput did write %u bytes after new file was started", write_size);
411 total_write_size += write_size;
418 AccountBuffer(total_write_size, num_events);
420 if (fRunSlave && fRfio && startnewfile)
421 DOUT1(
"HldOutput write complete first buffer after new file was started");
void SetIO(FileInterface *_io, bool _ioowner=false)
Reference on memory from memory pool.
unsigned NumSegments() const
Returns number of segment in buffer.
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
unsigned GetTypeId() const
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Represents command with its arguments.
bool SetStr(const std::string &name, const char *value)
std::string GetStr(const std::string &name, const std::string &dflt="") const
bool SetUInt(const std::string &name, unsigned v)
bool GetBool(const std::string &name, bool dflt=false) const
Defines and implements basic POSIX file interface.
int GetSizeLimitMB() const
virtual bool Write_Init()
This is generic virtual method to initialize output before real work is started.
virtual bool Write_Stat(dabc::Command cmd)
Fill different statistic parameters into provided command.
Parameter FindPar(const std::string &parname)
ModuleRef FindModule(const std::string &name)
RecordField Value() const
Returns parameter value.
uint64_t AsUInt(uint64_t dflt=0) const
bool null() const
Returns true if reference contains nullptr.
Uniform Resource Locator interpreter.
bool HasOption(const std::string &optname) const
int GetOptionInt(const std::string &optname, int dflt=0) const
std::string GetOptions() const
bool fRfio
true if we write to rfio
bool fRunSlave
true if run id is controlled by combiner
bool fLtsm
true if we write to ltsm
virtual bool Write_Stat(dabc::Command cmd)
Fill different statistic parameters into provided command.
uint32_t fRunNumber
id number of current run (can be 0 when data are ignored)
virtual bool Write_Retry()
Returns true if output object can be reinitialized for recover error.
virtual unsigned Write_Buffer(dabc::Buffer &buf)
Start writing of buffer to output.
virtual bool Write_Init()
This is generic virtual method to initialize output before real work is started.
uint16_t fEBNumber
id of parent event builder process
bool fPlainName
if true no any runid extensions appended to file name
bool fUseDaqDisk
true if /data number is taken from daq_disk (HADES setup)
std::string fUrlOptions
remember URL options, may be used for RFIO file open
virtual bool Write_Restart(dabc::Command cmd)
Method used to restart output - like recreate new output file.
HldOutput(const dabc::Url &url)
Read iterator for HADAQ events/subevents.
static unsigned NumEvents(const dabc::Buffer &buf)
bool NextEvent()
Used for ready HLD events.
hadaq::RawSubevent * subevnt() const
bool NextSubEvent()
Used for sub-events iteration inside current block.
hadaq::RawEvent * evnt() const
bool NextSubeventsBlock()
Depending from buffer type calls NextHadTu() or NextEvent()
std::string format(const char *fmt,...)
std::string FormatFilename(uint32_t runid, uint16_t ebid=0)
void SetSize(uint32_t bytes)
uint32_t GetPaddedSize() const
void Init(uint32_t evnt, uint32_t run=0, uint32_t id=EvtId_DABC)