DABC (Data Acquisition Backbone Core)  2.9.9
DataIO.cxx
Go to the documentation of this file.
1 // $Id: DataIO.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/DataIO.h"
17 
18 #include "dabc/Manager.h"
19 #include "dabc/BinaryFile.h"
20 
22 {
23  unsigned sz = Read_Size();
24 
25  dabc::Buffer buf;
26 
27  if (sz == di_DfltBufSize) sz = 0x10000;
28 
29  if (sz>di_ValidSize) return buf;
30 
32 
33  if (buf.null()) return buf;
34 
35  if (Read_Start(buf) != di_Ok) {
36  buf.Release();
37  return buf;
38  }
39 
40  if (Read_Complete(buf) != di_Ok) {
41  buf.Release();
42  return buf;
43  }
44 
45  return buf;
46 }
47 
48 // ======================================================
49 
51  fInfoName()
52 {
53 }
54 
55 void dabc::DataOutput::SetInfoParName(const std::string &name)
56 {
57  fInfoName = name;
58 }
59 
60 
61 void dabc::DataOutput::ShowInfo(int lvl, const std::string &info)
62 {
64  if (!fInfoName.empty())
65  par = dabc::mgr.FindPar(fInfoName);
66 
67  if (par.null()) {
68  if (lvl<0)
69  EOUT(info.c_str());
70  else
71  DOUT1(info.c_str());
72  } else {
73  par.SetValue(info);
74  par.FireModified();
75  }
76 }
77 
78 
80 {
81  if (Write_Check() != do_Ok) return false;
82 
83  if (Write_Buffer(buf) != do_Ok) return false;
84 
85  return Write_Complete() == do_Ok;
86 }
87 
88 
89 // ========================================================
90 
91 
93  DataInput(),
94  fFileName(url.GetFullName()),
95  fFilesList(),
96  fIO(0),
97  fCurrentName(),
98  fLoop(url.HasOption("loop")),
99  fReduce(url.GetOptionDouble("reduce",1.))
100 {
101  if (fReduce>1.) fReduce = 1; else
102  if (fReduce<0.01) fReduce = 0.01;
103 }
104 
106 {
107  DOUT3("Destroy file input %p", this);
108 
109  if (fIO!=0) {
110  delete fIO;
111  fIO = 0;
112  }
113 }
114 
116 {
117  if (fIO!=0) {
118  EOUT("File interface object already assigned");
119  delete io;
120  } else {
121  fIO = io;
122  }
123 }
124 
126 {
127  if (fFileName.find_first_of("*?") != std::string::npos) {
128  fFilesList = fIO->fmatch(fFileName.c_str());
129  } else {
130  fFilesList = new dabc::Object(0, "FilesList");
131  new dabc::Object(fFilesList(), fFileName);
132  }
133 
134  fFilesList.SetAutoDestroy(true);
135 
136  return fFilesList.NumChilds() > 0;
137 }
138 
139 bool dabc::FileInput::Read_Init(const WorkerRef& wrk, const Command& cmd)
140 {
141  if (!dabc::DataInput::Read_Init(wrk,cmd)) return false;
142 
143  if (fFileName.empty()) return false;
144 
145  if (!fFilesList.null()) {
146  EOUT("Files list already exists");
147  return false;
148  }
149 
150  if (fIO==0) fIO = new dabc::FileInterface;
151 
152  return InitFilesList();
153 }
154 
156 {
157  fCurrentName.clear();
158  if (fFilesList.NumChilds() == 0) {
159  if (!fLoop || !InitFilesList()) return false;
160  }
161  const char* nextname = fFilesList.GetChild(0).GetName();
162  if (nextname!=0) fCurrentName = nextname;
163  fFilesList.GetChild(0).Destroy();
164  return !fCurrentName.empty();
165 }
166 
168 {
169  cmd.SetStr("InputFileName", fFileName);
170  cmd.SetStr("InputCurrFileName", fCurrentName);
171  return true;
172 }
173 
174 
175 // ================================================================
176 
177 dabc::FileOutput::FileOutput(const dabc::Url& url, const std::string &ext) :
178  DataOutput(url),
179  fFileName(url.GetFullName()),
180  fSizeLimitMB(url.GetOptionInt(dabc::xml_maxsize,0)),
181  fFileExtens(ext),
182  fIO(0),
183  fCurrentFileNumber(url.GetOptionInt(dabc::xml_number,0)),
184  fCurrentFileName(),
185  fCurrentFileSize(0),
186  fTotalFileSize(0),
187  fTotalNumBufs(0),
188  fTotalNumEvents(0)
189 {
190 }
191 
193 {
194  if (fIO!=0) {
195  delete fIO;
196  fIO = 0;
197  }
198 }
199 
201 {
202  if (fIO!=0) {
203  EOUT("File interface object already assigned");
204  delete io;
205  } else {
206  fIO = io;
207  }
208 }
209 
210 
212 {
213  if (!DataOutput::Write_Init()) return false;
214 
215  if (fIO==0) fIO = new FileInterface;
216 
217  std::string mask = ProduceFileName("*");
218 
219  dabc::Reference lst = fIO->fmatch(mask.c_str());
220 
221  int maxnumber = -1;
222 
223  for (unsigned cnt=0; cnt < lst.NumChilds(); cnt++) {
224 
225  std::string fname = lst.GetChild(cnt).GetName();
226 
227 // DOUT0("Test file %s", fname.c_str());
228 
229  fname.erase(fname.length()-fFileExtens.length(), fFileExtens.length());
230  size_t pos = fname.length() - 1;
231  while ((fname[pos]>='0') && (fname[pos] <= '9')) pos--;
232  fname.erase(0, pos+1);
233 
234  while ((fname.length()>1) && fname[0]=='0') fname.erase(0, 1);
235 
236  int number(0);
237  if ((fname.length()>0) && dabc::str_to_int(fname.c_str(), &number)) {
238  if (number>maxnumber) maxnumber = number;
239  }
240  }
241 
242  lst.Destroy();
243 
244  if (fCurrentFileNumber <= maxnumber) {
245  fCurrentFileNumber = maxnumber + 1;
246  ShowInfo(0, dabc::format("start with file number %d", fCurrentFileNumber));
247  }
248 
249  return true;
250 }
251 
252 std::string dabc::FileOutput::ProduceFileName(const std::string &suffix)
253 {
254  std::string fname = fFileName;
255 
256  size_t len = fname.length();
257  size_t pos = fname.rfind(fFileExtens);
258 
259  if (pos == (len-fFileExtens.length())) {
260  fname.insert(pos, suffix);
261  } else {
262  fname += suffix;
263  fname += fFileExtens;
264  }
265 
266  return fname;
267 }
268 
269 
271 {
272  fCurrentFileName = ProduceFileName(dabc::format("_%04d", fCurrentFileNumber++));
273  fCurrentFileSize = 0;
274 }
275 
276 
278 {
279  if (fSizeLimitMB > 0)
280  return (fCurrentFileSize + sz)/1024./1024. > fSizeLimitMB;
281 
282  return false;
283 }
284 
285 
287 {
288  fCurrentFileSize += sz;
289  fTotalFileSize += sz;
290  fTotalNumBufs++;
291  fTotalNumEvents += numev;
292 }
293 
294 
296 {
297  std::string info = fCurrentFileName;
298  size_t pos = info.rfind("/");
299  if (pos!=std::string::npos) info.erase(0, pos);
300 
301  info.append(" ");
302  info.append(dabc::size_to_str(fCurrentFileSize));
303 
304  if (fSizeLimitMB>0)
305  info.append(dabc::format(" (%3.1f %s)", 100./(fSizeLimitMB*1024.*1024.)*fCurrentFileSize, "%"));
306 
307  if (fTotalNumEvents > 0) {
308  if (fTotalNumEvents<1000000)
309  info.append(dabc::format(" %ld ev", fTotalNumEvents));
310  else
311  info.append(dabc::format(" %8.3e ev", 1.*fTotalNumEvents));
312  } else {
313  if (fTotalNumBufs<1000000)
314  info.append(dabc::format(" %ld bufs", fTotalNumBufs));
315  else
316  info.append(dabc::format(" %8.3e bufs", 1.*fTotalNumBufs));
317  }
318 
319  return info;
320 }
321 
323 {
324  cmd.SetStr("OutputFileName", fFileName);
325  cmd.SetDouble("OutputFileEvents", fTotalNumEvents);
326  cmd.SetDouble("OutputFileSize", fTotalFileSize);
327 
328  cmd.SetStr("OutputCurrFileName", fCurrentFileName);
329  cmd.SetDouble("OutputCurrFileSize", fCurrentFileSize);
330 
331  return true;
332 }
Reference on memory from memory pool.
Definition: Buffer.h:135
static Buffer CreateBuffer(BufferSize_t sz)
This static method create independent buffer for any other memory pools Therefore it can be used in s...
Definition: Buffer.cxx:419
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetDouble(const std::string &name, double v)
Definition: Command.h:144
Interface for implementing any kind of data input.
Definition: DataIO.h:61
virtual unsigned Read_Start(Buffer &buf)
Prepare buffer for reading (if required)
Definition: DataIO.h:103
virtual unsigned Read_Size()
Defines required buffer size for next operation.
Definition: DataIO.h:95
virtual unsigned Read_Complete(Buffer &buf)
Complete reading of the buffer from source,.
Definition: DataIO.h:114
Buffer ReadBuffer()
Reads complete buffer.
Definition: DataIO.cxx:21
virtual bool Read_Init(const WorkerRef &wrk, const Command &cmd)
Initialize data input, using port and command.
Definition: DataIO.h:82
Interface for implementing any kind of data output.
Definition: DataIO.h:158
DataOutput(const dabc::Url &url)
Definition: DataIO.cxx:50
void ShowInfo(int lvl, const std::string &info)
Definition: DataIO.cxx:61
bool WriteBuffer(Buffer &buf)
Write buffer to the output.
Definition: DataIO.cxx:79
virtual bool Write_Init()
This is generic virtual method to initialize output before real work is started.
Definition: DataIO.h:186
void SetInfoParName(const std::string &name)
Methods set parameter name, which could be used for debug output.
Definition: DataIO.cxx:55
bool TakeNextFileName()
Definition: DataIO.cxx:155
virtual ~FileInput()
Definition: DataIO.cxx:105
double fReduce
factor to reduce buffer size when reading
Definition: DataIO.h:253
virtual bool Read_Init(const WorkerRef &wrk, const Command &cmd)
Initialize data input, using port and command.
Definition: DataIO.cxx:139
void SetIO(dabc::FileInterface *io)
Definition: DataIO.cxx:115
bool InitFilesList()
Definition: DataIO.cxx:125
virtual bool Read_Stat(dabc::Command cmd)
Provide timeout value.
Definition: DataIO.cxx:167
FileInput(const dabc::Url &url)
Definition: DataIO.cxx:92
Defines and implements basic POSIX file interface.
Definition: BinaryFile.h:33
bool CheckBufferForNextFile(unsigned sz)
Return true if new file should be started.
Definition: DataIO.cxx:277
void AccountBuffer(unsigned sz, int numev=0)
Definition: DataIO.cxx:286
std::string ProduceFileName(const std::string &suffix)
Definition: DataIO.cxx:252
virtual bool Write_Init()
This is generic virtual method to initialize output before real work is started.
Definition: DataIO.cxx:211
void SetIO(dabc::FileInterface *io)
Definition: DataIO.cxx:200
virtual std::string ProvideInfo()
Method can be used to get debug info about output.
Definition: DataIO.cxx:295
void ProduceNewFileName()
Definition: DataIO.cxx:270
virtual ~FileOutput()
Definition: DataIO.cxx:192
virtual bool Write_Stat(dabc::Command cmd)
Fill different statistic parameters into provided command.
Definition: DataIO.cxx:322
FileOutput(const dabc::Url &url, const std::string &ext="")
Definition: DataIO.cxx:177
Special info parameter class.
Definition: Parameter.h:300
Parameter FindPar(const std::string &parname)
Definition: Manager.cxx:2069
Base class for most of the DABC classes.
Definition: Object.h:116
bool SetValue(const RecordField &v)
Set parameter value.
Definition: Parameter.h:205
void FireModified()
Can be called by user to signal framework that parameter was modified.
Definition: Parameter.cxx:555
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
unsigned NumChilds() const
Return number of childs in referenced object.
Definition: Reference.cxx:194
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
void Destroy()
Release reference and starts destroyment of referenced object.
Definition: Reference.cxx:148
Reference GetChild(unsigned n) const
Return reference on child n.
Definition: Reference.cxx:199
Uniform Resource Locator interpreter.
Definition: Url.h:33
Reference on dabc::Worker
Definition: Worker.h:466
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
Event manipulation API.
Definition: api.h:23
const char * xml_maxsize
Definition: Object.cxx:73
uint32_t BufferSize_t
Definition: Buffer.h:32
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
std::string size_to_str(unsigned long sz, int prec=1, int select=0)
Convert size to string of form like 4.2 GB or 3.7 MB.
Definition: string.cxx:75
@ do_Ok
Definition: DataIO.h:142
const char * xml_number
Definition: Object.cxx:74
@ di_Ok
Definition: DataIO.h:38
@ di_ValidSize
Definition: DataIO.h:33
@ di_DfltBufSize
Definition: DataIO.h:42
bool str_to_int(const char *val, int *res)
Convert string to integer value.
Definition: string.cxx:142