23 #include "TBufferFile.h" 32 #include "dabc/version.h" 33 #include "dabc/Hierarchy.h" 34 #include "dabc/Manager.h" 35 #include "dabc/Publisher.h" 36 #include "dabc/Configuration.h" 41 if ((item.GetField(
"dabc:kind").AsStr() ==
"rate") ||
42 (item.GetField(
"dabc:history").AsInt() > 0))
return kTRUE;
48 std::string kind = item.GetField(
"dabc:kind").AsStr();
49 if (kind.find(
"ROOT.") == 0) {
65 ROOT::Internal::RConcurrentHashColl::HashValue hash;
66 return { (TList *)
mylist->Clone(), 0, hash };
73 gROOT->GetListOfFiles()->Remove(
this);
90 int fHistoryLength{0};
94 Bool_t fCompression{
false};
113 fObjName =
"StreamerInfo";
114 fItemName = sinfoname;
115 fRootClassName =
"TList";
136 fObjName = item.GetName();
137 fItemName = item.ItemName();
138 std::string kind = item.GetField(dabc::prop_kind).AsStr();
139 fHistoryLength = item.GetField(dabc::prop_history).AsInt();
141 fMasterName = item.GetField(dabc::prop_masteritem).AsStr();
142 fMasterItemName = item.FindMaster().ItemName();
144 if (kind.find(
"ROOT.") == 0) {
146 fRootClassName = kind;
150 if (fHistoryLength > 0) fRootClassName =
"TGraph";
162 Bool_t
GetObject(TObject *&obj, Bool_t &owner)
const override {
return kFALSE; }
166 if (fRootClassName.length() > 0)
173 return fObjName.c_str();
178 if (fRootClassName.length()>0)
return fRootClassName.c_str();
180 return "dabc::Hierarchy";
187 dabc::WorkerRef wrk = dabc::mgr.FindItem(
"/Go4ReplWrk");
188 if (wrk.null())
return 0;
190 if (fIsRate && (fHistoryLength>0)) {
191 dabc::CmdGetBinary cmd2;
192 cmd2.SetStr(
"Item", fItemName);
193 cmd2.SetStr(
"Kind",
"hierarchy");
194 cmd2.SetStr(
"Query", TString::Format(
"history=%d",fHistoryLength).Data());
195 cmd2.SetTimeout(10.);
196 cmd2.SetReceiver(fNodeName + dabc::Publisher::DfltName());
198 cmd2.SetPtr(
"#DabcAccess",
this);
203 if (dabc::mgr.GetCommandChannel().Submit(cmd2))
return 2;
206 if (!fRootClassName.empty()) {
208 TClass *cl = TClass::GetClass(fRootClassName.c_str());
210 TGo4Log::Error(
"GetObject Unknown class %s", fRootClassName.c_str());
214 if (!cl->InheritsFrom(TObject::Class())) {
216 TGo4Log::Error(
"GetObject Class %s not derived from TObject", fRootClassName.c_str());
220 dabc::CmdGetBinary cmd(fItemName,
"root.bin", fCompression ?
"zipped" :
"");
224 cmd.SetReceiver(fNodeName + dabc::Publisher::DfltName());
226 cmd.SetPtr(
"#DabcAccess",
this);
233 if (dabc::mgr.GetCommandChannel().Submit(cmd))
return 2;
244 if (cmd.GetResult() != dabc::cmd_true) {
245 TGo4Log::Error(
"dabc::Command %s execution failed", cmd.GetName());
250 fRawData = cmd.GetRawData();
252 if (fRawData.null() || (fRawData.NumSegments() == 0)) {
253 TGo4Log::Error(
"Did not get raw data from dabc::Command %s", cmd.GetName());
257 if (fIsRate && (fHistoryLength>0)) {
260 res.SetVersion(cmd.GetUInt(
"version"));
261 res.ReadFromBuffer(fRawData);
265 dabc::HistoryIter iter = res.MakeHistoryIter();
267 while (iter.next()) cnt++;
271 if (cnt == 0)
return kFALSE;
273 TGraph *gr =
new TGraph(cnt);
274 gr->SetName(fObjName.c_str());
275 gr->SetTitle(TString::Format(
"%s ratemeter", fItemName.c_str()).Data());
277 while (iter.next()) {
279 double v = iter.GetField(
"value").AsDouble();
280 uint64_t tm = iter.GetField(
"time").AsUInt();
285 gr->SetPoint(cnt-i, tm / 1000, v);
288 gr->GetXaxis()->SetTimeDisplay(1);
289 gr->GetXaxis()->SetTimeFormat(
"%H:%M:%S%F1970-01-01 00:00:00");
291 DoObjectAssignement(fxReceiver, fxRecvPath.Data(), gr, kTRUE);
295 if (!fRootClassName.empty()) {
296 TClass *cl = TClass::GetClass(fRootClassName.c_str());
304 if (!fMasterName.empty() && tgtslot && tgtslot->GetParent())
310 Int_t local_master_version = 0;
312 if (!masterslot->
GetIntPar(
"dabc_version", local_master_version))
313 local_master_version = 0;
315 if (local_master_version >= cmd.GetInt(
"MVersion")) {
318 if (masterslot->
GetPar(
"dabc_loading")) {
327 TGo4Log::Error(
"Fail to request MASTER item %s from DABC node %s", fMasterItemName.c_str(), fNodeName.c_str());
331 masterslot->
SetPar(
"dabc_loading",
"true");
336 TGo4Log::Debug(
"Item %s raw_data size %u master_version %u", fItemName.c_str(), fRawData.GetTotalSize(), cmd.GetInt(
"MVersion"));
338 char *pobj = (
char *) cl->New();
341 TGo4Log::Error(
"ReadObj - Cannot create new object of class %s", cl->GetName());
345 Int_t baseOffset = cl->GetBaseClassOffset(TObject::Class());
346 if (baseOffset==-1) {
350 TGo4Log::Error(
"ReadObj Incorrect detection of the inheritance from TObject for class %s.",
355 TObject *tobj = (TObject *) (pobj+baseOffset);
358 char *rawbuf =
nullptr;
363 int sizeinp = fRawData.GetTotalSize(), sizeout = 0, irep = 0;
365 if (R__unzip_header(&sizeinp, (
unsigned char *) fRawData.SegmentPtr(), &sizeout)) {
366 printf(
"Fail to decode zip header\n");
368 rawbuf = (
char*) malloc(sizeout);
370 R__unzip(&sizeinp, (
unsigned char *) fRawData.SegmentPtr(), &sizeout, (
unsigned char *) rawbuf, &irep);
375 rawbuf = (
char *) fRawData.SegmentPtr();
376 rawbuflen = fRawData.GetTotalSize();
380 TBufferFile buf(TBuffer::kRead, rawbuflen, rawbuf, kFALSE);
381 buf.MapObject(pobj,cl);
384 cl->Destructor(pobj);
389 if (fCompression) free(rawbuf);
391 if ((fObjName ==
"StreamerInfo") &&
392 (fRootClassName ==
"TList") &&
393 tobj && tobj->InheritsFrom(TList::Class()) &&
394 fMasterName.empty()) {
397 fff.ReadStreamerInfo();
404 tgtslot->RemovePar(
"dabc_loading");
405 tgtslot->SetIntPar(
"dabc_version", cmd.GetInt(
"BVersion"));
407 DoObjectAssignement(fxReceiver, fxRecvPath.Data(), tobj, kTRUE);
461 Start((Long_t)res_tm*1000, kTRUE);
471 if (cmd.GetPtr(
"#DabcAccess") || cmd.GetPtr(
"#DabcProxy")) {
477 return dabc::Worker::ReplyCommand(cmd);
482 dabc::Worker(MakePair(name))
510 if (fParent.NumChilds() == 0)
return kFALSE;
514 fChild = fParent.GetChild(fCnt);
518 if (fCnt>=fParent.NumChilds())
return kFALSE;
519 fChild = fParent.GetChild(fCnt);
522 return !fChild.null();
525 Bool_t
isfolder()
override {
return fChild.NumChilds() > 0; }
529 if (strcmp(flagname,
"IsRemote") == 0)
return 1;
540 const char *
name()
override {
return fChild.GetName(); }
541 const char *
info()
override {
return "item from dabc"; }
563 if (fClNameBuf.Length()>0)
return fClNameBuf.Data();
565 return "dabc::Hierarchy";
576 fxHierarchy(nullptr),
577 fxParentSlot(nullptr)
596 if (!dabc::CreateManager(
"cmd", 0))
return kFALSE;
598 std::string node = dabc::MakeNodeName(nodename);
600 if (!dabc::ConnectDabcNode(node)) {
605 dabc::WorkerRef wrk = dabc::mgr.FindItem(
"/Go4ReplWrk");
609 wrk()->AssignToThread(dabc::mgr.thread());
621 dabc::Command cmd = *((dabc::Command*)_cmd);
623 if (cmd.IsName(dabc::CmdGetNamesList::CmdName())) {
625 dabc::Buffer buf = cmd.GetRawData();
627 if (buf.null())
return kFALSE;
632 dabc::Hierarchy& hierarchy = *((dabc::Hierarchy*)
fxHierarchy);
636 if (!hierarchy.ReadFromBuffer(buf))
return kFALSE;
651 dabc::CmdGetNamesList cmd2;
652 cmd2.SetReceiver(std::string(
fNodeName.Data()) + dabc::Publisher::DfltName());
653 cmd2.SetTimeout(10.);
655 dabc::WorkerRef wrk = dabc::mgr.FindItem(
"/Go4ReplWrk");
657 if (!sync && !wrk.null()) {
658 cmd2.SetPtr(
"#DabcProxy",
this);
661 dabc::mgr.GetCommandChannel().Submit(cmd2);
665 if (dabc::mgr.GetCommandChannel().Execute(cmd2)!=dabc::cmd_true) {
686 dabc::Hierarchy& hierarchy = *((dabc::Hierarchy*)
fxHierarchy);
688 return hierarchy.NumChilds() > 0;
696 dabc::Hierarchy& hierarchy = *((dabc::Hierarchy*)
fxHierarchy);
698 if (hierarchy.null())
702 return std::make_unique<TGo4DabcAccess>(
fNodeName.Data(), hierarchy);
704 dabc::Hierarchy child = hierarchy.FindChild(name);
706 return child.null() ? nullptr : std::make_unique<TGo4DabcAccess>(
fNodeName.Data(), child);
713 dabc::Hierarchy& hierarchy = *((dabc::Hierarchy*)
fxHierarchy);
729 printf(
"GO4 WANTS update DABC hierarchy - do it\n");
Bool_t Connect(const char *nodename)
void ReadData(TGo4Slot *slot, TDirectory *dir) override
Bool_t GetIntPar(const char *name, Int_t &value) const
TGo4LevelIter * MakeIter() override
static TClass * GetClass(const char *classname, Bool_t load=kFALSE)
Bool_t UpdateHierarchy(Bool_t sync=kTRUE)
TGo4DabcAccess(const std::string &node, const dabc::Hierarchy &item)
Bool_t CanGetObject() const override
TGo4DabcAccess(const std::string &node, const std::string &sinfoname)
request compression from server
virtual ~TGo4DabcLevelIter()
ReplyWorker(const std::string &name)
std::string fMasterItemName
void WriteData(TGo4Slot *slot, TDirectory *dir, Bool_t onlyobjs) override
const char * info() override
Bool_t IsRemote() const override
InfoListRet GetStreamerInfoListImpl(bool) override
TClass * GetObjectClass() const override
Bool_t isfolder() override
TGo4Slot * FindSlot(const char *fullpath, const char **subname=nullptr)
virtual ~TGo4DabcAccess()
TGo4DabcLevelIter(const dabc::Hierarchy &item)
Bool_t HasSublevels() const override
static void Debug(const char *text,...) GO4_PRINTF_ARGS
TGo4LevelIter * subiterator() override
void Finalize(TGo4Slot *slot) override
TGo4Slot * GetParent() const
TString GetRootClassName(const dabc::Hierarchy &item)
Int_t AssignObjectTo(TGo4ObjectManager *rcv, const char *path) override
std::string fRootClassName
TGo4Slot * GetSlot(const char *name, Bool_t force=kFALSE)
TReplyTimer(dabc::Command cmd)
double ProcessCommandReply(dabc::Command cmd)
const char * GetClassName() override
Bool_t GetObject(TObject *&obj, Bool_t &owner) const override
static void Error(const char *text,...) GO4_PRINTF_ARGS
const char * GetPar(const char *name) const
Bool_t RefreshNamesList() override
const char * name() override
bool ReplyCommand(dabc::Command cmd) override
void Initialize(TGo4Slot *slot) override
Int_t AssignObjectToSlot(TGo4Slot *slot)
Bool_t ReplyCommand(void *cmd)
const char * GetObjectName() const override
std::unique_ptr< TGo4Access > ProvideAccess(const char *name) override
const char * GetObjectClassName() const override
TGo4Slot * getslot() override
Int_t sizeinfo() override
void ForwardEvent(TGo4Slot *source, Int_t id, void *param=nullptr)
bool IsRateHistory(const dabc::Hierarchy &item)
static const char * GetDabcVersion()
TGo4Slot * fxParentSlot
pointer on dabc::Hierarchy class
void SetPar(const char *name, const char *value)
void Update(TGo4Slot *slot, Bool_t strong) override
Int_t getflag(const char *flagname) override