DABC (Data Acquisition Backbone Core)  2.9.9
Monitor.cxx
Go to the documentation of this file.
1 // $Id: Monitor.cxx 4475 2020-04-15 13:55:30Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "fesa/Monitor.h"
17 
18 #include <cstdlib>
19 #include <cmath>
20 
21 #ifdef WITH_FESA
22 #include <cmw-rda/RDAService.h>
23 #include <cmw-rda/Config.h>
24 #include <cmw-rda/DeviceHandle.h>
25 #include <cmw-rda/ReplyHandler.h>
26 
27 
28 class rdaDabcHandler : public rdaReplyHandler
29 {
30  public:
31  fesa::Monitor* fPlayer;
32  std::string fService;
33  rdaRequest* fRequest;
34  rdaData fContext;
35 
36  rdaDabcHandler(fesa::Monitor* p, const std::string &serv) :
37  rdaReplyHandler(),
38  fPlayer(p),
39  fService(serv),
40  fRequest(0),
41  fContext()
42  {
43  }
44 
45  bool subscribe(rdaDeviceHandle* device, const std::string &cycle)
46  {
47  try {
48  fRequest = device->monitorOn(fService.c_str(), cycle.c_str(), false, this, fContext);
49  return fRequest!=0;
50  } catch (const rdaException& ex) {
51  if (fPlayer) fPlayer->ReportServiceError(fService, dabc::format("%s : %s", ex.getType(),ex.getMessage()));
52  EOUT("Exception caught in subscribe: %s %s", ex.getType(), ex.getMessage());
53  } catch (...) {
54  if (fPlayer) fPlayer->ReportServiceError(fService, "subscribe - unknown error");
55  EOUT("Unknown exception caught in subscribe");
56  }
57 
58  return false;
59  }
60 
61  bool unsubscribe(rdaDeviceHandle* device)
62  {
63  try {
64  if (fRequest) device->monitorOff(fRequest);
65  fRequest = 0;
66  return true;
67  } catch (const rdaException& ex) {
68  if (fPlayer) fPlayer->ReportServiceError(fService, dabc::format("%s : %s", ex.getType(),ex.getMessage()));
69  EOUT("Exception caught in subscribe: %s %s", ex.getType(), ex.getMessage());
70  } catch (...) {
71  if (fPlayer) fPlayer->ReportServiceError(fService, "unsubscribe - unknown error");
72  EOUT("Unknown exception caught in unsubscribe");
73  }
74  return false;
75  }
76 
77 
78  virtual void handleReply(const rdaRequest& rq, const rdaData& value)
79  {
80  try
81  {
82  if (fPlayer) fPlayer->ReportServiceChanged(fService, &value);
83  }
84  catch (const rdaException& ex)
85  {
86  if (fPlayer) fPlayer->ReportServiceError(fService, dabc::format("%s : %s", ex.getType(),ex.getMessage()));
87  EOUT( "Exception caught in GSIVoltageHandler: %s %s", ex.getType(), ex.getMessage());
88  }
89  catch (...)
90  {
91  if (fPlayer) fPlayer->ReportServiceError(fService, "handleReply - unknown error");
92  EOUT("Unknown exception caught in handleReply");
93  }
94  }
95 };
96 
97 #endif
98 
99 fesa::Monitor::Monitor(const std::string &name, dabc::Command cmd) :
100  mbs::MonitorSlowControl(name, "Fesa", cmd),
101  fHierarchy(),
102  fRDAService(0),
103  fDevice(0),
104  fHandlers(0),
105  fBlockRec(false)
106 {
107  fHierarchy.Create("fesa-monitor", true);
108 
109  fServerName = Cfg("Server", cmd).AsStr();
110  fDeviceName = Cfg("Device", cmd).AsStr();
111  fCycle = Cfg("Cycle", cmd).AsStr();
112 
113  std::vector<std::string> services = Cfg("Services", cmd).AsStrVect();
114 
115  #ifdef WITH_FESA
116  if (!fServerName.empty() && !fDeviceName.empty() && (services.size()>0)) {
117  fRDAService = rdaRDAService::init();
118 
119  try {
120  fDevice = fRDAService->getDeviceHandle(fDeviceName.c_str(), fServerName.c_str());
121  } catch (...) {
122  EOUT("Device %s on server %s not found", fDeviceName.c_str(), fServerName.c_str());
123  }
124  }
125  #endif
126 
127  for (unsigned n=0;n<services.size();n++) {
128  dabc::Hierarchy item = fHierarchy.CreateHChild(services[n]);
129  item.EnableHistory(100);
130 
131  if (fDevice!=0) {
132  #ifdef WITH_FESA
133  rdaDabcHandler* handler = new rdaDabcHandler(this, services[n]);
134  if (handler->subscribe(fDevice, fCycle))
135  fHandlers.push_back(handler);
136  else
137  delete handler;
138  #endif
139  }
140  }
141 
142  Publish(fHierarchy, "FESA/Monitor");
143 }
144 
146 {
147  #ifdef WITH_FESA
148 
149  for (unsigned n=0;n<fHandlers.size();n++) {
150  fHandlers[n]->unsubscribe(fDevice);
151  delete fHandlers[n];
152  }
153  fHandlers.clear();
154 
155  #endif
156 
157 }
158 
159 void fesa::Monitor::ReportServiceError(const std::string &name, const std::string &err)
160 {
161  dabc::LockGuard lock(fHierarchy.GetHMutex());
162 
163  dabc::Hierarchy item = fHierarchy.GetHChild(name);
164  if (item.null()) return;
165 
166  item.SetField(dabc::prop_error, err);
167 
168  item.MarkChangedItems();
169 }
170 
171 
172 void fesa::Monitor::ReportServiceChanged(const std::string &name, const rdaData* value)
173 {
174  // DOUT0("REPORT FESA SERVICE %s = %5.3f", name.c_str());
175 
176  dabc::LockGuard lock(fHierarchy.GetHMutex());
177 
178  dabc::Hierarchy item = fHierarchy.GetHChild(name);
179  if (item.null()) return;
180 
181  if (item.HasField(dabc::prop_error))
183 
184 #ifdef WITH_FESA
185 
186  // TODO: mark here that we start change hierarchy item, but not immidiately change version
187 
188  unsigned n = 0;
189  // first of all, delete non-existing fields
190  while (n < item.NumFields()) {
191  std::string fname = item.FieldName(n++);
192  if ((fname.find("_") == 0) || value->contains(fname.c_str())) continue;
193  item.RemoveField(fname);
194  n = 0; // start search from beginning
195  }
196 
197  rdaDataIterator iter(*value);
198 
199  while (iter.hasNext()) {
200  rdaDataEntry* entry = iter.next();
201 
202  const char* tag = entry->tag();
203  if ((tag==0) || (*tag==0)) {
204  EOUT("There is no tag specified in field of service %s", name.c_str());
205  continue;
206  }
207 
208  // DOUT0("Service %s tag %s", name.c_str(), tag);
209 
210  switch (entry->getValueType()) {
212  case rdaDataEntry::TYPE_NULL: break;
214  case rdaDataEntry::TYPE_BOOLEAN: {
215  item.SetField(tag, entry->extractBoolean());
216  if (fDoRec && !fBlockRec) fRec.AddLong(tag, entry->extractBoolean(), true);
217  break;
218  }
220  case rdaDataEntry::TYPE_BOOLEAN_ARRAY: {
221  unsigned long size(0);
222  const bool* arr = entry->getBooleanArray(size);
223  std::vector<int64_t> vect;
224  for (unsigned n=0;n<size;n++) vect.push_back(arr[n] ? 1 : 0);
225  item.SetField(tag, vect);
226  break;
227  }
229  case rdaDataEntry::TYPE_BYTE: {
230  item.SetField(tag, (int64_t)entry->extractByte());
231  if (fDoRec && !fBlockRec) fRec.AddLong(tag, (int64_t)entry->extractByte(), true);
232  break;
233  }
235  case rdaDataEntry::TYPE_BYTE_ARRAY: {
236  unsigned long size(0);
237  const signed char* arr = entry->getByteArray(size);
238  std::vector<int64_t> vect;
239  for (unsigned n=0;n<size;n++) vect.push_back((int64_t)arr[n]);
240  item.SetField(tag, vect);
241  break;
242  }
244  case rdaDataEntry::TYPE_SHORT: {
245  item.SetField(tag, (int64_t)entry->extractShort());
246  if (fDoRec && !fBlockRec) fRec.AddLong(tag, (int64_t)entry->extractShort(), true);
247  break;
248  }
250  case rdaDataEntry::TYPE_SHORT_ARRAY: {
251  unsigned long size(0);
252  const short* arr = entry->getShortArray(size);
253  std::vector<int64_t> vect;
254  for (unsigned n=0;n<size;n++) vect.push_back((int64_t)arr[n]);
255  item.SetField(tag, vect);
256  break;
257  }
259  case rdaDataEntry::TYPE_INT: {
260  item.SetField(tag, entry->extractInt());
261  if (fDoRec && !fBlockRec) fRec.AddLong(tag, entry->extractInt(), true);
262  break;
263  }
265  case rdaDataEntry::TYPE_INT_ARRAY: {
266  unsigned long size(0);
267  const int* arr = entry->getIntArray(size);
268 
269  std::vector<int64_t> vect;
270  for (unsigned n=0;n<size;n++) vect.push_back(arr[n]);
271  item.SetField(tag, vect);
272  break;
273  }
275  case rdaDataEntry::TYPE_LONG: {
276  item.SetField(tag, (int64_t)entry->extractLong());
277  if (fDoRec && !fBlockRec) fRec.AddLong(tag, entry->extractLong(), true);
278  break;
279  }
281  case rdaDataEntry::TYPE_LONG_ARRAY: {
282  unsigned long size(0);
283  const long long* arr = entry->getLongArray(size);
284  std::vector<int64_t> vect;
285  for (unsigned n=0;n<size;n++) vect.push_back((int64_t)arr[n]);
286  item.SetField(tag, vect);
287  break;
288  }
290  case rdaDataEntry::TYPE_FLOAT: {
291  item.SetField(tag, entry->extractFloat());
292  if (fDoRec && !fBlockRec) fRec.AddDouble(tag, entry->extractFloat(), true);
293  break;
294  }
296  case rdaDataEntry::TYPE_FLOAT_ARRAY: {
297  unsigned long size(0);
298  const float* arr = entry->getFloatArray(size);
299  std::vector<double> vect;
300  for (unsigned n=0;n<size;n++) vect.push_back(arr[n]);
301  item.SetField(tag, vect);
302  break;
303  }
305  case rdaDataEntry::TYPE_DOUBLE: {
306  item.SetField(tag, entry->extractDouble());
307  if (fDoRec && !fBlockRec) fRec.AddDouble(tag, entry->extractDouble(), true);
308  break;
309  }
311  case rdaDataEntry::TYPE_DOUBLE_ARRAY: {
312  unsigned long size(0);
313  const double* arr = entry->getDoubleArray(size);
314  std::vector<double> vect;
315  for (unsigned n=0;n<size;n++) vect.push_back(arr[n]);
316  item.SetField(tag, vect);
317  break;
318  }
320  case rdaDataEntry::TYPE_STRING: {
321  item.SetField(tag, entry->getString());
322  break;
323  }
325  case rdaDataEntry::TYPE_STRING_ARRAY: {
326  unsigned long size(0);
327  const char** arr = entry->getStringArray(size);
328  std::vector<std::string> vect;
329  for (unsigned n=0;n<size;n++) vect.push_back(arr[n]);
330  item.SetField(tag, vect);
331  break;
332  }
333  }
334  }
335 
336  // TODO: here apply new version and create history entry
337 #endif
338 
339  item.MarkChangedItems();
340 }
341 
343 {
344  dabc::LockGuard lock(fHierarchy.GetHMutex());
345  // between GetRecRawSize and WriteRecRawData record will be blocked - no any updates possible
346  fBlockRec = true;
347  return fRec.GetRawSize();
348 }
349 
350 unsigned fesa::Monitor::WriteRecRawData(void* ptr, unsigned maxsize)
351 {
352  dabc::LockGuard lock(fHierarchy.GetHMutex());
353  unsigned len = fRec.Write(ptr, maxsize);
354  fRec.Clear();
355  fBlockRec = false;
356  return len;
357 }
Represents command with its arguments.
Definition: Command.h:99
Represents objects hierarchy of remote (or local) DABC process.
Definition: Hierarchy.h:285
void MarkChangedItems(uint64_t tm=0)
If any field was modified, item will be marked with new version.
Definition: Hierarchy.h:330
Hierarchy CreateHChild(const std::string &name, bool allowslahes=false, bool sortorder=false)
Create child item in hierarchy with specified name If allowslahes enabled, instead of subfolders item...
Definition: Hierarchy.h:392
Hierarchy GetHChild(const std::string &name, bool allowslahes=false, bool force=false, bool sortorder=false)
Return child, if necessary creates with full subfolder If force specified, missing childs and folders...
Definition: Hierarchy.cxx:944
void EnableHistory(unsigned length=100, bool withchilds=false)
Activate history production for selected element and its childs.
Definition: Hierarchy.cxx:837
Lock guard for posix mutex.
Definition: threads.h:127
unsigned NumFields() const
Definition: Record.h:504
bool HasField(const std::string &name) const
Definition: Record.h:498
bool SetField(const std::string &name, const RecordField &v)
Definition: Record.h:516
bool RemoveField(const std::string &name)
Definition: Record.h:501
std::string FieldName(unsigned cnt) const
Definition: Record.h:507
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Monitor for of FESA data.
Definition: Monitor.h:38
virtual unsigned WriteRecRawData(void *ptr, unsigned maxsize)
Definition: Monitor.cxx:350
Monitor(const std::string &name, dabc::Command cmd=nullptr)
Definition: Monitor.cxx:99
virtual unsigned GetRecRawSize()
Definition: Monitor.cxx:342
std::vector< rdaDabcHandler * > fHandlers
Definition: Monitor.h:48
void ReportServiceError(const std::string &name, const std::string &err)
Definition: Monitor.cxx:159
virtual ~Monitor()
Definition: Monitor.cxx:145
rdaDeviceHandle * fDevice
Definition: Monitor.h:47
void ReportServiceChanged(const std::string &name, const rdaData *v)
Definition: Monitor.cxx:172
#define EOUT(args ...)
Definition: logging.h:150
const char * prop_error
Definition: Hierarchy.cxx:33
std::string format(const char *fmt,...)
Definition: string.cxx:49
Support for MBS - standard GSI DAQ.
Definition: api.h:36