DABC (Data Acquisition Backbone Core)  2.9.9
CombinerModule.cxx
Go to the documentation of this file.
1 // $Id: CombinerModule.cxx 4102 2018-09-28 10:20:13Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "mbs/CombinerModule.h"
17 
18 #include <map>
19 
20 #include "dabc/Manager.h"
21 
22 
23 mbs::CombinerModule::CombinerModule(const std::string &name, dabc::Command cmd) :
24  dabc::ModuleAsync(name, cmd),
25  fInp(),
26  fOut(),
27  fFlushFlag(false),
28  fBuildCompleteEvents(false),
29  fCheckSubIds(false)
30 {
31  EnsurePorts(0, 0, dabc::xmlWorkPool);
32 
33  fBuildCompleteEvents = Cfg(mbs::xmlCombineCompleteOnly,cmd).AsBool(true);
34  fCheckSubIds = Cfg(mbs::xmlCheckSubeventIds,cmd).AsBool(true);
35 
36  fEventIdMask = Cfg(mbs::xmlEvidMask,cmd).AsInt(0);
37  if (fEventIdMask == 0) fEventIdMask = 0xffffffff;
38 
39  fEventIdTolerance = Cfg(mbs::xmlEvidTolerance,cmd).AsInt(0);
40 
41  std::string ratesprefix = Cfg(mbs::xmlCombinerRatesPrefix, cmd).AsStr("Mbs");
42 
43  fSpecialTriggerLimit = Cfg(mbs::xmlSpecialTriggerLimit,cmd).AsInt(12);
44 
45  fExcludeTime = Cfg("ExcludeTime", cmd).AsDouble(5.);
46 
47  double flushtmout = Cfg(dabc::xmlFlushTimeout,cmd).AsDouble(1.);
48 
49  for (unsigned n=0;n<NumInputs();n++) {
50  DOUT0(" MBS COMBINER Port%u: Capacity %u", n, InputQueueCapacity(n));
51 
52  fInp.push_back(ReadIterator());
53  fCfg.push_back(InputCfg());
54  fInp[n].Close();
55  fCfg[n].Reset();
56  }
57 
58  fNumObligatoryInputs = NumInputs();
59 
60  if (flushtmout>0.) CreateTimer("FlushTimer", flushtmout);
61 
62  fEventRateName = ratesprefix+"Events";
63  fDataRateName = ratesprefix+"Data";
64  fInfoName = ratesprefix+"Info";
65  fFileStateName= ratesprefix + "FileOn";
66 
67  DOUT0("Create rate %s", fDataRateName.c_str());
68 
69  CreatePar(fDataRateName).SetRatemeter(false, 3.).SetUnits("MB");
70  CreatePar(fEventRateName).SetRatemeter(false, 3.).SetUnits("Ev");
71 
72  // must be configured in xml file
73  // fDataRate->SetDebugOutput(true);
74 
75  CreateCmdDef(mbs::comStartFile)
76  .AddArg(dabc::xmlFileName, "string", true)
77  .AddArg(dabc::xmlFileSizeLimit, "int", false, 1000);
78 
79  CreateCmdDef(mbs::comStopFile);
80 
81  CreateCmdDef(mbs::comStartServer)
83 
84  CreateCmdDef(mbs::comStopServer);
85 
86  CreatePar(fInfoName, "info").SetSynchron(true, 2., false);
87  CreatePar(fFileStateName).Dflt(false);
88 
89  PublishPars(dabc::format("$CONTEXT$/%sCombinerModule",ratesprefix.c_str()));
90 
91  SetInfo(dabc::format("MBS combiner module ready. Mode: full events only:%d, subids check:%d flush:%3.1f" ,fBuildCompleteEvents,fCheckSubIds,flushtmout), true);
92 }
93 
95 {
96  DOUT0("mbs::CombinerModule::DTOR - does nothing!");
97 }
98 
100 {
101  DOUT0("mbs::CombinerModule::ModuleCleanup()");
102 
103  fOut.Close().Release();
104  for (unsigned n=0;n<fInp.size();n++)
105  fInp[n].Reset();
106 }
107 
108 
109 void mbs::CombinerModule::SetInfo(const std::string &info, bool forceinfo)
110 {
111 
112  Par(fInfoName).SetValue(info);
113 
114  if (forceinfo) Par(fInfoName).FireModified();
115 
116 /*
117  dabc::Logger::Debug(lvl, __FILE__, __LINE__, __func__, info.c_str());
118 
119  dabc::TimeStamp now = dabc::Now();
120 
121  if (forceinfo || (now > fLastInfoTm + 2.)) {
122  Par("MbsCombinerInfo").SetStr(dabc::format("%s: %s", GetName(), info.c_str()));
123  fLastInfoTm = now;
124  if (!forceinfo) DOUT0("%s: %s", GetName(), info.c_str());
125  }
126 */
127 }
128 
129 
131 {
132  if (fFlushFlag) {
133  unsigned cnt = 0;
134  while (IsRunning() && (cnt<100) && BuildEvent()) ++cnt;
135  FlushBuffer();
136  }
137  fFlushFlag = true;
138 }
139 
141 {
142  if (fOut.IsEmpty() || !fOut.IsBuffer()) return false;
143 
144  if (!CanSendToAllOutputs()) return false;
145 
146  dabc::Buffer buf = fOut.Close();
147 
148  DOUT3("Send buffer of size = %d", buf.GetTotalSize());
149 
150  SendToAllOutputs(buf);
151 
152  fFlushFlag = false; // indicate that next flush timeout one not need to send buffer
153 
154  return true;
155 }
156 
158 {
159  DOUT0("mbs::CombinerModule::BeforeModuleStart name: %s is calling first build event...", GetName());
160 
161 
162  // FIXME: why event processing already done here ???
163 
164  unsigned cnt=0;
165 
166  while (cnt<100 && BuildEvent()) cnt++;
167 
168  DOUT0("mbs::CombinerModule::BeforeModuleStart name: %s is finished %u %u", GetName(), NumInputs(), NumOutputs());
169 }
170 
172 {
173  // FIXME: we should process data which is remains in the input queues
174  // probably, we could build incomplete events if they are allowed
175 }
176 
178 {
179  fCfg[ninp].curr_evnt_num = 0;
180  fCfg[ninp].curr_evnt_special = false;
181  fCfg[ninp].valid = false;
182 
183  fInp[ninp].Close();
184  SkipInputBuffers(ninp, 1);
185 
186  return true;
187 }
188 
190 {
191  // always set event number to 0
192  fCfg[ninp].curr_evnt_num = 0;
193  fCfg[ninp].curr_evnt_special = false;
194  fCfg[ninp].valid = false;
195 
196  bool foundevent(false);
197 
198  while (!foundevent) {
199 
200  if (!fInp[ninp].IsData()) {
201 
202  if (NumCanRecv(ninp)==0) return false;
203 
204  if (!fInp[ninp].Reset(RecvQueueItem(ninp, 0))) {
205 
206  // skip buffer and try again
207  fInp[ninp].Close();
208  SkipInputBuffers(ninp, 1);
209  continue;
210  }
211  }
212 
213  bool res = fInp[ninp].NextEvent();
214 
215  if (!res || (fInp[ninp].evnt()==0)) {
216  fInp[ninp].Close();
217  SkipInputBuffers(ninp, 1);
218  continue;
219  }
220 
221  if (fCfg[ninp].real_mbs && (fInp[ninp].evnt()->iTrigger>=fSpecialTriggerLimit)) {
222  // TODO: Probably, one should combine trigger events from all normal mbs channels.
223  foundevent = true;
224  fCfg[ninp].curr_evnt_special = true;
225  fCfg[ninp].curr_evnt_num = fInp[ninp].evnt()->EventNumber();
226  DOUT1("Found special event with trigger %d on input %u", fInp[ninp].evnt()->iTrigger, ninp);
227  } else
228  if (fCfg[ninp].real_evnt_num) {
229  foundevent = true;
230  fCfg[ninp].curr_evnt_num = fInp[ninp].evnt()->EventNumber() & fEventIdMask;
231 
232  // DOUT1("Find in input %u event %u", ninp, fCfg[ninp].curr_evnt_num);
233  } else
234  if (fCfg[ninp].no_evnt_num) {
235 
236  // indicate that data in optional input was found, should be append to the next event
237  foundevent = true;
238 
239  } else {
240  mbs::SubeventHeader* subevnt = fInp[ninp].evnt()->SubEvents();
241  fCfg[ninp].curr_evnt_num = 0;
242 
243  while (subevnt!=0) {
244  // DOUT1("Saw subevent fullid %u", subevnt->fFullId);
245  if (subevnt->fFullId == fCfg[ninp].evntsrc_fullid) break;
246  subevnt = fInp[ninp].evnt()->NextSubEvent(subevnt);
247  }
248 
249  if (subevnt!=0) {
250  uint32_t* data = (uint32_t*) (((uint8_t*) subevnt->RawData()) + fCfg[ninp].evntsrc_shift);
251 
252  if (fCfg[ninp].evntsrc_shift + sizeof(uint32_t) <= subevnt->RawDataSize()) {
253  foundevent = true;
254  fCfg[ninp].curr_evnt_num = *data & fEventIdMask; // take only required bits
255  //DOUT1("Find in input %u event %u (in subevent)", ninp, fCfg[ninp].curr_evnt_num);
256  } else {
257  EOUT("Subevent too small %u compare with required shift %u for id location", subevnt->RawDataSize(), fCfg[ninp].evntsrc_shift);
258  }
259  } else {
260  EOUT("Did not found subevent for id location");
261  }
262  }
263  }
264 
265 // if (ninp==2)
266 // DOUT1("Inp%u Event%d", ninp, fCfg[ninp].curr_evnt_num);
267 
268  fCfg[ninp].valid = true;
269 
270  return true;
271 }
272 
273 
274 
276 {
277  mbs::EventNumType mineventid(0), maxeventid(0), triggereventid(0);
278 
279  // indicate if some of main (non-optional) input queues are mostly full
280  // if such queue will be found, incomplete event may be build when it is allowed
281 
282 // DOUT0("BuildEvent method");
283 
284  int mostly_full(-1);
285  bool required_missing(false);
286 
287  for (unsigned ninp=0; ninp<fCfg.size(); ninp++) {
288 // DOUT0(" Port%u: pending %u capacity %u", ninp, port->InputPending(), port->InputQueueCapacity());
289 
290  if (IsOptionalInput(ninp)) continue;
291 
292  if (!IsInputConnected(ninp)) required_missing = true;
293 
294  if (InputQueueFull(ninp) && (mostly_full<0))
295  mostly_full = (int) ninp;
296  }
297 
298  if (required_missing && fBuildCompleteEvents) {
299  // if some of important input missing than we should clean our queues
300  // to let data flowing, no event will be produced to output
301 
302  for (unsigned ninp=0; ninp<fCfg.size(); ninp++) {
303  if (InputQueueFull(ninp) ||
304  (fCfg[ninp].no_evnt_num && (NumCanRecv(ninp) > InputQueueCapacity(ninp) / 2))) {
305  ShiftToNextBuffer(ninp);
306  SetInfo(dabc::format("Skip buffer on input %u while some other input is disconnected", ninp));
307  DOUT0("Skip buffer on input %u",ninp);
308  }
309  }
310 
311  // all queues now have at least one empty entry, one could wait for the next event
312  return false;
313  }
314 
315  int hasTriggerEvent = -1;
316  int num_valid = 0;
317 
318  double tm_now = dabc::TimeStamp::Now().AsDouble();
319 
320  for (unsigned ninp=0; ninp<fCfg.size(); ninp++) {
321 
322  fCfg[ninp].selected = false;
323 
324  if (fCfg[ninp].last_valid_tm <= 0) fCfg[ninp].last_valid_tm = tm_now;
325 
326  if (fInp[ninp].evnt()==0) {
327  if (!ShiftToNextEvent(ninp)) {
328  // if optional input is absent just continue
329  if (fCfg[ninp].no_evnt_num) continue;
330  // we can now exclude this input completely while some other is mostly full
331  if ((mostly_full>=0) && !fBuildCompleteEvents) continue;
332 
333  if (fCfg[ninp].optional_input && (tm_now > (fCfg[ninp].last_valid_tm + fExcludeTime))) continue;
334 
335  return false;
336  } else {
337  fCfg[ninp].last_valid_tm = tm_now;
338  }
339  }
340 
341  if (fCfg[ninp].no_evnt_num) continue;
342 
343  mbs::EventNumType evid = fCfg[ninp].curr_evnt_num;
344 
345  if (num_valid == 0) {
346  mineventid = evid;
347  maxeventid = evid;
348  } else {
349  if (evid < mineventid) mineventid = evid; else
350  if (evid > maxeventid) maxeventid = evid;
351  }
352 
353  num_valid++;
354 
355  if (fCfg[ninp].curr_evnt_special && (hasTriggerEvent<0)) {
356  hasTriggerEvent = ninp;
357  triggereventid = evid;
358  }
359 
360  } // for ninp
361 
362  if (num_valid==0) return false;
363 
364  // we always try to build event with minimum id
365  mbs::EventNumType buildevid(mineventid);
366  mbs::EventNumType diff = maxeventid - mineventid;
367 
368  // if any trigger event found, it will be send as is
369  if (hasTriggerEvent>=0) {
370  buildevid = triggereventid;
371  fCfg[hasTriggerEvent].selected = true;
372  diff = 0;
373 
374  } else {
375 
376  // but due to event counter overflow one should build event with maxid
377  if (diff > fEventIdMask/2) {
378  buildevid = maxeventid;
379  diff = fEventIdMask - diff + 1;
380  }
381 
382  if ((fEventIdTolerance > 0) && (diff > fEventIdTolerance)) {
383  SetInfo(dabc::format("Event id difference %u exceeding tolerance window %u, stopping dabc!", diff, fEventIdTolerance), true);
385  return false; // need to return immediately after stop state is set
386  }
387 
388  // select inputs which will be used for building
389  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++)
390  if (fCfg[ninp].valid && ((fCfg[ninp].curr_evnt_num == buildevid) || fCfg[ninp].no_evnt_num))
391  fCfg[ninp].selected = true;
392  }
393 
394  // calculated result event size and define if mbs header is available
395  // also check here if all subids are unique
396  uint32_t subeventssize = 0;
397  // define number of input which will be used to copy mbs header
398  int copyMbsHdrId = -1;
399  std::map<uint32_t, bool> subid_map;
400  unsigned num_selected_important(0), num_selected_all(0);
401 
402  // check of unique subevent ids:
403  bool duplicatefound = false;
404 
405  // indicate if important input skipped - means input which could have important data,
406  // used to check if incomplete event can be build when if fBuildCompleteEvents = true
407  bool important_input_skipped = false;
408 
409  int firstselected = -1;
410 
411  std::string sel_str;
412 
413  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
414  if (!fCfg[ninp].selected) {
415  // input without number can be skipped without any problem
416  if (fCfg[ninp].no_evnt_num) continue;
417 
418  // if optional input not selected, but has valid data than it is not important for us
419  // if no new events on the optional input for a long time, one can skip it as well
420  if (fCfg[ninp].optional_input)
421  if (fCfg[ninp].valid || (tm_now > (fCfg[ninp].last_valid_tm + fExcludeTime))) continue;
422 
423  important_input_skipped = true;
424 
425  continue;
426  }
427 
428  sel_str += dabc::format(" %d", ninp);
429 
430  num_selected_all++;
431 
432  if (!IsOptionalInput(ninp)) {
433  // take into account only events with "normal" event number
434  if (firstselected<0) firstselected = ninp;
435  num_selected_important++;
436  if (fCfg[ninp].real_mbs && (copyMbsHdrId<0)) copyMbsHdrId = ninp;
437  }
438 
439  subeventssize += fInp[ninp].evnt()->SubEventsSize();
440 
441  if (fCheckSubIds)
442  while (fInp[ninp].NextSubEvent()) {
443  uint32_t fullid = fInp[ninp].subevnt()->fFullId;
444  if (subid_map.find(fullid) != subid_map.end()) {
445  EOUT("Duplicate fullid = 0x%x", fullid);
446  duplicatefound = true;
447  }
448  subid_map[fullid] = true;
449  }
450  }
451 
452  bool do_skip_data = false;
453 
454 
455 
456  if (fBuildCompleteEvents && important_input_skipped && (hasTriggerEvent<0)) {
457  SetInfo(dabc::format("Skip incomplete event %u, found inputs %u required %u selected %s", buildevid, num_selected_important, NumObligatoryInputs(), sel_str.c_str()));
458  do_skip_data = true;
459 // DOUT0("Skip incomplete event %u, found inputs %u required %u diff %u selected %s", buildevid, num_selected_important, NumObligatoryInputs(), diff, sel_str.c_str());
460  } else
461  if (duplicatefound && (hasTriggerEvent<0)) {
462  SetInfo(dabc::format("Skip event %u while duplicates subevents found", buildevid));
463  do_skip_data = true;
464 // DOUT0("Skip event %u while duplicates subevents found", buildevid);
465  } else {
466 
467  if (fBuildCompleteEvents && (num_selected_important < NumObligatoryInputs())) {
468  SetInfo( dabc::format("Build incomplete event %u, found inputs %u required %u first %d diff %u mostly_full %d", buildevid, num_selected_important, NumObligatoryInputs(), firstselected, diff, mostly_full) );
469 // DOUT0("%s skip optional input and build incomplete event %u, found inputs %u required %u first %d diff %u mostly_full %d", GetName(), buildevid, num_selected_important, NumObligatoryInputs(), firstselected, diff, mostly_full);
470  } else
471  if (important_input_skipped) {
472  SetInfo( dabc::format("Build incomplete event %u, found inputs %u required %u first %d diff %u mostly_full %d", buildevid, num_selected_important, NumObligatoryInputs(), firstselected, diff, mostly_full) );
473 // DOUT0("%s Build incomplete event %u, found inputs %u required %u first %d diff %u mostly_full %d", GetName(), buildevid, num_selected_important, NumObligatoryInputs(), firstselected, diff, mostly_full);
474  } else {
475 // JAM2016: better supress this output:
476 // SetInfo( dabc::format("Build event %u with %u inputs %s", buildevid, num_selected_all, sel_str.c_str()) );
477 // DOUT0("Build event %u with %u inputs selected %s", buildevid, num_selected_all, sel_str.c_str());
478  }
479 
480  // if there is no place for the event, flush current buffer
481  if (fOut.IsBuffer() && !fOut.IsPlaceForEvent(subeventssize))
482  if (!FlushBuffer()) return false;
483 
484  if (!fOut.IsBuffer()) {
485 
486  dabc::Buffer buf = TakeBuffer();
487  if (buf.null()) return false;
488 
489  if (!fOut.Reset(buf)) {
490  SetInfo("Cannot use buffer for output - hard error!!!!", true);
491 
492  buf.Release();
493 
495  return false;
496  }
497  }
498 
499  if (!fOut.IsPlaceForEvent(subeventssize)) {
500  EOUT("Event size %u too big for buffer, skip event %u completely", subeventssize+ sizeof(mbs::EventHeader), buildevid);
501  } else {
502 
503  if (copyMbsHdrId<0) {
504  // SetInfo("No mbs eventid found in mbs event number mode, stop dabc", true);
505  // dabc::mgr.StopApplication();
506  }
507 
508  DOUT4("Building event %u num_valid %u", buildevid, num_valid);
509  fOut.NewEvent(buildevid); // note: this header id may be overwritten due to mode
510 
511  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++) {
512 
513  if (fCfg[ninp].selected) {
514 
515  // if header id still not defined, used first
516  if (copyMbsHdrId<0) copyMbsHdrId = ninp;
517 
518  if (!fInp[ninp].IsData())
519  throw dabc::Exception("Input has no buffer but used for event building");
520 
521  dabc::Pointer ptr;
522  fInp[ninp].AssignEventPointer(ptr);
523 
524  ptr.shift(sizeof(mbs::EventHeader));
525 
526  if (ptr.segmid()>100)
527  throw dabc::Exception("Bad segment id");
528 
529  fOut.AddSubevent(ptr);
530  }
531  }
532 
533  fOut.evnt()->CopyHeader(fInp[copyMbsHdrId].evnt());
534 
535  fOut.FinishEvent();
536 
537  DOUT4("Produced event %d subevents %u", buildevid, subeventssize);
538 
539  Par(fEventRateName).SetValue(1);
540  Par(fDataRateName).SetValue((subeventssize + sizeof(mbs::EventHeader))/1024./1024.);
541 
542  // if output buffer filled already, flush it immediately
543  if (!fOut.IsPlaceForEvent(0))
544  FlushBuffer();
545  }
546  } // end of incomplete event
547 
548  for (unsigned ninp = 0; ninp < fCfg.size(); ninp++)
549  if (fCfg[ninp].selected) {
550  // if just skipping data, do not remove special (EPICS) inputs
551  if (do_skip_data && fCfg[ninp].no_evnt_num) continue;
552 
553  if (ShiftToNextEvent(ninp))
554  fCfg[ninp].last_valid_tm = tm_now;
555  }
556 
557  // return true means that method can be called again immediately
558  // in all places one requires while loop
559  return true;
560 }
561 
562 
564 {
565 
566 
567 
569 // if (cmd.IsName(mbs::comStartFile)) {
570 // if (NumOutputs()<2) {
571 // EOUT("No ports was created for the file");
572 // return dabc::cmd_false;
573 // }
574 //
575 // // TODO: check if it works, probably some parameters should be taken from original command
576 // bool res = dabc::mgr.CreateTransport(OutputName(1, true));
577 // return cmd_bool(res);
578 // } else
579 // if (cmd.IsName(mbs::comStopFile)) {
580 //
581 // FindPort(OutputName(1)).Disconnect();
582 //
583 // SetInfo("Stop file", true);
584 //
585 // return dabc::cmd_true;
586 // } else
587 // if (cmd.IsName(mbs::comStartServer)) {
588 // if (NumOutputs()<1) {
589 // EOUT("No ports was created for the server");
590 // return dabc::cmd_false;
591 // }
592 //
593 // bool res = dabc::mgr.CreateTransport(OutputName(0, true));
594 //
595 // return cmd_bool(res);
596 // } else
597 // if (cmd.IsName(mbs::comStopServer)) {
598 // FindPort(OutputName()).Disconnect();
599 //
600 // SetInfo("Stop server", true);
601 // return dabc::cmd_true;
602 // }
604 
605 
607  if (cmd.IsName(mbs::comStartFile)) {
608 
609  std::string fname = cmd.GetStr(dabc::xmlFileName); //"filename")
610  int maxsize = cmd.GetInt(dabc::xml_maxsize, 30); // maxsize
611  std::string url = dabc::format("%s://%s?%s=%d", mbs::protocolLmd, fname.c_str(), dabc::xml_maxsize, maxsize);
612  EnsurePorts(0, 2);
613  bool res = dabc::mgr.CreateTransport(OutputName(1, true), url);
614  DOUT0("Started file %s res = %d", url.c_str(), res);
615  SetInfo(dabc::format("Execute StartFile for %s, result=%d",url.c_str(), res), true);
616  ChangeFileState(true);
617  return cmd_bool(res);
618  } else
619  if (cmd.IsName(mbs::comStopFile)) {
620  FindPort(OutputName(1)).Disconnect();
621  SetInfo("Stopped file", true);
622  ChangeFileState(false);
623  return dabc::cmd_true;
624  } else
625  if (cmd.IsName(mbs::comStartServer)) {
626  if (NumOutputs()<1) {
627  EOUT("No ports was created for the server");
628  return dabc::cmd_false;
629  }
630  std::string skind = cmd.GetStr(mbs::xmlServerKind);
631 
632  int port = cmd.GetInt(mbs::xmlServerPort, 6666);
633  std::string url = dabc::format("mbs://%s?%s=%d", skind.c_str(), mbs::xmlServerPort, port);
634  EnsurePorts(0, 1);
635  bool res = dabc::mgr.CreateTransport(OutputName(0, true));
636  DOUT0("Started server %s res = %d", url.c_str(), res);
637  SetInfo(dabc::format("Execute StartServer for %s, result=%d",url.c_str(), res), true);
638  return cmd_bool(res);
639  } else
640  if (cmd.IsName(mbs::comStopServer)) {
641  FindPort(OutputName(0)).Disconnect();
642  SetInfo("Stopped server", true);
643  return dabc::cmd_true;
644  }
646 
647 
648 
649  else
650  if (cmd.IsName("ConfigureInput")) {
651  unsigned ninp = cmd.GetUInt("Port", 0);
652 // DOUT0("Start input configure %u size %u", ninp, fCfg.size());
653  if (ninp<fCfg.size()) {
654 
655 // DOUT0("Do0 input configure %u size %u", ninp, fCfg.size());
656 
657  fCfg[ninp].real_mbs = cmd.GetBool("RealMbs", fCfg[ninp].real_mbs);
658  fCfg[ninp].real_evnt_num = cmd.GetBool("RealEvntNum", fCfg[ninp].real_evnt_num);
659  fCfg[ninp].no_evnt_num = cmd.GetBool("NoEvntNum", fCfg[ninp].no_evnt_num);
660  fCfg[ninp].optional_input = cmd.GetBool("Optional", fCfg[ninp].optional_input);
661 
662 // DOUT0("Do1 input configure %u size %u", ninp, fCfg.size());
663  fCfg[ninp].evntsrc_fullid = cmd.GetUInt("EvntSrcFullId", fCfg[ninp].evntsrc_fullid);
664  fCfg[ninp].evntsrc_shift = cmd.GetUInt("EvntSrcShift", fCfg[ninp].evntsrc_shift);
665 
666 // DOUT0("Do2 input configure %u size %u", ninp, fCfg.size());
667 
668  std::string ratename = cmd.GetStr("RateName", "");
669  if (!ratename.empty())
670  SetPortRatemeter(InputName(ninp), CreatePar(ratename).SetRatemeter(false,1.));
671 
672 // DOUT0("Do3 input configure %u size %u", ninp, fCfg.size());
673 
674  // optional imputs not need to be accounted for obligatory inputs
675  if (fCfg[ninp].optional_input || fCfg[ninp].no_evnt_num) {
676  if (fNumObligatoryInputs>1) fNumObligatoryInputs--;
677  }
678 
679  // events without number could not be MBS events
680  if (fCfg[ninp].no_evnt_num) {
681  fCfg[ninp].real_mbs = false;
682  }
683 
684  DOUT1("Configure input%u of module %s: RealMbs:%s RealEvntNum:%s EvntSrcFullId: 0x%x EvntSrcShift: %u",
685  ninp, GetName(),
686  DBOOL(fCfg[ninp].real_mbs), DBOOL(fCfg[ninp].real_evnt_num),
687  fCfg[ninp].evntsrc_fullid, fCfg[ninp].evntsrc_shift);
688 
689 // DOUT0("Do4 input configure %u size %u", ninp, fCfg.size());
690 
691  }
692 
693  return dabc::cmd_true;
694  }
695 
696 
697 
699 
700 }
701 
703 {
704  return 0xffffffff;
705 }
706 
707 // JAM2016 - adopted from pexorplugin readout module
709 {
710  SetParValue(fFileStateName, on);
711 }
712 
713 
714 
Reference on memory from memory pool.
Definition: Buffer.h:135
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
Represents command with its arguments.
Definition: Command.h:99
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
Definition: Command.h:148
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
DABC exception.
Definition: Exception.h:57
void StopApplication()
Definition: Manager.cxx:2231
bool CreateTransport(const std::string &portname, const std::string &transportkind="", const std::string &thrdname="")
Definition: Manager.cxx:2210
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
Manipulator with dabc::Buffer class.
Definition: Pointer.h:34
BufferSize_t shift(BufferSize_t sz)
Definition: Pointer.h:153
unsigned segmid() const
Definition: Pointer.h:211
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
Read iterator for HADAQ events/subevents.
Definition: Iterator.h:39
void SetInfo(const std::string &info, bool forceinfo=false)
virtual unsigned int GetOverflowEventNumber() const
bool ShiftToNextEvent(unsigned ninp)
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
CombinerModule(const std::string &name, dabc::Command cmd=nullptr)
virtual void AfterModuleStop()
virtual void ProcessTimerEvent(unsigned timer)
Method called by framework when timer event is produced.
void ChangeFileState(bool on)
change file on/off state in application
virtual void ModuleCleanup()
Method, which can be reimplemented by user and should cleanup all references on buffers and other obj...
bool ShiftToNextBuffer(unsigned ninp)
Method should be used to skip current buffer from the queue.
virtual void BeforeModuleStart()
unsigned fullid
Definition: hldprint.cxx:995
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
Event manipulation API.
Definition: api.h:23
const char * xmlWorkPool
Definition: Object.cxx:46
const char * xml_maxsize
Definition: Object.cxx:73
const char * xmlFileName
Definition: Object.cxx:70
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * xmlFlushTimeout
Definition: Object.cxx:61
const char * xmlFileSizeLimit
Definition: Object.cxx:72
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38
const char * comStartServer
Definition: MbsTypeDefs.cxx:37
const char * comStartFile
Definition: MbsTypeDefs.cxx:39
const char * xmlCombineCompleteOnly
Definition: MbsTypeDefs.cxx:55
const char * comStopFile
Definition: MbsTypeDefs.cxx:40
@ StreamServer
Definition: MbsTypeDefs.h:355
const char * xmlSpecialTriggerLimit
Definition: MbsTypeDefs.cxx:59
const char * xmlEvidMask
Definition: MbsTypeDefs.cxx:57
const char * comStopServer
Definition: MbsTypeDefs.cxx:38
uint32_t EventNumType
Definition: MbsTypeDefs.h:94
const char * xmlCheckSubeventIds
Definition: MbsTypeDefs.cxx:56
const char * xmlCombinerRatesPrefix
Definition: MbsTypeDefs.cxx:60
const char * xmlServerKind
Definition: MbsTypeDefs.cxx:44
const char * xmlEvidTolerance
Definition: MbsTypeDefs.cxx:58
const char * xmlServerPort
Definition: MbsTypeDefs.cxx:45
const char * ServerKindToStr(int kind)
const char * protocolLmd
Definition: MbsTypeDefs.cxx:31
static TimeStamp Now()
Method returns TimeStamp instance with current time stamp value, measured either by fast TSC (if it i...
Definition: timing.h:176
double AsDouble() const
Return time stamp in form of double (in seconds)
Definition: timing.h:120
MBS subevent
Definition: MbsTypeDefs.h:40
void * RawData() const
Definition: MbsTypeDefs.h:75
uint32_t RawDataSize() const
Definition: MbsTypeDefs.h:78