23 #include "TBufferFile.h"
31 #include "dabc/version.h"
32 #include "dabc/Hierarchy.h"
33 #include "dabc/Manager.h"
34 #include "dabc/Publisher.h"
35 #include "dabc/Configuration.h"
37 extern "C" int R__unzip_header(
int *nin,
unsigned char *bufin,
int *lout);
38 extern "C" void R__unzip(
int *nin,
unsigned char* bufin,
int *lout,
unsigned char *bufout,
int *nout);
42 if ((item.GetField(
"dabc:kind").AsStr() ==
"rate") ||
43 (item.GetField(
"dabc:history").AsInt() > 0))
return kTRUE;
49 std::string kind = item.GetField(
"dabc:kind").AsStr();
50 if (kind.find(
"ROOT.") == 0) {
64 TFakeFile(TList* sinfo) : TMemFile(
"dummy.file",
"RECREATE"), mylist(sinfo)
66 gROOT->GetListOfFiles()->Remove(
this);
108 fObjName =
"StreamerInfo";
109 fItemName = sinfoname;
110 fRootClassName =
"TList";
131 fObjName = item.GetName();
132 fItemName = item.ItemName();
133 std::string kind = item.GetField(dabc::prop_kind).AsStr();
134 fHistoryLength = item.GetField(dabc::prop_history).AsInt();
136 fMasterName = item.GetField(dabc::prop_masteritem).AsStr();
137 fMasterItemName = item.FindMaster().ItemName();
139 if (kind.find(
"ROOT.") == 0) {
141 fRootClassName = kind;
145 if (fHistoryLength > 0) fRootClassName =
"TGraph";
160 virtual Bool_t
GetObject(TObject* &obj, Bool_t &owner)
const {
return kFALSE; }
164 if (fRootClassName.length() > 0)
171 return fObjName.c_str();
176 if (fRootClassName.length()>0)
return fRootClassName.c_str();
178 return "dabc::Hierarchy";
184 if (rcv==0)
return 0;
186 dabc::WorkerRef wrk = dabc::mgr.FindItem(
"/Go4ReplWrk");
187 if (wrk.null())
return 0;
189 if (fIsRate && (fHistoryLength>0)) {
190 dabc::CmdGetBinary cmd2;
191 cmd2.SetStr(
"Item", fItemName);
192 cmd2.SetStr(
"Kind",
"hierarchy");
193 cmd2.SetStr(
"Query", Form(
"history=%d",fHistoryLength));
194 cmd2.SetTimeout(10.);
195 cmd2.SetReceiver(fNodeName + dabc::Publisher::DfltName());
197 cmd2.SetPtr(
"#DabcAccess",
this);
202 if (dabc::mgr.GetCommandChannel().Submit(cmd2))
return 2;
205 if (!fRootClassName.empty()) {
207 TClass *cl = TClass::GetClass(fRootClassName.c_str());
209 TGo4Log::Error(
"GetObject Unknown class %s", fRootClassName.c_str());
213 if (!cl->InheritsFrom(TObject::Class())) {
215 TGo4Log::Error(
"GetObject Class %s not derived from TObject", fRootClassName.c_str());
219 dabc::CmdGetBinary cmd(fItemName,
"root.bin", fCompression ?
"zipped" :
"");
223 cmd.SetReceiver(fNodeName + dabc::Publisher::DfltName());
225 cmd.SetPtr(
"#DabcAccess",
this);
232 if (dabc::mgr.GetCommandChannel().Submit(cmd))
return 2;
243 if (cmd.GetResult() != dabc::cmd_true) {
244 TGo4Log::Error(
"dabc::Command %s execution failed", cmd.GetName());
249 fRawData = cmd.GetRawData();
251 if (fRawData.null() || (fRawData.NumSegments()==0)) {
252 TGo4Log::Error(
"Did not get raw data from dabc::Command %s", cmd.GetName());
256 if (fIsRate && (fHistoryLength>0)) {
259 res.SetVersion(cmd.GetUInt(
"version"));
260 res.ReadFromBuffer(fRawData);
264 dabc::HistoryIter iter = res.MakeHistoryIter();
266 while (iter.next()) cnt++;
270 if (cnt==0)
return kFALSE;
272 TGraph* gr =
new TGraph(cnt);
273 gr->SetName(fObjName.c_str());
274 gr->SetTitle(Form(
"%s ratemeter", fItemName.c_str()));
276 while (iter.next()) {
278 double v = iter.GetField(
"value").AsDouble();
279 uint64_t tm = iter.GetField(
"time").AsUInt();
284 gr->SetPoint(cnt-i, tm / 1000, v);
287 gr->GetXaxis()->SetTimeDisplay(1);
288 gr->GetXaxis()->SetTimeFormat(
"%H:%M:%S%F1970-01-01 00:00:00");
294 if (!fRootClassName.empty()) {
295 TClass *cl = TClass::GetClass(fRootClassName.c_str());
303 if (!fMasterName.empty() && tgtslot && tgtslot->GetParent())
309 Int_t local_master_version = 0;
311 if (!masterslot->
GetIntPar(
"dabc_version", local_master_version))
312 local_master_version = 0;
314 if (local_master_version >= cmd.GetInt(
"MVersion")) {
317 if (masterslot->
GetPar(
"dabc_loading")!=0) {
326 TGo4Log::Error(
"Fail to request MASTER item %s from DABC node %s", fMasterItemName.c_str(), fNodeName.c_str());
330 masterslot->
SetPar(
"dabc_loading",
"true");
335 TGo4Log::Debug(
"Item %s raw_data size %u master_version %u", fItemName.c_str(), fRawData.GetTotalSize(), cmd.GetInt(
"MVersion"));
337 char *pobj = (
char*) cl->New();
340 TGo4Log::Error(
"ReadObj - Cannot create new object of class %s", cl->GetName());
344 Int_t baseOffset = cl->GetBaseClassOffset(TObject::Class());
345 if (baseOffset==-1) {
349 TGo4Log::Error(
"ReadObj Incorrect detection of the inheritance from TObject for class %s.",
354 TObject *tobj = (TObject*)(pobj+baseOffset);
362 int sizeinp(fRawData.GetTotalSize()), sizeout(0), irep(0);
364 if (
R__unzip_header(&sizeinp, (
unsigned char*) fRawData.SegmentPtr(), &sizeout)) {
365 printf(
"Fail to decode zip header\n");
367 rawbuf = (
char*) malloc(sizeout);
369 R__unzip(&sizeinp, (
unsigned char*) fRawData.SegmentPtr(), &sizeout, (
unsigned char*) rawbuf, &irep);
374 rawbuf = (
char*) fRawData.SegmentPtr();
375 rawbuflen = fRawData.GetTotalSize();
379 TBufferFile buf(TBuffer::kRead, rawbuflen, rawbuf, kFALSE);
380 buf.MapObject(pobj,cl);
383 cl->Destructor(pobj);
388 if (fCompression) free(rawbuf);
390 if ((fObjName ==
"StreamerInfo") &&
391 (fRootClassName ==
"TList") &&
392 tobj && tobj->InheritsFrom(TList::Class()) &&
393 fMasterName.empty()) {
396 fff.ReadStreamerInfo();
403 tgtslot->RemovePar(
"dabc_loading");
404 tgtslot->SetIntPar(
"dabc_version", cmd.GetInt(
"BVersion"));
460 Start((Long_t)res_tm*1000, kTRUE);
470 if ((cmd.GetPtr(
"#DabcAccess") != 0) || (cmd.GetPtr(
"#DabcProxy") != 0)) {
476 return dabc::Worker::ReplyCommand(cmd);
481 dabc::Worker(MakePair(name))
511 if (fParent.NumChilds()==0)
return kFALSE;
515 fChild = fParent.GetChild(fCnt);
520 if (fCnt>=fParent.NumChilds())
return kFALSE;
521 fChild = fParent.GetChild(fCnt);
525 return !fChild.null();
528 virtual Bool_t
isfolder() {
return fChild.NumChilds()>0; }
532 if (strcmp(flagname,
"IsRemote")==0)
return 1;
543 virtual const char*
name() {
return fChild.GetName(); }
544 virtual const char*
info() {
return "item from dabc"; }
565 if (fClNameBuf.Length()>0)
return fClNameBuf.Data();
567 return "dabc::Hierarchy";
598 if (!dabc::CreateManager(
"cmd", 0))
return kFALSE;
600 std::string node = dabc::MakeNodeName(nodename);
602 if (!dabc::ConnectDabcNode(node)) {
607 dabc::WorkerRef wrk = dabc::mgr.FindItem(
"/Go4ReplWrk");
611 wrk()->AssignToThread(dabc::mgr.thread());
623 dabc::Command cmd = *((dabc::Command*)_cmd);
625 if (cmd.IsName(dabc::CmdGetNamesList::CmdName())) {
627 dabc::Buffer buf = cmd.GetRawData();
629 if (buf.null())
return kFALSE;
635 dabc::Hierarchy& hierarchy = *((dabc::Hierarchy*)
fxHierarchy);
639 if (!hierarchy.ReadFromBuffer(buf))
return kFALSE;
652 if (
fNodeName.Length() == 0)
return kFALSE;
654 dabc::CmdGetNamesList cmd2;
655 cmd2.SetReceiver(std::string(
fNodeName.Data()) + dabc::Publisher::DfltName());
656 cmd2.SetTimeout(10.);
658 dabc::WorkerRef wrk = dabc::mgr.FindItem(
"/Go4ReplWrk");
660 if (!sync && !wrk.null()) {
661 cmd2.SetPtr(
"#DabcProxy",
this);
664 dabc::mgr.GetCommandChannel().Submit(cmd2);
668 if (dabc::mgr.GetCommandChannel().Execute(cmd2)!=dabc::cmd_true) {
689 dabc::Hierarchy& hierarchy = *((dabc::Hierarchy*)
fxHierarchy);
691 return hierarchy.NumChilds() > 0;
700 dabc::Hierarchy& hierarchy = *((dabc::Hierarchy*)
fxHierarchy);
702 if (hierarchy.null())
return 0;
704 if ((name==0) || (strlen(name)==0)) {
709 dabc::Hierarchy child = hierarchy.FindChild(name);
720 dabc::Hierarchy& hierarchy = *((dabc::Hierarchy*)
fxHierarchy);
738 printf(
"GO4 WANTS update DABC hierarchy - do it\n");
Bool_t Connect(const char *nodename)
void R__unzip(int *nin, unsigned char *bufin, int *lout, unsigned char *bufout, int *nout)
virtual const char * GetObjectName() const
Bool_t GetIntPar(const char *name, Int_t &value)
static TClass * GetClass(const char *classname, Bool_t load=kFALSE)
Bool_t UpdateHierarchy(Bool_t sync=kTRUE)
TGo4DabcAccess(const std::string &node, const dabc::Hierarchy &item)
TGo4DabcAccess(const std::string &node, const std::string &sinfoname)
request compression from server
virtual ~TGo4DabcLevelIter()
ReplyWorker(const std::string &name)
TGo4Slot * FindSlot(const char *fullpath, const char **subname=0)
std::string fMasterItemName
unsigned fCnt
buffer used for class name
virtual void Update(TGo4Slot *slot, Bool_t strong)
virtual TGo4LevelIter * subiterator()
void DoObjectAssignement(TGo4ObjectManager *rcv, const char *path, TObject *obj, Bool_t owner)
virtual Int_t AssignObjectTo(TGo4ObjectManager *rcv, const char *path)
const char * GetPar(const char *name) const
Bool_t fCompression
raw data, get from command
virtual const char * info()
int R__unzip_header(int *nin, unsigned char *bufin, int *lout)
virtual ~TGo4DabcAccess()
virtual Bool_t RefreshNamesList()
TGo4DabcLevelIter(const dabc::Hierarchy &item)
virtual TList * GetStreamerInfoList()
virtual void Initialize(TGo4Slot *slot)
virtual void Finalize(TGo4Slot *slot)
TString GetRootClassName(const dabc::Hierarchy &item)
virtual Bool_t isfolder()
TGo4Slot * GetParent() const
std::string fRootClassName
TGo4Slot * GetSlot(const char *name, Bool_t force=kFALSE)
virtual bool ReplyCommand(dabc::Command cmd)
TReplyTimer(dabc::Command cmd)
virtual const char * GetObjectClassName() const
virtual const char * name()
double ProcessCommandReply(dabc::Command cmd)
virtual Int_t getflag(const char *flagname)
virtual TGo4LevelIter * MakeIter()
virtual void WriteData(TGo4Slot *slot, TDirectory *dir, Bool_t onlyobjs)
virtual Bool_t CanGetObject() const
TGo4ObjectManager * fxReceiver
void ForwardEvent(TGo4Slot *source, Int_t id, void *param=0)
virtual const char * GetClassName()
Int_t AssignObjectToSlot(TGo4Slot *slot)
Bool_t ReplyCommand(void *cmd)
virtual TClass * GetObjectClass() const
virtual TGo4Slot * getslot()
bool IsRateHistory(const dabc::Hierarchy &item)
static const char * GetDabcVersion()
virtual Bool_t GetObject(TObject *&obj, Bool_t &owner) const
virtual Bool_t HasSublevels() const
virtual TGo4Access * ProvideAccess(const char *name)
TGo4Slot * fxParentSlot
pointer on dabc::Hierarchy class
void SetPar(const char *name, const char *value)
virtual void ReadData(TGo4Slot *slot, TDirectory *dir)
static void Error(const char *text,...)
static void Debug(const char *text,...)
virtual Bool_t IsRemote() const