00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef ROOT_TProofServ
00014 #define ROOT_TProofServ
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #ifndef ROOT_TApplication
00028 #include "TApplication.h"
00029 #endif
00030 #ifndef ROOT_TString
00031 #include "TString.h"
00032 #endif
00033 #ifndef ROOT_TSysEvtHandler
00034 #include "TSysEvtHandler.h"
00035 #endif
00036 #ifndef ROOT_TStopwatch
00037 #include "TStopwatch.h"
00038 #endif
00039 #ifndef ROOT_TTimer
00040 #include "TTimer.h"
00041 #endif
00042 #ifndef ROOT_TProofQueryResult
00043 #include "TProofQueryResult.h"
00044 #endif
00045
00046 class TDSet;
00047 class TProof;
00048 class TVirtualProofPlayer;
00049 class TProofLockPath;
00050 class TQueryResultManager;
00051 class TSocket;
00052 class THashList;
00053 class TList;
00054 class TDSetElement;
00055 class TMessage;
00056 class TShutdownTimer;
00057 class TReaperTimer;
00058 class TIdleTOTimer;
00059 class TMutex;
00060 class TFileCollection;
00061 class TDataSetManager;
00062 class TFileHandler;
00063 class TMonitor;
00064 class TServerSocket;
00065
00066
00067
00068
00069 typedef Int_t (*OldProofServAuthSetup_t)(TSocket *, Bool_t, Int_t,
00070 TString &, TString &, TString &);
00071
00072
00073 class TProofServ : public TApplication {
00074
00075 friend class TProofServLite;
00076 friend class TXProofServ;
00077
00078 public:
00079 enum EStatusBits { kHighMemory = BIT(16) };
00080 enum EQueryAction { kQueryOK, kQueryModify, kQueryStop, kQueryEnqueued };
00081
00082 private:
00083 TString fService;
00084 TString fUser;
00085 TString fGroup;
00086 TString fConfDir;
00087 TString fConfFile;
00088 TString fWorkDir;
00089 TString fImage;
00090 TString fSessionTag;
00091 TString fTopSessionTag;
00092 TString fSessionDir;
00093 TString fPackageDir;
00094 THashList *fGlobalPackageDirList;
00095 TString fCacheDir;
00096 TString fQueryDir;
00097 TString fDataSetDir;
00098 TString fDataDir;
00099 TString fAdminPath;
00100 TProofLockPath *fPackageLock;
00101 TProofLockPath *fCacheLock;
00102 TProofLockPath *fQueryLock;
00103 TString fArchivePath;
00104 TSocket *fSocket;
00105 TProof *fProof;
00106 TVirtualProofPlayer *fPlayer;
00107 FILE *fLogFile;
00108 Int_t fLogFileDes;
00109 TList *fEnabledPackages;
00110 Int_t fProtocol;
00111 TString fOrdinal;
00112 Int_t fGroupId;
00113 Int_t fGroupSize;
00114 Int_t fLogLevel;
00115 Int_t fNcmd;
00116 Int_t fGroupPriority;
00117 Bool_t fEndMaster;
00118 Bool_t fMasterServ;
00119 Bool_t fInterrupt;
00120 Float_t fRealTime;
00121 Float_t fCpuTime;
00122 TStopwatch fLatency;
00123 TStopwatch fCompute;
00124 Int_t fQuerySeqNum;
00125
00126 Int_t fTotSessions;
00127 Int_t fActSessions;
00128 Float_t fEffSessions;
00129
00130 TFileHandler *fInputHandler;
00131
00132 TQueryResultManager *fQMgr;
00133
00134 TList *fWaitingQueries;
00135 Bool_t fIdle;
00136 TMutex *fQMtx;
00137
00138 TList *fQueuedMsg;
00139
00140 TString fPrefix;
00141
00142 Bool_t fRealTimeLog;
00143
00144 TShutdownTimer *fShutdownTimer;
00145 TReaperTimer *fReaperTimer;
00146 TIdleTOTimer *fIdleTOTimer;
00147
00148 Int_t fInflateFactor;
00149
00150 Int_t fCompressMsg;
00151
00152 TDataSetManager* fDataSetManager;
00153
00154 Bool_t fSendLogToMaster;
00155
00156 TServerSocket *fMergingSocket;
00157 TMonitor *fMergingMonitor;
00158 Int_t fMergedWorkers;
00159
00160
00161 Int_t fMaxQueries;
00162 Long64_t fMaxBoxSize;
00163 Long64_t fHWMBoxSize;
00164
00165
00166 static Long_t fgVirtMemMax;
00167 static Long_t fgResMemMax;
00168
00169 static Float_t fgMemHWM;
00170 static Float_t fgMemStop;
00171
00172
00173 Long64_t fMsgSizeHWM;
00174
00175 static FILE *fgErrorHandlerFile;
00176 static Int_t fgRecursive;
00177
00178
00179 static Int_t fgLogToSysLog;
00180 static TString fgSysLogService;
00181 static TString fgSysLogEntity;
00182
00183 void RedirectOutput(const char *dir = 0, const char *mode = "w");
00184 Int_t CatMotd();
00185 Int_t UnloadPackage(const char *package);
00186 Int_t UnloadPackages();
00187 Int_t OldAuthSetup(TString &wconf);
00188 Int_t GetPriority();
00189
00190
00191 TProofQueryResult *MakeQueryResult(Long64_t nentries, const char *opt,
00192 TList *inl, Long64_t first, TDSet *dset,
00193 const char *selec, TObject *elist);
00194 void SetQueryRunning(TProofQueryResult *pq);
00195
00196
00197 Int_t SendResults(TSocket *sock, TList *outlist = 0, TQueryResult *pq = 0);
00198 Bool_t AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer);
00199
00200 Int_t RegisterDataSets(TList *in, TList *out);
00201
00202
00203 void SetIdle(Bool_t st = kTRUE);
00204 Bool_t IsWaiting();
00205 Int_t WaitingQueries();
00206 Int_t QueueQuery(TProofQueryResult *pq);
00207 TProofQueryResult *NextQuery();
00208 Int_t CleanupWaitingQueries(Bool_t del = kTRUE, TList *qls = 0);
00209
00210 protected:
00211 virtual void HandleArchive(TMessage *mess, TString *slb = 0);
00212 virtual Int_t HandleCache(TMessage *mess, TString *slb = 0);
00213 virtual void HandleCheckFile(TMessage *mess, TString *slb = 0);
00214 virtual Int_t HandleDataSets(TMessage *mess, TString *slb = 0);
00215 virtual void HandleSubmerger(TMessage *mess);
00216 virtual void HandleFork(TMessage *mess);
00217 virtual void HandleLibIncPath(TMessage *mess);
00218 virtual void HandleProcess(TMessage *mess, TString *slb = 0);
00219 virtual void HandleQueryList(TMessage *mess);
00220 virtual void HandleRemove(TMessage *mess, TString *slb = 0);
00221 virtual void HandleRetrieve(TMessage *mess, TString *slb = 0);
00222 virtual void HandleWorkerLists(TMessage *mess);
00223
00224 virtual void ProcessNext(TString *slb = 0);
00225 virtual Int_t Setup();
00226 Int_t SetupCommon();
00227 virtual void MakePlayer();
00228 virtual void DeletePlayer();
00229
00230 virtual Int_t Fork();
00231 Int_t GetSessionStatus();
00232 Bool_t IsIdle();
00233 Bool_t UnlinkDataDir(const char *path);
00234
00235 static TString fgLastMsg;
00236
00237 public:
00238 TProofServ(Int_t *argc, char **argv, FILE *flog = 0);
00239 virtual ~TProofServ();
00240
00241 virtual Int_t CreateServer();
00242
00243 TProof *GetProof() const { return fProof; }
00244 const char *GetService() const { return fService; }
00245 const char *GetConfDir() const { return fConfDir; }
00246 const char *GetConfFile() const { return fConfFile; }
00247 const char *GetUser() const { return fUser; }
00248 const char *GetGroup() const { return fGroup; }
00249 const char *GetWorkDir() const { return fWorkDir; }
00250 const char *GetImage() const { return fImage; }
00251 const char *GetSessionTag() const { return fTopSessionTag; }
00252 const char *GetSessionDir() const { return fSessionDir; }
00253 const char *GetPackageDir() const { return fPackageDir; }
00254 const char *GetDataDir() const { return fDataDir; }
00255 Int_t GetProtocol() const { return fProtocol; }
00256 const char *GetOrdinal() const { return fOrdinal; }
00257 Int_t GetGroupId() const { return fGroupId; }
00258 Int_t GetGroupSize() const { return fGroupSize; }
00259 Int_t GetLogLevel() const { return fLogLevel; }
00260 TSocket *GetSocket() const { return fSocket; }
00261 Float_t GetRealTime() const { return fRealTime; }
00262 Float_t GetCpuTime() const { return fCpuTime; }
00263 Int_t GetQuerySeqNum() const { return fQuerySeqNum; }
00264
00265 Int_t GetTotSessions() const { return fTotSessions; }
00266 Int_t GetActSessions() const { return fActSessions; }
00267 Float_t GetEffSessions() const { return fEffSessions; }
00268
00269 void GetOptions(Int_t *argc, char **argv);
00270 TList *GetEnabledPackages() const { return fEnabledPackages; }
00271
00272 Int_t GetInflateFactor() const { return fInflateFactor; }
00273
00274 static Long_t GetVirtMemMax();
00275 static Long_t GetResMemMax();
00276 static Float_t GetMemHWM();
00277 static Float_t GetMemStop();
00278
00279 Long64_t GetMsgSizeHWM() const { return fMsgSizeHWM; }
00280
00281 const char *GetPrefix() const { return fPrefix; }
00282
00283 void FlushLogFile();
00284
00285 TProofLockPath *GetCacheLock() { return fCacheLock; }
00286 Int_t CopyFromCache(const char *name, Bool_t cpbin);
00287 Int_t CopyToCache(const char *name, Int_t opt = 0);
00288
00289 virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange,
00290 Bool_t resume = kFALSE);
00291 virtual void HandleException(Int_t sig);
00292 virtual Int_t HandleSocketInput(TMessage *mess, Bool_t all);
00293 virtual void HandleSocketInput();
00294 virtual void HandleUrgentData();
00295 virtual void HandleSigPipe();
00296 virtual void HandleTermination() { Terminate(0); }
00297 void Interrupt() { fInterrupt = kTRUE; }
00298 Bool_t IsEndMaster() const { return fEndMaster; }
00299 Bool_t IsMaster() const { return fMasterServ; }
00300 Bool_t IsParallel() const;
00301 Bool_t IsTopMaster() const { return fOrdinal == "0"; }
00302
00303 void Run(Bool_t retrn = kFALSE);
00304
00305 void Print(Option_t *option="") const;
00306
00307 void RestartComputeTime();
00308
00309 TObject *Get(const char *namecycle);
00310 TDSetElement *GetNextPacket(Long64_t totalEntries = -1);
00311 virtual void ReleaseWorker(const char *) { }
00312 void Reset(const char *dir);
00313 Int_t ReceiveFile(const char *file, Bool_t bin, Long64_t size);
00314 virtual Int_t SendAsynMessage(const char *msg, Bool_t lf = kTRUE);
00315 virtual void SendLogFile(Int_t status = 0, Int_t start = -1, Int_t end = -1);
00316 void SendStatistics();
00317 void SendParallel(Bool_t async = kFALSE);
00318
00319 Int_t UpdateSessionStatus(Int_t xst = -1);
00320
00321
00322 virtual void DisableTimeout() { }
00323 virtual void EnableTimeout() { }
00324
00325 virtual void Terminate(Int_t status);
00326
00327
00328 void LogToMaster(Bool_t on = kTRUE) { fSendLogToMaster = on; }
00329
00330 static FILE *SetErrorHandlerFile(FILE *ferr);
00331 static void ErrorHandler(Int_t level, Bool_t abort, const char *location,
00332 const char *msg);
00333
00334 static void ResolveKeywords(TString &fname, const char *path = 0);
00335
00336 static void SetLastMsg(const char *lastmsg);
00337
00338 static Bool_t IsActive();
00339 static TProofServ *This();
00340
00341 ClassDef(TProofServ,0)
00342 };
00343
00344 R__EXTERN TProofServ *gProofServ;
00345
00346 class TProofLockPath : public TNamed {
00347 private:
00348 Int_t fLockId;
00349
00350 public:
00351 TProofLockPath(const char *path) : TNamed(path,path), fLockId(-1) { }
00352 ~TProofLockPath() { if (IsLocked()) Unlock(); }
00353
00354 Int_t Lock();
00355 Int_t Unlock();
00356
00357 Bool_t IsLocked() const { return (fLockId > -1); }
00358 };
00359
00360 class TProofLockPathGuard {
00361 private:
00362 TProofLockPath *fLocker;
00363
00364 public:
00365 TProofLockPathGuard(TProofLockPath *l) { fLocker = l; if (fLocker) fLocker->Lock(); }
00366 ~TProofLockPathGuard() { if (fLocker) fLocker->Unlock(); }
00367 };
00368
00369
00370
00371
00372 class TProofServLogHandler : public TFileHandler {
00373 private:
00374 TSocket *fSocket;
00375 FILE *fFile;
00376 TString fPfx;
00377
00378 static TString fgPfx;
00379 static Int_t fgCmdRtn;
00380
00381 public:
00382 enum EStatusBits { kFileIsPipe = BIT(23) };
00383 TProofServLogHandler(const char *cmd, TSocket *s, const char *pfx = "");
00384 TProofServLogHandler(FILE *f, TSocket *s, const char *pfx = "");
00385 virtual ~TProofServLogHandler();
00386
00387 Bool_t IsValid() { return ((fFile && fSocket) ? kTRUE : kFALSE); }
00388
00389 Bool_t Notify();
00390 Bool_t ReadNotify() { return Notify(); }
00391
00392 static void SetDefaultPrefix(const char *pfx);
00393 static Int_t GetCmdRtn();
00394 };
00395
00396
00397
00398 class TProofServLogHandlerGuard {
00399
00400 private:
00401 TProofServLogHandler *fExecHandler;
00402
00403 public:
00404 TProofServLogHandlerGuard(const char *cmd, TSocket *s,
00405 const char *pfx = "", Bool_t on = kTRUE);
00406 TProofServLogHandlerGuard(FILE *f, TSocket *s,
00407 const char *pfx = "", Bool_t on = kTRUE);
00408 virtual ~TProofServLogHandlerGuard();
00409 };
00410
00411
00412
00413 class TShutdownTimer : public TTimer {
00414 private:
00415 TProofServ *fProofServ;
00416
00417 public:
00418 TShutdownTimer(TProofServ *p, Int_t delay) : TTimer(delay, kFALSE), fProofServ(p) { }
00419
00420 Bool_t Notify();
00421 };
00422
00423
00424
00425 class TReaperTimer : public TTimer {
00426 private:
00427 TList *fChildren;
00428
00429 public:
00430 TReaperTimer(Long_t frequency = 1000) : TTimer(frequency, kTRUE), fChildren(0) { }
00431 virtual ~TReaperTimer();
00432
00433 void AddPid(Int_t pid);
00434 Bool_t Notify();
00435 };
00436
00437
00438
00439 class TIdleTOTimer : public TTimer {
00440 private:
00441 TProofServ *fProofServ;
00442
00443 public:
00444 TIdleTOTimer(TProofServ *p, Int_t delay) : TTimer(delay, kTRUE), fProofServ(p) { }
00445
00446 Bool_t Notify();
00447 };
00448
00449 class TIdleTOTimerGuard {
00450
00451 private:
00452 TIdleTOTimer *fIdleTOTimer;
00453
00454 public:
00455 TIdleTOTimerGuard(TIdleTOTimer *t) : fIdleTOTimer(t) { if (fIdleTOTimer) fIdleTOTimer->Stop(); }
00456 virtual ~TIdleTOTimerGuard() { if (fIdleTOTimer) fIdleTOTimer->Start(-1, kTRUE); }
00457 };
00458
00459
00460 #endif