DABC (Data Acquisition Backbone Core)  2.9.9
HldOutput.cxx
Go to the documentation of this file.
1 // $Id: HldOutput.cxx 4575 2020-08-04 12:44:29Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "hadaq/HldOutput.h"
17 
18 #include <cstring>
19 #include <cstdlib>
20 #include <cstdio>
21 #include <unistd.h>
22 
23 #if defined(__MACH__) /* Apple OSX section */
24 #include <machine/endian.h>
25 #else
26 #include <endian.h>
27 #endif
28 
29 #include "dabc/Manager.h"
30 
31 #include "hadaq/Iterator.h"
32 
33 
35  dabc::FileOutput(url,".hld"),
36  fRunSlave(false),
37  fLastRunNumber(0),
38  fRunNumber(0),
39  fEventNumber(0),
40  fEBNumber(0),
41  fUseDaqDisk(false),
42  fRfio(false),
43  fLtsm(false),
44  fPlainName(false),
45  fUrlOptions(),
46  fLastPrefix(),
47  fFile()
48 {
49  fRunSlave = url.HasOption("slave");
50  fEBNumber = url.GetOptionInt("ebnumber",0); // default is single eventbuilder
51  fRunNumber = url.GetOptionInt("runid", 0); // if specified, use runid from url
52  fUseDaqDisk = url.GetOptionInt("diskdemon", 0); // if specified, use number of /data partition from daq_disk demon
53  fRfio = url.HasOption("rfio");
54  fLtsm = url.HasOption("ltsm");
55  fPlainName = url.HasOption("plain") && (GetSizeLimitMB() <= 0);
56  if (fRfio) {
57  dabc::FileInterface* io = (dabc::FileInterface*) dabc::mgr.CreateAny("rfio::FileInterface");
58 
59  if (io) {
60  fUrlOptions = url.GetOptions();
61  fFile.SetIO(io, true);
62  // set default protocol and node name, can only be used in GSI
63  //if (fFileName.find("rfiodaq:gstore:")== std::string::npos)
64  fFileName = std::string("rfiodaq:gstore:") + fFileName;
65  } else {
66  EOUT("Cannot create RFIO object, check if libDabcRfio.so loaded");
67  }
68  } else if(fLtsm) {
69  dabc::FileInterface* io = (dabc::FileInterface*) dabc::mgr.CreateAny("ltsm::FileInterface");
70  if (io!=0) {
71  fUrlOptions = url.GetOptions();
72  fFile.SetIO(io, true);
73  } else {
74  EOUT("Cannot create LTSM object, check if libDabcLtsm.so loaded");
75  }
76  }
77 }
78 
80 {
81  DOUT3(" hadaq::HldOutput::DTOR");
82  CloseFile();
83 }
84 
86 {
87  if (!dabc::FileOutput::Write_Init()) return false;
88 
89  if (fRunSlave) {
90  // use parameters only in slave mode
91  fRunNumber = 0;
92  ShowInfo(0, dabc::format("%s slave mode is enabled, waiting for runid", (fRunSlave ? "RUN" : "EPICS")));
93  return true;
94  }
95 
96  return StartNewFile();
97 }
98 
99 
101 {
102  CloseFile();
103 
104  if (fRunNumber == 0) {
105  if (fRunSlave) {
106  EOUT("Cannot start new file without valid RUNID");
107  return false;
108  }
109 
110  fRunNumber = hadaq::CreateRunId();
111  //std::cout <<"HldOutput Generates New Runid"<<fRunNumber << std::endl;
112  ShowInfo(0, dabc::format("HldOutput Generates New Runid %d (0x%x)", fRunNumber, fRunNumber));
113  }
114 
115  if(fUseDaqDisk) {
116  dabc::Parameter fDiskNumberPar = dabc::mgr.FindPar("Combiner/Evtbuild-diskNum");
117  if(!fDiskNumberPar.null()) {
118  std::string prefix;
119  size_t prepos = fFileName.rfind("/");
120  if (prepos == std::string::npos)
121  prefix = fFileName;
122  else
123  prefix = fFileName.substr(prepos+1);
124 
125  unsigned disknumber = fDiskNumberPar.Value().AsUInt();
126  fFileName = dabc::format("/data%02d/data/%s",disknumber,prefix.c_str());
127  DOUT1("Set filename from daq_disks to %s, disknumber was %d, prefix=%s",
128  fFileName.c_str(), disknumber, prefix.c_str());
129 
130  dabc::CmdSetParameter cmd("Evtbuild-diskNumEB", disknumber);
131  dabc::mgr.FindModule("Combiner").Submit(cmd);
132  } else {
133  EOUT("Could not find daq_disk parameter although disk demon mode is on!");
134  }
135  }
136  // change file names according hades style:
137  std::string extens = hadaq::FormatFilename(fRunNumber, fEBNumber);
138  std::string fname = fFileName;
139 
140  if (!fLastPrefix.empty() && fRunSlave) {
141  // when run in BNet mode, only file path used
142  size_t slash = fname.rfind("/");
143  if (slash == std::string::npos)
144  fname = "";
145  else
146  fname.erase(slash+1);
147  fname.append(fLastPrefix);
148  }
149 
150  size_t pos = fname.rfind(".hld");
151  if (pos == std::string::npos)
152  pos = fname.rfind(".HLD");
153 
154  if ((pos != std::string::npos) && (pos == fname.length()-4)) {
155  if (!fPlainName) fname.insert(pos, extens);
156  } else {
157  if (!fPlainName) fname += extens;
158  fname += ".hld";
159  }
160  fCurrentFileName = fname;
161 
162  if (fRunSlave && fRfio)
163  DOUT1("Before open file %s for writing", CurrentFileName().c_str());
164 
165  if (!fFile.OpenWrite(CurrentFileName().c_str(), fRunNumber, fUrlOptions.c_str())) {
166  ShowInfo(-1, dabc::format("%s cannot open file for writing", CurrentFileName().c_str()));
167  return false;
168  }
169 
170  // JAM2020: here we have to update the real filename in case that implementation changes it
171  // this can happen for ltsm io where we may add subfolders for year and day
172  char tmp[1024];
173  if(fFile.GetStrPar("RealFileName", tmp, 1024))
174  {
175  std::string previous=CurrentFileName();
176  fCurrentFileName=tmp;
177  DOUT0("Note: Original file name %s was changed by implementation to %s", previous.c_str(), CurrentFileName().c_str());
178  }
179 
180  if (fRunSlave && fRfio)
181  DOUT1("File %s is open for writing", CurrentFileName().c_str());
182 
183  ShowInfo(0, dabc::format("%s open for writing runid %d", CurrentFileName().c_str(), fRunNumber));
184  DOUT0("%s open for writing runid %d", CurrentFileName().c_str(), fRunNumber);
185 
186  fLastRunNumber = fRunNumber;
187 
188  return true;
189 }
190 
192 {
193  // HLD output supports retry option
194 
195  CloseFile();
196  if (!fRunSlave) fRunNumber = 0;
197  return true;
198 }
199 
201 {
202  if (fFile.isOpened()) {
203  ShowInfo(0, "HLD file is CLOSED");
204  fFile.Close();
205  }
206  fCurrentFileSize = 0;
207  fCurrentFileName = "";
208  return true;
209 }
210 
212 {
213  if (cmd.GetBool("only_prefix")) {
214  // command used by BNet, prefix is not directly stored by the master
215  std::string prefix = cmd.GetStr("prefix");
216  if (!prefix.empty()) fLastPrefix = prefix;
217  } else if (fFile.isWriting()) {
218  CloseFile();
219  fRunNumber = 0;
220  StartNewFile();
221  cmd.SetStr("FileName", fCurrentFileName);
222  }
223  return true;
224 }
225 
227 {
228  bool res = dabc::FileOutput::Write_Stat(cmd);
229 
230  cmd.SetUInt("RunId", fRunNumber);
231  cmd.SetUInt("RunSize", fCurrentFileSize);
232  cmd.SetStr("RunName", fCurrentFileName);
233  cmd.SetStr("RunPrefix", fLastPrefix);
234 
235  return res;
236 }
237 
238 
240 {
241  if (buf.null()) return dabc::do_Error;
242 
243  if (buf.GetTypeId() == dabc::mbt_EOF) {
244  CloseFile();
245  return dabc::do_Close;
246  }
247 
248  bool is_eol = (buf.GetTypeId() == hadaq::mbt_HadaqStopRun);
249 
250  bool is_subev = (buf.GetTypeId() == hadaq::mbt_HadaqSubevents);
251 
252  bool is_events = (buf.GetTypeId() == hadaq::mbt_HadaqEvents);
253 
254  if (!is_events && !is_eol && !is_subev) {
255  ShowInfo(-1, dabc::format("Buffer must contain hadaq event(s), but has type %u", buf.GetTypeId()));
256  EOUT("Discard buffer %u", buf.GetTypeId());
257 
258  return dabc::do_Error;
259  }
260 
261  unsigned cursor = 0;
262  bool startnewfile = false;
263  if (is_eol) {
264  // just reset number
265  fRunNumber = 0;
266  startnewfile = true;
267  DOUT2("HLD output process EOL");
268  } else if (fRunSlave) {
269 
270  // scan event headers in buffer for run id change/consistency
271  hadaq::ReadIterator bufiter(buf);
272  unsigned numevents(0), payload(0);
273 
274  while (bufiter.NextEvent()) {
275  uint32_t nextrunid = bufiter.evnt()->GetRunNr();
276  if (nextrunid == fRunNumber) {
277  numevents++;
278  payload += bufiter.evnt()->GetPaddedSize();// remember current position in buffer:
279  continue;
280  }
281 
282 // ShowInfo(0, dabc::format("HldOutput Finds New Runid %d (0x%x) from EPICS in event header (previous:%d (0x%x))",
283 // nextrunid, nextrunid, fRunNumber,fRunNumber));
284  DOUT1("HldOutput Finds New Runid %d or 0x%x from EPICS in event header (previous: %d or 0x%x)",
285  nextrunid, nextrunid, fRunNumber, fRunNumber);
286  fRunNumber = nextrunid;
287  startnewfile = true;
288  break;
289 
290  } // while bufiter
291 
292  // if current runid is still 0, just ignore buffer
293  if (!startnewfile && (fRunNumber == 0)) return dabc::do_Ok;
294 
295  if(startnewfile) {
296  // first flush rest of previous run to old file:
297  cursor = payload;
298 
299  // only if file opened for writing, write rest buffers
300  if (fFile.isWriting())
301  for (unsigned n=0;n<buf.NumSegments();n++) {
302 
303  if (payload==0) break;
304 
305  unsigned write_size = buf.SegmentSize(n);
306  if (write_size > payload) write_size = payload;
307 
308  if (fRfio)
309  DOUT1("HldOutput write %u bytes from buffer with old runid", write_size);
310 
311  if (!fFile.WriteBuffer(buf.SegmentPtr(n), write_size)) return dabc::do_Error;
312 
313  DOUT1("HldOutput did flushes %d bytes (%d events) of old runid in buffer segment %d to file",
314  write_size, numevents, n);
315 
316  payload -= write_size;
317  }// for
318  }
319 
320  } else {
321  if (CheckBufferForNextFile(buf.GetTotalSize())) {
322  fRunNumber = 0;
323  startnewfile = true;
324  }
325  } // epicsslave
326 
327  if(startnewfile) {
328 
329  if (fRunSlave && (fRunNumber == 0)) {
330  // in slave mode 0 runnumber means do nothing
331  CloseFile();
332  DOUT0("CLOSE FILE WRITING in slave mode");
333  return dabc::do_Ok;
334  }
335 
336  if ((fLastRunNumber != 0) && (fLastRunNumber == fRunNumber)) {
337  DOUT0("Saw same runid %d 0x%u as previous - skip buffer", fLastRunNumber, fLastRunNumber);
338  return dabc::do_Ok;
339  }
340 
341  if (!StartNewFile()) {
342  EOUT("Cannot start new file for writing");
343  return dabc::do_Error;
344  }
345  ShowInfo(0, dabc::format("%s open for writing runid %d", CurrentFileName().c_str(), fRunNumber));
346  }
347 
348  if (!fFile.isWriting()) return dabc::do_Error;
349 
350  unsigned total_write_size = 0, num_events = 0;
351 
352  if (is_subev) {
353  // this is list of subevents in the buffer, one need to add artificial events headers for each subevents
354 
355  hadaq::RawEvent evnt;
356 
357  hadaq::ReadIterator iter(buf);
358  while (iter.NextSubeventsBlock()) {
359 
360  if (!iter.NextSubEvent())
361  return dabc::do_Error;
362 
363  char* write_ptr = (char*) iter.subevnt();
364  unsigned write_size = iter.subevnt()->GetPaddedSize();
365 
366  evnt.Init(fEventNumber++, fRunNumber);
367  evnt.SetSize(write_size + sizeof(hadaq::RawEvent));
368 
369  if (!fFile.WriteBuffer(&evnt, sizeof(hadaq::RawEvent)))
370  return dabc::do_Error;
371 
372  if (!fFile.WriteBuffer(write_ptr, write_size))
373  return dabc::do_Error;
374 
375  total_write_size += sizeof(hadaq::RawEvent) + write_size;
376  num_events ++;
377  }
378 
379  } else if (is_events) {
380 
381  for (unsigned n=0;n<buf.NumSegments();n++) {
382 
383  unsigned write_size = buf.SegmentSize(n);
384 
385  if (cursor >= write_size) {
386  // skip segment completely
387  cursor -= write_size;
388  continue;
389  }
390 
391  char* write_ptr = (char*) buf.SegmentPtr(n);
392 
393  if(startnewfile)
394  DOUT2("Wrote to %s at segment %d, cursor %d, size %d", CurrentFileName().c_str(), n, cursor, write_size-cursor);
395 
396  if (cursor > 0) {
397  write_ptr += cursor;
398  write_size -= cursor;
399  cursor = 0;
400  }
401 
402  if (fRunSlave && fRfio && startnewfile)
403  DOUT1("HldOutput write %u bytes after new file was started", write_size);
404 
405  if (!fFile.WriteBuffer(write_ptr, write_size))
406  return dabc::do_Error;
407 
408  if (fRunSlave && fRfio && startnewfile)
409  DOUT1("HldOutput did write %u bytes after new file was started", write_size);
410 
411  total_write_size += write_size;
412  }
413 
414  num_events = hadaq::ReadIterator::NumEvents(buf);
415  }
416 
417  // TODO: in case of partial written buffer, account sizes to correct file
418  AccountBuffer(total_write_size, num_events);
419 
420  if (fRunSlave && fRfio && startnewfile)
421  DOUT1("HldOutput write complete first buffer after new file was started");
422 
423  return dabc::do_Ok;
424 }
void SetIO(FileInterface *_io, bool _ioowner=false)
Definition: BinaryFile.h:114
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned NumSegments() const
Returns number of segment in buffer.
Definition: Buffer.h:163
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
Definition: Buffer.h:174
unsigned GetTypeId() const
Definition: Buffer.h:152
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Definition: Buffer.h:171
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
bool SetUInt(const std::string &name, unsigned v)
Definition: Command.h:147
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
Defines and implements basic POSIX file interface.
Definition: BinaryFile.h:33
int GetSizeLimitMB() const
Definition: DataIO.h:312
std::string fFileName
Definition: DataIO.h:286
virtual bool Write_Init()
This is generic virtual method to initialize output before real work is started.
Definition: DataIO.cxx:211
virtual bool Write_Stat(dabc::Command cmd)
Fill different statistic parameters into provided command.
Definition: DataIO.cxx:322
Parameter FindPar(const std::string &parname)
Definition: Manager.cxx:2069
ModuleRef FindModule(const std::string &name)
Definition: Manager.cxx:2018
Parameter class
Definition: Parameter.h:163
RecordField Value() const
Returns parameter value.
Definition: Parameter.h:202
uint64_t AsUInt(uint64_t dflt=0) const
Definition: Record.cxx:525
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Uniform Resource Locator interpreter.
Definition: Url.h:33
bool HasOption(const std::string &optname) const
Definition: Url.h:70
int GetOptionInt(const std::string &optname, int dflt=0) const
Definition: Url.cxx:290
std::string GetOptions() const
Definition: Url.h:63
bool Submit(Command cmd)
Definition: Worker.cxx:1139
bool fRfio
true if we write to rfio
Definition: HldOutput.h:40
bool fRunSlave
true if run id is controlled by combiner
Definition: HldOutput.h:34
bool fLtsm
true if we write to ltsm
Definition: HldOutput.h:41
virtual ~HldOutput()
Definition: HldOutput.cxx:79
virtual bool Write_Stat(dabc::Command cmd)
Fill different statistic parameters into provided command.
Definition: HldOutput.cxx:226
uint32_t fRunNumber
id number of current run (can be 0 when data are ignored)
Definition: HldOutput.h:36
virtual bool Write_Retry()
Returns true if output object can be reinitialized for recover error.
Definition: HldOutput.cxx:191
virtual unsigned Write_Buffer(dabc::Buffer &buf)
Start writing of buffer to output.
Definition: HldOutput.cxx:239
virtual bool Write_Init()
This is generic virtual method to initialize output before real work is started.
Definition: HldOutput.cxx:85
hadaq::HldFile fFile
Definition: HldOutput.h:48
uint16_t fEBNumber
id of parent event builder process
Definition: HldOutput.h:38
bool fPlainName
if true no any runid extensions appended to file name
Definition: HldOutput.h:42
bool fUseDaqDisk
true if /data number is taken from daq_disk (HADES setup)
Definition: HldOutput.h:39
std::string fUrlOptions
remember URL options, may be used for RFIO file open
Definition: HldOutput.h:43
virtual bool Write_Restart(dabc::Command cmd)
Method used to restart output - like recreate new output file.
Definition: HldOutput.cxx:211
HldOutput(const dabc::Url &url)
Definition: HldOutput.cxx:34
Read iterator for HADAQ events/subevents.
Definition: Iterator.h:39
static unsigned NumEvents(const dabc::Buffer &buf)
Definition: Iterator.cxx:259
bool NextEvent()
Used for ready HLD events.
Definition: Iterator.cxx:127
hadaq::RawSubevent * subevnt() const
Definition: Iterator.h:90
bool NextSubEvent()
Used for sub-events iteration inside current block.
Definition: Iterator.cxx:199
hadaq::RawEvent * evnt() const
Definition: Iterator.h:81
bool NextSubeventsBlock()
Depending from buffer type calls NextHadTu() or NextEvent()
Definition: Iterator.cxx:166
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
Event manipulation API.
Definition: api.h:23
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
@ do_Ok
Definition: DataIO.h:142
@ do_Close
Definition: DataIO.h:148
@ do_Error
Definition: DataIO.h:147
@ mbt_EOF
Definition: Buffer.h:45
@ mbt_HadaqSubevents
Definition: HadaqTypeDefs.h:26
@ mbt_HadaqEvents
Definition: HadaqTypeDefs.h:24
@ mbt_HadaqStopRun
Definition: HadaqTypeDefs.h:27
std::string FormatFilename(uint32_t runid, uint16_t ebid=0)
uint32_t CreateRunId()
void SetSize(uint32_t bytes)
Definition: defines.h:191
uint32_t GetPaddedSize() const
Definition: defines.h:184
Hadaq event structure.
Definition: defines.h:443
void Init(uint32_t evnt, uint32_t run=0, uint32_t id=EvtId_DABC)
Definition: defines.h:472
int32_t GetRunNr() const
Definition: defines.h:463