TProofPlayer.cxx

Go to the documentation of this file.
00001 // @(#)root/proofplayer:$Id: TProofPlayer.cxx 37970 2011-02-03 17:40:17Z ganis $
00002 // Author: Maarten Ballintijn   07/01/02
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TProofPlayer                                                         //
00015 //                                                                      //
00016 // This internal class and its subclasses steer the processing in PROOF.//
00017 // Instances of the TProofPlayer class are created on the worker nodes  //
00018 // per session and do the processing.                                   //
00019 // Instances of its subclass - TProofPlayerRemote are created per each  //
00020 // query on the master(s) and on the client. On the master(s),          //
00021 // TProofPlayerRemote coordinate processing, check the dataset, create  //
00022 // the packetizer and take care of merging the results of the workers.  //
00023 // The instance on the client collects information on the input         //
00024 // (dataset and selector), it invokes the Begin() method and finalizes  //
00025 // the query by calling Terminate().                                    //
00026 //                                                                      //
00027 //////////////////////////////////////////////////////////////////////////
00028 
00029 #include "TProofDraw.h"
00030 #include "TProofPlayer.h"
00031 #include "THashList.h"
00032 #include "TEnv.h"
00033 #include "TEventIter.h"
00034 #include "TVirtualPacketizer.h"
00035 #include "TSelector.h"
00036 #include "TSocket.h"
00037 #include "TProofServ.h"
00038 #include "TProof.h"
00039 #include "TProofOutputFile.h"
00040 #include "TProofSuperMaster.h"
00041 #include "TSlave.h"
00042 #include "TClass.h"
00043 #include "TROOT.h"
00044 #include "TError.h"
00045 #include "TException.h"
00046 #include "MessageTypes.h"
00047 #include "TMessage.h"
00048 #include "TDSetProxy.h"
00049 #include "TString.h"
00050 #include "TSystem.h"
00051 #include "TFile.h"
00052 #include "TFileCollection.h"
00053 #include "TFileInfo.h"
00054 #include "TFileMerger.h"
00055 #include "TProofDebug.h"
00056 #include "TTimer.h"
00057 #include "TMap.h"
00058 #include "TPerfStats.h"
00059 #include "TStatus.h"
00060 #include "TEventList.h"
00061 #include "TProofLimitsFinder.h"
00062 #include "TSortedList.h"
00063 #include "TTree.h"
00064 #include "TEntryList.h"
00065 #include "TDSet.h"
00066 #include "TDrawFeedback.h"
00067 #include "TNamed.h"
00068 #include "TObjString.h"
00069 #include "TQueryResult.h"
00070 #include "TMD5.h"
00071 #include "TMethodCall.h"
00072 #include "TObjArray.h"
00073 #include "TMutex.h"
00074 #include "TH1.h"
00075 #include "TVirtualMonitoring.h"
00076 #include "TParameter.h"
00077 #include "TOutputListSelectorDataMap.h"
00078 
00079 // Timeout exception
00080 #define kPEX_STOPPED  1001
00081 #define kPEX_ABORTED  1002
00082 
00083 // To flag an abort condition: use a local static variable to avoid
00084 // warnings about problems with longjumps
00085 static Bool_t gAbort = kFALSE;
00086 
00087 class TAutoBinVal : public TNamed {
00088 private:
00089    Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
00090 
00091 public:
00092    TAutoBinVal(const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
00093                Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,"")
00094    {
00095       fXmin = xmin; fXmax = xmax;
00096       fYmin = ymin; fYmax = ymax;
00097       fZmin = zmin; fZmax = zmax;
00098    }
00099    void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
00100                Double_t& ymax, Double_t& zmin, Double_t& zmax)
00101    {
00102       xmin = fXmin; xmax = fXmax;
00103       ymin = fYmin; ymax = fYmax;
00104       zmin = fZmin; zmax = fZmax;
00105    }
00106 
00107 };
00108 
00109 //
00110 // Special timer to dispatch pending events while processing
00111 //______________________________________________________________________________
00112 class TDispatchTimer : public TTimer {
00113 private:
00114    TProofPlayer    *fPlayer;
00115 
00116 public:
00117    TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
00118 
00119    Bool_t Notify();
00120 };
00121 //______________________________________________________________________________
00122 Bool_t TDispatchTimer::Notify()
00123 {
00124    // Handle expiration of the timer associated with dispatching pending
00125    // events while processing. We must act as fast as possible here, so
00126    // we just set a flag submitting a request for dispatching pending events
00127 
00128    if (gDebug > 0)
00129       Info ("Notify","called!");
00130 
00131    fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
00132 
00133    // Needed for the next shot
00134    Reset();
00135    return kTRUE;
00136 }
00137 
00138 //
00139 // Special timer to handle stop/abort request via exception raising
00140 //______________________________________________________________________________
00141 class TStopTimer : public TTimer {
00142 private:
00143    Bool_t           fAbort;
00144    TProofPlayer    *fPlayer;
00145 
00146 public:
00147    TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
00148 
00149    Bool_t Notify();
00150 };
00151 
00152 //______________________________________________________________________________
00153 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
00154            : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
00155 {
00156    // Constructor for the timer to stop/abort processing.
00157    // The 'timeout' is in seconds.
00158    // Make sure that 'to' make sense, i.e. not larger than 10 days;
00159    // the minimum value is 10 ms (0 does not seem to start the timer ...).
00160 
00161    if (gDebug > 0)
00162       Info ("TStopTimer","enter: %d, timeout: %d", abort, to);
00163 
00164    fPlayer = p;
00165    fAbort = abort;
00166 
00167    if (gDebug > 1)
00168       Info ("TStopTimer","timeout set to %s ms", fTime.AsString());
00169 }
00170 
00171 //______________________________________________________________________________
00172 Bool_t TStopTimer::Notify()
00173 {
00174    // Handle the signal coming from the expiration of the timer
00175    // associated with an abort or stop request.
00176    // We raise an exception which will be processed in the
00177    // event loop.
00178 
00179    if (gDebug > 0)
00180       Info ("Notify","called!");
00181 
00182    if (fAbort)
00183       Throw(kPEX_ABORTED);
00184    else
00185       Throw(kPEX_STOPPED);
00186 
00187    return kTRUE;
00188 }
00189 
00190 //------------------------------------------------------------------------------
00191 
00192 ClassImp(TProofPlayer)
00193 
00194 THashList *TProofPlayer::fgDrawInputPars = 0;
00195 
00196 //______________________________________________________________________________
00197 TProofPlayer::TProofPlayer(TProof *)
00198    : fAutoBins(0), fOutput(0), fSelector(0), fSelectorClass(0),
00199      fFeedbackTimer(0), fFeedbackPeriod(2000),
00200      fEvIter(0), fSelStatus(0),
00201      fTotalEvents(0), fQueryResults(0), fQuery(0), fDrawQueries(0),
00202      fMaxDrawQueries(1), fStopTimer(0), fStopTimerMtx(0), fDispatchTimer(0)
00203 {
00204    // Default ctor.
00205 
00206    fInput         = new TList;
00207    fExitStatus    = kFinished;
00208    fProgressStatus = new TProofProgressStatus();
00209    SetProcessing(kFALSE);
00210 
00211    static Bool_t initLimitsFinder = kFALSE;
00212    if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
00213       THLimitsFinder::SetLimitsFinder(new TProofLimitsFinder);
00214       initLimitsFinder = kTRUE;
00215    }
00216 }
00217 
00218 //______________________________________________________________________________
00219 TProofPlayer::~TProofPlayer()
00220 {
00221    // Destructor.
00222 
00223    fInput->Clear("nodelete");
00224    SafeDelete(fInput);
00225    // The output list is owned by fSelector and destroyed in there
00226    SafeDelete(fSelector);
00227    SafeDelete(fFeedbackTimer);
00228    SafeDelete(fEvIter);
00229    SafeDelete(fQueryResults);
00230    SafeDelete(fDispatchTimer);
00231    SafeDelete(fStopTimer);
00232 }
00233 
00234 //______________________________________________________________________________
00235 void TProofPlayer::SetProcessing(Bool_t on)
00236 {
00237    // Set processing bit according to 'on'
00238 
00239    if (on)
00240       SetBit(TProofPlayer::kIsProcessing);
00241    else
00242       ResetBit(TProofPlayer::kIsProcessing);
00243 }
00244 
00245 //______________________________________________________________________________
00246 void TProofPlayer::StopProcess(Bool_t abort, Int_t timeout)
00247 {
00248    // Stop the process after this event. If timeout is positive, start
00249    // a timer firing after timeout seconds to hard-stop time-expensive
00250    // events.
00251 
00252    if (gDebug > 0)
00253       Info ("StopProcess","abort: %d, timeout: %d", abort, timeout);
00254 
00255    if (fEvIter != 0)
00256       fEvIter->StopProcess(abort);
00257    Long_t to = 1;
00258    if (abort == kTRUE) {
00259       fExitStatus = kAborted;
00260    } else {
00261       fExitStatus = kStopped;
00262       to = timeout;
00263    }
00264    // Start countdown, if needed
00265    if (to > 0)
00266       SetStopTimer(kTRUE, abort, to);
00267 }
00268 
00269 //______________________________________________________________________________
00270 void TProofPlayer::SetDispatchTimer(Bool_t on)
00271 {
00272    // Enable/disable the timer to dispatch pening events while processing.
00273 
00274    SafeDelete(fDispatchTimer);
00275    ResetBit(TProofPlayer::kDispatchOneEvent);
00276    if (on) {
00277       fDispatchTimer = new TDispatchTimer(this);
00278       fDispatchTimer->Start();
00279    }
00280 }
00281 
00282 //______________________________________________________________________________
00283 void TProofPlayer::SetStopTimer(Bool_t on, Bool_t abort, Int_t timeout)
00284 {
00285    // Enable/disable the timer to stop/abort processing.
00286    // The 'timeout' is in seconds.
00287 
00288    fStopTimerMtx = (fStopTimerMtx) ? fStopTimerMtx : new TMutex(kTRUE);
00289    R__LOCKGUARD(fStopTimerMtx);
00290 
00291    // Clean-up the timer
00292    SafeDelete(fStopTimer);
00293    if (on) {
00294       // create timer
00295       fStopTimer = new TStopTimer(this, abort, timeout);
00296       // Start the countdown
00297       fStopTimer->Start();
00298       if (gDebug > 0)
00299          Info ("SetStopTimer", "%s timer STARTED (timeout: %d)",
00300                                (abort ? "ABORT" : "STOP"), timeout);
00301    } else {
00302       if (gDebug > 0)
00303          Info ("SetStopTimer", "timer STOPPED");
00304    }
00305 }
00306 
00307 //______________________________________________________________________________
00308 void TProofPlayer::AddQueryResult(TQueryResult *q)
00309 {
00310    // Add query result to the list, making sure that there are no
00311    // duplicates.
00312 
00313    if (!q) {
00314       Warning("AddQueryResult","query undefined - do nothing");
00315       return;
00316    }
00317 
00318    // Treat differently normal and draw queries
00319    if (!(q->IsDraw())) {
00320       if (!fQueryResults) {
00321          fQueryResults = new TList;
00322          fQueryResults->Add(q);
00323       } else {
00324          TIter nxr(fQueryResults);
00325          TQueryResult *qr = 0;
00326          TQueryResult *qp = 0;
00327          while ((qr = (TQueryResult *) nxr())) {
00328             // If same query, remove old version and break
00329             if (*qr == *q) {
00330                fQueryResults->Remove(qr);
00331                delete qr;
00332                break;
00333             }
00334             // Record position according to start time
00335             if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
00336                qp = qr;
00337          }
00338 
00339          if (!qp) {
00340             fQueryResults->AddFirst(q);
00341          } else {
00342             fQueryResults->AddAfter(qp, q);
00343          }
00344       }
00345    } else if (IsClient()) {
00346       // If max reached, eliminate first the oldest one
00347       if (fDrawQueries == fMaxDrawQueries && fMaxDrawQueries > 0) {
00348          TIter nxr(fQueryResults);
00349          TQueryResult *qr = 0;
00350          while ((qr = (TQueryResult *) nxr())) {
00351             // If same query, remove old version and break
00352             if (qr->IsDraw()) {
00353                fDrawQueries--;
00354                fQueryResults->Remove(qr);
00355                delete qr;
00356                break;
00357             }
00358          }
00359       }
00360       // Add new draw query
00361       if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
00362          fDrawQueries++;
00363          if (!fQueryResults)
00364             fQueryResults = new TList;
00365          fQueryResults->Add(q);
00366       }
00367    }
00368 }
00369 
00370 //______________________________________________________________________________
00371 void TProofPlayer::RemoveQueryResult(const char *ref)
00372 {
00373    // Remove all query result instances referenced 'ref' from
00374    // the list of results.
00375 
00376    if (fQueryResults) {
00377       TIter nxq(fQueryResults);
00378       TQueryResult *qr = 0;
00379       while ((qr = (TQueryResult *) nxq())) {
00380          if (qr->Matches(ref)) {
00381             fQueryResults->Remove(qr);
00382             delete qr;
00383          }
00384       }
00385    }
00386 }
00387 
00388 //______________________________________________________________________________
00389 TQueryResult *TProofPlayer::GetQueryResult(const char *ref)
00390 {
00391    // Get query result instances referenced 'ref' from
00392    // the list of results.
00393 
00394    if (fQueryResults) {
00395       if (ref && strlen(ref) > 0) {
00396          TIter nxq(fQueryResults);
00397          TQueryResult *qr = 0;
00398          while ((qr = (TQueryResult *) nxq())) {
00399             if (qr->Matches(ref))
00400                return qr;
00401          }
00402       } else {
00403          // Get last
00404          return (TQueryResult *) fQueryResults->Last();
00405       }
00406    }
00407 
00408    // Nothing found
00409    return (TQueryResult *)0;
00410 }
00411 
00412 //______________________________________________________________________________
00413 void TProofPlayer::SetCurrentQuery(TQueryResult *q)
00414 {
00415    // Set current query and save previous value.
00416 
00417    fPreviousQuery = fQuery;
00418    fQuery = q;
00419 }
00420 
00421 //______________________________________________________________________________
00422 void TProofPlayer::AddInput(TObject *inp)
00423 {
00424    // Add object to input list.
00425 
00426    fInput->Add(inp);
00427 }
00428 
00429 //______________________________________________________________________________
00430 void TProofPlayer::ClearInput()
00431 {
00432    // Clear input list.
00433 
00434    fInput->Clear();
00435 }
00436 
00437 //______________________________________________________________________________
00438 TObject *TProofPlayer::GetOutput(const char *name) const
00439 {
00440    // Get output object by name.
00441 
00442    if (fOutput)
00443       return fOutput->FindObject(name);
00444    return 0;
00445 }
00446 
00447 //______________________________________________________________________________
00448 TList *TProofPlayer::GetOutputList() const
00449 {
00450    // Get output list.
00451 
00452    TList *ol = fOutput;
00453    if (!ol && fQuery)
00454       ol = fQuery->GetOutputList();
00455    return ol;
00456 }
00457 
00458 //______________________________________________________________________________
00459 Int_t TProofPlayer::ReinitSelector(TQueryResult *qr)
00460 {
00461    // Reinitialize fSelector using the selector files in the query result.
00462    // Needed when Finalize is called after a Process execution for the same
00463    // selector name.
00464 
00465    Int_t rc = 0;
00466 
00467    // Make sure we have a query
00468    if (!qr) {
00469       Info("ReinitSelector", "query undefined - do nothing");
00470       return -1;
00471    }
00472 
00473    // Selector name
00474    TString selec = qr->GetSelecImp()->GetName();
00475    if (selec.Length() <= 0) {
00476       Info("ReinitSelector", "selector name undefined - do nothing");
00477       return -1;
00478    }
00479 
00480    // Find out if this is a standard selection used for Draw actions
00481    Bool_t stdselec = TSelector::IsStandardDraw(selec);
00482 
00483    // Find out if this is a precompiled selector: in such a case we do not
00484    // have the code in TMacros, so we must rely on local libraries
00485    Bool_t compselec = (selec.Contains(".") || stdselec) ? kFALSE : kTRUE;
00486 
00487    // If not, find out if it needs to be expanded
00488    TString ipathold;
00489    if (!stdselec && !compselec) {
00490       // Check checksums for the versions of the selector files
00491       Bool_t expandselec = kTRUE;
00492       TString dir, ipath;
00493       char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
00494       if (selc) {
00495          // Check checksums
00496          TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
00497          // Implementation files
00498          md5icur = TMD5::FileChecksum(selc);
00499          md5iold = qr->GetSelecImp()->Checksum();
00500          // Header files
00501          TString selh(selc);
00502          Int_t dot = selh.Last('.');
00503          if (dot != kNPOS) selh.Remove(dot);
00504          selh += ".h";
00505          if (!gSystem->AccessPathName(selh, kReadPermission))
00506             md5hcur = TMD5::FileChecksum(selh);
00507          md5hold = qr->GetSelecHdr()->Checksum();
00508 
00509          // If nothing has changed nothing to do
00510          if (md5hcur && md5hold && md5icur && md5iold)
00511             if (*md5hcur == *md5hold && *md5icur == *md5iold)
00512                expandselec = kFALSE;
00513 
00514          SafeDelete(md5icur);
00515          SafeDelete(md5hcur);
00516          SafeDelete(md5iold);
00517          SafeDelete(md5hold);
00518          if (selc) delete [] selc;
00519       }
00520 
00521       Bool_t ok = kTRUE;
00522       // Expand selector files, if needed
00523       if (expandselec) {
00524 
00525          ok = kFALSE;
00526          // Expand files in a temporary directory
00527          TUUID u;
00528          dir = Form("%s/%s",gSystem->TempDirectory(),u.AsString());
00529          if (!(gSystem->MakeDirectory(dir))) {
00530 
00531             // Export implementation file
00532             selec = Form("%s/%s",dir.Data(),selec.Data());
00533             qr->GetSelecImp()->SaveSource(selec);
00534 
00535             // Export header file
00536             TString seleh = Form("%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
00537             qr->GetSelecHdr()->SaveSource(seleh);
00538 
00539             // Adjust include path
00540             ipathold = gSystem->GetIncludePath();
00541             ipath = Form("-I%s %s", dir.Data(), gSystem->GetIncludePath());
00542             gSystem->SetIncludePath(ipath.Data());
00543 
00544             ok = kTRUE;
00545          }
00546       }
00547       TString opt(qr->GetOptions());
00548       Ssiz_t id = opt.Last('#');
00549       if (id != kNPOS && id < opt.Length() - 1)
00550          selec += opt(id + 1, opt.Length());
00551 
00552       if (!ok) {
00553          Info("ReinitSelector", "problems locating or exporting selector files");
00554          return -1;
00555       }
00556    }
00557 
00558    // Cleanup previous stuff
00559    SafeDelete(fSelector);
00560    fSelectorClass = 0;
00561 
00562    // Init the selector now
00563    Int_t iglevelsave = gErrorIgnoreLevel;
00564    if (compselec)
00565       // Silent error printout on first attempt
00566       gErrorIgnoreLevel = kBreak;
00567 
00568    if ((fSelector = TSelector::GetSelector(selec))) {
00569       if (compselec)
00570          gErrorIgnoreLevel = iglevelsave; // restore ignore level
00571       fSelectorClass = fSelector->IsA();
00572       fSelector->SetOption(qr->GetOptions());
00573 
00574    } else {
00575       if (compselec) {
00576          gErrorIgnoreLevel = iglevelsave; // restore ignore level
00577          // Retry by loading first the libraries listed in TQueryResult, if any
00578          if (strlen(qr->GetLibList()) > 0) {
00579             TString sl(qr->GetLibList());
00580             TObjArray *oa = sl.Tokenize(" ");
00581             if (oa) {
00582                Bool_t retry = kFALSE;
00583                TIter nxl(oa);
00584                TObjString *os = 0;
00585                while ((os = (TObjString *) nxl())) {
00586                   TString lib = gSystem->BaseName(os->GetName());
00587                   if (lib != "lib") {
00588                      lib.ReplaceAll("-l", "lib");
00589                      if (gSystem->Load(lib) == 0)
00590                         retry = kTRUE;
00591                   }
00592                }
00593                // Retry now, if the case
00594                if (retry)
00595                   fSelector = TSelector::GetSelector(selec);
00596             }
00597          }
00598       }
00599       if (!fSelector) {
00600          if (compselec)
00601             Info("ReinitSelector", "compiled selector re-init failed:"
00602                                    " automatic reload unsuccessful:"
00603                                    " please load manually the correct library");
00604          rc = -1;
00605       }
00606    }
00607    if (fSelector) {
00608       // Draw needs to reinit temp histos
00609       fSelector->SetInputList(qr->GetInputList());
00610       if (stdselec) {
00611          ((TProofDraw *)fSelector)->DefVar();
00612       } else {
00613          // variables may have been initialized in Begin()
00614          fSelector->Begin(0);
00615       }
00616    }
00617 
00618    // Restore original include path, if needed
00619    if (ipathold.Length() > 0)
00620       gSystem->SetIncludePath(ipathold.Data());
00621 
00622    return rc;
00623 }
00624 
00625 //______________________________________________________________________________
00626 Int_t TProofPlayer::AddOutputObject(TObject *)
00627 {
00628    // Incorporate output object (may not be used in this class).
00629 
00630    MayNotUse("AddOutputObject");
00631    return -1;
00632 }
00633 
00634 //______________________________________________________________________________
00635 void TProofPlayer::AddOutput(TList *)
00636 {
00637    // Incorporate output list (may not be used in this class).
00638 
00639    MayNotUse("AddOutput");
00640 }
00641 
00642 //______________________________________________________________________________
00643 void TProofPlayer::StoreOutput(TList *)
00644 {
00645    // Store output list (may not be used in this class).
00646 
00647    MayNotUse("StoreOutput");
00648 }
00649 
00650 //______________________________________________________________________________
00651 void TProofPlayer::StoreFeedback(TObject *, TList *)
00652 {
00653    // Store feedback list (may not be used in this class).
00654 
00655    MayNotUse("StoreFeedback");
00656 }
00657 
00658 //______________________________________________________________________________
00659 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/)
00660 {
00661    // Report progress (may not be used in this class).
00662 
00663    MayNotUse("Progress");
00664 }
00665 
00666 //______________________________________________________________________________
00667 void TProofPlayer::Progress(Long64_t /*total*/, Long64_t /*processed*/,
00668                             Long64_t /*bytesread*/,
00669                             Float_t /*evtRate*/, Float_t /*mbRate*/,
00670                             Float_t /*evtrti*/, Float_t /*mbrti*/)
00671 {
00672    // Report progress (may not be used in this class).
00673 
00674    MayNotUse("Progress");
00675 }
00676 
00677 //______________________________________________________________________________
00678 void TProofPlayer::Progress(TProofProgressInfo * /*pi*/)
00679 {
00680    // Report progress (may not be used in this class).
00681 
00682    MayNotUse("Progress");
00683 }
00684 
00685 //______________________________________________________________________________
00686 void TProofPlayer::Feedback(TList *)
00687 {
00688    // Set feedback list (may not be used in this class).
00689 
00690    MayNotUse("Feedback");
00691 }
00692 
00693 //______________________________________________________________________________
00694 TDrawFeedback *TProofPlayer::CreateDrawFeedback(TProof *p)
00695 {
00696    // Draw feedback creation proxy. When accessed via TProof avoids
00697    // link dependency on libProofPlayer.
00698 
00699    return new TDrawFeedback(p);
00700 }
00701 
00702 //______________________________________________________________________________
00703 void TProofPlayer::SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
00704 {
00705    // Set draw feedback option.
00706 
00707    if (f)
00708       f->SetOption(opt);
00709 }
00710 
00711 //______________________________________________________________________________
00712 void TProofPlayer::DeleteDrawFeedback(TDrawFeedback *f)
00713 {
00714    // Delete draw feedback object.
00715 
00716    delete f;
00717 }
00718 
00719 //______________________________________________________________________________
00720 Long64_t TProofPlayer::Process(TDSet *dset, const char *selector_file,
00721                                Option_t *option, Long64_t nentries,
00722                                Long64_t first)
00723 {
00724    // Process specified TDSet on PROOF worker.
00725    // The return value is -1 in case of error and TSelector::GetStatus()
00726    // in case of success.
00727 
00728    PDB(kGlobal,1) Info("Process","Enter");
00729 
00730    fExitStatus = kFinished;
00731    fOutput = 0;
00732 
00733    TCleanup clean(this);
00734 
00735    SafeDelete(fSelector);
00736    fSelectorClass = 0;
00737    Int_t version = -1;
00738    TRY {
00739       // Get selector files from cache
00740       if (gProofServ) {
00741          gProofServ->GetCacheLock()->Lock();
00742          gProofServ->CopyFromCache(selector_file, 1);
00743       }
00744 
00745       if (!(fSelector = TSelector::GetSelector(selector_file))) {
00746          Error("Process", "cannot load: %s", selector_file );
00747          gProofServ->GetCacheLock()->Unlock();
00748          return -1;
00749       }
00750 
00751       // Save binaries to cache, if any
00752       if (gProofServ) {
00753          gProofServ->CopyToCache(selector_file, 1);
00754          gProofServ->GetCacheLock()->Unlock();
00755       }
00756 
00757       fSelectorClass = fSelector->IsA();
00758       version = fSelector->Version();
00759 
00760       fOutput = fSelector->GetOutputList();
00761 
00762       if (gProofServ)
00763          TPerfStats::Start(fInput, fOutput);
00764 
00765       fSelStatus = new TStatus;
00766       fOutput->Add(fSelStatus);
00767 
00768       fSelector->SetOption(option);
00769       fSelector->SetInputList(fInput);
00770 
00771       // If in sequential (0-PROOF) mode validate the data set to get
00772       // the number of entries
00773       fTotalEvents = nentries;
00774       if (fTotalEvents < 0 && gProofServ &&
00775          gProofServ->IsMaster() && !gProofServ->IsParallel()) {
00776          dset->Validate();
00777          dset->Reset();
00778          TDSetElement *e = 0;
00779          while ((e = dset->Next())) {
00780             fTotalEvents += e->GetNum();
00781          }
00782       }
00783 
00784       dset->Reset();
00785 
00786       // Set parameters controlling the iterator behaviour
00787       Int_t useTreeCache = 1;
00788       if (TProof::GetParameter(fInput, "PROOF_UseTreeCache", useTreeCache) == 0) {
00789          if (useTreeCache > -1 && useTreeCache < 2)
00790             gEnv->SetValue("ProofPlayer.UseTreeCache", useTreeCache);
00791       }
00792       Long64_t cacheSize = -1;
00793       if (TProof::GetParameter(fInput, "PROOF_CacheSize", cacheSize) == 0) {
00794          TString sz = TString::Format("%lld", cacheSize);
00795          gEnv->SetValue("ProofPlayer.CacheSize", sz.Data());
00796       }
00797       // Parallel unzipping
00798       Int_t useParallelUnzip = 0;
00799       if (TProof::GetParameter(fInput, "PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
00800          if (useParallelUnzip > -1 && useParallelUnzip < 2)
00801             gEnv->SetValue("ProofPlayer.UseParallelUnzip", useParallelUnzip);
00802       }
00803       fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
00804 
00805       if (version == 0) {
00806          PDB(kLoop,1) Info("Process","Call Begin(0)");
00807          fSelector->Begin(0);
00808       } else {
00809          if (IsClient()) {
00810             // on client (for local run)
00811             PDB(kLoop,1) Info("Process","Call Begin(0)");
00812             fSelector->Begin(0);
00813          }
00814          if (fSelStatus->IsOk()) {
00815             PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
00816             fSelector->SlaveBegin(0);  // Init is called explicitly
00817                                        // from GetNextEvent()
00818          }
00819       }
00820 
00821    } CATCH(excode) {
00822       SetProcessing(kFALSE);
00823       Error("Process","exception %d caught", excode);
00824       gProofServ->GetCacheLock()->Unlock();
00825       return -1;
00826    } ENDTRY;
00827 
00828    // Create feedback lists, if required
00829    SetupFeedback();
00830 
00831    if (gMonitoringWriter)
00832       gMonitoringWriter->SendProcessingStatus("STARTED",kTRUE);
00833 
00834    PDB(kLoop,1)
00835       Info("Process","Looping over Process()");
00836 
00837    // get the byte read counter at the beginning of processing
00838    Long64_t readbytesatstart = TFile::GetFileBytesRead();
00839    Long64_t readcallsatstart = TFile::GetFileReadCalls();
00840    // force the first monitoring info
00841    if (gMonitoringWriter)
00842       gMonitoringWriter->SendProcessingProgress(0,0,kTRUE);
00843 
00844    // Start asynchronous timer to dispatch pending events
00845    SetDispatchTimer(kTRUE);
00846 
00847    // Loop over range
00848    gAbort = kFALSE;
00849    Long64_t entry;
00850    fProgressStatus->Reset();
00851    if (gProofServ) gProofServ->ResetBit(TProofServ::kHighMemory);
00852 
00853    TRY {
00854 
00855       // Get the frequency for checking memory consumption and logging information
00856       TParameter<Long64_t> *par = (TParameter<Long64_t>*)fInput->FindObject("PROOF_MemLogFreq");
00857       Long64_t memlogfreq = (par) ? par->GetVal() : 100;
00858       Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
00859       TString lastMsg;
00860 
00861       TPair *currentElem = 0;
00862       // The event loop on the worker
00863       while ((entry = fEvIter->GetNextEvent()) >= 0 && fSelStatus->IsOk()) {
00864 
00865          // This is needed by the inflate infrastructure to calculate
00866          // sleeping times
00867          SetProcessing(kTRUE);
00868 
00869          // Give the possibility to the selector to access additional info in the
00870          // incoming packet
00871          lastMsg = "(unfortunately no detailed info is available about current packet)";
00872          if (dset->Current()) {
00873             if (!currentElem) {
00874                currentElem = new TPair(new TObjString("PROOF_CurrentElement"), dset->Current());
00875                fInput->Add(currentElem);
00876             } else {
00877                if (currentElem->Value() != dset->Current()) {
00878                   currentElem->SetValue(dset->Current());
00879                } else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
00880                   dset->Current()->ResetBit(TDSetElement::kNewRun);
00881                }
00882             }
00883             if (dset->TestBit(TDSet::kEmpty)) {
00884                lastMsg.Form("while processing cycle:%lld - check logs for possible stacktrace", entry);
00885             } else {
00886                TDSetElement *elem = dynamic_cast<TDSetElement *>(currentElem->Value());
00887                TString fn = (elem) ? elem->GetFileName() : "<undef>";
00888                lastMsg.Form("while processing dset:'%s', file:'%s', event:%lld"
00889                             " - check logs for possible stacktrace", dset->GetName(), fn.Data(), entry);
00890             }
00891          }
00892          // This will be sent to clients in case of exceptions ...
00893          TProofServ::SetLastMsg(lastMsg);
00894 
00895          if (version == 0) {
00896             PDB(kLoop,3)
00897                Info("Process","Call ProcessCut(%lld)", entry);
00898             if (fSelector->ProcessCut(entry)) {
00899                PDB(kLoop,3)
00900                   Info("Process","Call ProcessFill(%lld)", entry);
00901                fSelector->ProcessFill(entry);
00902             }
00903          } else {
00904             PDB(kLoop,3)
00905                Info("Process","Call Process(%lld)", entry);
00906             fSelector->Process(entry);
00907             if (fSelector->GetAbort() == TSelector::kAbortProcess) {
00908                SetProcessing(kFALSE);
00909                break;
00910             }
00911          }
00912 
00913          if (fSelStatus->IsOk()) {
00914             fProgressStatus->IncEntries();
00915             fProgressStatus->SetBytesRead(TFile::GetFileBytesRead()-readbytesatstart);
00916             fProgressStatus->SetReadCalls(TFile::GetFileReadCalls()-readcallsatstart);
00917             if (gMonitoringWriter)
00918                gMonitoringWriter->SendProcessingProgress(fProgressStatus->GetEntries(),
00919                        TFile::GetFileBytesRead()-readbytesatstart, kFALSE);
00920          }
00921          // Check the memory footprint, if required
00922          TString wmsg;
00923          if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
00924             Error("Process", "%s", wmsg.Data());
00925             if (gProofServ) {
00926                wmsg.Insert(0, TString::Format("ERROR:%s, entry:%lld, ", gProofServ->GetOrdinal(), entry));
00927                gProofServ->SendAsynMessage(wmsg.Data());
00928             }
00929             fExitStatus = kStopped;
00930             SetProcessing(kFALSE);
00931             if (gProofServ) gProofServ->SetBit(TProofServ::kHighMemory);
00932             break;
00933          } else {
00934             if (!wmsg.IsNull()) {
00935                Warning("Process", "%s", wmsg.Data());
00936                if (gProofServ) {
00937                   wmsg.Insert(0, TString::Format("WARNING:%s, entry:%lld, ", gProofServ->GetOrdinal(), entry));
00938                   gProofServ->SendAsynMessage(wmsg.Data());
00939                }
00940             }
00941          }
00942 
00943          if (TestBit(TProofPlayer::kDispatchOneEvent)) {
00944             gSystem->DispatchOneEvent(kTRUE);
00945             ResetBit(TProofPlayer::kDispatchOneEvent);
00946          }
00947          SetProcessing(kFALSE);
00948          if (!fSelStatus->IsOk() || gROOT->IsInterrupted()) break;
00949       }
00950 
00951    } CATCH(excode) {
00952       if (excode == kPEX_STOPPED) {
00953          Info("Process","received stop-process signal");
00954          fExitStatus = kStopped;
00955       } else if (excode == kPEX_ABORTED) {
00956          gAbort = kTRUE;
00957          Info("Process","received abort-process signal");
00958          fExitStatus = kAborted;
00959       } else {
00960          Error("Process","exception %d caught", excode);
00961          // Perhaps we need a dedicated status code here ...
00962          gAbort = kTRUE;
00963          fExitStatus = kAborted;
00964       }
00965       SetProcessing(kFALSE);
00966    } ENDTRY;
00967 
00968    // Clean-up the envelop for the current element
00969    TPair *currentElem = 0;
00970    if ((currentElem = (TPair *) fInput->FindObject("PROOF_CurrentElement"))) {
00971       fInput->Remove(currentElem);
00972       delete currentElem->Key();
00973       delete currentElem;
00974    }
00975 
00976    PDB(kGlobal,2)
00977       Info("Process","%lld events processed", fProgressStatus->GetEntries());
00978 
00979    if (gMonitoringWriter) {
00980       gMonitoringWriter->SendProcessingProgress(fProgressStatus->GetEntries(), TFile::GetFileBytesRead()-readbytesatstart, kFALSE);
00981       gMonitoringWriter->SendProcessingStatus("DONE");
00982    }
00983 
00984    // Stop active timers
00985    SetDispatchTimer(kFALSE);
00986    if (fStopTimer != 0)
00987       SetStopTimer(kFALSE, gAbort);
00988    if (fFeedbackTimer != 0)
00989       HandleTimer(0);
00990 
00991    StopFeedback();
00992 
00993    SafeDelete(fEvIter);
00994 
00995    // Finalize
00996 
00997    if (fExitStatus != kAborted) {
00998 
00999       TIter nxo(GetOutputList());
01000       TObject *o = 0;
01001       while ((o = nxo())) {
01002          // Special treatment for files
01003          if (o->IsA() == TProofOutputFile::Class()) {
01004             ((TProofOutputFile *)o)->SetWorkerOrdinal(gProofServ->GetOrdinal());
01005             if (!strcmp(((TProofOutputFile *)o)->GetDir(),""))
01006                ((TProofOutputFile *)o)->SetDir(gProofServ->GetSessionDir());
01007          }
01008       }
01009 
01010       MapOutputListToDataMembers();
01011 
01012       if (fSelStatus->IsOk()) {
01013          if (version == 0) {
01014             PDB(kLoop,1) Info("Process","Call Terminate()");
01015             fSelector->Terminate();
01016          } else {
01017             PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
01018             fSelector->SlaveTerminate();
01019             if (IsClient() && fSelStatus->IsOk()) {
01020                PDB(kLoop,1) Info("Process","Call Terminate()");
01021                fSelector->Terminate();
01022             }
01023          }
01024       }
01025       if (gProofServ && !gProofServ->IsParallel()) {  // put all the canvases onto the output list
01026          TIter nxc(gROOT->GetListOfCanvases());
01027          while (TObject *c = nxc())
01028             fOutput->Add(c);
01029       }
01030    }
01031 
01032    if (gProofServ)
01033       TPerfStats::Stop();
01034 
01035    return 0;
01036 }
01037 
01038 //______________________________________________________________________________
01039 Bool_t TProofPlayer::CheckMemUsage(Long64_t &mfreq, Bool_t &w80r,
01040                                    Bool_t &w80v, TString &wmsg)
01041 {
01042    // Check the memory usage, if requested.
01043    // Return kTRUE if OK, kFALSE if above 95% of at least one between virtual or
01044    // resident limits are depassed.
01045 
01046    if (mfreq > 0 && GetEventsProcessed()%mfreq == 0) {
01047       // Record the memory information
01048       ProcInfo_t pi;
01049       if (!gSystem->GetProcInfo(&pi)){
01050          Info("CheckMemUsage|Svc", "Memory %ld virtual %ld resident event %lld",
01051                                    pi.fMemVirtual, pi.fMemResident, GetEventsProcessed());
01052          wmsg = "";
01053          // Apply limit on virtual memory, if any: warn if above 80%, stop if above 95% of max
01054          if (TProofServ::GetVirtMemMax() > 0) {
01055             if (pi.fMemVirtual > TProofServ::GetMemStop() * TProofServ::GetVirtMemMax()) {
01056                wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)"
01057                          " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
01058                return kFALSE;
01059             } else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
01060                // Refine monitoring
01061                mfreq = 1;
01062                wmsg.Form("using more than %d%% of allowed virtual memory (%ld kB)",
01063                          (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
01064                w80v = kFALSE;
01065             }
01066          }
01067          // Apply limit on resident memory, if any: warn if above 80%, stop if above 95% of max
01068          if (TProofServ::GetResMemMax() > 0) {
01069             if (pi.fMemResident > TProofServ::GetMemStop() * TProofServ::GetResMemMax()) {
01070                wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)"
01071                          " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
01072                return kFALSE;
01073             } else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
01074                // Refine monitoring
01075                mfreq = 1;
01076                if (wmsg.Length() > 0) {
01077                   wmsg.Form("using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
01078                             (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
01079                } else {
01080                   wmsg.Form("using more than %d%% of allowed resident memory (%ld kB)",
01081                             (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
01082                }
01083                w80r = kFALSE;
01084             }
01085          }
01086       }
01087    }
01088    // Done
01089    return kTRUE;
01090 }
01091 
01092 //______________________________________________________________________________
01093 Long64_t TProofPlayer::Finalize(Bool_t, Bool_t)
01094 {
01095    // Finalize query (may not be used in this class).
01096 
01097    MayNotUse("Finalize");
01098    return -1;
01099 }
01100 
01101 //______________________________________________________________________________
01102 Long64_t TProofPlayer::Finalize(TQueryResult *)
01103 {
01104    // Finalize query (may not be used in this class).
01105 
01106    MayNotUse("Finalize");
01107    return -1;
01108 }
01109 //______________________________________________________________________________
01110 void TProofPlayer::MergeOutput()
01111 {
01112    // Merge output (may not be used in this class).
01113 
01114    MayNotUse("MergeOutput");
01115    return;
01116 }
01117 
01118 //______________________________________________________________________________
01119 void TProofPlayer::MapOutputListToDataMembers() const
01120 {
01121    TOutputListSelectorDataMap* olsdm = new TOutputListSelectorDataMap(fSelector);
01122    fOutput->Add(olsdm);
01123 }
01124 
01125 //______________________________________________________________________________
01126 void TProofPlayer::UpdateAutoBin(const char *name,
01127                                  Double_t& xmin, Double_t& xmax,
01128                                  Double_t& ymin, Double_t& ymax,
01129                                  Double_t& zmin, Double_t& zmax)
01130 {
01131    // Update automatic binning parameters for given object "name".
01132 
01133    if ( fAutoBins == 0 ) {
01134       fAutoBins = new THashList;
01135    }
01136 
01137    TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
01138 
01139    if ( val == 0 ) {
01140       //look for info in higher master
01141       if (gProofServ && !gProofServ->IsTopMaster()) {
01142          TString key = name;
01143          TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
01144       }
01145 
01146       val = new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
01147       fAutoBins->Add(val);
01148    } else {
01149       val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
01150    }
01151 }
01152 
01153 //______________________________________________________________________________
01154 TDSetElement *TProofPlayer::GetNextPacket(TSlave *, TMessage *)
01155 {
01156    // Get next packet (may not be used in this class).
01157 
01158    MayNotUse("GetNextPacket");
01159    return 0;
01160 }
01161 
01162 //______________________________________________________________________________
01163 void TProofPlayer::SetupFeedback()
01164 {
01165    // Set up feedback (may not be used in this class).
01166 
01167    MayNotUse("SetupFeedback");
01168 }
01169 
01170 //______________________________________________________________________________
01171 void TProofPlayer::StopFeedback()
01172 {
01173    // Stop feedback (may not be used in this class).
01174 
01175    MayNotUse("StopFeedback");
01176 }
01177 
01178 //______________________________________________________________________________
01179 Long64_t TProofPlayer::DrawSelect(TDSet * /*set*/, const char * /*varexp*/,
01180                                   const char * /*selection*/, Option_t * /*option*/,
01181                                   Long64_t /*nentries*/, Long64_t /*firstentry*/)
01182 {
01183    // Draw (may not be used in this class).
01184 
01185    MayNotUse("DrawSelect");
01186    return -1;
01187 }
01188 
01189 //______________________________________________________________________________
01190 void TProofPlayer::HandleGetTreeHeader(TMessage *)
01191 {
01192    // Handle tree header request.
01193 
01194    MayNotUse("HandleGetTreeHeader|");
01195 }
01196 
01197 //______________________________________________________________________________
01198 void TProofPlayer::HandleRecvHisto(TMessage *mess)
01199 {
01200    // Receive histo from slave.
01201 
01202    TObject *obj = mess->ReadObject(mess->GetClass());
01203    if (obj->InheritsFrom(TH1::Class())) {
01204       TH1 *h = (TH1*)obj;
01205       h->SetDirectory(0);
01206       TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
01207       if (horg)
01208          horg->Add(h);
01209       else
01210          h->SetDirectory(gDirectory);
01211    }
01212 }
01213 
01214 //______________________________________________________________________________
01215 Int_t TProofPlayer::DrawCanvas(TObject *obj)
01216 {
01217    // Draw the object if it is a canvas.
01218    // Return 0 in case of success, 1 if it is not a canvas or libProofDraw
01219    // is not available.
01220 
01221    static Int_t (*gDrawCanvasHook)(TObject *) = 0;
01222 
01223    // Load the library the first time
01224    if (!gDrawCanvasHook) {
01225       // Load library needed for graphics ...
01226       TString drawlib = "libProofDraw";
01227       char *p = 0;
01228       if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
01229          delete[] p;
01230          if (gSystem->Load(drawlib) != -1) {
01231             // Locate DrawCanvas
01232             Func_t f = 0;
01233             if ((f = gSystem->DynFindSymbol(drawlib,"DrawCanvas")))
01234                gDrawCanvasHook = (Int_t (*)(TObject *))(f);
01235             else
01236                Warning("DrawCanvas", "can't find DrawCanvas");
01237          } else
01238             Warning("DrawCanvas", "can't load %s", drawlib.Data());
01239       } else
01240          Warning("DrawCanvas", "can't locate %s", drawlib.Data());
01241    }
01242    if (gDrawCanvasHook && obj)
01243       return (*gDrawCanvasHook)(obj);
01244    // No drawing hook or object undefined
01245    return 1;
01246 }
01247 
01248 //______________________________________________________________________________
01249 Int_t TProofPlayer::GetDrawArgs(const char *var, const char *sel, Option_t *opt,
01250                                 TString &selector, TString &objname)
01251 {
01252    // Parse the arguments from var, sel and opt and fill the selector and
01253    // object name accordingly.
01254    // Return 0 in case of success, 1 if libProofDraw is not available.
01255 
01256    static Int_t (*gGetDrawArgsHook)(const char *, const char *, Option_t *,
01257                                     TString &, TString &) = 0;
01258 
01259    // Load the library the first time
01260    if (!gGetDrawArgsHook) {
01261       // Load library needed for graphics ...
01262       TString drawlib = "libProofDraw";
01263       char *p = 0;
01264       if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
01265          delete[] p;
01266          if (gSystem->Load(drawlib) != -1) {
01267             // Locate GetDrawArgs
01268             Func_t f = 0;
01269             if ((f = gSystem->DynFindSymbol(drawlib,"GetDrawArgs")))
01270                gGetDrawArgsHook = (Int_t (*)(const char *, const char *, Option_t *,
01271                                              TString &, TString &))(f);
01272             else
01273                Warning("GetDrawArgs", "can't find GetDrawArgs");
01274          } else
01275             Warning("GetDrawArgs", "can't load %s", drawlib.Data());
01276       } else
01277          Warning("GetDrawArgs", "can't locate %s", drawlib.Data());
01278    }
01279    if (gGetDrawArgsHook)
01280       return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
01281    // No parser hook or object undefined
01282    return 1;
01283 }
01284 
01285 //______________________________________________________________________________
01286 void TProofPlayer::FeedBackCanvas(const char *name, Bool_t create)
01287 {
01288    // Create/destroy a named canvas for feedback
01289 
01290    static void (*gFeedBackCanvasHook)(const char *, Bool_t) = 0;
01291 
01292    // Load the library the first time
01293    if (!gFeedBackCanvasHook) {
01294       // Load library needed for graphics ...
01295       TString drawlib = "libProofDraw";
01296       char *p = 0;
01297       if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
01298          delete[] p;
01299          if (gSystem->Load(drawlib) != -1) {
01300             // Locate FeedBackCanvas
01301             Func_t f = 0;
01302             if ((f = gSystem->DynFindSymbol(drawlib,"FeedBackCanvas")))
01303                gFeedBackCanvasHook = (void (*)(const char *, Bool_t))(f);
01304             else
01305                Warning("FeedBackCanvas", "can't find FeedBackCanvas");
01306          } else
01307             Warning("FeedBackCanvas", "can't load %s", drawlib.Data());
01308       } else
01309          Warning("FeedBackCanvas", "can't locate %s", drawlib.Data());
01310    }
01311    if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
01312    // No parser hook or object undefined
01313    return;
01314 }
01315 
01316 //______________________________________________________________________________
01317 Long64_t TProofPlayer::GetCacheSize()
01318 {
01319    // Return the size in bytes of the cache
01320 
01321    if (fEvIter) return fEvIter->GetCacheSize();
01322    return -1;
01323 }
01324 
01325 //______________________________________________________________________________
01326 Int_t TProofPlayer::GetLearnEntries()
01327 {
01328    // Return the number of entries in the learning phase
01329 
01330    if (fEvIter) return fEvIter->GetLearnEntries();
01331    return -1;
01332 }
01333 
01334 //------------------------------------------------------------------------------
01335 
01336 ClassImp(TProofPlayerLocal)
01337 
01338 
01339 //------------------------------------------------------------------------------
01340 
01341 ClassImp(TProofPlayerRemote)
01342 
01343 
01344 //______________________________________________________________________________
01345 TProofPlayerRemote::~TProofPlayerRemote()
01346 {
01347    // Destructor.
01348 
01349    SafeDelete(fOutput);      // owns the output list
01350    SafeDelete(fOutputLists);
01351 
01352    // Objects stored in maps are already deleted when merging the feedback
01353    SafeDelete(fFeedbackLists);
01354    SafeDelete(fPacketizer);
01355 }
01356 
01357 //______________________________________________________________________________
01358 Int_t TProofPlayerRemote::InitPacketizer(TDSet *dset, Long64_t nentries,
01359                                          Long64_t first, const char *defpackunit,
01360                                          const char *defpackdata)
01361 {
01362    // Init the packetizer
01363    // Return 0 on success (fPacketizer is correctly initialized), -1 on failure.
01364 
01365    SafeDelete(fPacketizer);
01366    PDB(kGlobal,1) Info("Process","Enter");
01367    fDSet = dset;
01368    fExitStatus = kFinished;
01369 
01370    Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
01371 
01372    TString packetizer;
01373    TList *listOfMissingFiles = 0;
01374 
01375    TMethodCall callEnv;
01376    TClass *cl;
01377    noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
01378 
01379    if (noData) {
01380 
01381       if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
01382          packetizer = defpackunit;
01383       else
01384          Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
01385 
01386       // Get linked to the related class
01387       cl = TClass::GetClass(packetizer);
01388       if (cl == 0) {
01389          Error("InitPacketizer", "class '%s' not found", packetizer.Data());
01390          fExitStatus = kAborted;
01391          return -1;
01392       }
01393 
01394       // Init the constructor
01395       callEnv.InitWithPrototype(cl, cl->GetName(),"TList*,Long64_t,TList*,TProofProgressStatus*");
01396       if (!callEnv.IsValid()) {
01397          Error("InitPacketizer",
01398                "cannot find correct constructor for '%s'", cl->GetName());
01399          fExitStatus = kAborted;
01400          return -1;
01401       }
01402       callEnv.ResetParam();
01403       callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
01404       callEnv.SetParam((Long64_t) nentries);
01405       callEnv.SetParam((Long_t) fInput);
01406       callEnv.SetParam((Long_t) fProgressStatus);
01407 
01408    } else if (dset->TestBit(TDSet::kMultiDSet)) {
01409 
01410       // We have to process many datasets in one go, keeping them separate
01411       if (fProof->GetRunStatus() != TProof::kRunning) {
01412          // We have been asked to stop
01413          Error("InitPacketizer", "received stop/abort request");
01414          fExitStatus = kAborted;
01415          return -1;
01416       }
01417 
01418       // The multi packetizer
01419       packetizer = "TPacketizerMulti";
01420 
01421       // Get linked to the related class
01422       cl = TClass::GetClass(packetizer);
01423       if (cl == 0) {
01424          Error("InitPacketizer", "class '%s' not found", packetizer.Data());
01425          fExitStatus = kAborted;
01426          return -1;
01427       }
01428 
01429       // Init the constructor
01430       callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
01431       if (!callEnv.IsValid()) {
01432          Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
01433          fExitStatus = kAborted;
01434          return -1;
01435       }
01436       callEnv.ResetParam();
01437       callEnv.SetParam((Long_t) dset);
01438       callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
01439       callEnv.SetParam((Long64_t) first);
01440       callEnv.SetParam((Long64_t) nentries);
01441       callEnv.SetParam((Long_t) fInput);
01442       callEnv.SetParam((Long_t) fProgressStatus);
01443 
01444       // We are going to test validity during the packetizer initialization
01445       dset->SetBit(TDSet::kValidityChecked);
01446       dset->ResetBit(TDSet::kSomeInvalid);
01447 
01448    } else {
01449 
01450       // Lookup - resolve the end-point urls to optmize the distribution.
01451       // The lookup was previously called in the packetizer's constructor.
01452       // A list for the missing files may already have been added to the
01453       // output list; otherwise, if needed it will be created inside
01454       if ((listOfMissingFiles = (TList *)fInput->FindObject("MissingFiles"))) {
01455          // Move it to the output list
01456          fInput->Remove(listOfMissingFiles);
01457       } else {
01458          listOfMissingFiles = new TList;
01459       }
01460       // Do the lookup; we only skip it if explicitely requested so.
01461       TString lkopt;
01462       if (TProof::GetParameter(fInput, "PROOF_LookupOpt", lkopt) != 0 || lkopt != "none")
01463          dset->Lookup(kTRUE, &listOfMissingFiles);
01464 
01465       if (fProof->GetRunStatus() != TProof::kRunning) {
01466          // We have been asked to stop
01467          Error("InitPacketizer", "received stop/abort request");
01468          fExitStatus = kAborted;
01469          return -1;
01470       }
01471 
01472       if (!(dset->GetListOfElements()) ||
01473           !(dset->GetListOfElements()->GetSize())) {
01474          if (gProofServ)
01475             gProofServ->SendAsynMessage("InitPacketizer: No files from the data set were found - Aborting");
01476          Error("InitPacketizer", "No files from the data set were found - Aborting");
01477          fExitStatus = kAborted;
01478          if (listOfMissingFiles) {
01479             listOfMissingFiles->SetOwner();
01480             fOutput->Remove(listOfMissingFiles);
01481             SafeDelete(listOfMissingFiles);
01482          }
01483          return -1;
01484       }
01485 
01486       if (TProof::GetParameter(fInput, "PROOF_Packetizer", packetizer) != 0)
01487          // Using standard packetizer TAdaptivePacketizer
01488          packetizer = defpackdata;
01489       else
01490          Info("InitPacketizer", "using alternate packetizer: %s", packetizer.Data());
01491 
01492       // Get linked to the related class
01493       cl = TClass::GetClass(packetizer);
01494       if (cl == 0) {
01495          Error("InitPacketizer", "class '%s' not found", packetizer.Data());
01496          fExitStatus = kAborted;
01497          return -1;
01498       }
01499 
01500       // Init the constructor
01501       callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
01502       if (!callEnv.IsValid()) {
01503          Error("InitPacketizer", "cannot find correct constructor for '%s'", cl->GetName());
01504          fExitStatus = kAborted;
01505          return -1;
01506       }
01507       callEnv.ResetParam();
01508       callEnv.SetParam((Long_t) dset);
01509       callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
01510       callEnv.SetParam((Long64_t) first);
01511       callEnv.SetParam((Long64_t) nentries);
01512       callEnv.SetParam((Long_t) fInput);
01513       callEnv.SetParam((Long_t) fProgressStatus);
01514 
01515       // We are going to test validity during the packetizer initialization
01516       dset->SetBit(TDSet::kValidityChecked);
01517       dset->ResetBit(TDSet::kSomeInvalid);
01518    }
01519 
01520    // Get an instance of the packetizer
01521    Long_t ret = 0;
01522    callEnv.Execute(ret);
01523    if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
01524       Error("InitPacketizer", "cannot construct '%s'", cl->GetName());
01525       fExitStatus = kAborted;
01526       return -1;
01527    }
01528 
01529    if (!fPacketizer->IsValid()) {
01530       Error("InitPacketizer",
01531             "instantiated packetizer object '%s' is invalid", cl->GetName());
01532       fExitStatus = kAborted;
01533       SafeDelete(fPacketizer);
01534       return -1;
01535    }
01536 
01537    // In multi mode retrieve the list of missing files
01538    if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
01539       if ((listOfMissingFiles = (TList *) fInput->FindObject("MissingFiles"))) {
01540          // Remove it; it will be added to the output list
01541          fInput->Remove(listOfMissingFiles);
01542       }
01543    }
01544 
01545    if (!noData) {
01546       // Add invalid elements to the list of missing elements
01547       TDSetElement *elem = 0;
01548       if (dset->TestBit(TDSet::kSomeInvalid)) {
01549          TIter nxe(dset->GetListOfElements());
01550          while ((elem = (TDSetElement *)nxe())) {
01551             if (!elem->GetValid()) {
01552                if (!listOfMissingFiles)
01553                   listOfMissingFiles = new TList;
01554                listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
01555                dset->Remove(elem, kFALSE);
01556             }
01557          }
01558          // The invalid elements have been removed
01559          dset->ResetBit(TDSet::kSomeInvalid);
01560       }
01561 
01562       // Record the list of missing or invalid elements in the output list
01563       if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
01564          TIter missingFiles(listOfMissingFiles);
01565          TString msg;
01566          if (gDebug > 0) {
01567             TFileInfo *fi = 0;
01568             while ((fi = (TFileInfo *) missingFiles.Next())) {
01569                if (fi->GetCurrentUrl()) {
01570                   msg = Form("File not found: %s - skipping!",
01571                                                 fi->GetCurrentUrl()->GetUrl());
01572                } else {
01573                   msg = Form("File not found: %s - skipping!", fi->GetName());
01574                }
01575                if (gProofServ) gProofServ->SendAsynMessage(msg.Data());
01576             }
01577          }
01578          // Make sure it will be sent back
01579          if (!GetOutput("MissingFiles")) {
01580             listOfMissingFiles->SetName("MissingFiles");
01581             AddOutputObject(listOfMissingFiles);
01582          }
01583          TStatus *tmpStatus = (TStatus *)GetOutput("PROOF_Status");
01584          if (!tmpStatus) {
01585             tmpStatus = new TStatus();
01586             AddOutputObject(tmpStatus);
01587          }
01588          // Estimate how much data are missing
01589          Int_t ngood = dset->GetListOfElements()->GetSize();
01590          Int_t nbad = listOfMissingFiles->GetSize();
01591          Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
01592          msg = Form(" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
01593                     " the 'missingFiles' list", xb * 100., '%', nbad, nbad + ngood);
01594          tmpStatus->Add(msg.Data());
01595          msg = Form(" +++\n"
01596                     " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
01597                     " the 'MissingFiles' list\n"
01598                     " +++", xb * 100., '%', nbad, nbad + ngood);
01599          if (gProofServ) gProofServ->SendAsynMessage(msg.Data());
01600       } else {
01601          // Cleanup
01602          SafeDelete(listOfMissingFiles);
01603       }
01604    }
01605 
01606    // Done
01607    return 0;
01608 }
01609 
01610 //______________________________________________________________________________
01611 Long64_t TProofPlayerRemote::Process(TDSet *dset, const char *selector_file,
01612                                      Option_t *option, Long64_t nentries,
01613                                      Long64_t first)
01614 {
01615    // Process specified TDSet on PROOF.
01616    // This method is called on client and on the PROOF master.
01617    // The return value is -1 in case of an error and TSelector::GetStatus() in
01618    // in case of success.
01619 
01620    PDB(kGlobal,1) Info("Process","Enter");
01621    fDSet = dset;
01622    fExitStatus = kFinished;
01623 
01624    if (!fProgressStatus) {
01625       Error("Process", "No progress status");
01626       return -1;
01627    }
01628    fProgressStatus->Reset();
01629 
01630    //   delete fOutput;
01631    if (!fOutput)
01632       fOutput = new TList;
01633    else
01634       fOutput->Clear();
01635 
01636    SafeDelete(fFeedbackLists);
01637 
01638    if (fProof->IsMaster()){
01639       TPerfStats::Start(fInput, fOutput);
01640    } else {
01641       TPerfStats::Setup(fInput);
01642    }
01643 
01644    if(!SendSelector(selector_file)) return -1;
01645 
01646    TMessage mesg(kPROOF_PROCESS);
01647    TString fn(gSystem->BaseName(selector_file));
01648 
01649    // Parse option
01650    Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
01651 
01652    TDSet *set = dset;
01653    if (fProof->IsMaster()) {
01654 
01655       PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
01656       set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
01657                             dset->GetDirectory() );
01658       if (dset->TestBit(TDSet::kEmpty))
01659          set->SetBit(TDSet::kEmpty);
01660 
01661       const char *datapack = (fProof->IsLite()) ? "TPacketizer" : "TPacketizerAdaptive";
01662       if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", datapack) != 0) {
01663          Error("Process", "cannot init the packetizer");
01664          fExitStatus = kAborted;
01665          return -1;
01666       }
01667 
01668       // Reset start, this is now managed by the packetizer
01669       first = 0;
01670       // Try to have 100 messages about memory, unless a different number is given by the user
01671       if (!fProof->GetParameter("PROOF_MemLogFreq")){
01672          Long64_t memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
01673          memlogfreq = (memlogfreq > 0) ? memlogfreq : 1;
01674          fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
01675       }
01676 
01677       // Send input data, if any
01678       TString emsg;
01679       if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
01680          Warning("Process", "could not forward input data: %s", emsg.Data());
01681 
01682    } else {
01683 
01684       // Check whether we have to enforce the use of submergers
01685       if (gEnv->Lookup("Proof.UseMergers") && !fInput->FindObject("PROOF_UseMergers")) {
01686          Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
01687          if (smg >= 0) fInput->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
01688       }
01689 
01690       // For a new query clients should make sure that the temporary
01691       // output list is empty
01692       if (fOutputLists) {
01693          fOutputLists->Delete();
01694          delete fOutputLists;
01695          fOutputLists = 0;
01696       }
01697 
01698       if (!sync) {
01699          gSystem->RedirectOutput(fProof->fLogFileName);
01700          Printf(" ");
01701          Info("Process","starting new query");
01702       }
01703 
01704       SafeDelete(fSelector);
01705       fSelectorClass = 0;
01706       if (!(fSelector = TSelector::GetSelector(selector_file))) {
01707          if (!sync)
01708             gSystem->RedirectOutput(0);
01709          return -1;
01710       }
01711       fSelectorClass = fSelector->IsA();
01712       fSelector->SetInputList(fInput);
01713       fSelector->SetOption(option);
01714 
01715       PDB(kLoop,1) Info("Process","Call Begin(0)");
01716       fSelector->Begin(0);
01717 
01718       // Send large input data objects, if any
01719       fProof->SendInputDataFile();
01720 
01721       if (!sync)
01722          gSystem->RedirectOutput(0);
01723    }
01724 
01725    TCleanup clean(this);
01726    SetupFeedback();
01727 
01728    TString opt = option;
01729 
01730    // Old servers need a dedicated streamer
01731    if (fProof->fProtocol < 13)
01732       dset->SetWriteV3(kTRUE);
01733 
01734    // Workers will get the entry ranges from the packetizer
01735    Long64_t num = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : nentries;
01736    Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
01737 
01738    // Entry- or Event- list ?
01739    TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
01740                                            : (TEntryList *)0;
01741    TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
01742                                            : (TEventList *)0;
01743    if (fProof->fProtocol > 14) {
01744       mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
01745    } else {
01746       mesg << set << fn << fInput << opt << num << fst << evl << sync;
01747       if (enl)
01748          // Not supported remotely
01749          Warning("Process","entry lists not supported by the server");
01750    }
01751 
01752    // Reset the merging progress information
01753    fProof->ResetMergePrg();
01754 
01755    PDB(kGlobal,1) Info("Process","Calling Broadcast");
01756    fProof->Broadcast(mesg);
01757 
01758    // Reset streamer choice
01759    if (fProof->fProtocol < 13)
01760       dset->SetWriteV3(kFALSE);
01761 
01762    // Redirect logs from master to special log frame
01763    if (IsClient())
01764       fProof->fRedirLog = kTRUE;
01765 
01766    if (!IsClient()){
01767       // Signal the start of finalize for the memory log grepping
01768       Info("Process|Svc", "Start merging Memory information");
01769    }
01770 
01771    if (!sync) {
01772       if (IsClient()) {
01773          // Asynchronous query: just make sure that asynchronous input
01774          // is enabled and return the prompt
01775          PDB(kGlobal,1) Info("Process","Asynchronous processing:"
01776                                        " activating CollectInputFrom");
01777          fProof->Activate();
01778 
01779          // Receive the acknowledgement and query sequential number
01780          fProof->Collect();
01781 
01782          return fProof->fSeqNum;
01783 
01784       } else {
01785          PDB(kGlobal,1) Info("Process","Calling Collect");
01786          fProof->Collect();
01787 
01788          HandleTimer(0); // force an update of final result
01789          StopFeedback();
01790 
01791          return Finalize(kFALSE,sync);
01792       }
01793    } else {
01794 
01795       PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
01796       fProof->Collect();
01797       if (!(fProof->IsSync())) {
01798          // The server required to switch to asynchronous mode
01799          Info("Process", "switching to the asynchronous mode ...");
01800          return fProof->fSeqNum;
01801       }
01802 
01803       // Restore prompt logging, for clients (Collect leaves things as they were
01804       // at the time it was called)
01805       if (IsClient())
01806          fProof->fRedirLog = kFALSE;
01807 
01808       if (!IsClient()) {
01809          HandleTimer(0); // force an update of final result
01810          // Store process info
01811          if (fPacketizer && fQuery)
01812             fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
01813                                           fPacketizer->GetInitTime(),
01814                                           fPacketizer->GetProcTime());
01815       }
01816       StopFeedback();
01817 
01818       if (!IsClient() || GetExitStatus() != TProofPlayer::kAborted)
01819          return Finalize(kFALSE,sync);
01820       else
01821          return -1;
01822    }
01823 }
01824 
01825 //______________________________________________________________________________
01826 Bool_t TProofPlayerRemote::MergeOutputFiles()
01827 {
01828    // Merge output in files
01829 
01830    TList *rmList = 0;
01831    if (fMergeFiles) {
01832       TIter nxo(fOutput);
01833       TObject *o = 0;
01834       TProofOutputFile *pf = 0;
01835       while ((o = nxo())) {
01836          if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
01837 
01838             if (pf->IsMerge()) {
01839 
01840                // Point to the merger
01841                TFileMerger *filemerger = pf->GetFileMerger();
01842                if (!filemerger) {
01843                   Error("MergeOutputFiles", "file merger is null in TProofOutputFile! Protocol error?");
01844                   pf->Print();
01845                   continue;
01846                }
01847                // Set the output file
01848                if (!filemerger->OutputFile(pf->GetOutputFileName())) {
01849                   Error("MergeOutputFiles", "cannot open the output file");
01850                   continue;
01851                }
01852                // If only one instance the list in the merger is not yet created: do it now
01853                if (!pf->IsMerged()) {
01854                   TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
01855                   filemerger->AddFile(fileLoc);
01856                }
01857                // Merge
01858                if (!filemerger->Merge()) {
01859                   Error("MergeOutputFiles", "cannot merge the output files");
01860                   continue;
01861                }
01862                // Remove the files
01863                TList *fileList = filemerger->GetMergeList();
01864                if (fileList) {
01865                   TIter next(fileList);
01866                   TObjString *url = 0;
01867                   while((url = (TObjString*)next())) {
01868                      gSystem->Unlink(url->GetString());
01869                   }
01870                }
01871                // Reset the merger
01872                filemerger->Reset();
01873 
01874             } else {
01875 
01876                // Point to the dataset
01877                TFileCollection *fc = pf->GetFileCollection();
01878                if (!fc) {
01879                   Error("MergeOutputFiles", "file collection is null in TProofOutputFile! Protocol error?");
01880                   pf->Print();
01881                   continue;
01882                }
01883                // Add the collection to the output list for registration and/or to be returned
01884                // to the client
01885                fOutput->Add(fc);
01886                // Do not cleanup at destruction
01887                pf->ResetFileCollection();
01888                // Tell the main thread to register this dataset, if needed
01889                if (pf->IsRegister()) {
01890                   TString opt;
01891                   if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt += "O";
01892                   if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt += "V";
01893                   if (!fOutput->FindObject("PROOFSERV_RegisterDataSet"))
01894                      fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
01895                   TString tag = TString::Format("DATASET_%s", pf->GetTitle());
01896                   fOutput->Add(new TNamed(tag, opt));
01897                }
01898                // Remove this object from the output list and schedule it for distruction
01899                fOutput->Remove(pf);
01900                if (!rmList) rmList = new TList;
01901                rmList->Add(pf);
01902             }
01903          }
01904       }
01905    }
01906 
01907    // Remove objects scheduled for removal
01908    if (rmList && rmList->GetSize() > 0) {
01909       TIter nxo(rmList);
01910       TObject *o = 0;
01911       while((o = nxo())) {
01912          fOutput->Remove(o);
01913       }
01914       rmList->SetOwner(kTRUE);
01915       delete rmList;
01916    }
01917 
01918    // Done
01919    return kTRUE;
01920 }
01921 
01922 
01923 //______________________________________________________________________________
01924 void TProofPlayerRemote::SetSelectorDataMembersFromOutputList()
01925 {
01926    // Set the selector's data members:
01927    // find the mapping of data members to otuput list entries in the output list
01928    // and apply it.
01929    TOutputListSelectorDataMap* olsdm
01930       = TOutputListSelectorDataMap::FindInList(fOutput);
01931    if (!olsdm) {
01932       PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList","Failed to find map object in output list!");
01933       return;
01934    }
01935 
01936    olsdm->SetDataMembers(fSelector);
01937 }
01938 
01939 //______________________________________________________________________________
01940 Long64_t TProofPlayerRemote::Finalize(Bool_t force, Bool_t sync)
01941 {
01942 
01943    // Finalize a query.
01944    // Returns -1 in case of an error, 0 otherwise.
01945 
01946    if (IsClient()) {
01947       if (fOutputLists == 0) {
01948          if (force)
01949             if (fQuery)
01950                return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
01951                                                      fQuery->GetName()), force);
01952       } else {
01953          // Make sure the all objects are in the output list
01954          PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
01955          MergeOutput();
01956       }
01957    }
01958 
01959    Long64_t rv = 0;
01960    if (fProof->IsMaster()) {
01961       TPerfStats::Stop();
01962 
01963       PDB(kOutput,1) Info("Finalize","Calling Merge Output");
01964       // Some objects (e.g. histos in autobin) may not have been merged yet
01965       // do it now
01966       MergeOutput();
01967 
01968       // Merge the output files created on workers, if any
01969       MergeOutputFiles();
01970 
01971       fOutput->SetOwner();
01972 
01973       // Add the active-wrks-vs-proctime info from the packetizer
01974       if (fPacketizer) {
01975          TObject *pperf = (TObject *) fPacketizer->GetProgressPerf(kTRUE);
01976          if (pperf) fOutput->Add(pperf);
01977          TList *parms = fPacketizer->GetConfigParams(kTRUE);
01978          if (parms) {
01979             TIter nxo(parms);
01980             TObject *o = 0;
01981             while ((o = nxo())) fOutput->Add(o);
01982          }
01983          
01984          // If other invalid elements were found during processing, add them to the
01985          // list of missing elements
01986          TDSetElement *elem = 0;
01987          if (fPacketizer->GetFailedPackets()) {
01988             TString type = (fPacketizer->TestBit(TVirtualPacketizer::kIsTree)) ? "TTree" : "";
01989             TList *listOfMissingFiles = (TList *) fOutput->FindObject("MissingFiles");
01990             if (!listOfMissingFiles) {
01991                listOfMissingFiles = new TList;
01992                listOfMissingFiles->SetName("MissingFiles");
01993             }
01994             TIter nxe(fPacketizer->GetFailedPackets());
01995             while ((elem = (TDSetElement *)nxe()))
01996                listOfMissingFiles->Add(elem->GetFileInfo(type));
01997             if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
01998          }
01999       }
02000 
02001       SafeDelete(fSelector);
02002    } else {
02003       if (fExitStatus != kAborted) {
02004 
02005          if (!sync) {
02006             // Reinit selector (with multi-sessioning we must do this until
02007             // TSelector::GetSelector() is optimized to i) avoid reloading of an
02008             // unchanged selector and ii) invalidate existing instances of
02009             // reloaded selector)
02010             if (ReinitSelector(fQuery) == -1) {
02011                Info("Finalize", "problems reinitializing selector \"%s\"",
02012                     fQuery->GetSelecImp()->GetName());
02013                return -1;
02014             }
02015          }
02016 
02017          if (fPacketizer)
02018             if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
02019                fPacketizer->SetFailedPackets(0);
02020                failedPackets->SetName("FailedPackets");
02021                AddOutputObject(failedPackets);
02022 
02023                TStatus *status = (TStatus *)GetOutput("PROOF_Status");
02024                if (!status) {
02025                   status = new TStatus();
02026                   AddOutputObject(status);
02027                }
02028                status->Add("Some packets were not processed! Check the the"
02029                            " 'FailedPackets' list in the output list");
02030             }
02031 
02032          // Some input parameters may be needed in Terminate
02033          fSelector->SetInputList(fInput);
02034 
02035          TIter next(fOutput);
02036          TList *output = fSelector->GetOutputList();
02037          while(TObject* obj = next()) {
02038             if (fProof->IsParallel() || DrawCanvas(obj) == 1)
02039                // Either parallel or not a canvas or not able to display it:
02040                // just add to the list
02041                output->Add(obj);
02042          }
02043 
02044          SetSelectorDataMembersFromOutputList();
02045 
02046          PDB(kLoop,1) Info("Finalize","Call Terminate()");
02047          fOutput->Clear("nodelete");
02048          fSelector->Terminate();
02049 
02050          rv = fSelector->GetStatus();
02051 
02052          // copy the output list back and clean the selector's list
02053          TIter it(output);
02054          while(TObject* o = it()) {
02055             fOutput->Add(o);
02056          }
02057 
02058          // Save the output list in the current query, if any
02059          if (fQuery) {
02060             fQuery->SetOutputList(fOutput);
02061             // Set in finalized state (cannot be done twice)
02062             fQuery->SetFinalized();
02063          } else {
02064             Warning("Finalize","current TQueryResult object is undefined!");
02065          }
02066 
02067          // We have transferred copy of the output objects in TQueryResult,
02068          // so now we can cleanup the selector, making sure that we do not
02069          // touch the output objects
02070          output->SetOwner(kFALSE);
02071          SafeDelete(fSelector);
02072 
02073          // Delete fOutput (not needed anymore, cannot be finalized twice),
02074          // making sure that the objects saved in TQueryResult are not deleted
02075          fOutput->SetOwner(kFALSE);
02076          SafeDelete(fOutput);
02077       }
02078    }
02079    PDB(kGlobal,1) Info("Process","exit");
02080 
02081    if (!IsClient()) {
02082       Info("Finalize", "finalization on %s finished", gProofServ->GetPrefix());
02083    }
02084    fProof->FinalizationDone();
02085 
02086    return rv;
02087 }
02088 
02089 //______________________________________________________________________________
02090 Long64_t TProofPlayerRemote::Finalize(TQueryResult *qr)
02091 {
02092    // Finalize the results of a query already processed.
02093 
02094    PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
02095 
02096    if (!IsClient()) {
02097       Info("Finalize(TQueryResult *)",
02098            "method to be executed only on the clients");
02099       return -1;
02100    }
02101 
02102    if (!qr) {
02103       Info("Finalize(TQueryResult *)", "query undefined");
02104       return -1;
02105    }
02106 
02107    if (qr->IsFinalized()) {
02108       Info("Finalize(TQueryResult *)", "query already finalized");
02109       return -1;
02110    }
02111 
02112    // Reset the list
02113    if (!fOutput)
02114       fOutput = new TList;
02115    else
02116       fOutput->Clear();
02117 
02118    // Make sure that the temporary output list is empty
02119    if (fOutputLists) {
02120       fOutputLists->Delete();
02121       delete fOutputLists;
02122       fOutputLists = 0;
02123    }
02124 
02125    // Re-init the selector
02126    gSystem->RedirectOutput(fProof->fLogFileName);
02127 
02128    // Import the output list
02129    TList *tmp = (TList *) qr->GetOutputList();
02130    if (!tmp) {
02131       gSystem->RedirectOutput(0);
02132       Info("Finalize(TQueryResult *)", "ouputlist is empty");
02133       return -1;
02134    }
02135    TList *out = fOutput;
02136    if (fProof->fProtocol < 11)
02137       out = new TList;
02138    TIter nxo(tmp);
02139    TObject *o = 0;
02140    while ((o = nxo()))
02141       out->Add(o->Clone());
02142 
02143    // Adopts the list
02144    if (fProof->fProtocol < 11) {
02145       out->SetOwner();
02146       StoreOutput(out);
02147    }
02148    gSystem->RedirectOutput(0);
02149 
02150    SetSelectorDataMembersFromOutputList();
02151 
02152    // Finalize it
02153    SetCurrentQuery(qr);
02154    Long64_t rc = Finalize();
02155    RestorePreviousQuery();
02156 
02157    return rc;
02158 }
02159 
02160 //______________________________________________________________________________
02161 Bool_t TProofPlayerRemote::SendSelector(const char* selector_file)
02162 {
02163    // Send the selector file(s) to master or worker nodes.
02164 
02165    // Check input
02166    if (!selector_file) {
02167       Info("SendSelector", "Invalid input: selector (file) name undefined");
02168       return kFALSE;
02169    }
02170 
02171    if (!strchr(gSystem->BaseName(selector_file), '.')) {
02172       if (gDebug > 1)
02173          Info("SendSelector", "selector name '%s' does not contain a '.':"
02174               " nothing to send, it will be loaded from a library", selector_file);
02175       return kTRUE;
02176    }
02177 
02178    // Extract the fine name first
02179    TString selec = selector_file;
02180    TString aclicMode;
02181    TString arguments;
02182    TString io;
02183    selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
02184 
02185    // Expand possible envs or '~'
02186    gSystem->ExpandPathName(selec);
02187 
02188    // Update the macro path
02189    TString mp(TROOT::GetMacroPath());
02190    TString np(gSystem->DirName(selec));
02191    if (!np.IsNull()) {
02192       np += ":";
02193       if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
02194          Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
02195          mp.Insert(ip, np);
02196          TROOT::SetMacroPath(mp);
02197          if (gDebug > 0)
02198             Info("SendSelector", "macro path set to '%s'", TROOT::GetMacroPath());
02199       }
02200    }
02201 
02202    // Header file
02203    TString header = selec;
02204    header.Remove(header.Last('.'));
02205    header += ".h";
02206    if (gSystem->AccessPathName(header, kReadPermission)) {
02207       TString h = header;
02208       header.Remove(header.Last('.'));
02209       header += ".hh";
02210       if (gSystem->AccessPathName(header, kReadPermission)) {
02211          Info("SendSelector",
02212               "header file not found: tried: %s %s", h.Data(), header.Data());
02213          return kFALSE;
02214       }
02215    }
02216 
02217    // Send files now
02218    if (fProof->SendFile(selec, (TProof::kBinary | TProof::kForward | TProof::kCp | TProof::kCpBin)) == -1) {
02219       Info("SendSelector", "problems sending implementation file %s", selec.Data());
02220       return kFALSE;
02221    }
02222    if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
02223       Info("SendSelector", "problems sending header file %s", header.Data());
02224       return kFALSE;
02225    }
02226 
02227    return kTRUE;
02228 }
02229 
02230 //______________________________________________________________________________
02231 void TProofPlayerRemote::MergeOutput()
02232 {
02233    // Merge objects in output the lists.
02234 
02235    PDB(kOutput,1) Info("MergeOutput","Enter");
02236 
02237    if (fOutputLists == 0) {
02238       PDB(kOutput,1) Info("MergeOutput","Leave (no output)");
02239       return;
02240    }
02241 
02242    TIter next(fOutputLists);
02243 
02244    TList *list;
02245    while ( (list = (TList *) next()) ) {
02246 
02247       TObject *obj = fOutput->FindObject(list->GetName());
02248 
02249       if (obj == 0) {
02250          obj = list->First();
02251          list->Remove(obj);
02252          fOutput->Add(obj);
02253       }
02254 
02255       if ( list->IsEmpty() ) continue;
02256 
02257       TMethodCall callEnv;
02258       if (obj->IsA())
02259          callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
02260       if (callEnv.IsValid()) {
02261          callEnv.SetParam((Long_t) list);
02262          callEnv.Execute(obj);
02263       } else {
02264          // No Merge interface, return individual objects
02265          while ( (obj = list->First()) ) {
02266             fOutput->Add(obj);
02267             list->Remove(obj);
02268          }
02269       }
02270    }
02271 
02272    SafeDelete(fOutputLists);
02273 
02274    PDB(kOutput,1) Info("MergeOutput","Leave (%d object(s))", fOutput->GetSize());
02275 }
02276 
02277 //______________________________________________________________________________
02278 void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed)
02279 {
02280    // Progress signal.
02281 
02282    if (IsClient()) {
02283       fProof->Progress(total, processed);
02284    } else {
02285       // Send to the previous tier
02286       TMessage m(kPROOF_PROGRESS);
02287       m << total << processed;
02288       gProofServ->GetSocket()->Send(m);
02289    }
02290 }
02291 
02292 //______________________________________________________________________________
02293 void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed,
02294                                   Long64_t bytesread,
02295                                   Float_t initTime, Float_t procTime,
02296                                   Float_t evtrti, Float_t mbrti)
02297 {
02298    // Progress signal.
02299 
02300    PDB(kGlobal,1)
02301       Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
02302                                              initTime, procTime, evtrti, mbrti);
02303 
02304    if (IsClient()) {
02305       fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
02306    } else {
02307       // Send to the previous tier
02308       TMessage m(kPROOF_PROGRESS);
02309       m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
02310       gProofServ->GetSocket()->Send(m);
02311    }
02312 }
02313 
02314 //______________________________________________________________________________
02315 void TProofPlayerRemote::Progress(TProofProgressInfo *pi)
02316 {
02317    // Progress signal.
02318 
02319    if (pi) {
02320       PDB(kGlobal,1)
02321          Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
02322                            pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
02323                            pi->fActWorkers, pi->fEffSessions);
02324 
02325       if (IsClient()) {
02326          fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
02327                            pi->fInitTime, pi->fProcTime,
02328                            pi->fEvtRateI, pi->fMBRateI,
02329                            pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
02330       } else {
02331          // Send to the previous tier
02332          TMessage m(kPROOF_PROGRESS);
02333          m << pi;
02334          gProofServ->GetSocket()->Send(m);
02335       }
02336    } else {
02337       Warning("Progress","TProofProgressInfo object undefined!");
02338    }
02339 }
02340 
02341 
02342 //______________________________________________________________________________
02343 void TProofPlayerRemote::Feedback(TList *objs)
02344 {
02345    // Feedback signal.
02346 
02347    fProof->Feedback(objs);
02348 }
02349 
02350 //______________________________________________________________________________
02351 void TProofPlayerRemote::StopProcess(Bool_t abort, Int_t)
02352 {
02353    // Stop process after this event.
02354 
02355    if (fPacketizer != 0)
02356       fPacketizer->StopProcess(abort);
02357    if (abort == kTRUE)
02358       fExitStatus = kAborted;
02359    else
02360       fExitStatus = kStopped;
02361 }
02362 
02363 //______________________________________________________________________________
02364 Int_t TProofPlayerRemote::AddOutputObject(TObject *obj)
02365 {
02366    // Incorporate the received object 'obj' into the output list fOutput.
02367    // The latter is created if not existing.
02368    // This method short cuts 'StoreOutput + MergeOutput' optimizing the memory
02369    // consumption.
02370    // Returns -1 in case of error, 1 if the object has been merged into another
02371    // one (so that its ownership has not been taken and can be deleted), and 0
02372    // otherwise.
02373 
02374    PDB(kOutput,1)
02375       Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
02376 
02377    // We must something to process
02378    if (!obj) {
02379       PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
02380       return -1;
02381    }
02382 
02383    // Create the output list, if not yet done
02384    if (!fOutput)
02385       fOutput = new TList;
02386 
02387    // Flag about merging
02388    Bool_t merged = kTRUE;
02389 
02390    // Process event lists first
02391    TList *elists = dynamic_cast<TList *> (obj);
02392    if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
02393 
02394       // Create a global event list, result of merging the event lists
02395       // coresponding to the various data set elements
02396       TEventList *evlist = new TEventList("PROOF_EventList");
02397 
02398       // Iterate the list of event list segments
02399       TIter nxevl(elists);
02400       TEventList *evl = 0;
02401       while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
02402 
02403          // Find the file offset (fDSet is the current TDSet instance)
02404          // locating the element by name
02405          TIter nxelem(fDSet->GetListOfElements());
02406          TDSetElement *elem = 0;
02407          while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
02408             if (!strcmp(elem->GetFileName(), evl->GetName()))
02409                break;
02410          }
02411          if (!elem) {
02412             Error("AddOutputObject", "Found an event list for %s, but no object with"
02413                                      " the same name in the TDSet", evl->GetName());
02414             continue;
02415          }
02416          Long64_t offset = elem->GetTDSetOffset();
02417 
02418          // Shift the list by the number of first event in that file
02419          Long64_t *arr = evl->GetList();
02420          Int_t num = evl->GetN();
02421          if (arr && offset > 0)
02422             for (Int_t i = 0; i < num; i++)
02423                arr[i] += offset;
02424 
02425          // Add to the global event list
02426          evlist->Add(evl);
02427       }
02428 
02429       // Incorporate the resulting global list in fOutput
02430       SetLastMergingMsg(evlist);
02431       Incorporate(evlist, fOutput, merged);
02432       NotifyMemory(evlist);
02433 
02434       // Delete the global list if merged
02435       if (merged)
02436          SafeDelete(evlist);
02437 
02438       // The original object has been transformed in something else; we do
02439       // not have ownership on it
02440       return 1;
02441    }
02442 
02443    // Check if we need to merge files
02444    TProofOutputFile *pf = dynamic_cast<TProofOutputFile*>(obj);
02445    if (pf) {
02446       fMergeFiles = kTRUE;
02447       if (!IsClient()) {
02448          if (pf->IsMerge()) {
02449             // Fill the output file name, if not done by the client
02450             if (strlen(pf->GetOutputFileName()) <= 0) {
02451                TString of(Form("root://%s", gSystem->HostName()));
02452                if (gSystem->Getenv("XRDPORT")) {
02453                   TString sp(gSystem->Getenv("XRDPORT"));
02454                   if (sp.IsDigit())
02455                      of += Form(":%s", sp.Data());
02456                }
02457                TString sessionPath(gProofServ->GetSessionDir());
02458                // Take into account a prefix, if any
02459                TString pfx  = gEnv->GetValue("Path.Localroot","");
02460                if (!pfx.IsNull())
02461                   sessionPath.Remove(0, pfx.Length());
02462                of += Form("/%s/%s", sessionPath.Data(), pf->GetFileName());
02463                pf->SetOutputFileName(of);
02464             }
02465             // Notify, if required
02466             if (gDebug > 0)
02467                pf->Print();
02468          }
02469       } else {
02470          // On clients notify the output path
02471          Printf("Output file: %s", pf->GetOutputFileName());
02472       }
02473    }
02474 
02475    // For other objects we just run the incorporation procedure
02476    SetLastMergingMsg(obj);
02477    Incorporate(obj, fOutput, merged);
02478    NotifyMemory(obj);
02479 
02480    // We are done
02481    return (merged ? 1 : 0);
02482 }
02483 
02484 //______________________________________________________________________________
02485 void TProofPlayerRemote::RedirectOutput(Bool_t on)
02486 {
02487    // Control output redirection to TProof::fLogFileW
02488 
02489    if (on && fProof && fProof->fLogFileW) {
02490       TProofServ::SetErrorHandlerFile(fProof->fLogFileW);
02491       fErrorHandler = SetErrorHandler(TProofServ::ErrorHandler);
02492    } else if (!on) {
02493       if (fErrorHandler) {
02494          TProofServ::SetErrorHandlerFile(0);
02495          SetErrorHandler(fErrorHandler);
02496       }
02497    }
02498 }
02499 
02500 //______________________________________________________________________________
02501 void TProofPlayerRemote::AddOutput(TList *out)
02502 {
02503    // Incorporate the content of the received output list 'out' into the final
02504    // output list fOutput. The latter is created if not existing.
02505    // This method short cuts 'StoreOutput + MergeOutput' limiting the memory
02506    // consumption.
02507 
02508    PDB(kOutput,1) Info("AddOutput","Enter");
02509 
02510    // We must something to process
02511    if (!out) {
02512       PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
02513       return;
02514    }
02515 
02516    // Create the output list, if not yet done
02517    if (!fOutput)
02518       fOutput = new TList;
02519 
02520    // Process event lists first
02521    Bool_t merged = kTRUE;
02522    TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
02523    if (elists) {
02524 
02525       // Create a global event list, result of merging the event lists
02526       // corresponding to the various data set elements
02527       TEventList *evlist = new TEventList("PROOF_EventList");
02528 
02529       // Iterate the list of event list segments
02530       TIter nxevl(elists);
02531       TEventList *evl = 0;
02532       while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
02533 
02534          // Find the file offset (fDSet is the current TDSet instance)
02535          // locating the element by name
02536          TIter nxelem(fDSet->GetListOfElements());
02537          TDSetElement *elem = 0;
02538          while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
02539             if (!strcmp(elem->GetFileName(), evl->GetName()))
02540                break;
02541          }
02542          if (!elem) {
02543             Error("AddOutput", "Found an event list for %s, but no object with"
02544                                " the same name in the TDSet", evl->GetName());
02545             continue;
02546          }
02547          Long64_t offset = elem->GetTDSetOffset();
02548 
02549          // Shift the list by the number of first event in that file
02550          Long64_t *arr = evl->GetList();
02551          Int_t num = evl->GetN();
02552          if (arr && offset > 0)
02553             for (Int_t i = 0; i < num; i++)
02554                arr[i] += offset;
02555 
02556          // Add to the global event list
02557          evlist->Add(evl);
02558       }
02559 
02560       // Remove and delete the events lists object to avoid spoiling iteration
02561       // during next steps
02562       out->Remove(elists);
02563       delete elists;
02564 
02565       // Incorporate the resulting global list in fOutput
02566       SetLastMergingMsg(evlist);
02567       Incorporate(evlist, fOutput, merged);
02568       NotifyMemory(evlist);
02569    }
02570 
02571    // Iterate on the remaining objects in the received list
02572    TIter nxo(out);
02573    TObject *obj = 0;
02574    while ((obj = nxo())) {
02575       SetLastMergingMsg(obj);
02576       Incorporate(obj, fOutput, merged);
02577       // If not merged, drop from the temporary list, as the ownership
02578       // passes to fOutput
02579       if (!merged)
02580          out->Remove(obj);
02581       NotifyMemory(obj);
02582    }
02583 
02584    // Done
02585    return;
02586 }
02587 
02588 //______________________________________________________________________________
02589 void TProofPlayerRemote::NotifyMemory(TObject *obj)
02590 {
02591    // Printout the memory record after merging object 'obj'
02592    // This record is used by the memory monitor
02593 
02594    if (fProof && (!IsClient() || fProof->IsLite())){
02595       ProcInfo_t pi;
02596       if (!gSystem->GetProcInfo(&pi)){
02597          // For PROOF-Lite we redirect this output to a the open log file so that the
02598          // memory monitor can pick these messages up
02599          RedirectOutput(fProof->IsLite());
02600          Info("NotifyMemory|Svc", "Memory %ld virtual %ld resident after merging object %s",
02601                                   pi.fMemVirtual, pi.fMemResident, obj->GetName());
02602          RedirectOutput(0);
02603       }
02604    }
02605 }
02606 
02607 //______________________________________________________________________________
02608 void TProofPlayerRemote::SetLastMergingMsg(TObject *obj)
02609 {
02610    // Set the message to be notified in case of exception
02611 
02612    TString lastMsg = TString::Format("while merging object '%s'", obj->GetName());
02613    TProofServ::SetLastMsg(lastMsg);
02614 }
02615 
02616 //______________________________________________________________________________
02617 Int_t TProofPlayerRemote::Incorporate(TObject *newobj, TList *outlist, Bool_t &merged)
02618 {
02619    // Incorporate object 'newobj' in the list 'outlist'.
02620    // The object is merged with an object of the same name already existing in
02621    // the list, or just added.
02622    // The boolean merged is set to kFALSE when the object is just added to 'outlist';
02623    // this happens if the Merge() method does not exist or if a object named as 'obj'
02624    // is not already in the list. If the obj is not 'merged' than it should not be
02625    // deleted, unless outlist is not owner of its objects.
02626    // Return 0 on success, -1 on error.
02627 
02628    merged = kTRUE;
02629 
02630    PDB(kOutput,1)
02631       Info("Incorporate", "enter: obj: %p (%s), list: %p",
02632                           newobj, newobj ? newobj->ClassName() : "undef", outlist);
02633 
02634    // The object and list must exist
02635    if (!newobj || !outlist) {
02636       Error("Incorporate","Invalid inputs: obj: %p, list: %p", newobj, outlist);
02637       return -1;
02638    }
02639 
02640    // Special treatment for histograms in autobin mode
02641    Bool_t specialH =
02642       (!fProof || !fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) ? kTRUE : kFALSE;
02643    if (specialH && newobj->InheritsFrom(TH1::Class())) {
02644       if (!HandleHistogram(newobj)) {
02645          PDB(kOutput,1) Info("Incorporate", "histogram object '%s' added to the"
02646                              " appropriate list for delayed merging", newobj->GetName());
02647          merged = kFALSE;
02648          return 0;
02649       }
02650    }
02651 
02652    // Check if an object with the same name exists already
02653    TObject *obj = outlist->FindObject(newobj->GetName());
02654 
02655    // If no, add the new object and return
02656    if (!obj) {
02657       outlist->Add(newobj);
02658       merged = kFALSE;
02659       // Done
02660       return 0;
02661    }
02662 
02663    // Locate the Merge(TCollection *) method
02664    TMethodCall callEnv;
02665    if (obj->IsA())
02666       callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
02667    if (callEnv.IsValid()) {
02668       // Found: put the object in a one-element list
02669       static TList *xlist = new TList;
02670       xlist->Add(newobj);
02671       // Call the method
02672       callEnv.SetParam((Long_t) xlist);
02673       callEnv.Execute(obj);
02674       // Ready for next call
02675       xlist->Clear();
02676    } else {
02677       // Not found: return individual objects
02678       outlist->Add(newobj);
02679       merged = kFALSE;
02680    }
02681 
02682    // Done
02683    return 0;
02684 }
02685 
02686 //______________________________________________________________________________
02687 TObject *TProofPlayerRemote::HandleHistogram(TObject *obj)
02688 {
02689    // Low statistic histograms need a special treatment when using autobin
02690 
02691    TH1 *h = dynamic_cast<TH1 *>(obj);
02692    if (!h) {
02693       // Not an histo
02694       return obj;
02695    }
02696 
02697    // Does is still needs binning ?
02698    Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
02699 
02700    // Number of entries
02701    Int_t nent = h->GetBufferLength();
02702    PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
02703                        h->GetName(), nent, h->GetBufferSize());
02704 
02705    // Attach to the list in the outputlists, if any
02706    TList *list = 0;
02707    if (!fOutputLists) {
02708       PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
02709       fOutputLists = new TList;
02710       fOutputLists->SetOwner();
02711    }
02712    list = (TList *) fOutputLists->FindObject(h->GetName());
02713 
02714    TH1 *href = 0;
02715    if (tobebinned) {
02716 
02717       // The histogram needs to be projected in a reasonable range: we
02718       // do this at the end with all the histos, so we need to create
02719       // a list here
02720       if (!list) {
02721          // Create the list
02722          list = new TList;
02723          list->SetName(h->GetName());
02724          list->SetOwner();
02725          fOutputLists->Add(list);
02726          // Move in it any previously merged object from the output list
02727          if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
02728             fOutput->Remove(href);
02729             list->Add(href);
02730          }
02731       }
02732       TIter nxh(list);
02733       while ((href = (TH1 *) nxh())) {
02734          if (href->GetBuffer() && href->GetBufferLength() < nent) break;
02735       }
02736       if (href) {
02737          list->AddBefore(href, h);
02738       } else {
02739          list->Add(h);
02740       }
02741       // Done
02742       return (TObject *)0;
02743 
02744    } else {
02745 
02746       if (list) {
02747          TIter nxh(list);
02748          while ((href = (TH1 *) nxh())) {
02749             if (href->GetBuffer() || href->GetEntries() < nent) break;
02750          }
02751          if (href) {
02752             list->AddBefore(href, h);
02753          } else {
02754             list->Add(h);
02755          }
02756          // Done
02757          return (TObject *)0;
02758 
02759       } else {
02760          // Histogram has already been projected
02761          Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
02762          if (gProofServ && hsz > gProofServ->GetMsgSizeHWM()) {
02763             // Large histo: merge one-by-one
02764             return obj;
02765          } else {
02766             // Create the list to merge in one-go at the end (more efficient
02767             // than merging one by one)
02768             list = new TList;
02769             list->SetName(h->GetName());
02770             list->SetOwner();
02771             fOutputLists->Add(list);
02772             list->Add(h);
02773             // Done
02774             return (TObject *)0;
02775          }
02776       }
02777    }
02778    PDB(kOutput,1) Info("HandleHistogram", "leaving");
02779 }
02780 
02781 //______________________________________________________________________________
02782 void TProofPlayerRemote::StoreOutput(TList *out)
02783 {
02784    // Store received output list.
02785 
02786    PDB(kOutput,1) Info("StoreOutput","Enter");
02787 
02788    if ( out == 0 ) {
02789       PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
02790       return;
02791    }
02792 
02793    TIter next(out);
02794    out->SetOwner(kFALSE);  // take ownership of the contents
02795 
02796    if (fOutputLists == 0) {
02797       PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
02798       fOutputLists = new TList;
02799       fOutputLists->SetOwner();
02800    }
02801    // process eventlists first
02802    TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
02803    if (lists) {
02804       out->Remove(lists);
02805       TEventList *mainList = new TEventList("PROOF_EventList");
02806       out->Add(mainList);
02807       TIter it(lists);
02808       TEventList *aList;
02809       while ( (aList = dynamic_cast<TEventList*> (it())) ) {
02810          // find file offset
02811          TIter nxe(fDSet->GetListOfElements());
02812          TDSetElement *elem;
02813          while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
02814             if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
02815                break;
02816          }
02817          if (!elem) {
02818             Error("StoreOutput", "found the EventList for %s, but no object with that name "
02819                                  "in the TDSet", aList->GetName());
02820             continue;
02821          }
02822          Long64_t offset = elem->GetTDSetOffset();
02823 
02824          // shift the list by the number of first event in that file
02825          Long64_t *arr = aList->GetList();
02826          Int_t num = aList->GetN();
02827          if (arr && offset)
02828             for (int i = 0; i < num; i++)
02829                arr[i] += offset;
02830 
02831          mainList->Add(aList);           // add to the main list
02832       }
02833       delete lists;
02834    }
02835 
02836    TObject *obj;
02837    while( (obj = next()) ) {
02838       PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
02839 
02840       TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
02841       if ( list == 0 ) {
02842          PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
02843          list = new TList;
02844          list->SetName( obj->GetName() );
02845          list->SetOwner();
02846          fOutputLists->Add( list );
02847       }
02848       list->Add( obj );
02849    }
02850 
02851    delete out;
02852    PDB(kOutput,1) Info("StoreOutput", "leave");
02853 }
02854 
02855 //______________________________________________________________________________
02856 TList *TProofPlayerRemote::MergeFeedback()
02857 {
02858    // Merge feedback lists.
02859 
02860    PDB(kFeedback,1)
02861       Info("MergeFeedback","Enter");
02862 
02863    if ( fFeedbackLists == 0 ) {
02864       PDB(kFeedback,1)
02865          Info("MergeFeedback","Leave (no output)");
02866       return 0;
02867    }
02868 
02869    TList *fb = new TList;   // collection of feedback objects
02870    fb->SetOwner();
02871 
02872    TIter next(fFeedbackLists);
02873 
02874    TMap *map;
02875    while ( (map = (TMap*) next()) ) {
02876 
02877       PDB(kFeedback,2)
02878          Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
02879 
02880       // turn map into list ...
02881 
02882       TList *list = new TList;
02883       TIter keys(map);
02884 
02885 #ifndef R__TH1MERGEFIXED
02886       Int_t nbmx = -1;
02887       TObject *oref = 0;
02888 #endif
02889       while ( TObject *key = keys() ) {
02890          TObject *o = map->GetValue(key);
02891          TH1 *h = dynamic_cast<TH1 *>(o);
02892 #ifndef R__TH1MERGEFIXED
02893          // Temporary fix for to cope with the problem in TH1::Merge.
02894          // We need to use a reference histo the one with the largest number
02895          // of bins so that the histos from all submasters can be correctly
02896          // fit in
02897          if (h && !strncmp(o->GetName(),"PROOF_",6)) {
02898             if (h->GetNbinsX() > nbmx) {
02899                nbmx=  h->GetNbinsX();
02900                oref = o;
02901             }
02902          }
02903 #endif
02904          if (h) {
02905             TIter nxh(list);
02906             TH1 *href= 0;
02907             while ((href = (TH1 *)nxh())) {
02908                if (h->GetBuffer()) {
02909                   if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength()) break;
02910                } else {
02911                   if (href->GetBuffer() || href->GetEntries() < h->GetEntries()) break;
02912                }
02913             }
02914             if (href) {
02915                list->AddBefore(href, h);
02916             } else {
02917                list->Add(h);
02918             }
02919          } else {
02920             list->Add(o);
02921          }
02922       }
02923 
02924       // clone first object, remove from list
02925 #ifdef R__TH1MERGEFIXED
02926       TObject *obj = list->First();
02927 #else
02928       TObject *obj = (oref) ? oref : list->First();
02929 #endif
02930       list->Remove(obj);
02931       obj = obj->Clone();
02932       fb->Add(obj);
02933 
02934       if ( list->IsEmpty() ) {
02935          delete list;
02936          continue;
02937       }
02938 
02939       // merge list with clone
02940       TMethodCall callEnv;
02941       if (obj->IsA())
02942          callEnv.InitWithPrototype(obj->IsA(), "Merge", "TCollection*");
02943       if (callEnv.IsValid()) {
02944          callEnv.SetParam((Long_t) list);
02945          callEnv.Execute(obj);
02946       } else {
02947          // No Merge interface, return copy of individual objects
02948          while ( (obj = list->First()) ) {
02949             fb->Add(obj->Clone());
02950             list->Remove(obj);
02951          }
02952       }
02953 
02954       delete list;
02955    }
02956 
02957    PDB(kFeedback,1)
02958       Info("MergeFeedback","Leave (%d object(s))", fb->GetSize());
02959 
02960    return fb;
02961 }
02962 
02963 //______________________________________________________________________________
02964 void TProofPlayerRemote::StoreFeedback(TObject *slave, TList *out)
02965 {
02966    // Store feedback results from the specified slave.
02967 
02968    PDB(kFeedback,1)
02969       Info("StoreFeedback","Enter");
02970 
02971    if ( out == 0 ) {
02972       PDB(kFeedback,1)
02973          Info("StoreFeedback","Leave (empty)");
02974       return;
02975    }
02976 
02977    if ( IsClient() ) {
02978       // in client
02979       Feedback(out);
02980       delete out;
02981       return;
02982    }
02983 
02984    if (fFeedbackLists == 0) {
02985       PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
02986       fFeedbackLists = new TList;
02987       fFeedbackLists->SetOwner();
02988    }
02989 
02990    TIter next(out);
02991    out->SetOwner(kFALSE);  // take ownership of the contents
02992 
02993    const char *ord = ((TSlave*) slave)->GetOrdinal();
02994 
02995    TObject *obj;
02996    while( (obj = next()) ) {
02997       PDB(kFeedback,2)
02998          Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
02999       TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
03000       if ( map == 0 ) {
03001          PDB(kFeedback,2)
03002             Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
03003          // Map must not be owner (ownership is with regards to the keys (only))
03004          map = new TMap;
03005          map->SetName(obj->GetName());
03006          fFeedbackLists->Add(map);
03007       } else {
03008          PDB(kFeedback,2)
03009             Info("StoreFeedback","%s: removing previous value", ord);
03010          if (map->GetValue(slave))
03011             delete map->GetValue(slave);
03012          map->Remove(slave);
03013       }
03014       map->Add(slave, obj);
03015       PDB(kFeedback,2)
03016          Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
03017    }
03018 
03019    delete out;
03020    PDB(kFeedback,1)
03021       Info("StoreFeedback","Leave");
03022 }
03023 
03024 //______________________________________________________________________________
03025 void TProofPlayerRemote::SetupFeedback()
03026 {
03027    // Setup reporting of feedback objects.
03028 
03029    if (IsClient()) return; // Client does not need timer
03030 
03031    fFeedback = (TList*) fInput->FindObject("FeedbackList");
03032 
03033    PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
03034       fFeedback == 0 ? "NOT ":"");
03035 
03036    if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
03037 
03038    // OK, feedback was requested, setup the timer
03039    SafeDelete(fFeedbackTimer);
03040    fFeedbackPeriod = 2000;
03041    TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
03042    fFeedbackTimer = new TTimer;
03043    fFeedbackTimer->SetObject(this);
03044    fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03045 }
03046 
03047 //______________________________________________________________________________
03048 void TProofPlayerRemote::StopFeedback()
03049 {
03050    // Stop reporting of feedback objects.
03051 
03052    if (fFeedbackTimer == 0) return;
03053 
03054    PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
03055 
03056    SafeDelete(fFeedbackTimer);
03057 }
03058 
03059 //______________________________________________________________________________
03060 Bool_t TProofPlayerRemote::HandleTimer(TTimer *)
03061 {
03062    // Send feedback objects to client.
03063 
03064    PDB(kFeedback,2) Info("HandleTimer","Entry");
03065 
03066    if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
03067 
03068    // process local feedback objects
03069 
03070    TList *fb = new TList;
03071    fb->SetOwner();
03072 
03073    TIter next(fFeedback);
03074    while( TObjString *name = (TObjString*) next() ) {
03075       TObject *o = fOutput->FindObject(name->GetName());
03076       if (o != 0) {
03077          fb->Add(o->Clone());
03078          // remove the corresponding entry from the feedback list
03079          TMap *m = 0;
03080          if (fFeedbackLists &&
03081             (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
03082             fFeedbackLists->Remove(m);
03083             m->DeleteValues();
03084             delete m;
03085          }
03086       }
03087    }
03088 
03089    if (fb->GetSize() > 0) {
03090       StoreFeedback(this, fb); // adopts fb
03091    } else {
03092       delete fb;
03093    }
03094 
03095    if (fFeedbackLists == 0) {
03096       fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);   // maybe next time
03097       return kFALSE;
03098    }
03099 
03100    fb = MergeFeedback();
03101 
03102    PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
03103 
03104    TMessage m(kPROOF_FEEDBACK);
03105    m << fb;
03106 
03107    // send message to client;
03108    gProofServ->GetSocket()->Send(m);
03109 
03110    delete fb;
03111 
03112    fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03113 
03114    return kFALSE; // ignored?
03115 }
03116 
03117 //______________________________________________________________________________
03118 TDSetElement *TProofPlayerRemote::GetNextPacket(TSlave *slave, TMessage *r)
03119 {
03120    // Get next packet for specified slave.
03121 
03122    // The first call to this determines the end of initialization
03123    SetInitTime();
03124 
03125    TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
03126 
03127    if (e == 0) {
03128       PDB(kPacketizer,2) Info("GetNextPacket","%s: done!", slave->GetOrdinal());
03129    } else if (e == (TDSetElement*) -1) {
03130       PDB(kPacketizer,2) Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
03131    } else {
03132       PDB(kPacketizer,2)
03133          Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
03134               slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
03135               e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
03136    }
03137 
03138    return e;
03139 }
03140 
03141 //______________________________________________________________________________
03142 Bool_t TProofPlayerRemote::IsClient() const
03143 {
03144    // Is the player running on the client?
03145 
03146    return fProof ? fProof->TestBit(TProof::kIsClient) : kFALSE;
03147 }
03148 
03149 //______________________________________________________________________________
03150 Long64_t TProofPlayerRemote::DrawSelect(TDSet *set, const char *varexp,
03151                                         const char *selection, Option_t *option,
03152                                         Long64_t nentries, Long64_t firstentry)
03153 {
03154    // Draw (support for TChain::Draw()).
03155    // Returns -1 in case of error or number of selected events in case of success.
03156 
03157    if (!fgDrawInputPars) {
03158       fgDrawInputPars = new THashList;
03159       fgDrawInputPars->Add(new TObjString("FeedbackList"));
03160       fgDrawInputPars->Add(new TObjString("PROOF_LineColor"));
03161       fgDrawInputPars->Add(new TObjString("PROOF_LineStyle"));
03162       fgDrawInputPars->Add(new TObjString("PROOF_LineWidth"));
03163       fgDrawInputPars->Add(new TObjString("PROOF_MarkerColor"));
03164       fgDrawInputPars->Add(new TObjString("PROOF_MarkerStyle"));
03165       fgDrawInputPars->Add(new TObjString("PROOF_MarkerSize"));
03166       fgDrawInputPars->Add(new TObjString("PROOF_FillColor"));
03167       fgDrawInputPars->Add(new TObjString("PROOF_FillStyle"));
03168    }
03169 
03170    TString selector, objname;
03171    if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
03172       Error("DrawSelect", "parsing arguments");
03173       return -1;
03174    }
03175 
03176    TNamed *varexpobj = new TNamed("varexp", varexp);
03177    TNamed *selectionobj = new TNamed("selection", selection);
03178 
03179    // Save the current input list
03180    TObject *o = 0;
03181    TList *savedInput = new TList;
03182    TIter nxi(fInput);
03183    while ((o = nxi())) {
03184       savedInput->Add(o);
03185       TString n(o->GetName());
03186       if (fgDrawInputPars && !fgDrawInputPars->FindObject(o->GetName())) fInput->Remove(o);
03187    }
03188 
03189    fInput->Add(varexpobj);
03190    fInput->Add(selectionobj);
03191 
03192    // Make sure we have an object name
03193    if (objname == "") objname = "htemp";
03194 
03195    fProof->AddFeedback(objname);
03196    Long64_t r = Process(set, selector, option, nentries, firstentry);
03197    fProof->RemoveFeedback(objname);
03198 
03199    fInput->Remove(varexpobj);
03200    fInput->Remove(selectionobj);
03201    if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject("PROOF_OPTIONS"))) {
03202       fInput->Remove(opt);
03203       delete opt;
03204    }
03205 
03206    delete varexpobj;
03207    delete selectionobj;
03208 
03209    // Restore the input list
03210    fInput->Clear();
03211    TIter nxsi(savedInput);
03212    while ((o = nxsi()))
03213       fInput->Add(o);
03214    savedInput->SetOwner(kFALSE);
03215    delete savedInput;
03216 
03217    return r;
03218 }
03219 
03220 //______________________________________________________________________________
03221 void TProofPlayerRemote::SetInitTime()
03222 {
03223    // Set init time
03224 
03225    if (fPacketizer)
03226       fPacketizer->SetInitTime();
03227 }
03228 
03229 //------------------------------------------------------------------------------
03230 
03231 
03232 ClassImp(TProofPlayerSlave)
03233 
03234 //______________________________________________________________________________
03235 void TProofPlayerSlave::SetupFeedback()
03236 {
03237    // Setup feedback.
03238 
03239    TList *fb = (TList*) fInput->FindObject("FeedbackList");
03240    if (fb) {
03241       PDB(kFeedback,1)
03242          Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
03243    } else {
03244       PDB(kFeedback,1)
03245          Info("SetupFeedback","\"FeedbackList\" NOT found");
03246    }
03247 
03248    if (fb == 0 || fb->GetSize() == 0) return;
03249 
03250    // OK, feedback was requested, setup the timer
03251 
03252    SafeDelete(fFeedbackTimer);
03253    fFeedbackPeriod = 2000;
03254    TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
03255    fFeedbackTimer = new TTimer;
03256    fFeedbackTimer->SetObject(this);
03257    fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03258 
03259    fFeedback = fb;
03260 }
03261 
03262 //______________________________________________________________________________
03263 void TProofPlayerSlave::StopFeedback()
03264 {
03265    // Stop feedback.
03266 
03267    if (fFeedbackTimer == 0) return;
03268 
03269    PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
03270 
03271    SafeDelete(fFeedbackTimer);
03272 }
03273 
03274 //______________________________________________________________________________
03275 Bool_t TProofPlayerSlave::HandleTimer(TTimer *)
03276 {
03277    // Handle timer event.
03278 
03279    PDB(kFeedback,2) Info("HandleTimer","Entry");
03280 
03281    // If in sequential (0-slave-PROOF) mode we do not have a packetizer
03282    // so we also send the info to update the progress bar.
03283    if (gProofServ) {
03284       Bool_t sendm = kFALSE;
03285       TMessage m(kPROOF_PROGRESS);
03286       if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
03287          sendm = kTRUE;
03288          if (gProofServ->GetProtocol() > 25) {
03289             m << GetProgressStatus();
03290          } else if (gProofServ->GetProtocol() > 11) {
03291             TProofProgressStatus *ps = GetProgressStatus();
03292             m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
03293               << (Float_t) -1. << (Float_t) ps->GetProcTime()
03294               << (Float_t) ps->GetRate() << (Float_t) -1.;
03295          } else {
03296             m << fTotalEvents << GetEventsProcessed();
03297          }
03298       }
03299       if (sendm) gProofServ->GetSocket()->Send(m);
03300    }
03301 
03302    if (fFeedback == 0) return kFALSE;
03303 
03304    TList *fb = new TList;
03305    fb->SetOwner(kFALSE);
03306 
03307    if (fOutput == 0) {
03308       fOutput = fSelector->GetOutputList();
03309    }
03310 
03311    if (fOutput) {
03312       TIter next(fFeedback);
03313       while( TObjString *name = (TObjString*) next() ) {
03314          // TODO: find object in memory ... maybe allow only in fOutput ?
03315          TObject *o = fOutput->FindObject(name->GetName());
03316          if (o != 0) fb->Add(o);
03317       }
03318    }
03319 
03320    PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
03321 
03322    TMessage m(kPROOF_FEEDBACK);
03323    m << fb;
03324 
03325    // send message to client;
03326    gProofServ->GetSocket()->Send(m);
03327 
03328    delete fb;
03329 
03330    fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03331 
03332    return kFALSE; // ignored?
03333 }
03334 
03335 //______________________________________________________________________________
03336 void TProofPlayerSlave::HandleGetTreeHeader(TMessage *mess)
03337 {
03338    // Handle tree header request.
03339 
03340    TMessage answ(kPROOF_GETTREEHEADER);
03341 
03342    TDSet *dset;
03343    (*mess) >> dset;
03344    dset->Reset();
03345    TDSetElement *e = dset->Next();
03346    Long64_t entries = 0;
03347    TFile *f = 0;
03348    TTree *t = 0;
03349    if (!e) {
03350       PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
03351    } else {
03352       f = TFile::Open(e->GetFileName());
03353       t = 0;
03354       if (f) {
03355          t = (TTree*) f->Get(e->GetObjName());
03356          if (t) {
03357             t->SetMaxVirtualSize(0);
03358             t->DropBaskets();
03359             entries = t->GetEntries();
03360 
03361             // compute #entries in all the files
03362             while ((e = dset->Next()) != 0) {
03363                TFile *f1 = TFile::Open(e->GetFileName());
03364                if (f1) {
03365                   TTree *t1 = (TTree*) f1->Get(e->GetObjName());
03366                   if (t1) {
03367                      entries += t1->GetEntries();
03368                      delete t1;
03369                   }
03370                   delete f1;
03371                }
03372             }
03373             t->SetMaxEntryLoop(entries);   // this field will hold the total number of entries ;)
03374          }
03375       }
03376    }
03377    if (t)
03378       answ << TString("Success") << t;
03379    else
03380       answ << TString("Failed") << t;
03381 
03382    fSocket->Send(answ);
03383 
03384    SafeDelete(t);
03385    SafeDelete(f);
03386 }
03387 
03388 
03389 //------------------------------------------------------------------------------
03390 
03391 ClassImp(TProofPlayerSuperMaster)
03392 
03393 //______________________________________________________________________________
03394 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset, const char *selector_file,
03395                                           Option_t *option, Long64_t nentries,
03396                                           Long64_t first)
03397 {
03398    // Process specified TDSet on PROOF. Runs on super master.
03399    // The return value is -1 in case of error and TSelector::GetStatus() in
03400    // in case of success.
03401 
03402    fProgressStatus->Reset();
03403    PDB(kGlobal,1) Info("Process","Enter");
03404 
03405    TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
03406    if (!proof) return -1;
03407 
03408    delete fOutput;
03409    fOutput = new TList;
03410 
03411    TPerfStats::Start(fInput, fOutput);
03412 
03413    if (!SendSelector(selector_file)) {
03414       Error("Process", "sending selector %s", selector_file);
03415       return -1;
03416    }
03417 
03418    TCleanup clean(this);
03419    SetupFeedback();
03420 
03421    if (proof->IsMaster()) {
03422 
03423       // make sure the DSet is valid
03424       if (!dset->ElementsValid()) {
03425          proof->ValidateDSet(dset);
03426          if (!dset->ElementsValid()) {
03427             Error("Process", "could not validate TDSet");
03428             return -1;
03429          }
03430       }
03431 
03432       TList msds;
03433       msds.SetOwner(); // This will delete TPairs
03434 
03435       TList keyholder; // List to clean up key part of the pairs
03436       keyholder.SetOwner();
03437       TList valueholder; // List to clean up value part of the pairs
03438       valueholder.SetOwner();
03439 
03440       // Construct msd list using the slaves
03441       TIter nextslave(proof->GetListOfActiveSlaves());
03442       while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
03443          TList *submasters = 0;
03444          TPair *msd = dynamic_cast<TPair*>(msds.FindObject(sl->GetMsd()));
03445          if (!msd) {
03446             submasters = new TList;
03447             submasters->SetName(sl->GetMsd());
03448             keyholder.Add(submasters);
03449             TList *setelements = new TSortedList(kSortDescending);
03450             setelements->SetName(TString(sl->GetMsd())+"_Elements");
03451             valueholder.Add(setelements);
03452             msds.Add(new TPair(submasters, setelements));
03453          } else {
03454             submasters = dynamic_cast<TList*>(msd->Key());
03455          }
03456          if (submasters) submasters->Add(sl);
03457       }
03458 
03459       // Add TDSetElements to msd list
03460       Long64_t cur = 0; //start of next element
03461       TIter nextelement(dset->GetListOfElements());
03462       while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
03463 
03464          if (elem->GetNum()<1) continue; // get rid of empty elements
03465 
03466          if (nentries !=-1 && cur>=first+nentries) {
03467             // we are done
03468             break;
03469          }
03470 
03471          if (cur+elem->GetNum()-1<first) {
03472             //element is before first requested entry
03473             cur+=elem->GetNum();
03474             continue;
03475          }
03476 
03477          if (cur<first) {
03478             //modify element to get proper start
03479             elem->SetNum(elem->GetNum()-(first-cur));
03480             elem->SetFirst(elem->GetFirst()+first-cur);
03481             cur=first;
03482          }
03483 
03484          if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
03485             cur+=elem->GetNum();
03486          } else {
03487             //modify element to get proper end
03488             elem->SetNum(first+nentries-cur);
03489             cur=first+nentries;
03490          }
03491 
03492          TPair *msd = dynamic_cast<TPair*>(msds.FindObject(elem->GetMsd()));
03493          if (!msd) {
03494             Error("Process", "data requires mass storage domain '%s'"
03495                   " which is not accessible in this proof session",
03496                   elem->GetMsd());
03497             return -1;
03498          } else {
03499             TList *elements = dynamic_cast<TList*>(msd->Value());
03500             if (elements) elements->Add(elem);
03501          }
03502       }
03503 
03504       TList usedmasters;
03505       TIter nextmsd(msds.MakeIterator());
03506       while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
03507          TList *submasters = dynamic_cast<TList*>(msd->Key());
03508          TList *setelements = dynamic_cast<TList*>(msd->Value());
03509 
03510          // distribute elements over the masters
03511          Int_t nmasters = submasters ? submasters->GetSize() : -1;
03512          Int_t nelements = setelements ? setelements->GetSize() : -1;
03513          for (Int_t i=0; i<nmasters; i++) {
03514 
03515             Long64_t nent = 0;
03516             TDSet set(dset->GetType(), dset->GetObjName(),
03517                       dset->GetDirectory());
03518             for (Int_t j = (i*nelements)/nmasters;
03519                        j < ((i+1)*nelements)/nmasters;
03520                        j++) {
03521                TDSetElement *elem = setelements ?
03522                   dynamic_cast<TDSetElement*>(setelements->At(j)) : (TDSetElement *)0;
03523                if (elem) {
03524                   set.Add(elem->GetFileName(), elem->GetObjName(),
03525                         elem->GetDirectory(), elem->GetFirst(),
03526                         elem->GetNum(), elem->GetMsd());
03527                   nent += elem->GetNum();
03528                } else {
03529                   Warning("Process", "not a TDSetElement object");
03530                }
03531             }
03532 
03533             if (set.GetListOfElements()->GetSize()>0) {
03534                TMessage mesg(kPROOF_PROCESS);
03535                TString fn(gSystem->BaseName(selector_file));
03536                TString opt = option;
03537                mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
03538 
03539                TSlave *sl = dynamic_cast<TSlave*>(submasters->At(i));
03540                if (sl) {
03541                   PDB(kGlobal,1) Info("Process",
03542                                     "Sending TDSet with %d elements to submaster %s",
03543                                     set.GetListOfElements()->GetSize(),
03544                                     sl->GetOrdinal());
03545                   sl->GetSocket()->Send(mesg);
03546                   usedmasters.Add(sl);
03547 
03548                   // setup progress info
03549                   fSlaves.AddLast(sl);
03550                   fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
03551                   fSlaveProgress[fSlaveProgress.GetSize()-1] = 0;
03552                   fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
03553                   fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
03554                   fSlaveBytesRead.Set(fSlaveBytesRead.GetSize()+1);
03555                   fSlaveBytesRead[fSlaveBytesRead.GetSize()-1] = 0;
03556                   fSlaveInitTime.Set(fSlaveInitTime.GetSize()+1);
03557                   fSlaveInitTime[fSlaveInitTime.GetSize()-1] = -1.;
03558                   fSlaveProcTime.Set(fSlaveProcTime.GetSize()+1);
03559                   fSlaveProcTime[fSlaveProcTime.GetSize()-1] = -1.;
03560                   fSlaveEvtRti.Set(fSlaveEvtRti.GetSize()+1);
03561                   fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
03562                   fSlaveMBRti.Set(fSlaveMBRti.GetSize()+1);
03563                   fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
03564                   fSlaveActW.Set(fSlaveActW.GetSize()+1);
03565                   fSlaveActW[fSlaveActW.GetSize()-1] = 0;
03566                   fSlaveTotS.Set(fSlaveTotS.GetSize()+1);
03567                   fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
03568                   fSlaveEffS.Set(fSlaveEffS.GetSize()+1);
03569                   fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
03570                } else {
03571                   Warning("Process", "not a TSlave object");
03572                }
03573             }
03574          }
03575       }
03576 
03577       if ( !IsClient() ) HandleTimer(0);
03578       PDB(kGlobal,1) Info("Process","Calling Collect");
03579       proof->Collect(&usedmasters);
03580       HandleTimer(0);
03581 
03582    }
03583 
03584    StopFeedback();
03585 
03586    PDB(kGlobal,1) Info("Process","Calling Merge Output");
03587    MergeOutput();
03588 
03589    TPerfStats::Stop();
03590 
03591    return 0;
03592 }
03593 
03594 //______________________________________________________________________________
03595 void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total, Long64_t processed)
03596 {
03597    // Report progress.
03598 
03599    Int_t idx = fSlaves.IndexOf(sl);
03600    fSlaveProgress[idx] = processed;
03601    if (fSlaveTotals[idx] != total)
03602       Warning("Progress", "total events has changed for slave %s", sl->GetName());
03603    fSlaveTotals[idx] = total;
03604 
03605    Long64_t tot = 0;
03606    Int_t i;
03607    for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
03608    Long64_t proc = 0;
03609    for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
03610 
03611    Progress(tot, proc);
03612 }
03613 
03614 //______________________________________________________________________________
03615 void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total,
03616                                        Long64_t processed, Long64_t bytesread,
03617                                        Float_t initTime, Float_t procTime,
03618                                        Float_t evtrti, Float_t mbrti)
03619 {
03620    // Report progress.
03621 
03622    PDB(kGlobal,2)
03623       Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
03624                       processed, bytesread, initTime, procTime, evtrti, mbrti);
03625 
03626    Int_t idx = fSlaves.IndexOf(sl);
03627    if (fSlaveTotals[idx] != total)
03628       Warning("Progress", "total events has changed for slave %s", sl->GetName());
03629    fSlaveTotals[idx] = total;
03630    fSlaveProgress[idx] = processed;
03631    fSlaveBytesRead[idx] = bytesread;
03632    fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
03633    fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
03634    fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
03635    fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
03636 
03637    Int_t i;
03638    Long64_t tot = 0;
03639    Long64_t proc = 0;
03640    Long64_t bytes = 0;
03641    Float_t init = -1.;
03642    Float_t ptime = -1.;
03643    Float_t erti = 0.;
03644    Float_t srti = 0.;
03645    Int_t nerti = 0;
03646    Int_t nsrti = 0;
03647    for (i = 0; i < fSlaveTotals.GetSize(); i++) {
03648       tot += fSlaveTotals[i];
03649       if (i < fSlaveProgress.GetSize())
03650          proc += fSlaveProgress[i];
03651       if (i < fSlaveBytesRead.GetSize())
03652          bytes += fSlaveBytesRead[i];
03653       if (i < fSlaveInitTime.GetSize())
03654          if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
03655             init = fSlaveInitTime[i];
03656       if (i < fSlaveProcTime.GetSize())
03657          if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
03658             ptime = fSlaveProcTime[i];
03659       if (i < fSlaveEvtRti.GetSize())
03660          if (fSlaveEvtRti[i] > -1.) {
03661             erti += fSlaveEvtRti[i];
03662             nerti++;
03663          }
03664       if (i < fSlaveMBRti.GetSize())
03665          if (fSlaveMBRti[i] > -1.) {
03666             srti += fSlaveMBRti[i];
03667             nsrti++;
03668          }
03669    }
03670    srti = (nsrti > 0) ? srti / nerti : 0.;
03671 
03672    Progress(tot, proc, bytes, init, ptime, erti, srti);
03673 }
03674 
03675 //______________________________________________________________________________
03676 void TProofPlayerSuperMaster::Progress(TSlave *wrk, TProofProgressInfo *pi)
03677 {
03678    // Progress signal.
03679 
03680    if (pi) {
03681       PDB(kGlobal,2)
03682          Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
03683                          pi->fTotal, pi->fProcessed, pi->fBytesRead,
03684                          pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
03685                          pi->fActWorkers, pi->fEffSessions);
03686 
03687       Int_t idx = fSlaves.IndexOf(wrk);
03688       if (fSlaveTotals[idx] != pi->fTotal)
03689          Warning("Progress", "total events has changed for worker %s", wrk->GetName());
03690       fSlaveTotals[idx] = pi->fTotal;
03691       fSlaveProgress[idx] = pi->fProcessed;
03692       fSlaveBytesRead[idx] = pi->fBytesRead;
03693       fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
03694       fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
03695       fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
03696       fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
03697       fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
03698       fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
03699       fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
03700 
03701       Int_t i;
03702       Int_t nerti = 0;
03703       Int_t nsrti = 0;
03704       TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
03705       for (i = 0; i < fSlaveTotals.GetSize(); i++) {
03706          pisum.fTotal += fSlaveTotals[i];
03707          if (i < fSlaveProgress.GetSize())
03708             pisum.fProcessed += fSlaveProgress[i];
03709          if (i < fSlaveBytesRead.GetSize())
03710             pisum.fBytesRead += fSlaveBytesRead[i];
03711          if (i < fSlaveInitTime.GetSize())
03712             if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
03713                pisum.fInitTime = fSlaveInitTime[i];
03714          if (i < fSlaveProcTime.GetSize())
03715             if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
03716                pisum.fProcTime = fSlaveProcTime[i];
03717          if (i < fSlaveEvtRti.GetSize())
03718             if (fSlaveEvtRti[i] > -1.) {
03719                pisum.fEvtRateI += fSlaveEvtRti[i];
03720                nerti++;
03721             }
03722          if (i < fSlaveMBRti.GetSize())
03723             if (fSlaveMBRti[i] > -1.) {
03724                pisum.fMBRateI += fSlaveMBRti[i];
03725                nsrti++;
03726             }
03727          if (i < fSlaveActW.GetSize())
03728             pisum.fActWorkers += fSlaveActW[i];
03729          if (i < fSlaveTotS.GetSize())
03730             if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
03731                pisum.fTotSessions = fSlaveTotS[i];
03732          if (i < fSlaveEffS.GetSize())
03733             if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
03734                pisum.fEffSessions = fSlaveEffS[i];
03735       }
03736       pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
03737 
03738       Progress(&pisum);
03739    }
03740 }
03741 
03742 //______________________________________________________________________________
03743 Bool_t TProofPlayerSuperMaster::HandleTimer(TTimer *)
03744 {
03745    // Send progress and feedback to client.
03746 
03747    if (fFeedbackTimer == 0) return kFALSE; // timer stopped already
03748 
03749    Int_t i;
03750    Long64_t tot = 0;
03751    Long64_t proc = 0;
03752    Long64_t bytes = 0;
03753    Float_t init = -1.;
03754    Float_t ptime = -1.;
03755    Float_t erti = 0.;
03756    Float_t srti = 0.;
03757    Int_t nerti = 0;
03758    Int_t nsrti = 0;
03759    for (i = 0; i < fSlaveTotals.GetSize(); i++) {
03760       tot += fSlaveTotals[i];
03761       if (i < fSlaveProgress.GetSize())
03762          proc += fSlaveProgress[i];
03763       if (i < fSlaveBytesRead.GetSize())
03764          bytes += fSlaveBytesRead[i];
03765       if (i < fSlaveInitTime.GetSize())
03766          if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
03767             init = fSlaveInitTime[i];
03768       if (i < fSlaveProcTime.GetSize())
03769          if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
03770             ptime = fSlaveProcTime[i];
03771       if (i < fSlaveEvtRti.GetSize())
03772          if (fSlaveEvtRti[i] > -1.) {
03773             erti += fSlaveEvtRti[i];
03774             nerti++;
03775          }
03776       if (i < fSlaveMBRti.GetSize())
03777          if (fSlaveMBRti[i] > -1.) {
03778             srti += fSlaveMBRti[i];
03779             nsrti++;
03780          }
03781    }
03782    erti = (nerti > 0) ? erti / nerti : 0.;
03783    srti = (nsrti > 0) ? srti / nerti : 0.;
03784 
03785    TMessage m(kPROOF_PROGRESS);
03786    if (gProofServ->GetProtocol() > 25) {
03787       // Fill the message now
03788       TProofProgressInfo pi(tot, proc, bytes, init, ptime,
03789                             erti, srti, -1,
03790                             gProofServ->GetTotSessions(), gProofServ->GetEffSessions());
03791       m << &pi;
03792    } else {
03793 
03794       m << tot << proc << bytes << init << ptime << erti << srti;
03795    }
03796 
03797    // send message to client;
03798    gProofServ->GetSocket()->Send(m);
03799 
03800    if (fReturnFeedback)
03801       return TProofPlayerRemote::HandleTimer(0);
03802    else
03803       return kFALSE;
03804 }
03805 
03806 //______________________________________________________________________________
03807 void TProofPlayerSuperMaster::SetupFeedback()
03808 {
03809    // Setup reporting of feedback objects and progress messages.
03810 
03811    if (IsClient()) return; // Client does not need timer
03812 
03813    TProofPlayerRemote::SetupFeedback();
03814 
03815    if (fFeedbackTimer) {
03816       fReturnFeedback = kTRUE;
03817       return;
03818    } else {
03819       fReturnFeedback = kFALSE;
03820    }
03821 
03822    // setup the timer for progress message
03823    SafeDelete(fFeedbackTimer);
03824    fFeedbackPeriod = 2000;
03825    TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
03826    fFeedbackTimer = new TTimer;
03827    fFeedbackTimer->SetObject(this);
03828    fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
03829 }

Generated on Tue Jul 5 14:52:20 2011 for ROOT_528-00b_version by  doxygen 1.5.1