00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #ifndef ROOT_TProof
00013 #define ROOT_TProof
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #ifndef ROOT_TProofMgr
00028 #include "TProofMgr.h"
00029 #endif
00030 #ifndef ROOT_TProofDebug
00031 #include "TProofDebug.h"
00032 #endif
00033 #ifndef ROOT_TString
00034 #include "TString.h"
00035 #endif
00036 #ifndef ROOT_MessageTypes
00037 #include "MessageTypes.h"
00038 #endif
00039 #ifndef ROOT_TMD5
00040 #include "TMD5.h"
00041 #endif
00042 #ifndef ROOT_TRegexp
00043 #include "TRegexp.h"
00044 #endif
00045 #ifndef ROOT_TSysEvtHandler
00046 #include "TSysEvtHandler.h"
00047 #endif
00048 #ifndef ROOT_TThread
00049 #include "TThread.h"
00050 #endif
00051 #ifndef ROOT_TUrl
00052 #include "TUrl.h"
00053 #endif
00054
00055 #include <map>
00056
00057 #ifdef R__GLOBALSTL
00058 namespace std { using ::map; }
00059 #endif
00060
00061 #define CANNOTUSE(x) Info(x,"Not manager: cannot use this method")
00062
00063 class TChain;
00064 class TCondor;
00065 class TCondorSlave;
00066 class TDrawFeedback;
00067 class TDSet;
00068 class TEventList;
00069 class THashList;
00070 class TList;
00071 class TCollection;
00072 class TMessage;
00073 class TMonitor;
00074 class TPluginHandler;
00075 class TProof;
00076 class TProofInputHandler;
00077 class TProofInterruptHandler;
00078 class TProofLockPath;
00079 class TVirtualProofPlayer;
00080 class TProofPlayer;
00081 class TProofPlayerRemote;
00082 class TProofProgressDialog;
00083 class TProofServ;
00084 class TQueryResult;
00085 class TSignalHandler;
00086 class TSlave;
00087 class TSemaphore;
00088 class TSocket;
00089 class TTree;
00090 class TVirtualMutex;
00091 class TFileCollection;
00092 class TMap;
00093 class TDataSetManager;
00094 class TMacro;
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131 const Int_t kPROOF_Protocol = 31;
00132 const Int_t kPROOF_Port = 1093;
00133 const char* const kPROOF_ConfFile = "proof.conf";
00134 const char* const kPROOF_ConfDir = "/usr/local/root";
00135 const char* const kPROOF_WorkDir = ".proof";
00136 const char* const kPROOF_CacheDir = "cache";
00137 const char* const kPROOF_PackDir = "packages";
00138 const char* const kPROOF_PackDownloadDir = "downloaded";
00139 const char* const kPROOF_QueryDir = "queries";
00140 const char* const kPROOF_DataSetDir = "datasets";
00141 const char* const kPROOF_DataDir = "data";
00142 const char* const kPROOF_CacheLockFile = "proof-cache-lock-";
00143 const char* const kPROOF_PackageLockFile = "proof-package-lock-";
00144 const char* const kPROOF_QueryLockFile = "proof-query-lock-";
00145 const char* const kPROOF_TerminateWorker = "+++ terminating +++";
00146 const char* const kPROOF_WorkerIdleTO = "+++ idle-timeout +++";
00147 const char* const kPROOF_InputDataFile = "inputdata.root";
00148
00149 #ifndef R__WIN32
00150 const char* const kCP = "/bin/cp -fp";
00151 const char* const kRM = "/bin/rm -rf";
00152 const char* const kLS = "/bin/ls -l";
00153 const char* const kUNTAR = "%s -c %s/%s | (cd %s; tar xf -)";
00154 const char* const kUNTAR2 = "%s -c %s | (cd %s; tar xf -)";
00155 const char* const kUNTAR3 = "%s -c %s | (tar xf -)";
00156 const char* const kGUNZIP = "gunzip";
00157 #else
00158 const char* const kCP = "copy";
00159 const char* const kRM = "delete";
00160 const char* const kLS = "dir";
00161 const char* const kUNTAR = "...";
00162 const char* const kUNTAR2 = "...";
00163 const char* const kUNTAR3 = "...";
00164 const char* const kGUNZIP = "gunzip";
00165 #endif
00166
00167 R__EXTERN TVirtualMutex *gProofMutex;
00168
00169 typedef void (*PrintProgress_t)(Long64_t tot, Long64_t proc, Float_t proctime, Long64_t bytes);
00170
00171
00172 class TProofProgressInfo : public TObject {
00173 public:
00174 Long64_t fTotal;
00175 Long64_t fProcessed;
00176 Long64_t fBytesRead;
00177 Float_t fInitTime;
00178 Float_t fProcTime;
00179 Float_t fEvtRateI;
00180 Float_t fMBRateI;
00181 Int_t fActWorkers;
00182 Int_t fTotSessions;
00183 Float_t fEffSessions;
00184 TProofProgressInfo(Long64_t tot = 0, Long64_t proc = 0, Long64_t bytes = 0,
00185 Float_t initt = -1., Float_t proct = -1.,
00186 Float_t evts = -1., Float_t mbs = -1.,
00187 Int_t actw = 0, Int_t tsess = 0, Float_t esess = 0.) :
00188 fTotal(tot), fProcessed(proc), fBytesRead(bytes),
00189 fInitTime(initt), fProcTime(proct), fEvtRateI(evts), fMBRateI(mbs),
00190 fActWorkers(actw), fTotSessions(tsess), fEffSessions(esess) { }
00191 virtual ~TProofProgressInfo() { }
00192 ClassDef(TProofProgressInfo, 1);
00193 };
00194
00195
00196 class TProofInterruptHandler : public TSignalHandler {
00197 private:
00198 TProof *fProof;
00199
00200 TProofInterruptHandler(const TProofInterruptHandler&);
00201 TProofInterruptHandler& operator=(const TProofInterruptHandler&);
00202 public:
00203 TProofInterruptHandler(TProof *p)
00204 : TSignalHandler(kSigInterrupt, kFALSE), fProof(p) { }
00205 Bool_t Notify();
00206 };
00207
00208
00209 class TProofInputHandler : public TFileHandler {
00210 private:
00211 TSocket *fSocket;
00212 TProof *fProof;
00213
00214 TProofInputHandler(const TProofInputHandler&);
00215 TProofInputHandler& operator=(const TProofInputHandler&);
00216 public:
00217 TProofInputHandler(TProof *p, TSocket *s);
00218 Bool_t Notify();
00219 Bool_t ReadNotify() { return Notify(); }
00220 };
00221
00222
00223 class TSlaveInfo : public TObject {
00224 public:
00225 enum ESlaveStatus { kActive, kNotActive, kBad };
00226
00227 TString fOrdinal;
00228 TString fHostName;
00229 TString fMsd;
00230 TString fDataDir;
00231 Int_t fPerfIndex;
00232 SysInfo_t fSysInfo;
00233 ESlaveStatus fStatus;
00234
00235 TSlaveInfo(const char *ordinal = "", const char *host = "", Int_t perfidx = 0,
00236 const char *msd = "", const char *datadir = "") :
00237 fOrdinal(ordinal), fHostName(host), fMsd(msd), fDataDir(datadir),
00238 fPerfIndex(perfidx), fSysInfo(), fStatus(kNotActive) { }
00239
00240 const char *GetDataDir() const { return fDataDir; }
00241 const char *GetMsd() const { return fMsd; }
00242 const char *GetName() const { return fHostName; }
00243 const char *GetOrdinal() const { return fOrdinal; }
00244 SysInfo_t GetSysInfo() const { return fSysInfo; }
00245 void SetStatus(ESlaveStatus stat) { fStatus = stat; }
00246 void SetSysInfo(SysInfo_t si);
00247
00248 Int_t Compare(const TObject *obj) const;
00249 Bool_t IsSortable() const { return kTRUE; }
00250 void Print(Option_t *option="") const;
00251
00252 ClassDef(TSlaveInfo,4)
00253 };
00254
00255
00256 class TMergerInfo : public TObject {
00257 private:
00258
00259 TSlave *fMerger;
00260 Int_t fPort;
00261 Int_t fMergedObjects;
00262
00263 Int_t fWorkersToMerge;
00264
00265 Int_t fMergedWorkers;
00266
00267
00268 TList *fWorkers;
00269 Bool_t fIsActive;
00270
00271 TMergerInfo(const TMergerInfo&);
00272 TMergerInfo& operator=(const TMergerInfo&);
00273
00274 public:
00275 TMergerInfo(TSlave *t, Int_t port, Int_t forHowManyWorkers) :
00276 fMerger(t), fPort(port), fMergedObjects(0), fWorkersToMerge(forHowManyWorkers),
00277 fMergedWorkers(0), fWorkers(0), fIsActive(kTRUE) { }
00278 virtual ~TMergerInfo();
00279
00280 void AddWorker(TSlave *sl);
00281 TList *GetWorkers() { return fWorkers; }
00282
00283 TSlave *GetMerger() { return fMerger; }
00284 Int_t GetPort() { return fPort; }
00285
00286 Int_t GetWorkersToMerge() { return fWorkersToMerge; }
00287 Int_t GetMergedWorkers() { return fMergedWorkers; }
00288 Int_t GetMergedObjects() { return fMergedObjects; }
00289
00290 void SetMergedWorker();
00291 void AddMergedObjects(Int_t objects) { fMergedObjects += objects; }
00292
00293 Bool_t AreAllWorkersAssigned();
00294 Bool_t AreAllWorkersMerged();
00295
00296 void Deactivate() { fIsActive = kFALSE; }
00297 Bool_t IsActive() { return fIsActive; }
00298
00299 ClassDef(TMergerInfo,0)
00300 };
00301
00302
00303 class TProofMergePrg {
00304 private:
00305 TString fExp;
00306 Int_t fIdx;
00307 Int_t fNWrks;
00308 static char fgCr[4];
00309 public:
00310 TProofMergePrg() : fExp(), fIdx(-1), fNWrks(-1) { }
00311
00312 const char *Export() { fExp.Form("%c (%d workers still sending) ", fgCr[fIdx], fNWrks);
00313 return fExp.Data(); }
00314 void DecreaseNWrks() { fNWrks--; }
00315 void IncreaseNWrks() { fNWrks++; }
00316 void IncreaseIdx() { fIdx++; if (fIdx == 4) fIdx = 0; }
00317 void Reset(Int_t n = -1) { fIdx = -1; SetNWrks(n); }
00318 void SetNWrks(Int_t n) { fNWrks = n; }
00319 };
00320
00321 class TProof : public TNamed, public TQObject {
00322
00323 friend class TPacketizer;
00324 friend class TPacketizerDev;
00325 friend class TPacketizerAdaptive;
00326 friend class TProofLite;
00327 friend class TDataSetManager;
00328 friend class TProofServ;
00329 friend class TProofInputHandler;
00330 friend class TProofInterruptHandler;
00331 friend class TProofPlayer;
00332 friend class TProofPlayerLite;
00333 friend class TProofPlayerRemote;
00334 friend class TProofProgressDialog;
00335 friend class TSlave;
00336 friend class TSlaveLite;
00337 friend class TVirtualPacketizer;
00338 friend class TXSlave;
00339 friend class TXSocket;
00340 friend class TXSocketHandler;
00341 friend class TXProofMgr;
00342 friend class TXProofServ;
00343
00344 public:
00345
00346 enum EStatusBits {
00347 kUsingSessionGui = BIT(14),
00348 kNewInputData = BIT(15),
00349 kIsClient = BIT(16),
00350 kIsMaster = BIT(17),
00351 kIsTopMaster = BIT(18),
00352 kUseProgressDialog = BIT(19)
00353 };
00354 enum EQueryMode {
00355 kSync = 0,
00356 kAsync = 1
00357 };
00358 enum EUploadOpt {
00359 kAppend = 0x1,
00360 kOverwriteDataSet = 0x2,
00361 kNoOverwriteDataSet = 0x4,
00362 kOverwriteAllFiles = 0x8,
00363 kOverwriteNoFiles = 0x10,
00364 kAskUser = 0x0
00365 };
00366 enum ERegisterOpt {
00367 kFailIfExists = 0,
00368 kOverwriteIfExists = 1,
00369 kMergeIfExists = 2
00370 };
00371 enum EUploadDataSetAnswer {
00372 kError = -1,
00373 kDataSetExists = -2,
00374 kFail = -3
00375 };
00376 enum EUploadPackageOpt {
00377 kUntar = 0x0,
00378 kRemoveOld = 0x1
00379 };
00380 enum ERunStatus {
00381 kRunning = 0,
00382 kStopped = 1,
00383 kAborted = 2
00384 };
00385
00386 enum ESubMerger {
00387 kOutputSize = 1,
00388 kSendOutput = 2,
00389 kBeMerger = 3,
00390 kMergerDown = 4,
00391 kStopMerging = 5,
00392 kOutputSent = 6
00393 };
00394
00395 private:
00396 enum EUrgent {
00397 kLocalInterrupt = -1,
00398 kPing = 0,
00399 kHardInterrupt = 1,
00400 kSoftInterrupt,
00401 kShutdownInterrupt
00402 };
00403 enum EProofCacheCommands {
00404 kShowCache = 1,
00405 kClearCache = 2,
00406 kShowPackages = 3,
00407 kClearPackages = 4,
00408 kClearPackage = 5,
00409 kBuildPackage = 6,
00410 kLoadPackage = 7,
00411 kShowEnabledPackages = 8,
00412 kShowSubCache = 9,
00413 kClearSubCache = 10,
00414 kShowSubPackages = 11,
00415 kDisableSubPackages = 12,
00416 kDisableSubPackage = 13,
00417 kBuildSubPackage = 14,
00418 kUnloadPackage = 15,
00419 kDisablePackage = 16,
00420 kUnloadPackages = 17,
00421 kDisablePackages = 18,
00422 kListPackages = 19,
00423 kListEnabledPackages = 20,
00424 kLoadMacro = 21
00425 };
00426 enum EProofDataSetCommands {
00427 kUploadDataSet = 1,
00428 kCheckDataSetName = 2,
00429 kGetDataSets = 3,
00430 kRegisterDataSet = 4,
00431 kGetDataSet = 5,
00432 kVerifyDataSet = 6,
00433 kRemoveDataSet = 7,
00434 kMergeDataSet = 8,
00435 kShowDataSets = 9,
00436 kGetQuota = 10,
00437 kShowQuota = 11,
00438 kSetDefaultTreeName = 12,
00439 kCache = 13
00440 };
00441 enum ESendFileOpt {
00442 kAscii = 0x0,
00443 kBinary = 0x1,
00444 kForce = 0x2,
00445 kForward = 0x4,
00446 kCpBin = 0x8,
00447 kCp = 0x10
00448 };
00449 enum EProofWrkListAction {
00450 kActivateWorker = 1,
00451 kDeactivateWorker = 2
00452 };
00453 enum EBuildPackageOpt {
00454 kDontBuildOnClient = -2,
00455 kBuildOnSlavesNoWait = -1,
00456 kBuildAll = 0,
00457 kCollectBuildResults = 1
00458 };
00459 enum EProofShowQuotaOpt {
00460 kPerGroup = 0x1,
00461 kPerUser = 0x2
00462 };
00463 enum EProofClearData {
00464 kPurge = 0x1,
00465 kUnregistered = 0x2,
00466 kDataset = 0x4,
00467 kForceClear = 0x8
00468 };
00469
00470 Bool_t fValid;
00471 TString fMaster;
00472 TString fWorkDir;
00473 TString fGroup;
00474 Int_t fLogLevel;
00475 Int_t fStatus;
00476 Int_t fCheckFileStatus;
00477 TList *fRecvMessages;
00478 TList *fSlaveInfo;
00479 Bool_t fSendGroupView;
00480 TList *fActiveSlaves;
00481 TList *fInactiveSlaves;
00482 TList *fUniqueSlaves;
00483 TList *fAllUniqueSlaves;
00484 TList *fNonUniqueMasters;
00485 TMonitor *fActiveMonitor;
00486 TMonitor *fUniqueMonitor;
00487 TMonitor *fAllUniqueMonitor;
00488 TMonitor *fCurrentMonitor;
00489 Long64_t fBytesRead;
00490 Float_t fRealTime;
00491 Float_t fCpuTime;
00492 TSignalHandler *fIntHandler;
00493 TPluginHandler *fProgressDialog;
00494 Bool_t fProgressDialogStarted;
00495 TVirtualProofPlayer *fPlayer;
00496 TList *fFeedback;
00497 TList *fChains;
00498 struct MD5Mod_t {
00499 TMD5 fMD5;
00500 Long_t fModtime;
00501 };
00502 typedef std::map<TString, MD5Mod_t> FileMap_t;
00503 FileMap_t fFileMap;
00504 TDSet *fDSet;
00505
00506 Int_t fNotIdle;
00507 Bool_t fSync;
00508 ERunStatus fRunStatus;
00509 Bool_t fIsWaiting;
00510
00511 Bool_t fRedirLog;
00512 TString fLogFileName;
00513 FILE *fLogFileW;
00514 FILE *fLogFileR;
00515 Bool_t fLogToWindowOnly;
00516
00517 TProofMergePrg fMergePrg;
00518
00519 TList *fWaitingSlaves;
00520 TList *fQueries;
00521 Int_t fOtherQueries;
00522 Int_t fDrawQueries;
00523 Int_t fMaxDrawQueries;
00524 Int_t fSeqNum;
00525
00526 Int_t fSessionID;
00527
00528 Bool_t fEndMaster;
00529
00530 TString fPackageDir;
00531 THashList *fGlobalPackageDirList;
00532 TProofLockPath *fPackageLock;
00533 TList *fEnabledPackagesOnClient;
00534
00535 TList *fInputData;
00536 TString fInputDataFile;
00537
00538 PrintProgress_t fPrintProgress;
00539
00540 TVirtualMutex *fCloseMutex;
00541
00542 TList *fLoadedMacros;
00543 static TList *fgProofEnvList;
00544
00545
00546 Bool_t fMergersSet;
00547 Int_t fMergersCount;
00548 Int_t fWorkersToMerge;
00549 Int_t fLastAssignedMerger;
00550 TList *fMergers;
00551 Bool_t fFinalizationRunning;
00552 Int_t fRedirectNext;
00553
00554 static TPluginHandler *fgLogViewer;
00555
00556 protected:
00557 enum ESlaves { kAll, kActive, kUnique, kAllUnique };
00558
00559 Bool_t fMasterServ;
00560 TUrl fUrl;
00561 TString fConfFile;
00562 TString fConfDir;
00563 TString fImage;
00564 Int_t fProtocol;
00565 TList *fSlaves;
00566 TList *fBadSlaves;
00567 TMonitor *fAllMonitor;
00568 Bool_t fDataReady;
00569 Long64_t fBytesReady;
00570 Long64_t fTotalBytes;
00571 TList *fAvailablePackages;
00572 TList *fEnabledPackages;
00573 TList *fRunningDSets;
00574
00575 Int_t fCollectTimeout;
00576
00577 TString fDataPoolUrl;
00578 TProofMgr::EServType fServType;
00579 TProofMgr *fManager;
00580 EQueryMode fQueryMode;
00581 Bool_t fDynamicStartup;
00582
00583 static TSemaphore *fgSemaphore;
00584
00585 private:
00586 TProof(const TProof &);
00587 void operator=(const TProof &);
00588
00589 void CleanGDirectory(TList *ol);
00590
00591 Int_t Exec(const char *cmd, ESlaves list, Bool_t plusMaster);
00592 Int_t SendCommand(const char *cmd, ESlaves list = kActive);
00593 Int_t SendCurrentState(ESlaves list = kActive);
00594 Bool_t CheckFile(const char *file, TSlave *sl, Long_t modtime, Int_t cpopt = (kCp | kCpBin));
00595 Int_t SendObject(const TObject *obj, ESlaves list = kActive);
00596 Int_t SendGroupView();
00597 Int_t SendInitialState();
00598 Int_t SendPrint(Option_t *option="");
00599 Int_t Ping(ESlaves list);
00600 void Interrupt(EUrgent type, ESlaves list = kActive);
00601 void AskStatistics();
00602 void AskParallel();
00603 Int_t GoParallel(Int_t nodes, Bool_t accept = kFALSE, Bool_t random = kFALSE);
00604 Int_t SetParallelSilent(Int_t nodes, Bool_t random = kFALSE);
00605 void RecvLogFile(TSocket *s, Int_t size);
00606 void NotifyLogMsg(const char *msg, const char *sfx = "\n");
00607 Int_t BuildPackage(const char *package, EBuildPackageOpt opt = kBuildAll);
00608 Int_t BuildPackageOnClient(const char *package, Int_t opt = 0, TString *path = 0);
00609 Int_t LoadPackage(const char *package, Bool_t notOnClient = kFALSE, TList *loadopts = 0);
00610 Int_t LoadPackageOnClient(const char *package, TList *loadopts = 0);
00611 Int_t UnloadPackage(const char *package);
00612 Int_t UnloadPackageOnClient(const char *package);
00613 Int_t UnloadPackages();
00614 Int_t UploadPackageOnClient(const char *package, EUploadPackageOpt opt, TMD5 *md5);
00615 Int_t DisablePackage(const char *package);
00616 Int_t DisablePackageOnClient(const char *package);
00617 Int_t DisablePackages();
00618
00619 void Activate(TList *slaves = 0);
00620 Int_t Broadcast(const TMessage &mess, TList *slaves);
00621 Int_t Broadcast(const TMessage &mess, ESlaves list = kActive);
00622 Int_t Broadcast(const char *mess, Int_t kind, TList *slaves);
00623 Int_t Broadcast(const char *mess, Int_t kind = kMESS_STRING, ESlaves list = kActive);
00624 Int_t Broadcast(Int_t kind, TList *slaves) { return Broadcast(0, kind, slaves); }
00625 Int_t Broadcast(Int_t kind, ESlaves list = kActive) { return Broadcast(0, kind, list); }
00626 Int_t BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks);
00627 Int_t BroadcastFile(const char *file, Int_t opt, const char *rfile = 0, ESlaves list = kAllUnique);
00628 Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list = kAllUnique);
00629 Int_t BroadcastGroupPriority(const char *grp, Int_t priority, TList *workers);
00630 Int_t BroadcastObject(const TObject *obj, Int_t kind, TList *slaves);
00631 Int_t BroadcastObject(const TObject *obj, Int_t kind = kMESS_OBJECT, ESlaves list = kActive);
00632 Int_t BroadcastRaw(const void *buffer, Int_t length, TList *slaves);
00633 Int_t BroadcastRaw(const void *buffer, Int_t length, ESlaves list = kActive);
00634 Int_t Collect(const TSlave *sl, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
00635 Int_t Collect(TMonitor *mon, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
00636 Int_t CollectInputFrom(TSocket *s, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
00637 Int_t HandleInputMessage(TSlave *wrk, TMessage *m, Bool_t deactonfail = kFALSE);
00638 void HandleSubmerger(TMessage *mess, TSlave *sl);
00639 void SetMonitor(TMonitor *mon = 0, Bool_t on = kTRUE);
00640
00641 void ReleaseMonitor(TMonitor *mon);
00642
00643 virtual void FindUniqueSlaves();
00644 TSlave *FindSlave(TSocket *s) const;
00645 TList *GetListOfSlaves() const { return fSlaves; }
00646 TList *GetListOfInactiveSlaves() const { return fInactiveSlaves; }
00647 TList *GetListOfUniqueSlaves() const { return fUniqueSlaves; }
00648 TList *GetListOfBadSlaves() const { return fBadSlaves; }
00649 Int_t GetNumberOfSlaves() const;
00650 Int_t GetNumberOfActiveSlaves() const;
00651 Int_t GetNumberOfInactiveSlaves() const;
00652 Int_t GetNumberOfUniqueSlaves() const;
00653 Int_t GetNumberOfBadSlaves() const;
00654
00655 Bool_t IsEndMaster() const { return fEndMaster; }
00656 void ModifyWorkerLists(const char *ord, Bool_t add);
00657
00658 Bool_t IsSync() const { return fSync; }
00659 void InterruptCurrentMonitor();
00660
00661 void SetRunStatus(ERunStatus rst) { fRunStatus = rst; }
00662
00663 void MarkBad(TSlave *wrk, const char *reason = 0);
00664 void MarkBad(TSocket *s, const char *reason = 0);
00665 void TerminateWorker(TSlave *wrk);
00666 void TerminateWorker(const char *ord);
00667
00668 void ActivateAsyncInput();
00669 void DeActivateAsyncInput();
00670
00671 Int_t GetQueryReference(Int_t qry, TString &ref);
00672 void PrintProgress(Long64_t total, Long64_t processed,
00673 Float_t procTime = -1., Long64_t bytesread = -1);
00674
00675
00676 Bool_t CreateMerger(TSlave *sl, Int_t port);
00677 void RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size);
00678 Int_t GetActiveMergersCount();
00679 Int_t FindNextFreeMerger();
00680 void ResetMergers() { fMergersSet = kFALSE; }
00681 void AskForOutput(TSlave *sl);
00682
00683 void FinalizationDone() { fFinalizationRunning = kFALSE; }
00684
00685 void ResetMergePrg();
00686 void ParseConfigField(const char *config);
00687
00688 Bool_t Prompt(const char *p);
00689 void ClearDataProgress(Int_t r, Int_t t);
00690
00691 static TList *GetDataSetSrvMaps(const TString &srvmaps);
00692
00693 protected:
00694 TProof();
00695 void InitMembers();
00696 Int_t Init(const char *masterurl, const char *conffile,
00697 const char *confdir, Int_t loglevel,
00698 const char *alias = 0);
00699 virtual Bool_t StartSlaves(Bool_t attach = kFALSE);
00700 Int_t AddWorkers(TList *wrks);
00701 Int_t RemoveWorkers(TList *wrks);
00702
00703 void SetPlayer(TVirtualProofPlayer *player);
00704 TVirtualProofPlayer *GetPlayer() const { return fPlayer; }
00705 virtual TVirtualProofPlayer *MakePlayer(const char *player = 0, TSocket *s = 0);
00706
00707 void UpdateDialog();
00708
00709 void HandleLibIncPath(const char *what, Bool_t add, const char *dirs);
00710
00711 TList *GetListOfActiveSlaves() const { return fActiveSlaves; }
00712 TSlave *CreateSlave(const char *url, const char *ord,
00713 Int_t perf, const char *image, const char *workdir);
00714 TSlave *CreateSubmaster(const char *url, const char *ord,
00715 const char *image, const char *msd);
00716
00717 virtual void SaveWorkerInfo();
00718
00719 Int_t Collect(ESlaves list = kActive, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
00720 Int_t Collect(TList *slaves, Long_t timeout = -1, Int_t endtype = -1, Bool_t deactonfail = kFALSE);
00721
00722 void SetDSet(TDSet *dset) { fDSet = dset; }
00723 virtual void ValidateDSet(TDSet *dset);
00724
00725 TPluginHandler *GetProgressDialog() const { return fProgressDialog; }
00726
00727 Int_t AssertPath(const char *path, Bool_t writable);
00728
00729 void PrepareInputDataFile(TString &dataFile);
00730 virtual void SendInputDataFile();
00731 Int_t SendFile(const char *file, Int_t opt = (kBinary | kForward | kCp | kCpBin),
00732 const char *rfile = 0, TSlave *sl = 0);
00733
00734 static void *SlaveStartupThread(void *arg);
00735
00736 static Int_t AssertDataSet(TDSet *dset, TList *input,
00737 TDataSetManager *mgr, TString &emsg);
00738
00739 static Int_t GetInputData(TList *input, const char *cachedir, TString &emsg);
00740 static Int_t SaveInputData(TQueryResult *qr, const char *cachedir, TString &emsg);
00741 static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg);
00742
00743
00744 static Bool_t GetFileInCmd(const char *cmd, TString &fn);
00745
00746
00747 static void SystemCmd(const char *cmd, Int_t fdout);
00748
00749 public:
00750 TProof(const char *masterurl, const char *conffile = kPROOF_ConfFile,
00751 const char *confdir = kPROOF_ConfDir, Int_t loglevel = 0,
00752 const char *alias = 0, TProofMgr *mgr = 0);
00753 virtual ~TProof();
00754
00755 void cd(Int_t id = -1);
00756
00757 Int_t Ping();
00758 void Touch();
00759 Int_t Exec(const char *cmd, Bool_t plusMaster = kFALSE);
00760
00761 virtual Long64_t Process(TDSet *dset, const char *selector,
00762 Option_t *option = "", Long64_t nentries = -1,
00763 Long64_t firstentry = 0);
00764 virtual Long64_t Process(TFileCollection *fc, const char *selector,
00765 Option_t *option = "", Long64_t nentries = -1,
00766 Long64_t firstentry = 0);
00767 virtual Long64_t Process(const char *dsetname, const char *selector,
00768 Option_t *option = "", Long64_t nentries = -1,
00769 Long64_t firstentry = 0, TObject *enl = 0);
00770 virtual Long64_t Process(const char *selector, Long64_t nentries,
00771 Option_t *option = "");
00772
00773 virtual Long64_t DrawSelect(TDSet *dset, const char *varexp,
00774 const char *selection = "",
00775 Option_t *option = "", Long64_t nentries = -1,
00776 Long64_t firstentry = 0);
00777 Long64_t DrawSelect(const char *dsetname, const char *varexp,
00778 const char *selection = "",
00779 Option_t *option = "", Long64_t nentries = -1,
00780 Long64_t firstentry = 0, TObject *enl = 0);
00781 Int_t Archive(Int_t query, const char *url);
00782 Int_t Archive(const char *queryref, const char *url = 0);
00783 Int_t CleanupSession(const char *sessiontag);
00784 Long64_t Finalize(Int_t query = -1, Bool_t force = kFALSE);
00785 Long64_t Finalize(const char *queryref, Bool_t force = kFALSE);
00786 Int_t Remove(Int_t query, Bool_t all = kFALSE);
00787 Int_t Remove(const char *queryref, Bool_t all = kFALSE);
00788 Int_t Retrieve(Int_t query, const char *path = 0);
00789 Int_t Retrieve(const char *queryref, const char *path = 0);
00790
00791 void DisableGoAsyn();
00792 void GoAsynchronous();
00793 void StopProcess(Bool_t abort, Int_t timeout = -1);
00794 void Browse(TBrowser *b);
00795
00796 Int_t SetParallel(Int_t nodes = 9999, Bool_t random = kFALSE);
00797 void SetLogLevel(Int_t level, UInt_t mask = TProofDebug::kAll);
00798
00799 void Close(Option_t *option="");
00800 virtual void Print(Option_t *option="") const;
00801
00802
00803 virtual void ShowCache(Bool_t all = kFALSE);
00804 virtual void ClearCache(const char *file = 0);
00805 TList *GetListOfPackages();
00806 TList *GetListOfEnabledPackages();
00807 void ShowPackages(Bool_t all = kFALSE, Bool_t redirlog = kFALSE);
00808 void ShowEnabledPackages(Bool_t all = kFALSE);
00809 Int_t ClearPackages();
00810 Int_t ClearPackage(const char *package);
00811 Int_t DownloadPackage(const char *par, const char *dstdir = 0);
00812 Int_t EnablePackage(const char *package, Bool_t notOnClient = kFALSE);
00813 Int_t EnablePackage(const char *package, const char *loadopts,
00814 Bool_t notOnClient = kFALSE);
00815 Int_t EnablePackage(const char *package, TList *loadopts,
00816 Bool_t notOnClient = kFALSE);
00817 Int_t UploadPackage(const char *par, EUploadPackageOpt opt = kUntar);
00818 virtual Int_t Load(const char *macro, Bool_t notOnClient = kFALSE, Bool_t uniqueOnly = kTRUE,
00819 TList *wrks = 0);
00820
00821 Int_t AddDynamicPath(const char *libpath, Bool_t onClient = kFALSE, TList *wrks = 0);
00822 Int_t AddIncludePath(const char *incpath, Bool_t onClient = kFALSE, TList *wrks = 0);
00823 Int_t RemoveDynamicPath(const char *libpath, Bool_t onClient = kFALSE);
00824 Int_t RemoveIncludePath(const char *incpath, Bool_t onClient = kFALSE);
00825
00826
00827 Int_t UploadDataSet(const char *dataset,
00828 TList *files,
00829 const char *dest = 0,
00830 Int_t opt = kAskUser,
00831 TList *skippedFiles = 0);
00832 Int_t UploadDataSet(const char *dataset,
00833 const char *files,
00834 const char *dest = 0,
00835 Int_t opt = kAskUser,
00836 TList *skippedFiles = 0);
00837 Int_t UploadDataSetFromFile(const char *dataset,
00838 const char *file,
00839 const char *dest = 0,
00840 Int_t opt = kAskUser,
00841 TList *skippedFiles = 0);
00842 virtual Bool_t RegisterDataSet(const char *name,
00843 TFileCollection *dataset, const char* optStr = "");
00844 virtual TMap *GetDataSets(const char *uri = "", const char* optStr = "");
00845 virtual void ShowDataSets(const char *uri = "", const char* optStr = "");
00846
00847 TMap *GetDataSetQuota(const char* optStr = "");
00848 void ShowDataSetQuota(Option_t* opt = 0);
00849
00850 virtual Bool_t ExistsDataSet(const char *dataset);
00851 void ShowDataSet(const char *dataset = "", const char* opt = "M");
00852 virtual Int_t RemoveDataSet(const char *dataset, const char* optStr = "");
00853 virtual Int_t VerifyDataSet(const char *dataset, const char* optStr = "");
00854 virtual TFileCollection *GetDataSet(const char *dataset, const char* optStr = "");
00855 TList *FindDataSets(const char *searchString, const char* optStr = "");
00856
00857 virtual Int_t SetDataSetTreeName( const char *dataset, const char *treename);
00858
00859 virtual void ShowDataSetCache(const char *dataset = 0);
00860 virtual void ClearDataSetCache(const char *dataset = 0);
00861
00862 void ShowData();
00863 void ClearData(UInt_t what = kUnregistered, const char *dsname = 0);
00864
00865 const char *GetMaster() const { return fMaster; }
00866 const char *GetConfDir() const { return fConfDir; }
00867 const char *GetConfFile() const { return fConfFile; }
00868 const char *GetUser() const { return fUrl.GetUser(); }
00869 const char *GetGroup() const { return fGroup; }
00870 const char *GetWorkDir() const { return fWorkDir; }
00871 const char *GetSessionTag() const { return GetName(); }
00872 const char *GetImage() const { return fImage; }
00873 const char *GetUrl() { return fUrl.GetUrl(); }
00874 Int_t GetPort() const { return fUrl.GetPort(); }
00875 Int_t GetRemoteProtocol() const { return fProtocol; }
00876 Int_t GetClientProtocol() const { return kPROOF_Protocol; }
00877 Int_t GetStatus() const { return fStatus; }
00878 Int_t GetLogLevel() const { return fLogLevel; }
00879 Int_t GetParallel() const;
00880 Int_t GetSeqNum() const { return fSeqNum; }
00881 Int_t GetSessionID() const { return fSessionID; }
00882 TList *GetListOfSlaveInfos();
00883 Bool_t UseDynamicStartup() const { return fDynamicStartup; }
00884
00885 EQueryMode GetQueryMode(Option_t *mode = 0) const;
00886 void SetQueryMode(EQueryMode mode);
00887
00888 void SetRealTimeLog(Bool_t on = kTRUE);
00889
00890 void GetStatistics(Bool_t verbose = kFALSE);
00891 Long64_t GetBytesRead() const { return fBytesRead; }
00892 Float_t GetRealTime() const { return fRealTime; }
00893 Float_t GetCpuTime() const { return fCpuTime; }
00894
00895 Bool_t IsLite() const { return (fServType == TProofMgr::kProofLite); }
00896 Bool_t IsProofd() const { return (fServType == TProofMgr::kProofd); }
00897 Bool_t IsFolder() const { return kTRUE; }
00898 Bool_t IsMaster() const { return fMasterServ; }
00899 Bool_t IsValid() const { return fValid; }
00900 Bool_t IsParallel() const { return GetParallel() > 0 ? kTRUE : kFALSE; }
00901 Bool_t IsIdle() const { return (fNotIdle <= 0) ? kTRUE : kFALSE; }
00902 Bool_t IsWaiting() const { return fIsWaiting; }
00903
00904 ERunStatus GetRunStatus() const { return fRunStatus; }
00905 TList *GetLoadedMacros() const { return fLoadedMacros; }
00906
00907
00908 void SetParameter(const char *par, const char *value);
00909 void SetParameter(const char *par, Int_t value);
00910 void SetParameter(const char *par, Long_t value);
00911 void SetParameter(const char *par, Long64_t value);
00912 void SetParameter(const char *par, Double_t value);
00913 TObject *GetParameter(const char *par) const;
00914 void DeleteParameters(const char *wildcard);
00915 void ShowParameters(const char *wildcard = "PROOF_*") const;
00916
00917 void AddInput(TObject *obj);
00918 void ClearInput();
00919 TList *GetInputList();
00920 TObject *GetOutput(const char *name);
00921 TList *GetOutputList();
00922
00923 void ShowMissingFiles(TQueryResult *qr = 0);
00924 TFileCollection *GetMissingFiles(TQueryResult *qr = 0);
00925
00926 void AddInputData(TObject *obj, Bool_t push = kFALSE);
00927 void SetInputDataFile(const char *datafile);
00928 void ClearInputData(TObject *obj = 0);
00929 void ClearInputData(const char *name);
00930
00931 void AddFeedback(const char *name);
00932 void RemoveFeedback(const char *name);
00933 void ClearFeedback();
00934 void ShowFeedback() const;
00935 TList *GetFeedbackList() const;
00936
00937 virtual TList *GetListOfQueries(Option_t *opt = "");
00938 Int_t GetNumberOfQueries();
00939 Int_t GetNumberOfDrawQueries() { return fDrawQueries; }
00940 TList *GetQueryResults();
00941 TQueryResult *GetQueryResult(const char *ref = 0);
00942 void GetMaxQueries();
00943 void SetMaxDrawQueries(Int_t max);
00944 void ShowQueries(Option_t *opt = "");
00945
00946 Bool_t IsDataReady(Long64_t &totalbytes, Long64_t &bytesready);
00947
00948 void SetActive(Bool_t = kTRUE) { }
00949
00950 void LogMessage(const char *msg, Bool_t all);
00951 void Progress(Long64_t total, Long64_t processed);
00952 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
00953 Float_t initTime, Float_t procTime,
00954 Float_t evtrti, Float_t mbrti);
00955 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
00956 Float_t initTime, Float_t procTime,
00957 Float_t evtrti, Float_t mbrti,
00958 Int_t actw, Int_t tses, Float_t eses);
00959 void Feedback(TList *objs);
00960 void QueryResultReady(const char *ref);
00961 void CloseProgressDialog();
00962 void ResetProgressDialog(const char *sel, Int_t sz,
00963 Long64_t fst, Long64_t ent);
00964 void StartupMessage(const char *msg, Bool_t status, Int_t done,
00965 Int_t total);
00966 void DataSetStatus(const char *msg, Bool_t status,
00967 Int_t done, Int_t total);
00968
00969 void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st);
00970
00971 void GetLog(Int_t start = -1, Int_t end = -1);
00972 TMacro *GetLastLog();
00973 void PutLog(TQueryResult *qr);
00974 void ShowLog(Int_t qry = -1);
00975 void ShowLog(const char *queryref);
00976 Bool_t SendingLogToWindow() const { return fLogToWindowOnly; }
00977 void SendLogToWindow(Bool_t mode) { fLogToWindowOnly = mode; }
00978
00979 void ResetProgressDialogStatus() { fProgressDialogStarted = kFALSE; }
00980
00981 virtual TTree *GetTreeHeader(TDSet *tdset);
00982 TList *GetOutputNames();
00983
00984 void AddChain(TChain *chain);
00985 void RemoveChain(TChain *chain);
00986
00987 TDrawFeedback *CreateDrawFeedback();
00988 void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
00989 void DeleteDrawFeedback(TDrawFeedback *f);
00990
00991 void Detach(Option_t *opt = "");
00992
00993 virtual void SetAlias(const char *alias="");
00994
00995 TProofMgr *GetManager() { return fManager; }
00996 void SetManager(TProofMgr *mgr);
00997
00998 void ActivateWorker(const char *ord);
00999 void DeactivateWorker(const char *ord);
01000
01001 const char *GetDataPoolUrl() const { return fDataPoolUrl; }
01002 void SetDataPoolUrl(const char *url) { fDataPoolUrl = url; }
01003
01004 void SetPrintProgress(PrintProgress_t pp) { fPrintProgress = pp; }
01005
01006 void SetProgressDialog(Bool_t on = kTRUE);
01007
01008
01009 static TProof *Open(const char *url = 0, const char *conffile = 0,
01010 const char *confdir = 0, Int_t loglevel = 0);
01011 static void LogViewer(const char *url = 0, Int_t sessionidx = 0);
01012 static TProofMgr *Mgr(const char *url);
01013 static void Reset(const char *url, Bool_t hard = kFALSE);
01014
01015 static void AddEnvVar(const char *name, const char *value);
01016 static void DelEnvVar(const char *name);
01017 static const TList *GetEnvVars();
01018 static void ResetEnvVars();
01019
01020
01021 static Int_t GetParameter(TCollection *c, const char *par, TString &value);
01022 static Int_t GetParameter(TCollection *c, const char *par, Int_t &value);
01023 static Int_t GetParameter(TCollection *c, const char *par, Long_t &value);
01024 static Int_t GetParameter(TCollection *c, const char *par, Long64_t &value);
01025 static Int_t GetParameter(TCollection *c, const char *par, Double_t &value);
01026
01027 ClassDef(TProof,0)
01028 };
01029
01030
01031 R__EXTERN TProof *gProof;
01032
01033 #endif