TMonaLisaWriter.cxx

Go to the documentation of this file.
00001 // @(#)root/monalisa:$Id: TMonaLisaWriter.cxx 23209 2008-04-14 13:25:09Z rdm $
00002 // Author: Andreas Peters   5/10/2005
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TMonaLisaWriter                                                      //
00015 //                                                                      //
00016 // Class defining interface to MonaLisa Monitoring Services in ROOT.    //
00017 // The TMonaLisaWriter object is used to send monitoring information to //
00018 // a MonaLisa server using the ML ApMon package (libapmoncpp.so/UDP     //
00019 // packets). The MonaLisa ApMon library for C++ can be downloaded at    //
00020 // http://monalisa.cacr.caltech.edu/monalisa__Download__ApMon.html,     //
00021 // current version:                                                     //
00022 // http://monalisa.cacr.caltech.edu/download/apmon/ApMon_c-2.2.0.tar.gz //
00023 //                                                                      //
00024 // The ROOT implementation is primary optimized for process/job         //
00025 // monitoring, although all other generic MonaLisa ApMon functionality  //
00026 // can be exploited through the ApMon class directly via                //
00027 // dynamic_cast<TMonaLisaWriter*>(gMonitoringWriter)->GetApMon().       //
00028 //                                                                      //
00029 // Additions/modifications by Fabrizio Furano 10/04/2008                //
00030 // - The implementation of TFile throughput and info sending was        //
00031 // just sending 'regular' samples about the activity of the single TFile//
00032 // instance that happened to trigger an activity in the right moment.   //
00033 // - Now TMonaLisaWriter keeps internally track of every activity       //
00034 // and regularly sends summaries valid for all the files which had      //
00035 // activity in the last time interval.                                  //
00036 // - Additionally, it's now finalized the infrastructure able to measure//
00037 // and keep track of the file Open latency. A packet is sent for each   //
00038 // successful Open, sending the measures of the latencies for the       //
00039 // various phases of the open. Currently exploited fully by TAlienFile  //
00040 // and TXNetFile. Easy to report from other TFiles too.                 //
00041 // - Now, the hook for the Close() func triggers sending of a packet    //
00042 // containing various information about the performance related to that //
00043 // file only.                                                           //
00044 // - Added support also for performance monitoring when writing         //
00045 //////////////////////////////////////////////////////////////////////////
00046 
00047 #include "TMonaLisaWriter.h"
00048 #include "TSystem.h"
00049 #include "TGrid.h"
00050 #include "TFile.h"
00051 #include "TUrl.h"
00052 #include "TStopwatch.h"
00053 #include "Riostream.h"
00054 #include "TParameter.h"
00055 #include "THashList.h"
00056 #include "TMath.h"
00057 
00058 ClassImp(TMonaLisaWriter)
00059 
00060 
00061 // Information which is kept about an alive instance of TFile
00062 class MonitoredTFileInfo: public TObject {
00063 private:
00064    TFile       *fileinst;
00065 public:
00066    MonitoredTFileInfo(TFile *file, Double_t timenow): TObject(), fileinst(file) {
00067       if (file->InheritsFrom("TXNetFile"))
00068          fFileClassName = "TXNetFile";
00069       else
00070          fFileClassName = file->ClassName();
00071 
00072       fLastBytesRead = 0;
00073       fLastBytesWritten = 0;
00074 
00075       fTempReadBytes = 0;
00076       fTempWrittenBytes = 0;
00077 
00078       fLastResetTime = timenow;
00079       fCreationTime = timenow;
00080 
00081       fKillme = kFALSE;
00082    }
00083 
00084    Double_t    fCreationTime;
00085 
00086    TString     fFileClassName;
00087 
00088    Long64_t    fLastBytesRead;
00089    Long64_t    fTempReadBytes;
00090    Long64_t    fLastBytesWritten;
00091    Long64_t    fTempWrittenBytes;
00092 
00093    Double_t    fLastResetTime;
00094 
00095    Bool_t      fKillme;   // tells to remove the instance after the next computation step
00096 
00097    void        GetThroughputs(Long64_t &readthr, Long64_t &writethr, Double_t timenow, Double_t prectime) {
00098       readthr = -1;
00099       writethr = -1;
00100       Double_t t = TMath::Min(prectime, fLastResetTime);
00101 
00102       Int_t mselapsed = TMath::FloorNint((timenow - t) * 1000);
00103       mselapsed = TMath::Max(mselapsed, 1);
00104 
00105       readthr = fTempReadBytes / mselapsed * 1000;
00106       writethr = fTempWrittenBytes / mselapsed * 1000;
00107    }
00108 
00109    void        UpdateFileStatus(TFile *file) {
00110       fTempReadBytes = file->GetBytesRead() - fLastBytesRead;
00111       fTempWrittenBytes = file->GetBytesWritten() - fLastBytesWritten;
00112    }
00113 
00114    void        ResetFileStatus(TFile *file, Double_t timenow) {
00115       if (fKillme) return;
00116       fLastBytesRead = file->GetBytesRead();
00117       fLastBytesWritten = file->GetBytesWritten();
00118       fTempReadBytes = 0;
00119       fTempWrittenBytes = 0;
00120       fLastResetTime = timenow;
00121    }
00122 
00123    void        ResetFileStatus(Double_t timenow) {
00124       if (fKillme) return;
00125       ResetFileStatus(fileinst, timenow);
00126    }
00127 
00128 };
00129 
00130 
00131 
00132 
00133 // Helper used to build up ongoing throughput summaries
00134 class MonitoredTFileSummary: public TNamed {
00135 public:
00136    MonitoredTFileSummary(TString &fileclassname): TNamed(fileclassname, fileclassname) {
00137       fBytesRead = 0;
00138       fBytesWritten = 0;
00139       fReadThroughput = 0;
00140       fWriteThroughput = 0;
00141    }
00142 
00143    Long64_t    fBytesRead;
00144    Long64_t    fBytesWritten;
00145    Long64_t    fReadThroughput;
00146    Long64_t    fWriteThroughput;
00147 
00148    void        Update(MonitoredTFileInfo *mi, Double_t timenow, Double_t prectime) {
00149       Long64_t rth, wth;
00150       mi->GetThroughputs(rth, wth, timenow, prectime);
00151 
00152       fBytesRead += mi->fTempReadBytes;
00153       fBytesWritten += mi->fTempWrittenBytes;
00154 
00155       if (rth > 0) fReadThroughput += rth;
00156       if (wth > 0) fWriteThroughput += wth;
00157    }
00158 
00159 };
00160 
00161 
00162 
00163 //______________________________________________________________________________
00164 TMonaLisaWriter::TMonaLisaWriter(const char *monserver, const char *montag,
00165                                  const char *monid, const char *monsubid,
00166                                  const char *option)
00167 {
00168    // Create MonaLisa write object.
00169 
00170    fMonInfoRepo = new std::map<UInt_t, MonitoredTFileInfo *>;
00171 
00172    Init(monserver, montag, monid, monsubid, option);
00173 }
00174 
00175 //______________________________________________________________________________
00176 void TMonaLisaWriter::Init(const char *monserver, const char *montag, const char *monid,
00177                            const char *monsubid, const char *option)
00178 {
00179    // Creates a TMonaLisaWriter object to send monitoring information to a
00180    // MonaLisa server using the MonaLisa ApMon package (libapmoncpp.so/UDP
00181    // packets). The MonaLisa ApMon library for C++ can be downloaded at
00182    // http://monalisa.cacr.caltech.edu/monalisa__Download__ApMon.html,
00183    // current version:
00184    // http://monalisa.cacr.caltech.edu/download/apmon/ApMon_cpp-2.0.6.tar.gz
00185    //
00186    // The ROOT implementation is primary optimized for process/job monitoring,
00187    // although all other generic MonaLisa ApMon functionality can be exploited
00188    // through the ApMon class directly (gMonitoringWriter->GetApMon()).
00189    //
00190    // Monitoring information in MonaLisa is structured in the following tree
00191    // structure:
00192    // <farmname>
00193    //    |
00194    //    ---> <nodename1>
00195    //              |
00196    //              ---> <key1> - <value1>
00197    //              ---> <key2> - <value2>
00198    //    ---> <nodename2>
00199    //              |
00200    //              ---> <key3> - <value3>
00201    //              ---> <key4> - <value4>
00202    //
00203    // The parameter monid is equivalent to the MonaLisa node name, for the
00204    // case of process monitoring it can be just an identifier to classify
00205    // the type of jobs e.g. "PROOF_PROCESSING".
00206    // If monid is not specified, TMonaLisaWriter tries to set it in this order
00207    // from environement variables:
00208    // - PROOF_JOB_ID
00209    // - GRID_JOB_ID
00210    // - LCG_JOB_ID
00211    // - ALIEN_MASTERJOB_ID
00212    // - ALIEN_PROC_ID
00213    //
00214    // The parameter montag is equivalent to the MonaLisa farm name, for the
00215    // case of process monitoring it can be a process identifier e.g. a PROOF
00216    // session ID.
00217    //
00218    // The parameter monserver specifies the server to whom to send the
00219    // monitoring UDP packets. If not specified, the hostname (the port is
00220    // a default one) is specified in the environment variable APMON_CONFIG.
00221    //
00222    // To use TMonaLisaWriter, libMonaLisa.so has to be loaded.
00223    //
00224    // According to the fact, that the deepness of the MonaLisa naming scheme
00225    // is only 3 (<farm><node><value>), a special naming scheme is used for
00226    // process monitoring. There is a high-level method to send progress
00227    // information of Tree analysis (# of events, datasize).
00228    // To distinguish individual nodes running the processing, part of the
00229    // information is kept in the <value> parameter of ML.
00230    // <value> is named as:
00231    //    <site-name>:<host-name>:<pid>:<valuetag>
00232    // <site-name> is taken from an environment variable in the following order:
00233    // - PROOF_SITE
00234    // - GRID_SITE
00235    // - ALIEN_SITE
00236    // - default 'none'
00237    // <host-name> is taken from gSystem->Hostname()
00238    // <pid> is the process ID of the ROOT process
00239    //
00240    // Example of use for Process Monitoring:
00241    //   new TMonaLisaWriter("BATCH_ANALYSIS","AnalysisLoop-00001","lxplus050.cern.ch");
00242    // Once when you create an analysis task, execute
00243    //   gMonitoringWriter->SendInfoUser("myname");
00244    //   gMonitoringWriter->SendInfoDescription("My first Higgs analysis");
00245    //   gMonitoringWriter->SendInfoTime();
00246    //   gMonitoringWriter->SendInfoStatus("Submitted");
00247    //
00248    // On each node executing a subtask, you can set the status of this subtask:
00249    //   gMonitoringWriter->SendProcessingStatus("Started");
00250    // During the processing of your analysis you can send progress updates:
00251    //   gMonitoringWriter->SendProcessProgress(100,1000000); <= 100 events, 1MB processed
00252    //   ....
00253    //   gMonitoringWriter-SendProcessingStatus("Finished");
00254    //   delete gMonitoringWriter; gMonitoringWriter=0;
00255    //
00256    // Example of use for any Generic Monitoring information:
00257    //   TList *valuelist = new TList();
00258    //   valuelist->SetOwner(kTRUE);
00259    //   // append a text object
00260    //   TMonaLisaText *valtext = new TMonaLisaText("decaychannel","K->eeg");
00261    //   valuelist->Add(valtext);
00262    //   // append a double value
00263    //   TMonaLisaValue* valdouble = new TMonaLisaValue("n-gamma",5);
00264    //   valuelist->Add(valdouble);
00265    //   Bool_t success = SendParameters(valuelist);
00266    //   delete valuelist;
00267    //
00268    // option:
00269    // "global": gMonitoringWriter is initialized with this instance
00270 
00271    SetName(montag);
00272    SetTitle(montag);
00273 
00274    fVerbose = kFALSE;           // no verbosity as default
00275 
00276    fFileStopwatch.Start(kTRUE);
00277    fLastRWSendTime = fFileStopwatch.RealTime();
00278    fLastFCloseSendTime = fFileStopwatch.RealTime();
00279    fLastProgressTime = time(0);
00280    fFileStopwatch.Continue();
00281 
00282    fReportInterval = 120; // default interval is 120, to prevent flooding
00283    if (gSystem->Getenv("APMON_INTERVAL")) {
00284       fReportInterval = atoi(gSystem->Getenv("APMON_INTERVAL"));
00285       if (fReportInterval < 1)
00286          fReportInterval =1;
00287       Info("TMonaLisaWriter","Setting APMON Report Interval to %d seconds",fReportInterval);
00288    }
00289 
00290    char *apmon_config[1] =
00291       { ((monserver == 0) ? (char *) gSystem->Getenv("APMON_CONFIG") : (char *) monserver) };
00292    if (apmon_config[0] == 0) {
00293       Error("TMonaLisaWriter",
00294             "Disabling apmon monitoring since env variable APMON_CONFIG was not found and the monitoring server is not specified in the constructor!");
00295       fInitialized = kFALSE;
00296       return;
00297    }
00298 
00299    try {
00300       fApmon = new ApMon(1, apmon_config);
00301       fApmon->setConfRecheck(false);
00302       fApmon->setJobMonitoring(false);
00303       //((ApMon*)fApmon)->setSysMonitoring(false);
00304       //((ApMon*)fApmon)->setGenMonitoring(false);
00305    } catch (runtime_error &e) {
00306       Error("TMonaLisaWriter", "Error initializing ApMon: %s", e.what());
00307       Error("TMonaLisaWriter", "Disabling apmon.");
00308       fInitialized = kFALSE;
00309       return;
00310    }
00311 
00312    TString clustername="ROOT_";
00313 
00314    if (montag == 0) {
00315      if (gSystem->Getenv("PROOF_SITE")) {
00316        clustername+=(gSystem->Getenv("PROOF_SITE"));
00317      } else if (gSystem->Getenv("GRID_SITE")) {
00318        clustername+=(gSystem->Getenv("GRID_SITE"));
00319      } else if (gSystem->Getenv("LCG_SITE")) {
00320        clustername+=(gSystem->Getenv("LCG_SITE"));
00321      } else if (gSystem->Getenv("ALIEN_SITE")) {
00322        clustername+=(gSystem->Getenv("ALIEN_SITE"));
00323      } else {
00324        clustername += TString("none");
00325      }
00326      SetName(clustername);
00327      SetTitle(clustername);
00328    } else {
00329        SetName(clustername+TString(montag));
00330        SetTitle(clustername+TString(montag));
00331    }
00332 
00333    fHostname = gSystem->HostName();
00334    fPid = gSystem->GetPid();
00335 
00336    if (monid == 0) {
00337       if (gSystem->Getenv("PROOF_QUERY_ID"))
00338          fJobId = gSystem->Getenv("PROOF_QUERY_ID");
00339       else if (gSystem->Getenv("GRID_JOB_ID"))
00340          fJobId = gSystem->Getenv("GRID_JOB_ID");
00341       else if (gSystem->Getenv("LCG_JOB_ID"))
00342          fJobId = gSystem->Getenv("LCG_JOB_ID");
00343       else if (gSystem->Getenv("ALIEN_MASTERJOBID"))
00344          fJobId = gSystem->Getenv("ALIEN_MASTERJOBID");
00345       else if (gSystem->Getenv("ALIEN_PROC_ID"))
00346          fJobId = gSystem->Getenv("ALIEN_PROC_ID");
00347       else
00348          fJobId = "-no-job-id";
00349    } else {
00350       fJobId = monid;
00351    }
00352 
00353    if (monsubid == 0) {
00354      if (gSystem->Getenv("PROOF_PROC_ID")) {
00355        fSubJobId = gSystem->Getenv("PROOF_PROC_ID");
00356      } else if (gSystem->Getenv("ALIEN_PROC_ID")) {
00357        fSubJobId = gSystem->Getenv("ALIEN_PROC_ID");
00358      } else {
00359        fSubJobId = fJobId;
00360      }
00361    } else {
00362      fSubJobId = monsubid;
00363    }
00364 
00365 
00366    if (fVerbose)
00367       Info("Initialized for ML Server <%s> - Setting ClusterID <%s> JobID <%s> SubID <%s>\n",
00368            apmon_config[0], fName.Data() ,fJobId.Data(),fSubJobId.Data());
00369 
00370    fInitialized = kTRUE;
00371 
00372    TString optionStr(option);
00373    if (optionStr.Contains("global"))
00374       gMonitoringWriter = this;
00375 }
00376 
00377 //______________________________________________________________________________
00378 TMonaLisaWriter::~TMonaLisaWriter()
00379 {
00380    // Cleanup.
00381    if (fMonInfoRepo) {
00382 
00383       std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->begin();
00384       while (iter != fMonInfoRepo->end()) {
00385          delete iter->second;
00386          iter++;
00387       }
00388 
00389       fMonInfoRepo->clear();
00390       delete fMonInfoRepo;
00391       fMonInfoRepo = 0;
00392    }
00393 
00394    if (gMonitoringWriter == this)
00395       gMonitoringWriter = 0;
00396 }
00397 
00398 //______________________________________________________________________________
00399 Bool_t TMonaLisaWriter::SendInfoStatus(const char *status)
00400 {
00401    // Sends a <status> text to MonaLisa following the process scheme:
00402    //    <site> --> <jobid> --> 'status' = <status>
00403    // Used to set a global status for a groupjob, e.g.
00404    // a master-job or the general status of PROOF processing.
00405 
00406    if (!fInitialized) {
00407       Error("SendInfoStatus", "Monitoring is not properly initialized!");
00408       return kFALSE;
00409    }
00410 
00411    Bool_t success = kFALSE;
00412 
00413    TList *valuelist = new TList();
00414    valuelist->SetOwner(kTRUE);
00415 
00416    // create a monitor text object
00417    TMonaLisaText *valtext = new TMonaLisaText("status", status);
00418    valuelist->Add(valtext);
00419 
00420    // send it to monalisa
00421    success = SendParameters(valuelist);
00422 
00423    delete valuelist;
00424    return success;
00425 }
00426 
00427 //______________________________________________________________________________
00428 Bool_t TMonaLisaWriter::SendInfoUser(const char *user)
00429 {
00430    // Sends the <user> text to MonaLisa following the process scheme:
00431    //    <site> --> <jobid> --> 'user' = <user>
00432 
00433    if (!fInitialized) {
00434       Error("TMonaLisaWriter",
00435             "Monitoring initialization has failed - you can't send to MonaLisa!");
00436       return kFALSE;
00437    }
00438 
00439    Bool_t success = kFALSE;
00440 
00441    TList *valuelist = new TList();
00442    valuelist->SetOwner(kTRUE);
00443 
00444    const char *localuser;
00445    if (user) {
00446       localuser = user;
00447    } else {
00448       if (gGrid) {
00449          localuser = gGrid->GetUser();
00450       } else {
00451          localuser = "unknown";
00452       }
00453    }
00454 
00455    // create a monitor text object
00456    TMonaLisaText *valtext = new TMonaLisaText("user", localuser);
00457    valuelist->Add(valtext);
00458 
00459    // send it to monalisa
00460    success = SendParameters(valuelist);
00461 
00462    delete valuelist;
00463    return success;
00464 }
00465 
00466 //______________________________________________________________________________
00467 Bool_t TMonaLisaWriter::SendInfoDescription(const char *jobtag)
00468 {
00469    // Sends the description <jobtag> following the processing scheme:
00470    //    <site> --> <jobid> --> 'jobname' = <jobtag>
00471 
00472    if (!fInitialized) {
00473       Error("SendInfoDescription",
00474             "Monitoring is not properly initialized!");
00475       return kFALSE;
00476    }
00477 
00478    Bool_t success = kFALSE;
00479 
00480    TList *valuelist = new TList();
00481    valuelist->SetOwner(kTRUE);
00482 
00483    // create a monitor text object
00484    TMonaLisaText *valtext = new TMonaLisaText("jobname", jobtag);
00485    valuelist->Add(valtext);
00486 
00487    // send it to monalisag
00488    success = SendParameters(valuelist);
00489 
00490    delete valuelist;
00491    return success;
00492 }
00493 
00494 //______________________________________________________________________________
00495 Bool_t TMonaLisaWriter::SendInfoTime()
00496 {
00497    // Sends the current time to MonaLisa following the processing scheme
00498    //    <site> --> <jobid> --> 'time' = >unixtimestamp<
00499 
00500    if (!fInitialized) {
00501       Error("SendInfoTime", "Monitoring is not properly initialized!");
00502       return kFALSE;
00503    }
00504 
00505    Bool_t success = kFALSE;
00506 
00507    TList *valuelist = new TList();
00508    valuelist->SetOwner(kTRUE);
00509 
00510    TString valtime = (Int_t) time(0);
00511 
00512    // create a monitor text object
00513    TMonaLisaText *valtext = new TMonaLisaText("time", valtime);
00514    valuelist->Add(valtext);
00515 
00516    // send it to monalisa
00517    success = SendParameters(valuelist);
00518 
00519    delete valuelist;
00520    return success;
00521 }
00522 
00523 //______________________________________________________________________________
00524 Bool_t TMonaLisaWriter::SendProcessingStatus(const char *status, Bool_t restarttimer)
00525 {
00526    // Send the procesing status 'status' to MonaLisa following the
00527    // processing scheme:
00528    //    <site> --> <jobid> --> 'status' = <status>
00529    // Used, to set the processing status of individual subtaks e.g. the
00530    // status of a batch (sub-)job or the status of a PROOF slave
00531    // participating in query <jobid>
00532 
00533    if (restarttimer) {
00534       fStopwatch.Start(kTRUE);
00535    }
00536 
00537    if (!fInitialized) {
00538       Error("TMonaLisaWriter",
00539             "Monitoring initialization has failed - you can't send to MonaLisa!");
00540       return kFALSE;
00541    }
00542 
00543    Bool_t success = kFALSE;
00544 
00545    TList *valuelist = new TList();
00546    valuelist->SetOwner(kTRUE);
00547 
00548    // create a monitor text object
00549    TMonaLisaText *valtext = new TMonaLisaText("status", status);
00550    valuelist->Add(valtext);
00551 
00552    TMonaLisaText *valhost = new TMonaLisaText("hostname",fHostname);
00553    valuelist->Add(valhost);
00554 
00555    TMonaLisaText *valsid = new TMonaLisaText("subid", fSubJobId.Data());
00556    valuelist->Add(valsid);
00557 
00558    // send it to monalisa
00559    success = SendParameters(valuelist);
00560 
00561    delete valuelist;
00562    return success;
00563 }
00564 
00565 //______________________________________________________________________________
00566 Bool_t TMonaLisaWriter::SendProcessingProgress(Double_t nevent, Double_t nbytes, Bool_t force)
00567 {
00568    // Send the procesing progress to MonaLisa.
00569 
00570    if (!force && (time(0)-fLastProgressTime) < fReportInterval) {
00571      // if the progress is not forced, we send maximum < fReportInterval per second!
00572      return kFALSE;
00573    }
00574 
00575    if (!fInitialized) {
00576       Error("SendProcessingProgress",
00577             "Monitoring is not properly initialized!");
00578       return kFALSE;
00579    }
00580 
00581    Bool_t success = kFALSE;
00582 
00583    TList *valuelist = new TList();
00584    valuelist->SetOwner(kTRUE);
00585 
00586    // create a monitor text object
00587    TMonaLisaValue *valevent =    new TMonaLisaValue("events", nevent);
00588    TMonaLisaValue *valbyte =     new TMonaLisaValue("processedbytes", nbytes);
00589    TMonaLisaValue *valrealtime = new TMonaLisaValue("realtime",fStopwatch.RealTime());
00590    TMonaLisaValue *valcputime =  new TMonaLisaValue("cputime",fStopwatch.CpuTime());
00591 
00592    ProcInfo_t pinfo;
00593    gSystem->GetProcInfo(&pinfo);
00594    Double_t totmem = (Double_t)(pinfo.fMemVirtual) * 1024.;
00595    Double_t rssmem = (Double_t)(pinfo.fMemResident) * 1024.;
00596    Double_t shdmem = 0.;
00597 
00598    TMonaLisaValue *valtotmem = new TMonaLisaValue("totmem",totmem);
00599    TMonaLisaValue *valrssmem = new TMonaLisaValue("rssmem",rssmem);
00600    TMonaLisaValue *valshdmem = new TMonaLisaValue("shdmem",shdmem);
00601 
00602    TMonaLisaText *valsid = new TMonaLisaText("subid", fSubJobId.Data());
00603    valuelist->Add(valsid);
00604    valuelist->Add(valevent);
00605    valuelist->Add(valbyte);
00606    valuelist->Add(valrealtime);
00607    valuelist->Add(valcputime);
00608    valuelist->Add(valtotmem);
00609    valuelist->Add(valrssmem);
00610    valuelist->Add(valshdmem);
00611 
00612    TString      strevents="";
00613    strevents += nevent;
00614    TString      strbytes="";
00615    strbytes  += nbytes;
00616    TString      strcpu="";
00617    strcpu    += fStopwatch.CpuTime();
00618    TString      strreal="";
00619    strreal   += fStopwatch.RealTime();
00620    TString      strtotmem="";
00621    strtotmem += totmem;
00622    TString      strrssmem="";
00623    strrssmem += rssmem;
00624    TString      strshdmem="";
00625    strshdmem += shdmem;
00626 
00627    fStopwatch.Continue();
00628 
00629    TMonaLisaText *textevent   = new TMonaLisaText("events_str", strevents.Data());
00630    TMonaLisaText *textbyte    = new TMonaLisaText("processedbytes_str", strbytes.Data());
00631    TMonaLisaText *textreal    = new TMonaLisaText("realtime_str", strreal.Data());
00632    TMonaLisaText *textcpu     = new TMonaLisaText("cputime_str", strcpu.Data());
00633    TMonaLisaText *texttotmem  = new TMonaLisaText("totmem_str", strtotmem.Data());
00634    TMonaLisaText *textrssmem  = new TMonaLisaText("rssmem_str", strrssmem.Data());
00635    TMonaLisaText *textshdmem  = new TMonaLisaText("shdmem_str", strshdmem.Data());
00636    valuelist->Add(textevent);
00637    valuelist->Add(textbyte);
00638    valuelist->Add(textcpu);
00639    valuelist->Add(textreal);
00640    valuelist->Add(texttotmem);
00641    valuelist->Add(textrssmem);
00642    valuelist->Add(textshdmem);
00643 
00644    TMonaLisaText *valhost = new TMonaLisaText("hostname",fHostname);
00645    valuelist->Add(valhost);
00646 
00647    // send it to monalisa
00648    success = SendParameters(valuelist);
00649    fLastProgressTime = time(0);
00650    delete valuelist;
00651    return success;
00652 }
00653 
00654 //______________________________________________________________________________
00655 Bool_t TMonaLisaWriter::SendFileOpenProgress(TFile *file, TList *openphases,
00656                                              const char *openphasename,
00657                                              Bool_t forcesend)
00658 {
00659    // Send the fileopen progress to MonaLisa.
00660    // If openphases=0 it means that the information is to be stored
00661    // in a temp space, since there is not yet an object where to attach it to.
00662    // This is typical in the static Open calls.
00663    // The temp openphases are put into a list as soon as one is specified.
00664    //
00665    // If thisopenphasename=0 it means that the stored phases (temp and object)
00666    // have to be cleared.
00667 
00668    if (!fInitialized) {
00669       Error("SendFileOpenProgress",
00670             "Monitoring is not properly initialized!");
00671       return kFALSE;
00672    }
00673 
00674    // Create the list, if not yet done
00675    if (!fTmpOpenPhases && !openphases) {
00676       fTmpOpenPhases = new TList;
00677       fTmpOpenPhases->SetOwner();
00678    }
00679 
00680    if (!openphasename) {
00681       // This means "reset my phases"
00682       fTmpOpenPhases->Clear();
00683       return kTRUE;
00684    }
00685 
00686    // Take a measurement
00687    TParameter<Double_t> *nfo = new TParameter<Double_t>(openphasename, fFileStopwatch.RealTime());
00688    fFileStopwatch.Continue();
00689 
00690    if (!openphases) {
00691       fTmpOpenPhases->Add(nfo);
00692    } else {
00693       // Move info temporarly saved to object list
00694       TIter nxt(fTmpOpenPhases);
00695       TParameter<Double_t> *nf = 0;
00696       while ((nf = (TParameter<Double_t> *)nxt()))
00697          openphases->Add(nf);
00698       // Add this measurement
00699       openphases->Add(nfo);
00700       // Reset the temporary list
00701       if (fTmpOpenPhases) {
00702          fTmpOpenPhases->SetOwner(0);
00703          fTmpOpenPhases->Clear();
00704       }
00705 
00706    }
00707 
00708    if (!forcesend) return kTRUE;
00709    if (!file) return kTRUE;
00710 
00711    TList *op = openphases ? openphases : fTmpOpenPhases;
00712 
00713    Bool_t success = kFALSE;
00714 
00715 
00716    TList *valuelist = new TList();
00717    valuelist->SetOwner(kTRUE);
00718 
00719    // create a monitor text object
00720 
00721    TMonaLisaText *valhost = new TMonaLisaText("hostname",fHostname);
00722    valuelist->Add(valhost);
00723    TMonaLisaText *valsid = new TMonaLisaText("subid", fSubJobId.Data());
00724    valuelist->Add(valsid);
00725    TMonaLisaText *valdest = new TMonaLisaText("destname", file->GetEndpointUrl()->GetHost());
00726    valuelist->Add(valdest);
00727 
00728    TMonaLisaValue *valfid = new TMonaLisaValue("fileid", file->GetFileCounter());
00729    valuelist->Add(valfid);
00730    TString strfid = Form("%lld", file->GetFileCounter());
00731    TMonaLisaText *valstrfid = new TMonaLisaText("fileid_str", strfid.Data());
00732    valuelist->Add(valstrfid);
00733 
00734    Int_t kk = 1;
00735    TIter nxt(op);
00736    TParameter<Double_t> *nf1 = 0;
00737    TParameter<Double_t> *nf0 = (TParameter<Double_t> *)nxt();
00738    while ((nf1 = (TParameter<Double_t> *)nxt())) {
00739       TString s = Form("openphase%d_%s", kk, nf0->GetName());
00740       TMonaLisaValue *v = new TMonaLisaValue(s.Data(), nf1->GetVal() - nf0->GetVal());
00741       valuelist->Add(v);
00742       // Go to next
00743       nf0 = nf1;
00744       kk++;
00745    }
00746 
00747    // Now send how much time was elapsed in total
00748    nf0 = (TParameter<Double_t> *)op->First();
00749    nf1 = (TParameter<Double_t> *)op->Last();
00750    TMonaLisaValue *valtottime =
00751       new TMonaLisaValue("total_open_time", nf1->GetVal() - nf0->GetVal());
00752    valuelist->Add(valtottime);
00753 
00754    // send it to monalisa
00755    success = SendParameters(valuelist);
00756    delete valuelist;
00757    return success;
00758 }
00759 
00760 //______________________________________________________________________________
00761 Bool_t TMonaLisaWriter::SendFileCloseEvent(TFile *file) {
00762    if (!fInitialized) {
00763       Error("SendFileCloseEvent",
00764             "Monitoring is not properly initialized!");
00765       return kFALSE;
00766    }
00767 
00768    Bool_t success = kFALSE;
00769    Double_t timenow = fFileStopwatch.RealTime();
00770    fFileStopwatch.Continue();
00771 
00772    MonitoredTFileInfo *mi = 0;
00773    std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->find(file->GetUniqueID());
00774    if (iter != fMonInfoRepo->end()) mi = iter->second;
00775 
00776    Double_t timelapsed = 0.0;
00777    if (mi) timelapsed = timenow - mi->fCreationTime;
00778 
00779    TList *valuelist = new TList();
00780    valuelist->SetOwner(kTRUE);
00781 
00782    TString valname;
00783    TString pfx = file->ClassName();
00784    if (file->InheritsFrom("TXNetFile"))
00785       pfx = "TXNetFile";
00786 
00787    pfx += "_";
00788 
00789    // The info to be sent is the one relative to the specific file
00790 
00791    TMonaLisaText *valdest = new TMonaLisaText("destname",file->GetEndpointUrl()->GetHost());
00792    valuelist->Add(valdest);
00793    TMonaLisaValue *valfid = new TMonaLisaValue("fileid",file->GetFileCounter());
00794    valuelist->Add(valfid);
00795    TString strfid="";
00796    strfid+=file->GetFileCounter();
00797    TMonaLisaText *valstrfid = new TMonaLisaText("fileid_str",strfid.Data());
00798    valuelist->Add(valstrfid);
00799 
00800    valname = pfx;
00801    valname += "readbytes";
00802    TMonaLisaValue *valread = new TMonaLisaValue(valname, file->GetBytesRead());
00803    valuelist->Add(valread);
00804 
00805 //    TString strbytes_r="";
00806 //    strbytes_r += file->GetBytesRead();
00807 //    TMonaLisaText *valstrread = new TMonaLisaText("readbytes_str", strbytes_r.Data());
00808 //    valuelist->Add(valstrread);
00809 
00810    valname = pfx;
00811    valname += "writtenbytes";
00812    TMonaLisaValue *valwrite = new TMonaLisaValue(valname, file->GetBytesWritten());
00813    valuelist->Add(valwrite);
00814 
00815 //    TString strbytes_w="";
00816 //    strbytes_w += file->GetBytesWritten();
00817 //    TMonaLisaText *valstrwrite = new TMonaLisaText("writtenbytes_str", strbytes_w.Data());
00818 //    valuelist->Add(valstrwrite);
00819 
00820    int thput;
00821    if (timelapsed > 0.001) {
00822       Int_t selapsed = TMath::FloorNint(timelapsed * 1000);
00823 
00824       thput = file->GetBytesRead() / selapsed * 1000;
00825       valname = pfx;
00826       valname += "filethrpt_rd";
00827       TMonaLisaValue *valreadthavg = new TMonaLisaValue(valname, thput);
00828       valuelist->Add(valreadthavg);
00829 
00830       thput = file->GetBytesWritten() / selapsed * 1000;
00831       valname = pfx;
00832       valname += "filethrpt_wr";
00833       TMonaLisaValue *valwritethavg = new TMonaLisaValue(valname, thput);
00834       valuelist->Add(valwritethavg);
00835    }
00836 
00837    // And the specific file summary has to be removed from the repo
00838    if (mi) {
00839       mi->UpdateFileStatus(file);
00840       mi->fKillme = kTRUE;
00841    }
00842 
00843    // send it to monalisa
00844    success = SendParameters(valuelist);
00845 
00846 
00847    delete valuelist;
00848    return success;
00849 }
00850 //______________________________________________________________________________
00851 Bool_t TMonaLisaWriter::SendFileReadProgress(TFile *file) {
00852    return SendFileCheckpoint(file);
00853 }
00854 //______________________________________________________________________________
00855 Bool_t TMonaLisaWriter::SendFileWriteProgress(TFile *file) {
00856    return SendFileCheckpoint(file);
00857 }
00858 //______________________________________________________________________________
00859 Bool_t TMonaLisaWriter::SendFileCheckpoint(TFile *file)
00860 {
00861    if (!fInitialized) {
00862       Error("SendFileCheckpoint",
00863             "Monitoring is not properly initialized!");
00864       return kFALSE;
00865    }
00866 
00867    if (!file->IsOpen()) return kTRUE;
00868 
00869    // We cannot handle this kind of ongoing averaged monitoring for a file which has not an unique id. Sorry.
00870    // This seems to affect raw files, for which only the Close() info is available
00871    // Removing this check causes a mess for non-raw and raw TFiles, because
00872    // the UUID in the Init phase has not yet been set, and the traffic
00873    // reported during that phase is reported wrongly, causing various leaks and troubles
00874    // TFiles without an unique id can be monitored only in their Open/Close event
00875    if (!file->TestBit(kHasUUID)) return kTRUE;
00876 
00877    Double_t timenow = fFileStopwatch.RealTime();
00878    fFileStopwatch.Continue();
00879 
00880    // This info has to be gathered in any case.
00881    
00882    // Check if an MonitoredTFileInfo instance is already available
00883    // If not, create one
00884    MonitoredTFileInfo *mi = 0;
00885    std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->find(file->GetUniqueID());
00886    if (iter != fMonInfoRepo->end()) mi = iter->second;
00887    if (!mi) {
00888 
00889       mi = new MonitoredTFileInfo(file, timenow);
00890       if (mi) fMonInfoRepo->insert( make_pair( file->GetUniqueID(), mi ) ); 
00891    }
00892 
00893    // And now we get those partial values
00894    if (mi) mi->UpdateFileStatus(file);
00895 
00896    // Send the fileread progress to MonaLisa only if required or convenient
00897    if ( timenow - fLastRWSendTime < fReportInterval) {
00898       // if the progress is not forced, we send maximum 1 every fReportInterval seconds!
00899       return kFALSE;
00900    }
00901 
00902    Bool_t success = kFALSE;
00903 
00904    TList *valuelist = new TList();
00905    valuelist->SetOwner(kTRUE);
00906 
00907    TString valname;
00908    
00909    // We send only a little throughput summary info
00910    // Instead we send the info for the actual file passed
00911 
00912    TMonaLisaText *valhost = new TMonaLisaText("hostname",fHostname);
00913    valuelist->Add(valhost);
00914    TMonaLisaText *valsid = new TMonaLisaText("subid", fSubJobId.Data());
00915    valuelist->Add(valsid);
00916 
00917    // First of all, we create an internal summary, sorted by kind of TFile used
00918    THashList summary;
00919    summary.SetOwner(kTRUE);
00920 
00921    iter = fMonInfoRepo->begin();
00922    if (iter != fMonInfoRepo->end()) mi = iter->second;
00923    else mi = 0;
00924 
00925    while (mi) {
00926       MonitoredTFileSummary *sum = static_cast<MonitoredTFileSummary *>(summary.FindObject(mi->fFileClassName));
00927       if (!sum) {
00928          sum = new MonitoredTFileSummary(mi->fFileClassName);
00929          if (sum) summary.AddLast(sum);
00930       }
00931 
00932       if (sum) {
00933          sum->Update(mi, timenow, fLastRWSendTime);
00934          mi->ResetFileStatus(timenow);
00935       }
00936 
00937       // This could be an info about an already closed file
00938       if (mi->fKillme) {
00939          std::map<UInt_t, MonitoredTFileInfo *>::iterator tmpiter = iter;
00940 
00941          iter->second = 0;
00942       }
00943 
00944       iter++;
00945       if (iter != fMonInfoRepo->end()) mi = iter->second;
00946       else mi = 0;
00947       
00948    }
00949 
00950    for (iter = fMonInfoRepo->begin(); iter != fMonInfoRepo->end(); iter++)
00951       if (!iter->second) fMonInfoRepo->erase(iter);
00952 
00953    // This info is a summary valid for all the monitored files at once.
00954    // It makes no sense at all to send data relative to a specific file here
00955    // Cycle through the summary...
00956    TIter nxt2(&summary);
00957    MonitoredTFileSummary *sum;
00958    while ((sum = (MonitoredTFileSummary *)nxt2())) {
00959 
00960       if (sum->fReadThroughput >= 0) {
00961          valname = sum->GetName();
00962          valname += "_avgthrpt_rd";
00963          TMonaLisaValue *valreadthr = new TMonaLisaValue(valname, sum->fReadThroughput);
00964          valuelist->Add(valreadthr);
00965       }
00966 
00967       if ( sum->fWriteThroughput >= 0 ) {
00968          valname = sum->GetName();
00969          valname += "_avgthrpt_wr";
00970          TMonaLisaValue *valwritethr = new TMonaLisaValue(valname, sum->fWriteThroughput);
00971          valuelist->Add(valwritethr);
00972       }
00973 
00974    }
00975 
00976 
00977    // send it to monalisa
00978    success = SendParameters(valuelist);
00979 
00980    fLastRWSendTime = timenow;
00981 
00982    delete valuelist;
00983    return success;
00984 }
00985 
00986 //______________________________________________________________________________
00987 Bool_t TMonaLisaWriter::SendParameters(TList *valuelist, const char *identifier)
00988 {
00989    // Send the parameters to MonaLisa.
00990 
00991    if (!fInitialized) {
00992       Error("SendParameters", "Monitoring is not properly initialized!");
00993       return kFALSE;
00994    }
00995 
00996    if (!valuelist) {
00997       Error("SendParameters", "No values in the value list!");
00998       return kFALSE;
00999    }
01000 
01001    if (identifier == 0)
01002       identifier = fJobId;
01003 
01004    TIter nextvalue(valuelist);
01005 
01006    TMonaLisaValue *objval;
01007    TMonaLisaText *objtext;
01008    TObject *monobj;
01009 
01010    Int_t apmon_nparams = valuelist->GetSize();
01011    char **apmon_params = 0;
01012    Int_t *apmon_types = 0;
01013    char **apmon_values = 0;
01014    Double_t *bufDouble = 0; // buffer for int, long, etc. that is to be sent as double
01015 
01016    if (apmon_nparams) {
01017 
01018       apmon_params = (char **) malloc(apmon_nparams * sizeof(char *));
01019       apmon_values = (char **) malloc(apmon_nparams * sizeof(char *));
01020       apmon_types = (int *) malloc(apmon_nparams * sizeof(int));
01021       bufDouble = new Double_t[apmon_nparams];
01022 
01023       Int_t looper = 0;
01024       while ((monobj = nextvalue())) {
01025          if (!strcmp(monobj->ClassName(), "TMonaLisaValue")) {
01026             objval = (TMonaLisaValue *) monobj;
01027 
01028             if (fVerbose)
01029                Info("SendParameters", "adding tag %s with val %f",
01030                     objval->GetName(), objval->GetValue());
01031 
01032             apmon_params[looper] = (char *) objval->GetName();
01033             apmon_types[looper] = XDR_REAL64;
01034             apmon_values[looper] = (char *) (objval->GetValuePtr());
01035             looper++;
01036          }
01037          if (!strcmp(monobj->ClassName(), "TMonaLisaText")) {
01038             objtext = (TMonaLisaText *) monobj;
01039 
01040             if (fVerbose)
01041                Info("SendParameters", "adding tag %s with text %s",
01042                     objtext->GetName(), objtext->GetText());
01043 
01044             apmon_params[looper] = (char *) objtext->GetName();
01045             apmon_types[looper] = XDR_STRING;
01046             apmon_values[looper] = (char *) (objtext->GetText());
01047             looper++;
01048          }
01049          if (!strcmp(monobj->ClassName(), "TNamed")) {
01050             TNamed* objNamed = (TNamed *) monobj;
01051 
01052             if (fVerbose)
01053               Info("SendParameters", "adding tag %s with text %s",
01054                    objNamed->GetName(), objNamed->GetTitle());
01055 
01056             apmon_params[looper] = (char *) objNamed->GetName();
01057             apmon_types[looper] = XDR_STRING;
01058             apmon_values[looper] = (char *) (objNamed->GetTitle());
01059             looper++;
01060          }
01061          // unfortunately ClassName() converts Double_t to double, etc.
01062          if (!strcmp(monobj->ClassName(), "TParameter<double>")) {
01063             TParameter<double>* objParam = (TParameter<double> *) monobj;
01064 
01065             if (fVerbose)
01066                Info("SendParameters", "adding tag %s with val %f",
01067                     objParam->GetName(), objParam->GetVal());
01068 
01069             apmon_params[looper] = (char *) objParam->GetName();
01070             apmon_types[looper] = XDR_REAL64;
01071             apmon_values[looper] = (char *) &(objParam->GetVal());
01072             looper++;
01073          }
01074          if (!strcmp(monobj->ClassName(), "TParameter<Long64_t>")) {
01075             TParameter<Long64_t>* objParam = (TParameter<Long64_t> *) monobj;
01076 
01077             if (fVerbose)
01078                Info("SendParameters", "adding tag %s with val %lld",
01079                     objParam->GetName(), objParam->GetVal());
01080 
01081             apmon_params[looper] = (char *) objParam->GetName();
01082             apmon_types[looper] = XDR_REAL64;
01083             bufDouble[looper] = objParam->GetVal();
01084             apmon_values[looper] = (char *) (bufDouble + looper);
01085             looper++;
01086          }
01087          if (!strcmp(monobj->ClassName(), "TParameter<long>")) {
01088             TParameter<long>* objParam = (TParameter<long> *) monobj;
01089 
01090             if (fVerbose)
01091                Info("SendParameters", "adding tag %s with val %ld",
01092                     objParam->GetName(), objParam->GetVal());
01093 
01094             apmon_params[looper] = (char *) objParam->GetName();
01095             apmon_types[looper] = XDR_REAL64;
01096             bufDouble[looper] = objParam->GetVal();
01097             apmon_values[looper] = (char *) (bufDouble + looper);
01098             looper++;
01099          }
01100          if (!strcmp(monobj->ClassName(), "TParameter<float>")) {
01101             TParameter<float>* objParam = (TParameter<float> *) monobj;
01102 
01103             if (fVerbose)
01104                Info("SendParameters", "adding tag %s with val %f",
01105                     objParam->GetName(), objParam->GetVal());
01106 
01107             apmon_params[looper] = (char *) objParam->GetName();
01108             apmon_types[looper] = XDR_REAL64;
01109             bufDouble[looper] = objParam->GetVal();
01110             apmon_values[looper] = (char *) (bufDouble + looper);
01111             looper++;
01112          }
01113          if (!strcmp(monobj->ClassName(), "TParameter<int>")) {
01114             TParameter<int>* objParam = (TParameter<int> *) monobj;
01115 
01116             if (fVerbose)
01117                Info("SendParameters", "adding tag %s with val %d",
01118                     objParam->GetName(), objParam->GetVal());
01119 
01120             apmon_params[looper] = (char *) objParam->GetName();
01121             apmon_types[looper] = XDR_REAL64;
01122             bufDouble[looper] = objParam->GetVal();
01123             apmon_values[looper] = (char *) (bufDouble + looper);
01124             looper++;
01125          }
01126       }
01127 
01128       // change number of parameters to the actual found value
01129       apmon_nparams = looper;
01130 
01131       if (fVerbose)
01132          Info("SendParameters", "n: %d name: %s identifier %s ...,",
01133               apmon_nparams, GetName(), identifier);
01134 
01135       ((ApMon *) fApmon)->sendParameters((char *) GetName(), (char*)identifier,
01136                                          apmon_nparams, apmon_params,
01137                                          apmon_types, apmon_values);
01138 
01139       free(apmon_params);
01140       free(apmon_values);
01141       free(apmon_types);
01142       delete[] bufDouble;
01143    }
01144    return kTRUE;
01145 }
01146 
01147 //______________________________________________________________________________
01148 void TMonaLisaWriter::SetLogLevel(const char *loglevel)
01149 {
01150    // Set MonaLisa log level.
01151 
01152    ((ApMon *) fApmon)->setLogLevel((char *) loglevel);
01153 }
01154 
01155 //______________________________________________________________________________
01156 void TMonaLisaWriter::Print(Option_t *) const
01157 {
01158    // Print info about MonaLisa object.
01159 
01160    cout << "Site     (Farm) : " << fName << endl;
01161    cout << "JobId    (Node) : " << fJobId << endl;
01162    cout << "SubJobId (Node) : " << fSubJobId << endl;
01163    cout << "HostName        : " << fHostname << endl;
01164    cout << "Pid             : " << fPid << endl;
01165    cout << "Inititialized   : " << fInitialized << endl;
01166    cout << "Verbose         : " << fVerbose << endl;
01167 
01168 }

Generated on Tue Jul 5 14:45:42 2011 for ROOT_528-00b_version by  doxygen 1.5.1