TProofServ.h

Go to the documentation of this file.
00001 // @(#)root/proof:$Id: TProofServ.h 36592 2010-11-11 10:43:17Z ganis $
00002 // Author: Fons Rademakers   16/02/97
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 
00013 #ifndef ROOT_TProofServ
00014 #define ROOT_TProofServ
00015 
00016 //////////////////////////////////////////////////////////////////////////
00017 //                                                                      //
00018 // TProofServ                                                           //
00019 //                                                                      //
00020 // TProofServ is the PROOF server. It can act either as the master      //
00021 // server or as a slave server, depending on its startup arguments. It  //
00022 // receives and handles message coming from the client or from the      //
00023 // master server.                                                       //
00024 //                                                                      //
00025 //////////////////////////////////////////////////////////////////////////
00026 
00027 #ifndef ROOT_TApplication
00028 #include "TApplication.h"
00029 #endif
00030 #ifndef ROOT_TString
00031 #include "TString.h"
00032 #endif
00033 #ifndef ROOT_TSysEvtHandler
00034 #include "TSysEvtHandler.h"
00035 #endif
00036 #ifndef ROOT_TStopwatch
00037 #include "TStopwatch.h"
00038 #endif
00039 #ifndef ROOT_TTimer
00040 #include "TTimer.h"
00041 #endif
00042 #ifndef ROOT_TProofQueryResult
00043 #include "TProofQueryResult.h"
00044 #endif
00045 
00046 class TDSet;
00047 class TProof;
00048 class TVirtualProofPlayer;
00049 class TProofLockPath;
00050 class TQueryResultManager;
00051 class TSocket;
00052 class THashList;
00053 class TList;
00054 class TDSetElement;
00055 class TMessage;
00056 class TShutdownTimer;
00057 class TReaperTimer;
00058 class TIdleTOTimer;
00059 class TMutex;
00060 class TFileCollection;
00061 class TDataSetManager;
00062 class TFileHandler;
00063 class TMonitor;
00064 class TServerSocket;
00065 
00066 // Hook to external function setting up authentication related stuff
00067 // for old versions.
00068 // For backward compatibility
00069 typedef Int_t (*OldProofServAuthSetup_t)(TSocket *, Bool_t, Int_t,
00070                                          TString &, TString &, TString &);
00071 
00072 
00073 class TProofServ : public TApplication {
00074 
00075 friend class TProofServLite;
00076 friend class TXProofServ;
00077 
00078 public:
00079    enum EStatusBits { kHighMemory = BIT(16) };
00080    enum EQueryAction { kQueryOK, kQueryModify, kQueryStop, kQueryEnqueued };
00081 
00082 private:
00083    TString       fService;          //service we are running, either "proofserv" or "proofslave"
00084    TString       fUser;             //user as which we run
00085    TString       fGroup;            //group the user belongs to
00086    TString       fConfDir;          //directory containing cluster config information
00087    TString       fConfFile;         //file containing config information
00088    TString       fWorkDir;          //directory containing all proof related info
00089    TString       fImage;            //image name of the session
00090    TString       fSessionTag;       //tag for the server session
00091    TString       fTopSessionTag;    //tag for the global session
00092    TString       fSessionDir;       //directory containing session dependent files
00093    TString       fPackageDir;       //directory containing packages and user libs
00094    THashList    *fGlobalPackageDirList;  //list of directories containing global packages libs
00095    TString       fCacheDir;         //directory containing cache of user files
00096    TString       fQueryDir;         //directory containing query results and status
00097    TString       fDataSetDir;       //directory containing info about known data sets
00098    TString       fDataDir;          //directory containing data files produced during queries
00099    TString       fAdminPath;        //admin path for this session
00100    TProofLockPath *fPackageLock;    //package dir locker
00101    TProofLockPath *fCacheLock;      //cache dir locker
00102    TProofLockPath *fQueryLock;      //query dir locker
00103    TString       fArchivePath;      //default archive path
00104    TSocket      *fSocket;           //socket connection to client
00105    TProof       *fProof;            //PROOF talking to slave servers
00106    TVirtualProofPlayer *fPlayer;    //actual player
00107    FILE         *fLogFile;          //log file
00108    Int_t         fLogFileDes;       //log file descriptor
00109    TList        *fEnabledPackages;  //list of enabled packages
00110    Int_t         fProtocol;         //protocol version number
00111    TString       fOrdinal;          //slave ordinal number
00112    Int_t         fGroupId;          //slave unique id in the active slave group
00113    Int_t         fGroupSize;        //size of the active slave group
00114    Int_t         fLogLevel;         //debug logging level
00115    Int_t         fNcmd;             //command history number
00116    Int_t         fGroupPriority;    //priority of group the user belongs to (0 - 100)
00117    Bool_t        fEndMaster;        //true for a master in direct contact only with workers
00118    Bool_t        fMasterServ;       //true if we are a master server
00119    Bool_t        fInterrupt;        //if true macro execution will be stopped
00120    Float_t       fRealTime;         //real time spent executing commands
00121    Float_t       fCpuTime;          //CPU time spent executing commands
00122    TStopwatch    fLatency;          //measures latency of packet requests
00123    TStopwatch    fCompute;          //measures time spend processing a packet
00124    Int_t         fQuerySeqNum;      //sequential number of the current or last query
00125 
00126    Int_t         fTotSessions;      //Total number of PROOF sessions on the cluster 
00127    Int_t         fActSessions;      //Total number of active PROOF sessions on the cluster 
00128    Float_t       fEffSessions;      //Effective Number of PROOF sessions on the assigned machines
00129 
00130    TFileHandler *fInputHandler;     //Input socket handler
00131 
00132    TQueryResultManager *fQMgr;      //Query-result manager
00133 
00134    TList        *fWaitingQueries;   //list of TProofQueryResult waiting to be processed
00135    Bool_t        fIdle;             //TRUE if idle
00136    TMutex       *fQMtx;             // To protect async msg queue
00137 
00138    TList        *fQueuedMsg;        //list of messages waiting to be processed
00139 
00140    TString       fPrefix;           //Prefix identifying the node
00141 
00142    Bool_t        fRealTimeLog;      //TRUE if log messages should be send back in real-time
00143 
00144    TShutdownTimer *fShutdownTimer;  // Timer used to shutdown out-of-control sessions
00145    TReaperTimer   *fReaperTimer;    // Timer used to control children state
00146    TIdleTOTimer   *fIdleTOTimer;    // Timer used to control children state
00147 
00148    Int_t         fInflateFactor;    // Factor in 1/1000 to inflate the CPU time
00149 
00150    Int_t         fCompressMsg;     // Compression level for messages
00151 
00152    TDataSetManager* fDataSetManager; // dataset manager
00153 
00154    Bool_t        fSendLogToMaster; // On workers, controls logs sending to master
00155 
00156    TServerSocket *fMergingSocket;  // Socket used for merging outputs if submerger
00157    TMonitor      *fMergingMonitor; // Monitor for merging sockets
00158    Int_t          fMergedWorkers;  // Number of workers merged
00159 
00160    // Quotas (-1 to disable)
00161    Int_t         fMaxQueries;       //Max number of queries fully kept
00162    Long64_t      fMaxBoxSize;       //Max size of the sandbox
00163    Long64_t      fHWMBoxSize;       //High-Water-Mark on the sandbox size
00164 
00165    // Memory limits (-1 to disable) set by envs ROOTPROFOASHARD, PROOF_VIRTMEMMAX, PROOF_RESMEMMAX
00166    static Long_t fgVirtMemMax;       //Hard limit enforced by the system (in kB)
00167    static Long_t fgResMemMax;        //Hard limit on the resident memory checked
00168                                      //in TProofPlayer::Process (in kB)
00169    static Float_t fgMemHWM;          // Threshold fraction of max for warning and finer monitoring
00170    static Float_t fgMemStop;         // Fraction of max for stop processing
00171 
00172    // In bytes; default is 1MB
00173    Long64_t      fMsgSizeHWM;       //High-Water-Mark on the size of messages with results
00174 
00175    static FILE  *fgErrorHandlerFile; // File where to log
00176    static Int_t  fgRecursive;       // Keep track of recursive inputs during processing
00177 
00178    // Control sending information to syslog
00179    static Int_t  fgLogToSysLog;      // >0 sent to syslog too
00180    static TString fgSysLogService;   // name of the syslog service (eg: proofm-0, proofw-0.67)
00181    static TString fgSysLogEntity;   // logging entity (<user>:<group>)
00182 
00183    void          RedirectOutput(const char *dir = 0, const char *mode = "w");
00184    Int_t         CatMotd();
00185    Int_t         UnloadPackage(const char *package);
00186    Int_t         UnloadPackages();
00187    Int_t         OldAuthSetup(TString &wconf);
00188    Int_t         GetPriority();
00189 
00190    // Query handlers
00191    TProofQueryResult *MakeQueryResult(Long64_t nentries, const char *opt,
00192                                       TList *inl, Long64_t first, TDSet *dset,
00193                                       const char *selec, TObject *elist);
00194    void          SetQueryRunning(TProofQueryResult *pq);
00195 
00196    // Results handling
00197    Int_t         SendResults(TSocket *sock, TList *outlist = 0, TQueryResult *pq = 0);
00198    Bool_t        AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer);
00199    
00200    Int_t         RegisterDataSets(TList *in, TList *out);
00201 
00202    // Waiting queries handlers
00203    void          SetIdle(Bool_t st = kTRUE);
00204    Bool_t        IsWaiting();
00205    Int_t         WaitingQueries();
00206    Int_t         QueueQuery(TProofQueryResult *pq);
00207    TProofQueryResult *NextQuery();
00208    Int_t         CleanupWaitingQueries(Bool_t del = kTRUE, TList *qls = 0);
00209 
00210 protected:
00211    virtual void  HandleArchive(TMessage *mess, TString *slb = 0);
00212    virtual Int_t HandleCache(TMessage *mess, TString *slb = 0);
00213    virtual void  HandleCheckFile(TMessage *mess, TString *slb = 0);
00214    virtual Int_t HandleDataSets(TMessage *mess, TString *slb = 0);
00215    virtual void  HandleSubmerger(TMessage *mess);
00216    virtual void  HandleFork(TMessage *mess);
00217    virtual void  HandleLibIncPath(TMessage *mess);
00218    virtual void  HandleProcess(TMessage *mess, TString *slb = 0);
00219    virtual void  HandleQueryList(TMessage *mess);
00220    virtual void  HandleRemove(TMessage *mess, TString *slb = 0);
00221    virtual void  HandleRetrieve(TMessage *mess, TString *slb = 0);
00222    virtual void  HandleWorkerLists(TMessage *mess);
00223 
00224    virtual void  ProcessNext(TString *slb = 0);
00225    virtual Int_t Setup();
00226    Int_t         SetupCommon();
00227    virtual void  MakePlayer();
00228    virtual void  DeletePlayer();
00229 
00230    virtual Int_t Fork();
00231    Int_t         GetSessionStatus();
00232    Bool_t        IsIdle();
00233    Bool_t        UnlinkDataDir(const char *path);
00234 
00235    static TString fgLastMsg;    // Message about status before exception
00236 
00237 public:
00238    TProofServ(Int_t *argc, char **argv, FILE *flog = 0);
00239    virtual ~TProofServ();
00240 
00241    virtual Int_t  CreateServer();
00242 
00243    TProof        *GetProof()      const { return fProof; }
00244    const char    *GetService()    const { return fService; }
00245    const char    *GetConfDir()    const { return fConfDir; }
00246    const char    *GetConfFile()   const { return fConfFile; }
00247    const char    *GetUser()       const { return fUser; }
00248    const char    *GetGroup()      const { return fGroup; }
00249    const char    *GetWorkDir()    const { return fWorkDir; }
00250    const char    *GetImage()      const { return fImage; }
00251    const char    *GetSessionTag() const { return fTopSessionTag; }
00252    const char    *GetSessionDir() const { return fSessionDir; }
00253    const char    *GetPackageDir() const { return fPackageDir; }
00254    const char    *GetDataDir()    const { return fDataDir; }
00255    Int_t          GetProtocol()   const { return fProtocol; }
00256    const char    *GetOrdinal()    const { return fOrdinal; }
00257    Int_t          GetGroupId()    const { return fGroupId; }
00258    Int_t          GetGroupSize()  const { return fGroupSize; }
00259    Int_t          GetLogLevel()   const { return fLogLevel; }
00260    TSocket       *GetSocket()     const { return fSocket; }
00261    Float_t        GetRealTime()   const { return fRealTime; }
00262    Float_t        GetCpuTime()    const { return fCpuTime; }
00263    Int_t          GetQuerySeqNum() const { return fQuerySeqNum; }
00264 
00265    Int_t          GetTotSessions() const { return fTotSessions; }
00266    Int_t          GetActSessions() const { return fActSessions; }
00267    Float_t        GetEffSessions() const { return fEffSessions; }
00268 
00269    void           GetOptions(Int_t *argc, char **argv);
00270    TList         *GetEnabledPackages() const { return fEnabledPackages; }
00271 
00272    Int_t          GetInflateFactor() const { return fInflateFactor; }
00273 
00274    static Long_t  GetVirtMemMax();
00275    static Long_t  GetResMemMax();
00276    static Float_t GetMemHWM();
00277    static Float_t GetMemStop();
00278 
00279    Long64_t       GetMsgSizeHWM() const { return fMsgSizeHWM; }
00280 
00281    const char    *GetPrefix()     const { return fPrefix; }
00282 
00283    void           FlushLogFile();
00284 
00285    TProofLockPath *GetCacheLock() { return fCacheLock; }      //cache dir locker; used by TProofPlayer
00286    Int_t          CopyFromCache(const char *name, Bool_t cpbin);
00287    Int_t          CopyToCache(const char *name, Int_t opt = 0);
00288 
00289    virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange,
00290                                    Bool_t resume = kFALSE);
00291    virtual void   HandleException(Int_t sig);
00292    virtual Int_t  HandleSocketInput(TMessage *mess, Bool_t all);
00293    virtual void   HandleSocketInput();
00294    virtual void   HandleUrgentData();
00295    virtual void   HandleSigPipe();
00296    virtual void   HandleTermination() { Terminate(0); }
00297    void           Interrupt() { fInterrupt = kTRUE; }
00298    Bool_t         IsEndMaster() const { return fEndMaster; }
00299    Bool_t         IsMaster() const { return fMasterServ; }
00300    Bool_t         IsParallel() const;
00301    Bool_t         IsTopMaster() const { return fOrdinal == "0"; }
00302 
00303    void           Run(Bool_t retrn = kFALSE);
00304 
00305    void           Print(Option_t *option="") const;
00306 
00307    void           RestartComputeTime();
00308 
00309    TObject       *Get(const char *namecycle);
00310    TDSetElement  *GetNextPacket(Long64_t totalEntries = -1);
00311    virtual void   ReleaseWorker(const char *) { }
00312    void           Reset(const char *dir);
00313    Int_t          ReceiveFile(const char *file, Bool_t bin, Long64_t size);
00314    virtual Int_t  SendAsynMessage(const char *msg, Bool_t lf = kTRUE);
00315    virtual void   SendLogFile(Int_t status = 0, Int_t start = -1, Int_t end = -1);
00316    void           SendStatistics();
00317    void           SendParallel(Bool_t async = kFALSE);
00318 
00319    Int_t          UpdateSessionStatus(Int_t xst = -1);
00320 
00321    // Disable / Enable read timeout
00322    virtual void   DisableTimeout() { }
00323    virtual void   EnableTimeout() { }
00324 
00325    virtual void   Terminate(Int_t status);
00326 
00327    // Log control
00328    void           LogToMaster(Bool_t on = kTRUE) { fSendLogToMaster = on; }
00329 
00330    static FILE   *SetErrorHandlerFile(FILE *ferr);
00331    static void    ErrorHandler(Int_t level, Bool_t abort, const char *location,
00332                                const char *msg);
00333 
00334    static void    ResolveKeywords(TString &fname, const char *path = 0);
00335 
00336    static void    SetLastMsg(const char *lastmsg);
00337 
00338    static Bool_t      IsActive();
00339    static TProofServ *This();
00340 
00341    ClassDef(TProofServ,0)  //PROOF Server Application Interface
00342 };
00343 
00344 R__EXTERN TProofServ *gProofServ;
00345 
00346 class TProofLockPath : public TNamed {
00347 private:
00348    Int_t         fLockId;        //file id of dir lock
00349 
00350 public:
00351    TProofLockPath(const char *path) : TNamed(path,path), fLockId(-1) { }
00352    ~TProofLockPath() { if (IsLocked()) Unlock(); }
00353 
00354    Int_t         Lock();
00355    Int_t         Unlock();
00356 
00357    Bool_t        IsLocked() const { return (fLockId > -1); }
00358 };
00359 
00360 class TProofLockPathGuard {
00361 private:
00362    TProofLockPath  *fLocker; //locker instance
00363 
00364 public:
00365    TProofLockPathGuard(TProofLockPath *l) { fLocker = l; if (fLocker) fLocker->Lock(); }
00366    ~TProofLockPathGuard() { if (fLocker) fLocker->Unlock(); }
00367 };
00368 
00369 //----- Handles output from commands executed externally via a pipe. ---------//
00370 //----- The output is redirected one level up (i.e., to master or client). ---//
00371 //______________________________________________________________________________
00372 class TProofServLogHandler : public TFileHandler {
00373 private:
00374    TSocket     *fSocket; // Socket where to redirect the message
00375    FILE        *fFile;   // File connected with the open pipe
00376    TString      fPfx;    // Prefix to be prepended to messages
00377 
00378    static TString fgPfx; // Default prefix to be prepended to messages
00379    static Int_t   fgCmdRtn; // Return code of the command execution (available only
00380                             // after closing the pipe)
00381 public:
00382    enum EStatusBits { kFileIsPipe = BIT(23) };
00383    TProofServLogHandler(const char *cmd, TSocket *s, const char *pfx = "");
00384    TProofServLogHandler(FILE *f, TSocket *s, const char *pfx = "");
00385    virtual ~TProofServLogHandler();
00386 
00387    Bool_t IsValid() { return ((fFile && fSocket) ? kTRUE : kFALSE); }
00388 
00389    Bool_t Notify();
00390    Bool_t ReadNotify() { return Notify(); }
00391 
00392    static void SetDefaultPrefix(const char *pfx);
00393    static Int_t GetCmdRtn();
00394 };
00395 
00396 //--- Guard class: close pipe, deactivatethe related descriptor --------------//
00397 //______________________________________________________________________________
00398 class TProofServLogHandlerGuard {
00399 
00400 private:
00401    TProofServLogHandler   *fExecHandler;
00402 
00403 public:
00404    TProofServLogHandlerGuard(const char *cmd, TSocket *s,
00405                              const char *pfx = "", Bool_t on = kTRUE);
00406    TProofServLogHandlerGuard(FILE *f, TSocket *s,
00407                              const char *pfx = "", Bool_t on = kTRUE);
00408    virtual ~TProofServLogHandlerGuard();
00409 };
00410 
00411 //--- Special timer to control delayed shutdowns
00412 //______________________________________________________________________________
00413 class TShutdownTimer : public TTimer {
00414 private:
00415    TProofServ    *fProofServ;
00416 
00417 public:
00418    TShutdownTimer(TProofServ *p, Int_t delay) : TTimer(delay, kFALSE), fProofServ(p) { }
00419 
00420    Bool_t Notify();
00421 };
00422 
00423 //--- Synchronous timer used to reap children processes change of state
00424 //______________________________________________________________________________
00425 class TReaperTimer : public TTimer {
00426 private:
00427    TList  *fChildren;   // List of children (forked) processes
00428 
00429 public:
00430    TReaperTimer(Long_t frequency = 1000) : TTimer(frequency, kTRUE), fChildren(0) { }
00431    virtual ~TReaperTimer();
00432 
00433    void AddPid(Int_t pid);
00434    Bool_t Notify();
00435 };
00436 
00437 //--- Special timer to terminate idle sessions
00438 //______________________________________________________________________________
00439 class TIdleTOTimer : public TTimer {
00440 private:
00441    TProofServ    *fProofServ;
00442 
00443 public:
00444    TIdleTOTimer(TProofServ *p, Int_t delay) : TTimer(delay, kTRUE), fProofServ(p) { }
00445 
00446    Bool_t Notify();
00447 };
00448 //______________________________________________________________________________
00449 class TIdleTOTimerGuard {
00450 
00451 private:
00452    TIdleTOTimer *fIdleTOTimer;
00453 
00454 public:
00455    TIdleTOTimerGuard(TIdleTOTimer *t) : fIdleTOTimer(t) { if (fIdleTOTimer) fIdleTOTimer->Stop(); }
00456    virtual ~TIdleTOTimerGuard() { if (fIdleTOTimer) fIdleTOTimer->Start(-1, kTRUE); }
00457 };
00458 
00459 
00460 #endif

Generated on Tue Jul 5 14:28:04 2011 for ROOT_528-00b_version by  doxygen 1.5.1