DABC (Data Acquisition Backbone Core)  2.9.9
HierarchyStore.cxx
Go to the documentation of this file.
1 // $Id: HierarchyStore.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/HierarchyStore.h"
17 
18 #include <algorithm>
19 
21  fBasePath(),
22  fIO(0),
23  fFile(),
24  fLastStoreTm(),
25  fLastFlushTm(),
26  fDoStore(false),
27  fDoFlush(false),
28  fLastVersion(0),
29  fStoreBuf(),
30  fFlushBuf()
31 {
32 }
33 
35 {
36  CloseFile();
37 
38  if (fIO!=0) {
39  delete fIO;
40  fIO = 0;
41  }
42 }
43 
44 bool dabc::HierarchyStore::SetBasePath(const std::string &path)
45 {
46  if (fFile.isOpened()) {
47  EOUT("Cannot change base path when current file is opened");
48  return false;
49  }
50 
51  fBasePath = path;
52  return true;
53 }
54 
56 {
57  if (!CloseFile()) return false;
58 
59  std::string path = fBasePath;
60  if ((path.length()>0) && (path[path.length()-1]!='/')) path.append("/");
61 
62  dabc::DateTime tm;
63  tm.GetNow();
64 
65  std::string strdate = tm.OnlyDateAsString();
66  std::string strtime = tm.OnlyTimeAsString();
67 
68  path.append(strdate);
69 
70  if (fIO==0) fIO = new FileInterface;
71 
72  if (!fIO->mkdir(path.c_str())) {
73  EOUT("Cannot create path %s for hierarchy storage", path.c_str());
74  return false;
75  }
76 
77  DOUT0("HierarchyStore:: CREATE %s", path.c_str());
78 
79  path.append("/");
80  path.append(strtime);
81  path.append(".dabc");
82 
83  fFile.SetIO(fIO);
84  if (!fFile.OpenWriting(path.c_str())) {
85  EOUT("Cannot open file %s for hierarchy storage", path.c_str());
86  return false;
87  }
88 
89  return WriteDiff(buf);
90 }
91 
93 {
94  if (!fFile.isWriting()) return false;
95 
96  BufferSize_t fullsz = buf.GetTotalSize();
97 
98  DOUT0("HierarchyStore:: WRITE %u", (unsigned) buf.GetTotalSize());
99 
100  if (!fFile.WriteBufHeader(fullsz, buf.GetTypeId())) {
101  EOUT("Cannot write buffer header");
102  return false;
103  }
104 
105  for (unsigned n=0;n<buf.NumSegments();n++)
106  if (!fFile.WriteBufPayload(buf.SegmentPtr(n), buf.SegmentSize(n))) {
107  EOUT("Cannot write buffer segment");
108  return false;
109  }
110 
111  return true;
112 }
113 
115 {
116  if (!fFile.isOpened()) return true;
117 
118  fFile.Close();
119 
120  return true;
121 }
122 
123 bool dabc::HierarchyStore::CheckForNextStore(DateTime& now, double store_period, double flush_period)
124 {
125  // if flags already set, do not touch them here
126  if (fDoStore || fDoFlush) return true;
127 
128  if (now.null()) now.GetNow();
129 
130  // DOUT0("SINCE LAST STORE = %5.1f", now - fLastStoreTm);
131 
132  // in the very first call trigger only write of base structure
133  if (fLastStoreTm.null()) {
134  fLastStoreTm = now;
135  fLastFlushTm = now;
136  fDoStore = false;
137  fDoFlush = true;
138  } else
139  if (now - fLastStoreTm > store_period) {
140  fLastStoreTm = now;
141  fDoStore = true;
142  if (now - fLastFlushTm >= flush_period) {
143  fDoFlush = true;
144  fLastFlushTm = now;
145  }
146  }
147 
148  return fDoStore || fDoFlush;
149 }
150 
152 {
153  if (fDoStore) {
154  // we record diff to previous version, including all history entries
155 
156  fStoreBuf = h.SaveToBuffer(dabc::stream_Full, fLastVersion, 1000000000);
157 
158  DOUT0("HierarchyStore:: EXTRACT prev %u now %u chlds %u size %u",
159  (unsigned) fLastVersion, (unsigned) h.GetVersion(),
160  (unsigned) h()->GetChildsVersion(), (unsigned) fStoreBuf.GetTotalSize());
161 
162  fLastVersion = h.GetVersion();
163  }
164 
165  if (fDoFlush) {
166  // we record complete hierarchy without any history entry
167  fFlushBuf = h.SaveToBuffer(dabc::stream_Full, 0, 0);
168  fLastVersion = h.GetVersion();
169  }
170 
171  if (fDoFlush || fDoStore) {
172  // this time will be used as raster when reading files
173  // it can be read very efficient - most decoding can be skipped
174  h.SetField("storetm", fLastStoreTm);
175  h.MarkChangedItems(fLastStoreTm.AsJSDate());
176  }
177 
178  return true;
179 }
180 
182 {
183  if (fDoStore) {
184  if (!fStoreBuf.null()) WriteDiff(fStoreBuf);
185  fStoreBuf.Release();
186  fDoStore = false;
187  }
188 
189  if (fDoFlush) {
190  if (!fFlushBuf.null()) StartFile(fFlushBuf);
191  fFlushBuf.Release();
192  fDoFlush = false;
193  }
194 
195  return true;
196 }
197 
198 // =================================================================================
199 
200 
201 
203  fBasePath(),
204  fIO(0)
205 {
206 }
207 
209 {
210  if (fIO!=0) {
211  delete fIO;
212  fIO = 0;
213  }
214 }
215 
216 void dabc::HierarchyReading::SetBasePath(const std::string &path)
217 {
218  fBasePath = path;
219  if ((fBasePath.length()>0) && (fBasePath[fBasePath.length()-1] != '/')) fBasePath.append("/");
220 }
221 
222 std::string dabc::HierarchyReading::MakeFileName(const std::string &fpath, const DateTime& dt)
223 {
224  std::string res = fpath;
225  if ((res.length()>0) && (res[res.length()-1] != '/')) res.append("/");
226 
227  std::string strdate = dt.OnlyDateAsString();
228  std::string strtime = dt.OnlyTimeAsString();
229 
230  res += dabc::format("%s/%s.dabc", strdate.c_str(), strtime.c_str());
231 
232  return res;
233 }
234 
235 bool dabc::HierarchyReading::ScanFiles(const std::string &dirname, const DateTime& onlydate, std::vector<uint64_t>& vect)
236 {
237  std::string mask = dirname + "*.dabc";
238 
239  Reference files = fIO->fmatch(mask.c_str(), true);
240 
241  for (unsigned n=0;n<files.NumChilds();n++) {
242  std::string fname = files.GetChild(n).GetName();
243  fname.erase(0, dirname.length());
244 
245  size_t pos = fname.find(".dabc");
246  if (pos == std::string::npos) {
247  EOUT("Wrong time file name %s", fname.c_str());
248  continue;
249  }
250  fname.erase(pos);
251 
252  DateTime dt = onlydate;
253  if (!dt.SetOnlyTime(fname.c_str())) {
254  EOUT("Wrong time file name %s", fname.c_str());
255  continue;
256  }
257 
258  vect.push_back(dt.AsJSDate());
259  }
260 
261  return true;
262 }
263 
264 
265 bool dabc::HierarchyReading::ScanTreeDir(dabc::Hierarchy& h, const std::string &dirname)
266 {
267  if (h.null() || dirname.empty()) return false;
268 
269  DOUT0("SCAN %s", dirname.c_str());
270 
271  std::string mask = dirname + "*";
272 
273  Reference dirs = fIO->fmatch(mask.c_str(), false);
274 
275  bool isanysubdir(false), isanydatedir(false);
276 
277  std::vector<uint64_t> files;
278 
279  for (unsigned n=0;n<dirs.NumChilds();n++) {
280  std::string fullsubname = dirs.GetChild(n).GetName();
281 
282  std::string itemname = fullsubname;
283  itemname.erase(0, dirname.length());
284 
285  DateTime dt;
286 
287  DOUT0("FOUND %s PART %s", fullsubname.c_str(), itemname.c_str());
288 
289  if (dt.SetOnlyDate(itemname.c_str())) {
290  isanydatedir = true;
291  if (!ScanFiles(fullsubname + "/", dt, files)) return false;
292  } else {
293  isanysubdir = true;
294  dabc::Hierarchy subh = h.CreateHChild(itemname);
295  if (!ScanTreeDir(subh, fullsubname + "/")) return false;
296  }
297  }
298 
299  dirs.Release();
300 
301  if (isanysubdir && isanydatedir) {
302  EOUT("DIR %s Cannot combine subdirs with date dirs!!!", dirname.c_str());
303  return false;
304  }
305 
306  if (isanydatedir && (files.size()>0)) {
307 
308  std::sort(files.begin(), files.end());
309 
310  dabc::DateTime mindt(files.front()), maxdt(files.back());
311 
312  DOUT0("DIR: %s mintm: %s maxtm: %s files %u", dirname.c_str(), mindt.AsJSString().c_str(), maxdt.AsJSString().c_str(), files.size());
313 
314  h.SetField("dabc:path", dirname);
315  h.SetField("dabc:mindt", mindt);
316  h.SetField("dabc:maxdt" ,maxdt);
317  h.SetField("dabc:files", files);
318  }
319 
320 
321  return true;
322 }
323 
324 
326 {
327  fTree.Release();
328  fTree.Create("TOP");
329  if (fIO==0) fIO = new FileInterface;
330  return ScanTreeDir(fTree, fBasePath);
331 }
332 
334 {
335 
336  dabc::Buffer buf;
337 
338  if (f.eof()) return buf;
339 
340  uint64_t size(0), typ(0);
341  if (!f.ReadBufHeader(&size, &typ)) return buf;
342 
343  buf = dabc::Buffer::CreateBuffer(size);
344  if (buf.null()) return buf;
345 
346  if (!f.ReadBufPayload(buf.SegmentPtr(), size)) { buf.Release(); return buf; }
347 
348  buf.SetTotalSize(size);
349  buf.SetTypeId(typ);
350 
351  return buf;
352 }
353 
354 bool dabc::HierarchyReading::ProduceStructure(Hierarchy& tree, const DateTime& from_date, const DateTime& till_date, const std::string &entry, Hierarchy& tgt)
355 {
356  if (tree.null()) return false;
357 
358  DOUT0("Produce structure for item %s", tree.ItemName().c_str());
359 
360  if (!tree.HasField("dabc:path")) {
361  for (unsigned n=0;n<tree.NumChilds();n++) {
362  Hierarchy tree_chld = tree.GetChild(n);
363 
365  if (!entry.empty())
366  if (entry.find(tree_chld.ItemName())!=0) continue;
367 
368  Hierarchy tgt_chld = tgt.CreateHChild(tree_chld.GetName());
369  if (!ProduceStructure(tree_chld, from_date, till_date, entry, tgt_chld)) return false;
370  }
371  return true;
372  }
373 
374  std::string fpath = tree.Field("dabc:path").AsStr();
375 
376  int nfiles = tree.Field("dabc:files").GetArraySize();
377  uint64_t* files = tree.Field("dabc:files").GetUIntArr();
378 
379  if ((files==0) || (nfiles<=0)) return false;
380 
381  DateTime dt;
382  dt.SetJSDate(files[0]);
383  if (!from_date.null()) {
384  for (int n=0;n<nfiles;n++)
385  if (files[n] <= from_date.AsJSDate())
386  dt.SetJSDate(files[n]);
387  }
388 
389  std::string fname = MakeFileName(fpath, dt);
390 
391  DOUT0("Opening file %s", fname.c_str());
392 
394  f.SetIO(fIO);
395  if (!f.OpenReading(fname.c_str())) return false;
396  dabc::Buffer buf = ReadBuffer(f);
397  if (buf.null()) return false;
398 
399  if (!tgt.ReadFromBuffer(buf)) return false;
400 
401  if (!entry.empty()) {
402 
403  Hierarchy sel = tgt.GetTop().FindChild(entry.c_str());
404 
405  if (sel.null()) {
406  EOUT("Cannot locate entry %s in the storage", entry.c_str());
407  return false;
408  }
409 
410  tgt.DisableReading(); // we ignore all data
411  sel.EnableReading(tgt); // beside selected element
412  tgt.EnableReading(); // we enable reading of top - it stores time stamp
413 
414  // now we need enable only that entry
415  sel.EnableHistory(100);
416 
417  while (!f.eof()) {
418  buf = ReadBuffer(f);
419  if (!buf.null())
420  if (!tgt.UpdateFromBuffer(buf, stream_NoDelete)) return false;
421  }
422 
423  }
424 
425  return true;
426 }
427 
429 {
430  if (fTree.null()) {
431  if (!ScanTree()) return false;
432  }
433 
434  if (fIO==0) return false;
435 
436  tgt.Release();
437  tgt.Create("TOP");
438 
439  return ProduceStructure(fTree, dt, 0, "", tgt);
440 }
441 
442 dabc::Hierarchy dabc::HierarchyReading::GetSerie(const std::string &entry, const DateTime& from, const DateTime& till)
443 {
444  dabc::Hierarchy h, res;
445 
446  if (fTree.null()) {
447  if (!ScanTree()) return res;
448  }
449 
450  if (fIO==0) return res;
451  h.Create("TOP");
452 
453  if (!ProduceStructure(fTree, from, till, entry, h)) return res;
454 
455  res = h.FindChild(entry.c_str());
456  if (res.null()) {
457  EOUT("Cannot locate entry %s in the storage", entry.c_str());
458  return res;
459  }
460 
461  res.DettachFromParent();
462 
463  h.Release(); // rest of hierarchy is not interesting
464 
465  return res;
466 }
467 
468 
bool eof() const
Definition: BinaryFile.h:135
void SetIO(FileInterface *_io, bool _ioowner=false)
Definition: BinaryFile.h:114
Generic file storage for DABC buffers.
Definition: BinaryFile.h:181
bool ReadBufPayload(void *ptr, uint64_t sz)
Definition: BinaryFile.h:337
bool OpenReading(const char *fname)
Definition: BinaryFile.h:197
bool ReadBufHeader(uint64_t *size, uint64_t *typ=0)
Definition: BinaryFile.h:317
Reference on memory from memory pool.
Definition: Buffer.h:135
unsigned NumSegments() const
Returns number of segment in buffer.
Definition: Buffer.h:163
unsigned SegmentSize(unsigned n=0) const
Returns size on the segment, no any boundary checks.
Definition: Buffer.h:174
void SetTotalSize(BufferSize_t len)
Set total length of the buffer to specified value Size cannot be bigger than original size of the buf...
Definition: Buffer.cxx:99
unsigned GetTypeId() const
Definition: Buffer.h:152
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
static Buffer CreateBuffer(BufferSize_t sz)
This static method create independent buffer for any other memory pools Therefore it can be used in s...
Definition: Buffer.cxx:419
void SetTypeId(unsigned tid)
Definition: Buffer.h:151
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Definition: Buffer.h:171
Class for holding GMT time with precision of nanoseconds.
Definition: timing.h:190
bool SetOnlyTime(const char *sbuf)
Set only time part of DateTime.
Definition: timing.cxx:308
std::string OnlyTimeAsString(const char *separ=nullptr, bool localtime=false) const
Fills only time as string.
Definition: timing.cxx:273
bool null() const
Definition: timing.h:210
std::string AsJSString(int ndecimal=3) const
convert string into sec.frac format, can be interpret directly in JavaScript ISO 8601 standard is use...
Definition: timing.cxx:212
void SetJSDate(uint64_t jsdate)
Set value in form of JS date - milliseconds since 1.1.1970.
Definition: timing.h:224
uint64_t AsJSDate() const
Return date and time in JS format - number of millisecond since 1.1.1970.
Definition: timing.cxx:158
DateTime & GetNow()
Definition: timing.cxx:147
bool SetOnlyDate(const char *sbuf)
Set only date part from the string.
Definition: timing.cxx:289
std::string OnlyDateAsString(const char *separ=nullptr, bool localtime=false) const
Fills only date as string.
Definition: timing.cxx:257
Defines and implements basic POSIX file interface.
Definition: BinaryFile.h:33
std::string MakeFileName(const std::string &fpath, const DateTime &dt)
bool ScanTreeDir(dabc::Hierarchy &h, const std::string &dirname)
! scanned files tree
void SetBasePath(const std::string &path)
Set top directory for all recorded data.
Hierarchy GetSerie(const std::string &entry, const DateTime &from, const DateTime &till)
Get entry with history for specified time interval.
bool ScanFiles(const std::string &dirname, const DateTime &onlydate, std::vector< uint64_t > &vect)
bool GetStrucutre(Hierarchy &h, const DateTime &dt=0)
Get full structure at given point of time.
bool ProduceStructure(Hierarchy &tree, const DateTime &from, const DateTime &till, const std::string &entry, Hierarchy &tgt)
bool ScanTree()
Scan only directories, do not open any files.
dabc::Buffer ReadBuffer(dabc::BinaryFile &f)
bool WriteDiff(dabc::Buffer buf)
bool CheckForNextStore(DateTime &now, double store_period, double flush_period)
Returns true if any new store action should be performed.
bool WriteExtractedData()
Write extracted data to files, performed outside hierarchy mutex.
bool SetBasePath(const std::string &path)
Set base path for data storage, can only be changed when all files are closed.
bool StartFile(dabc::Buffer buf)
bool ExtractData(dabc::Hierarchy &h)
Extract data which is can be stored, must be called under hierarchy mutex.
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
void DisableReading(bool withchlds=true)
Mark all elements that non of data will be read.
Definition: Hierarchy.cxx:1157
Hierarchy GetTop() const
Returns reference on the top element of the hierarchy.
Definition: Hierarchy.cxx:1139
void MarkChangedItems(uint64_t tm=0)
If any field was modified, item will be marked with new version.
Definition: Hierarchy.h:330
uint64_t GetVersion() const
Returns actual version of hierarchy entry.
Definition: Hierarchy.h:349
Hierarchy FindChild(const char *name)
Return child element from hierarchy.
Definition: Hierarchy.h:362
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
std::string ItemName() const
Name which is used as item name in hierarchy.
Definition: Hierarchy.cxx:1133
bool DettachFromParent()
Detach from parent object.
Definition: Hierarchy.cxx:1150
const RecordField & Field(const std::string &name) const
Definition: Hierarchy.h:304
dabc::Buffer SaveToBuffer(unsigned kind=stream_Full, uint64_t version=0, unsigned hlimit=0)
Save hierarchy in binary form, relative to specified version.
Definition: Hierarchy.cxx:873
void EnableReading(const Hierarchy &upto=nullptr)
Enable element and all its parents to read data.
Definition: Hierarchy.cxx:1167
void Create(const std::string &name, bool withmutex=false)
Create top-level object with specified name.
Definition: Hierarchy.cxx:934
bool UpdateFromBuffer(const dabc::Buffer &buf, HierarchyStreamKind kind=stream_Full)
Apply modification to hierarchy, using stored binary data
Definition: Hierarchy.cxx:912
bool ReadFromBuffer(const dabc::Buffer &buf)
Read hierarchy from buffer.
Definition: Hierarchy.cxx:896
void EnableHistory(unsigned length=100, bool withchilds=false)
Activate history production for selected element and its childs.
Definition: Hierarchy.cxx:837
int64_t GetArraySize() const
Definition: Record.h:317
std::string AsStr(const std::string &dflt="") const
Definition: Record.cxx:749
uint64_t * GetUIntArr() const
Definition: Record.h:327
bool HasField(const std::string &name) const
Definition: Record.h:498
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
unsigned NumChilds() const
Return number of childs in referenced object.
Definition: Reference.cxx:194
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Reference GetChild(unsigned n) const
Return reference on child n.
Definition: Reference.cxx:199
#define DOUT0(args ...)
Definition: logging.h:156
#define EOUT(args ...)
Definition: logging.h:150
uint32_t BufferSize_t
Definition: Buffer.h:32
std::string format(const char *fmt,...)
Definition: string.cxx:49
@ stream_NoDelete
Definition: Hierarchy.h:135
@ stream_Full
Definition: Hierarchy.h:134