XrdProofdProofServMgr.cxx

Go to the documentation of this file.
00001 // @(#)root/proofd:$Id: XrdProofdProofServMgr.cxx 36520 2010-11-05 16:08:23Z ganis $
00002 // Author: G. Ganis Jan 2008
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // XrdProofdProofServMgr                                                  //
00015 //                                                                      //
00016 // Author: G. Ganis, CERN, 2008                                         //
00017 //                                                                      //
00018 // Class managing proofserv sessions manager.                           //
00019 //                                                                      //
00020 //////////////////////////////////////////////////////////////////////////
00021 #include "XrdProofdPlatform.h"
00022 
00023 #ifdef OLDXRDOUC
00024 #  include "XrdOuc/XrdOucError.hh"
00025 #  include "XrdOuc/XrdOucLogger.hh"
00026 #else
00027 #  include "XrdSys/XrdSysError.hh"
00028 #  include "XrdSys/XrdSysLogger.hh"
00029 #endif
00030 
00031 #include "Xrd/XrdBuffer.hh"
00032 #include "Xrd/XrdPoll.hh"
00033 #include "Xrd/XrdScheduler.hh"
00034 #include "XrdNet/XrdNet.hh"
00035 #include "XrdNet/XrdNetDNS.hh"
00036 #include "XrdNet/XrdNetPeer.hh"
00037 #include "XrdOuc/XrdOucRash.hh"
00038 #include "XrdOuc/XrdOucStream.hh"
00039 #include "XrdSys/XrdSysPriv.hh"
00040 #include "XrdSut/XrdSutAux.hh"
00041 
00042 #include "XrdProofdClient.h"
00043 #include "XrdProofdClientMgr.h"
00044 #include "XrdProofdManager.h"
00045 #include "XrdProofdNetMgr.h"
00046 #include "XrdProofdPriorityMgr.h"
00047 #include "XrdProofdProofServMgr.h"
00048 #include "XrdProofdProtocol.h"
00049 #include "XrdProofGroup.h"
00050 #include "XrdProofSched.h"
00051 #include "XrdROOT.h"
00052 
00053 #include <map>
00054 
00055 // Aux structures for scan through operations
00056 typedef struct {
00057    XrdProofGroupMgr *fGroupMgr;
00058    int *fNBroadcast;
00059 } XpdBroadcastPriority_t;
00060 typedef struct {
00061    XrdProofdManager *fMgr;
00062    XrdProofdClient *fClient;
00063    FILE *fEnv;
00064 } XpdWriteEnv_t;
00065 
00066 // Tracing utilities
00067 #include "XrdProofdTrace.h"
00068 
00069 static XpdManagerCron_t fManagerCron;
00070 
00071 //--------------------------------------------------------------------------
00072 //
00073 // XrdProofdProofServCron
00074 //
00075 // Function run in separate thread watching changes in session status
00076 // frequency
00077 //
00078 //--------------------------------------------------------------------------
00079 void *XrdProofdProofServCron(void *p)
00080 {
00081    // This is an endless loop to check the system periodically or when
00082    // triggered via a message in a dedicated pipe
00083    XPDLOC(SMGR, "ProofServCron")
00084 
00085    XpdManagerCron_t *mc = (XpdManagerCron_t *)p;
00086    XrdProofdProofServMgr *mgr = mc->fSessionMgr;
00087    XrdProofSched *sched = mc->fProofSched;
00088    if (!(mgr)) {
00089       TRACE(XERR,  "undefined session manager: cannot start");
00090       return (void *)0;
00091    }
00092 
00093    // Quicj checks for client disconnections: frequency (5 secs) and
00094    // flag for disconnections effectively occuring
00095    int quickcheckfreq = 5;
00096    int clnlostscale = 0;
00097 
00098    // Time of last full sessions check
00099    int lastrun = time(0);
00100    int lastcheck = lastrun, ckfreq = mgr->CheckFrequency(), waitt = 0;
00101    int deltat = ((int)(0.1*ckfreq) >= 1) ? (int)(0.1*ckfreq) : 1;
00102    int maxdelay = 5*ckfreq; // Force check after 5 times the check frequency
00103    mgr->SetNextSessionsCheck(lastcheck + ckfreq);
00104    TRACE(ALL, "next full sessions check in "<<ckfreq<<" secs");
00105    while(1) {
00106       // We check for client disconnections every 'quickcheckfreq' secs; we do
00107       // a full check every mgr->CheckFrequency() secs; we make sure that we
00108       // do not pass a negative value (meaning no timeout)
00109       waitt =  ckfreq - (time(0) - lastcheck);
00110       if (waitt > quickcheckfreq || waitt <= 0)
00111          waitt = quickcheckfreq;
00112       int pollRet = mgr->Pipe()->Poll(waitt);
00113 
00114       if (pollRet > 0) {
00115          // Read message
00116          XpdMsg msg;
00117          int rc = 0;
00118          if ((rc = mgr->Pipe()->Recv(msg)) != 0) {
00119             TRACE(XERR, "problems receiving message; errno: "<<-rc);
00120             continue;
00121          }
00122          // Parse type
00123          if (msg.Type() == XrdProofdProofServMgr::kSessionRemoval) {
00124             // A session has just gone: read process id
00125             XrdOucString fpid;
00126             if ((rc = msg.Get(fpid)) != 0) {
00127                TRACE(XERR, "kSessionRemoval: problems receiving process ID (buf: '"<<
00128                            msg.Buf()<<"'); errno: "<<-rc);
00129                continue;
00130             }
00131             XrdSysMutexHelper mhp(mgr->Mutex());
00132             // Remove it from the hash list
00133             mgr->DeleteFromSessions(fpid.c_str());
00134             // Move the entry to the terminated sessions area
00135             mgr->MvSession(fpid.c_str());
00136             // Notify the scheduler too
00137             if (sched) {
00138                if (sched->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
00139                   TRACE(XERR, "kSessionRemoval: problem posting the scheduler pipe");
00140                }
00141             }
00142             // Notify action
00143             TRACE(REQ, "kSessionRemoval: session: "<<fpid<<
00144                         " has been removed from the active list");
00145          } else if (msg.Type() == XrdProofdProofServMgr::kClientDisconnect) {
00146             // A client just disconnected: we free the slots in the proofserv sesssions and
00147             // we check the sessions status to see if any of them must be terminated
00148             // read process id
00149             int pid = 0;
00150             if ((rc = msg.Get(pid)) != 0) {
00151                TRACE(XERR, "kClientDisconnect: problems receiving process ID (buf: '"<<
00152                            msg.Buf()<<"'); errno: "<<-rc);
00153                continue;
00154             }
00155             TRACE(REQ, "kClientDisconnect: a client just disconnected: "<<pid);
00156             // Free slots in the proof serv instances
00157             mgr->DisconnectFromProofServ(pid);
00158             TRACE(DBG, "quick check of active sessions");
00159             // Quick check of active sessions in case of disconnections
00160             mgr->CheckActiveSessions(0);
00161         } else if (msg.Type() == XrdProofdProofServMgr::kCleanSessions) {
00162             // Request for cleanup all sessions of a client (or all clients)
00163             XpdSrvMgrCreateCnt cnt(mgr, XrdProofdProofServMgr::kCleanSessionsCnt);
00164             XrdOucString usr;
00165             rc = msg.Get(usr);
00166             int svrtype;
00167             rc = (rc == 0) ? msg.Get(svrtype) : rc;
00168             if (rc != 0) {
00169                TRACE(XERR, "kCleanSessions: problems parsing message (buf: '"<<
00170                            msg.Buf()<<"'); errno: "<<-rc);
00171                continue;
00172             }
00173             // Notify action
00174             TRACE(REQ, "kCleanSessions: request for user: '"<<usr<<"', server type: "<<svrtype);
00175             // Clean sessions
00176             mgr->CleanClientSessions(usr.c_str(), svrtype);
00177             // Check if there is any orphalin sessions and clean them up
00178             mgr->CleanupLostProofServ();
00179          } else if (msg.Type() == XrdProofdProofServMgr::kProcessReq) {
00180             // Process request from some client: if we are here it means they can go ahead
00181             mgr->ProcessSem()->Post();
00182          } else if (msg.Type() == XrdProofdProofServMgr::kChgSessionSt) {
00183             // Propagate cluster information to active sessions after one session changed its state
00184             mgr->BroadcastClusterInfo();
00185          } else {
00186             TRACE(XERR, "unknown type: "<<msg.Type());
00187             continue;
00188          }
00189       } else {
00190 
00191          // The current time
00192          int now = time(0);
00193 
00194          // If there is any activity in mgr->Process() we postpone the checks in 5 secs
00195          int cnt = mgr->CheckCounter(XrdProofdProofServMgr::kProcessCnt);
00196          if (cnt > 0) {
00197             if ((now - lastrun) < maxdelay) {
00198                // The current time
00199                lastcheck = now + 5 - ckfreq;
00200                mgr->SetNextSessionsCheck(now + 5);
00201                // Notify
00202                TRACE(ALL, "postponing sessions check (will retry in 5 secs)");
00203                continue;
00204             } else {
00205                // Max time without checks reached: force a check
00206                TRACE(ALL, "Max time without checks reached ("<<maxdelay<<"): force a session check");
00207                // Reset the counter
00208                mgr->UpdateCounter(XrdProofdProofServMgr::kProcessCnt, -cnt);
00209             }
00210          }
00211 
00212          bool full = (now > mgr->NextSessionsCheck() - deltat) ? 1 : 0;
00213          if (full) {
00214             // Run periodical full checks
00215             mgr->CheckActiveSessions();
00216             mgr->CheckTerminatedSessions();
00217             if (clnlostscale <= 0) {
00218                mgr->CleanupLostProofServ();
00219                clnlostscale = 10;
00220             } else {
00221                clnlostscale--;
00222             }
00223             // How many active sessions do we have
00224             int cursess = mgr->CurrentSessions(1);
00225             TRACE(ALL, cursess << " sessions are currently active");
00226             // Remember when ...
00227             lastrun = now;
00228             lastcheck = now;
00229             mgr->SetNextSessionsCheck(lastcheck + mgr->CheckFrequency());
00230             // Notify
00231             TRACE(ALL, "next sessions check in "<<mgr->CheckFrequency()<<" secs");
00232          } else {
00233             TRACE(HDBG, "nothing to do; "<<mgr->NextSessionsCheck()-now<<" secs to full check");
00234          }
00235       }
00236    }
00237 
00238    // Should never come here
00239    return (void *)0;
00240 }
00241 
00242 //--------------------------------------------------------------------------
00243 //
00244 // XrdProofdProofServRecover
00245 //
00246 // Function run in a separate thread waiting for session to recover after
00247 // an abrupt shutdown
00248 //
00249 //--------------------------------------------------------------------------
00250 void *XrdProofdProofServRecover(void *p)
00251 {
00252    // Waiting for session to recover after an abrupt shutdown
00253    XPDLOC(SMGR, "ProofServRecover")
00254 
00255    XpdManagerCron_t *mc = (XpdManagerCron_t *)p;
00256    XrdProofdProofServMgr *mgr = mc->fSessionMgr;
00257    if (!(mgr)) {
00258       TRACE(XERR,  "undefined session manager: cannot start");
00259       return (void *)0;
00260    }
00261 
00262    // Recover active sessions
00263    int rc = mgr->RecoverActiveSessions();
00264 
00265    // Notify end of recovering
00266    if (rc > 0) {
00267       TRACE(ALL, "timeout recovering sessions: "<<rc<<" sessions not recovered");
00268    } else if (rc < 0) {
00269       TRACE(XERR, "some problem occured while recovering sessions");
00270    } else {
00271       TRACE(ALL, "recovering successfully terminated");
00272    }
00273 
00274    // Should never come here
00275    return (void *)0;
00276 }
00277 
00278 //______________________________________________________________________________
00279 XrdProofdProofServMgr::XrdProofdProofServMgr(XrdProofdManager *mgr,
00280                                              XrdProtocol_Config *pi, XrdSysError *e)
00281                   : XrdProofdConfig(pi->ConfigFN, e), fProcessSem(0)
00282 {
00283    // Constructor
00284    XPDLOC(SMGR, "XrdProofdProofServMgr")
00285 
00286    fMgr = mgr;
00287    fLogger = pi->eDest->logger();
00288    fInternalWait = 10;
00289    fActiveSessions.clear();
00290    fShutdownOpt = 1;
00291    fShutdownDelay = 0;
00292    fReconnectTime = -1;
00293    fReconnectTimeOut = 300;
00294    fNextSessionsCheck = -1;
00295    // Init internal counters
00296    for (int i = 0; i < PSMMAXCNTS; i++) {
00297       fCounters[i] = 0;
00298    }
00299    fCurrentSessions = 0;
00300 
00301    // Defaults can be changed via 'proofservmgr'
00302    fCheckFrequency = 30;
00303    fTerminationTimeOut = fCheckFrequency - 10;
00304    fVerifyTimeOut = 3 * fCheckFrequency;
00305    fRecoverTimeOut = 10;
00306    fCheckLost = 1;
00307    fParentExecs = "xproofd,xrootd";
00308 
00309    // Recover-related quantities
00310    fRecoverClients = 0;
00311    fRecoverDeadline = -1;
00312 
00313    // Init pipe for the poller
00314    if (!fPipe.IsValid()) {
00315       TRACE(XERR, "unable to generate pipe for the session poller");
00316       return;
00317    }
00318 
00319    // Configuration directives
00320    RegisterDirectives();
00321 }
00322 
00323 //__________________________________________________________________________
00324 int XrdProofdProofServMgr::Config(bool rcf)
00325 {
00326    // Run configuration and parse the entered config directives.
00327    // Return 0 on success, -1 on error
00328    XPDLOC(SMGR, "ProofServMgr::Config")
00329 
00330    XrdSysMutexHelper mhp(fEnvsMutex);
00331 
00332    bool notify = (rcf) ? 0 : 1;
00333    if (rcf && ReadFile(0)) {
00334       // Cleanup lists of envs and RCs
00335       fProofServRCs.clear();
00336       fProofServEnvs.clear();
00337       // Notify possible new settings
00338       notify = 1;
00339    }
00340 
00341    // Run first the configurator
00342    if (XrdProofdConfig::Config(rcf) != 0) {
00343       TRACE(XERR, "problems parsing file ");
00344       return -1;
00345    }
00346 
00347    XrdOucString msg;
00348    msg = (rcf) ? "re-configuring" : "configuring";
00349    if (notify) XPDPRT(msg);
00350 
00351    // Notify timeout on internal communications
00352    XPDFORM(msg, "setting internal timeout to %d secs", fInternalWait);
00353    if (notify) XPDPRT(msg);
00354 
00355    // Shutdown options
00356    msg = "client sessions shutdown after disconnection";
00357    if (fShutdownOpt > 0) {
00358       XPDFORM(msg, "client sessions kept %sfor %d secs after disconnection",
00359                    (fShutdownOpt == 1) ? "idle " : "", fShutdownDelay);
00360    }
00361    if (notify) XPDPRT(msg);
00362 
00363    if (!rcf) {
00364       // Admin paths
00365       fActiAdminPath = fMgr->AdminPath();
00366       fActiAdminPath += "/activesessions";
00367       fTermAdminPath = fMgr->AdminPath();
00368       fTermAdminPath += "/terminatedsessions";
00369 
00370       // Make sure they exist
00371       XrdProofUI ui;
00372       XrdProofdAux::GetUserInfo(fMgr->EffectiveUser(), ui);
00373       if (XrdProofdAux::AssertDir(fActiAdminPath.c_str(), ui, 1) != 0) {
00374          TRACE(XERR, "unable to assert the admin path: "<<fActiAdminPath);
00375          fActiAdminPath = "";
00376          return -1;
00377       }
00378       XPDPRT("active sessions admin path set to: "<<fActiAdminPath);
00379 
00380       if (XrdProofdAux::AssertDir(fTermAdminPath.c_str(), ui, 1) != 0) {
00381          TRACE(XERR, "unable to assert the admin path "<<fTermAdminPath);
00382          fTermAdminPath = "";
00383          return -1;
00384       }
00385       XPDPRT("terminated sessions admin path set to "<<fTermAdminPath);
00386    }
00387 
00388    if (notify) {
00389       XPDPRT("RC settings: "<< fProofServRCs.size());
00390       if (fProofServRCs.size() > 0) {
00391          std::list<XpdEnv>::iterator ircs = fProofServRCs.begin();
00392          for ( ; ircs != fProofServRCs.end(); ircs++) { (*ircs).Print("rc"); }
00393       }
00394       XPDPRT("ENV settings: "<< fProofServEnvs.size());
00395       if (fProofServEnvs.size() > 0) {
00396          std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
00397          for ( ; ienvs != fProofServEnvs.end(); ienvs++) { (*ienvs).Print("env"); }
00398       }
00399    }
00400 
00401    if (!rcf) {
00402       // Try to recover active session previously started
00403       int nr = -1;
00404       if ((nr = PrepareSessionRecovering()) < 0) {
00405          TRACE(XERR, "problems trying to recover active sessions");
00406       } else if (nr > 0) {
00407          XPDFORM(msg, "%d active sessions have been recovered", nr);
00408          XPDPRT(msg);
00409       }
00410 
00411       // Start cron thread
00412       pthread_t tid;
00413       // Fill manager pointers structure
00414       fManagerCron.fClientMgr = fMgr->ClientMgr();
00415       fManagerCron.fSessionMgr = this;
00416       if (XrdSysThread::Run(&tid, XrdProofdProofServCron,
00417                             (void *)&fManagerCron, 0, "ProofServMgr cron thread") != 0) {
00418          TRACE(XERR, "could not start cron thread");
00419          return 0;
00420       }
00421       XPDPRT("cron thread started");
00422    }
00423 
00424    // Done
00425    return 0;
00426 }
00427 
00428 //______________________________________________________________________________
00429 int XrdProofdProofServMgr::AddSession(XrdProofdProtocol *p, XrdProofdProofServ *s)
00430 {
00431    // Add new active session
00432    XPDLOC(SMGR, "ProofServMgr::AddSession")
00433 
00434    TRACE(REQ, "adding new active session ...");
00435 
00436    // Check inputs
00437    if (!s || !p || !p->Client()) {
00438       TRACE(XERR,"invalid inputs: "<<p<<", "<<s<<", "<<p->Client());
00439       return -1;
00440    }
00441    XrdProofdClient *c = p->Client();
00442 
00443    // Path
00444    XrdOucString path;
00445    XPDFORM(path, "%s/%s.%s.%d", fActiAdminPath.c_str(), c->User(), c->Group(), s->SrvPID());
00446 
00447    // Save session info to file
00448    XrdProofSessionInfo info(c, s);
00449    int rc = info.SaveToFile(path.c_str());
00450 
00451    return rc;
00452 }
00453 
00454 //______________________________________________________________________________
00455 bool XrdProofdProofServMgr::IsSessionSocket(const char *fpid)
00456 {
00457    // Checks is fpid is the path of a session UNIX socket
00458    // Returns TRUE is yes; cleans the socket if the session is gone.
00459    XPDLOC(SMGR, "ProofServMgr::IsSessionSocket")
00460 
00461    TRACE(REQ, "checking "<<fpid<<" ...");
00462 
00463    // Check inputs
00464    if (!fpid || strlen(fpid) <= 0) {
00465       TRACE(XERR, "invalid input: "<<fpid);
00466       return 0;
00467    }
00468 
00469    // Paths
00470    XrdOucString spath(fpid);
00471    if (!spath.endswith(".sock")) return 0;
00472    if (!spath.beginswith(fActiAdminPath.c_str())) {
00473       // We are given a partial path: create full paths
00474       XPDFORM(spath, "%s/%s", fActiAdminPath.c_str(), fpid);
00475    }
00476    XrdOucString apath = spath;
00477    apath.replace(".sock", "");
00478 
00479    // Check the admin path
00480    struct stat st;
00481    if (stat(apath.c_str(), &st) != 0 && (errno == ENOENT)) {
00482       // Remove the socket path if not during creation
00483       if (CheckCounter(kCreateCnt) <= 0) {
00484          unlink(spath.c_str());
00485          TRACE(REQ, "missing admin path: removing "<<spath<<" ...");
00486       }
00487    }
00488 
00489    // Done
00490    return 1;
00491 }
00492 
00493 //______________________________________________________________________________
00494 int XrdProofdProofServMgr::MvSession(const char *fpid)
00495 {
00496    // Move session file from the active to the terminated areas
00497    XPDLOC(SMGR, "ProofServMgr::MvSession")
00498 
00499    TRACE(REQ, "moving "<<fpid<<" ...");
00500 
00501    // Check inputs
00502    if (!fpid || strlen(fpid) <= 0) {
00503       TRACE(XERR, "invalid input: "<<fpid);
00504       return -1;
00505    }
00506 
00507    // Paths
00508    XrdOucString opath(fpid), npath;
00509    if (!opath.beginswith(fActiAdminPath.c_str())) {
00510       // We are given a partial path: create full paths
00511       XPDFORM(opath, "%s/%s", fActiAdminPath.c_str(), fpid);
00512       opath.replace(".status", "");
00513    } else {
00514       // Full path: just create the new path
00515       opath.replace(".status", "");
00516    }
00517    // The target path
00518    npath = opath;
00519    npath.replace(fActiAdminPath.c_str(), fTermAdminPath.c_str());
00520 
00521    // Remove the socket path
00522    XrdOucString spath = opath;
00523    spath += ".sock";
00524    if (unlink(spath.c_str()) != 0 && errno != ENOENT)
00525       TRACE(XERR, "problems removing the UNIX socket path: "<<spath<<"; errno: "<<errno);
00526    spath.replace(".sock", ".status");
00527    if (unlink(spath.c_str()) != 0 && errno != ENOENT)
00528       TRACE(XERR, "problems removing the status file: "<<spath<<"; errno: "<<errno);
00529 
00530    // Move the file
00531    errno = 0;
00532    int rc = 0;
00533    if ((rc = rename(opath.c_str(), npath.c_str())) == 0 || (errno == ENOENT)) {
00534       if (!rc)
00535          // Record the time when we did this
00536          TouchSession(fpid, npath.c_str());
00537       return 0;
00538    }
00539 
00540    TRACE(XERR, "session pid file cannot be moved: "<<opath<<
00541               "; target file: "<<npath<<"; errno: "<<errno);
00542    return -1;
00543 }
00544 
00545 //______________________________________________________________________________
00546 int XrdProofdProofServMgr::RmSession(const char *fpid)
00547 {
00548    // Remove session file from the terminated sessions area
00549    XPDLOC(SMGR, "ProofServMgr::RmSession")
00550 
00551    TRACE(REQ, "removing "<<fpid<<" ...");
00552 
00553    // Check inputs
00554    if (!fpid || strlen(fpid) <= 0) {
00555       TRACE(XERR, "invalid input: "<<fpid);
00556       return -1;
00557    }
00558 
00559    // Path
00560    XrdOucString path;
00561    XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), fpid);
00562 
00563    // remove the file
00564    if (unlink(path.c_str()) == 0)
00565       return 0;
00566 
00567    TRACE(XERR, "session pid file cannot be unlinked: "<<
00568                path<<"; error: "<<errno);
00569    return -1;
00570 }
00571 
00572 //______________________________________________________________________________
00573 int XrdProofdProofServMgr::TouchSession(const char *fpid, const char *fpath)
00574 {
00575    // Update the access time for the session pid file to the current time
00576    XPDLOC(SMGR, "ProofServMgr::TouchSession")
00577 
00578    TRACE(REQ, "touching "<<fpid<<", "<<fpath<<" ...");
00579 
00580    // Check inputs
00581    if (!fpid || strlen(fpid) <= 0) {
00582       TRACE(XERR, "invalid input: "<<fpid);
00583       return -1;
00584    }
00585 
00586    // Path
00587    XrdOucString path(fpath);
00588    if (!fpath || strlen(fpath) == 0)
00589       XPDFORM(path, "%s/%s.status", fActiAdminPath.c_str(), fpid);
00590 
00591    // Update file time stamps
00592    if (utime(path.c_str(), 0) == 0)
00593       return 0;
00594 
00595    TRACE(XERR, "time stamps for session pid file cannot be updated: "<<
00596                path<<"; error: "<<errno);
00597    return -1;
00598 }
00599 
00600 //______________________________________________________________________________
00601 int XrdProofdProofServMgr::VerifySession(const char *fpid,
00602                                          int to, const char *fpath)
00603 {
00604    // Check if the session is alive, i.e. if it has recently touched its admin file.
00605    // Return 0 if alive, 1 if not-responding, -1 in case of error.
00606    // The timeout for verification is 'to' if positive, else fVerifyTimeOut;
00607    // the admin file is looked under 'fpath' if defined, else fActiAdminPath.
00608    XPDLOC(SMGR, "ProofServMgr::VerifySession")
00609 
00610    // Check inputs
00611    if (!fpid || strlen(fpid) <= 0) {
00612       TRACE(XERR, "invalid input: "<<fpid);
00613       return -1;
00614    }
00615 
00616    // Path
00617    XrdOucString path;
00618    if (fpath && strlen(fpath) > 0)
00619       XPDFORM(path, "%s/%s", fpath, fpid);
00620    else
00621       XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), fpid);
00622 
00623    // Check first the new file but also the old one, for backward compatibility
00624    int deltat = -1;
00625    bool checkmore = 1;
00626    while (checkmore) {
00627       // Current settings
00628       struct stat st;
00629       if (stat(path.c_str(), &st)) {
00630          TRACE(XERR, "session status file cannot be stat'ed: "<<
00631                      path<<"; error: "<<errno);
00632          return -1;
00633       }
00634       // Check times
00635       int xto = (to > 0) ? to : fVerifyTimeOut;
00636       deltat = time(0) - st.st_mtime;
00637       if (deltat > xto) {
00638          if (path.endswith(".status")) {
00639             // Check the old one too
00640             path.erase(path.rfind(".status"));
00641          } else {
00642             // Dead
00643             TRACE(DBG, "admin path for session "<<fpid<<" hase not been touched"
00644                        " since at least "<< xto <<" secs");
00645             return 1;
00646          }
00647       } else {
00648          // We are done
00649          checkmore = 0;
00650       }
00651    }
00652 
00653    // Alive
00654    TRACE(DBG, "admin path for session "<<fpid<<" was touched " <<
00655               deltat <<" secs ago");
00656    return 0;
00657 }
00658 
00659 //______________________________________________________________________________
00660 int XrdProofdProofServMgr::DeleteFromSessions(const char *fpid)
00661 {
00662    // Delete from the hash list the session with ID pid.
00663    // Return -ENOENT if not found, or 0.
00664    XPDLOC(SMGR, "ProofServMgr::DeleteFromSessions")
00665 
00666    TRACE(REQ, "session: "<<fpid);
00667 
00668    // Check inputs
00669    if (!fpid || strlen(fpid) <= 0) {
00670       TRACE(XERR, "invalid input: "<<fpid);
00671       return -1;
00672    }
00673 
00674    XrdOucString key = fpid;
00675    key.replace(".status", "");
00676    key.erase(0, key.rfind('.') + 1);
00677    XrdProofdProofServ *xps = 0;
00678    { XrdSysMutexHelper mhp(fMutex); xps = fSessions.Find(key.c_str()); }
00679    if (xps) {
00680       // Tell other attached clients, if any, that this session is gone
00681       XrdOucString msg;
00682       XPDFORM(msg, "session: %s terminated by peer", fpid);
00683       TRACE(DBG, msg);
00684       // Reset this instance
00685       int tp = xps->Reset(msg.c_str(), kXPD_wrkmortem);
00686       // Update counters and lists
00687       XrdSysMutexHelper mhp(fMutex);
00688       if (tp == 1) fCurrentSessions--;
00689       // remove from the list of active sessions
00690       fActiveSessions.remove(xps);
00691    }
00692    int rc = -1;
00693    { XrdSysMutexHelper mhp(fMutex); rc = fSessions.Del(key.c_str()); }
00694    return rc;
00695 }
00696 
00697 //______________________________________________________________________________
00698 int XrdProofdProofServMgr::PrepareSessionRecovering()
00699 {
00700    // Go through the active sessions admin path and prepare reconnection of those
00701    // still alive.
00702    // Called at start-up.
00703    XPDLOC(SMGR, "ProofServMgr::PrepareSessionRecovering")
00704 
00705    // Open dir
00706    DIR *dir = opendir(fActiAdminPath.c_str());
00707    if (!dir) {
00708       TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
00709       return -1;
00710    }
00711    TRACE(REQ, "preparing recovering of active sessions ...");
00712 
00713    // Scan the active sessions admin path
00714    fRecoverClients = new std::list<XpdClientSessions *>;
00715    struct dirent *ent = 0;
00716    while ((ent = (struct dirent *)readdir(dir))) {
00717       if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
00718       // Get the session instance (skip non-digital entries)
00719       XrdOucString rest, a;
00720       int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
00721       if (!XPD_LONGOK(pid) || pid <= 0) continue;
00722       if (a.length() > 0) continue;
00723       bool rmsession = 1;
00724       // Check if the process is still alive
00725       if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
00726          if (ResolveSession(ent->d_name) == 0) {
00727             TRACE(DBG, "found active session: "<<pid);
00728             rmsession = 0;
00729          }
00730       }
00731       // Remove the session, if needed
00732       if (rmsession)
00733          MvSession(ent->d_name);
00734    }
00735    // Close the directory
00736    closedir(dir);
00737 
00738    // Start the recovering thread, if needed
00739    int nrc = 0;
00740    { XrdSysMutexHelper mhp(fRecoverMutex); nrc = fRecoverClients->size(); }
00741    if (nrc > 0) {
00742       // Start recovering thread
00743       pthread_t tid;
00744       // Fill manager pointers structure
00745       fManagerCron.fClientMgr = fMgr->ClientMgr();
00746       fManagerCron.fSessionMgr = this;
00747       fManagerCron.fProofSched = fMgr->ProofSched();
00748       if (XrdSysThread::Run(&tid, XrdProofdProofServRecover, (void *)&fManagerCron,
00749                             0, "ProofServMgr session recover thread") != 0) {
00750          TRACE(XERR, "could not start session recover thread");
00751          return 0;
00752       }
00753       XPDPRT("session recover thread started");
00754    } else {
00755       // End reconnect state if there is nothing to reconnect
00756       if (fMgr->ClientMgr() && fMgr->ClientMgr()->GetNClients() <= 0)
00757          SetReconnectTime(0);
00758    }
00759 
00760    // Done
00761    return 0;
00762 }
00763 
00764 
00765 //______________________________________________________________________________
00766 int XrdProofdProofServMgr::RecoverActiveSessions()
00767 {
00768    // Accept connections from sessions still alive. This is run in a dedicated
00769    // thread.
00770    // Returns -1 in case of failure, 0 if all alive sessions reconnected or the
00771    // numer of sessions not reconnected if the timeout (fRecoverTimeOut per client)
00772    // expired.
00773    XPDLOC(SMGR, "ProofServMgr::RecoverActiveSessions")
00774 
00775    int rc = 0;
00776 
00777    if (!fRecoverClients) {
00778       // Invalid input
00779       TRACE(XERR, "recovering clients list undefined");
00780       return -1;
00781    }
00782 
00783    int nrc = 0;
00784    { XrdSysMutexHelper mhp(fRecoverMutex); nrc = fRecoverClients->size(); }
00785    TRACE(REQ, "start recovering of "<<nrc<<" clients");
00786 
00787    // Recovering deadline
00788    { XrdSysMutexHelper mhp(fRecoverMutex);
00789      fRecoverDeadline = time(0) + fRecoverTimeOut * nrc; }
00790 
00791    // Respect the deadline
00792    int nr = 0;
00793    XpdClientSessions *cls = 0;
00794    bool go = true;
00795    while (go) {
00796 
00797       // Pickup the first one in the list
00798       { XrdSysMutexHelper mhp(fRecoverMutex); cls = fRecoverClients->front(); }
00799       if (cls) {
00800          SetReconnectTime();
00801          nr += Recover(cls);
00802 
00803          // If all client sessions reconnected remove the client from the list
00804          {  XrdSysMutexHelper mhp(cls->fMutex);
00805             if (cls->fProofServs.size() <= 0) {
00806                XrdSysMutexHelper mhpr(fRecoverMutex);
00807                fRecoverClients->remove(cls);
00808                // We may be over
00809                if ((nrc = fRecoverClients->size()) <= 0)
00810                   break;
00811             }
00812          }
00813       }
00814       TRACE(REQ, nrc<<" clients still to recover");
00815 
00816       // Check the deadline
00817       {  XrdSysMutexHelper mhp(fRecoverMutex);
00818          go = (time(0) < fRecoverDeadline) ? true : false; }
00819    }
00820    // End reconnect state
00821    SetReconnectTime(0);
00822 
00823    // If we reached the deadline, calculate the number of sessions not reconnected
00824    rc = 0;
00825    {  XrdSysMutexHelper mhp(fRecoverMutex);
00826       if (fRecoverClients->size() > 0) {
00827          std::list<XpdClientSessions* >::iterator ii = fRecoverClients->begin();
00828          for (; ii != fRecoverClients->end(); ii++) {
00829             rc += (*ii)->fProofServs.size();
00830          }
00831       }
00832    }
00833 
00834    // Delete the recovering clients list
00835    {  XrdSysMutexHelper mhp(fRecoverMutex);
00836       fRecoverClients->clear();
00837       delete fRecoverClients;
00838       fRecoverClients = 0;
00839       fRecoverDeadline = -1;
00840    }
00841 
00842    // Done
00843    return rc;
00844 }
00845 
00846 //______________________________________________________________________________
00847 bool XrdProofdProofServMgr::IsClientRecovering(const char *usr, const char *grp,
00848                                                int &deadline)
00849 {
00850    // Returns true (an the recovering deadline) if the client has sessions in
00851    // recovering state; returns false otherwise.
00852    // Called during for attach requests.
00853    XPDLOC(SMGR, "ProofServMgr::IsClientRecovering")
00854 
00855    if (!usr || !grp) {
00856       TRACE(XERR, "invalid inputs: usr: "<<usr<<", grp:"<<grp<<" ...");
00857       return false;
00858    }
00859 
00860    deadline = -1;
00861    int rc = false;
00862    {  XrdSysMutexHelper mhp(fRecoverMutex);
00863       if (fRecoverClients && fRecoverClients->size() > 0) {
00864          std::list<XpdClientSessions *>::iterator ii = fRecoverClients->begin();
00865          for (; ii != fRecoverClients->end(); ii++) {
00866             if ((*ii)->fClient && (*ii)->fClient->Match(usr, grp)) {
00867                rc = true;
00868                deadline = fRecoverDeadline;
00869                break;
00870             }
00871          }
00872       }
00873    }
00874    TRACE(DBG, "checking usr: "<<usr<<", grp:"<<grp<<" ... recovering? "<<
00875               rc<<", until: "<<deadline);
00876 
00877    // Done
00878    return rc;
00879 }
00880 
00881 //______________________________________________________________________________
00882 int XrdProofdProofServMgr::CheckActiveSessions(bool verify)
00883 {
00884    // Go through the active sessions admin path and make sure sessions are alive.
00885    // If 'verify' is true also ask the session to proof that they are alive
00886    // via asynchronous ping (the result will be done at next check).
00887    // Move those not responding in the terminated sessions admin path.
00888    XPDLOC(SMGR, "ProofServMgr::CheckActiveSessions")
00889 
00890    TRACE(REQ, "checking active sessions ...");
00891 
00892    // Open dir
00893    DIR *dir = opendir(fActiAdminPath.c_str());
00894    if (!dir) {
00895       TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
00896       return -1;
00897    }
00898 
00899    // Scan the active sessions admin path
00900    struct dirent *ent = 0;
00901    while ((ent = (struct dirent *)readdir(dir))) {
00902       if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
00903       // If a socket path, make sure that the associated session still exists
00904       // and go to the next
00905       if (strstr(ent->d_name, ".sock") && IsSessionSocket(ent->d_name)) continue;
00906       // Get the session instance (skip non-digital entries)
00907       XrdOucString rest, key, after;
00908       int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, after);
00909       // If not a status path, go to the next
00910       if (after != "status") continue;
00911       // If not a good pid
00912       if (!XPD_LONGOK(pid) || pid <= 0) continue;
00913       key += pid;
00914       //
00915       XrdProofdProofServ *xps = 0;
00916       {  XrdSysMutexHelper mhp(fMutex);
00917          xps = fSessions.Find(key.c_str());
00918       }
00919 
00920       bool sessionalive = (VerifySession(ent->d_name) == 0) ? 1 : 0;
00921       bool rmsession = 0;
00922       if (xps) {
00923          if (!xps->IsValid() || !sessionalive) rmsession = 1;
00924       } else {
00925          // Session not yet registered, possibly starting
00926          // Skips checks the admin file verification was OK
00927          if (sessionalive) continue;
00928          rmsession = 1;
00929       }
00930 
00931       // For backward compatibility we need to check the session version
00932       bool oldvers = (xps && xps->ROOT() && xps->ROOT()->SrvProtVers() >= 18) ? 0 : 1;
00933 
00934       // If somebody is interested in this session, we give her/him some
00935       // more time by skipping the connected clients check this time
00936       int nc = -1;
00937       if (!rmsession)
00938          rmsession = xps->CheckSession(oldvers, IsReconnecting(),
00939                                        fShutdownOpt, fShutdownDelay, fMgr->ChangeOwn(), nc);
00940 
00941       // Verify the session: this just sends a request to the session
00942       // to touch the session file; all this will be done asynchronously;
00943       // the result will be checked next time.
00944       // We do not want further propagation at this stage.
00945       if (!rmsession && verify && !oldvers) {
00946          if (xps->VerifyProofServ(0) != 0) {
00947             // This means that the connection is already gone
00948             rmsession = 1;
00949          }
00950       }
00951       TRACE(REQ, "session: "<<ent->d_name<<"; nc: "<<nc<<"; rm: "<<rmsession);
00952       // Remove the session, if needed
00953       if (rmsession)
00954          MvSession(ent->d_name);
00955    }
00956    // Close the directory
00957    closedir(dir);
00958 
00959    // Done
00960    return 0;
00961 }
00962 
00963 //______________________________________________________________________________
00964 int XrdProofdProofServMgr::CheckTerminatedSessions()
00965 {
00966    // Go through the terminated sessions admin path and make sure sessions they
00967    // are gone.
00968    // Hard-kill those still alive.
00969    XPDLOC(SMGR, "ProofServMgr::CheckTerminatedSessions")
00970 
00971    TRACE(REQ, "checking terminated sessions ...");
00972 
00973    // Open dir
00974    DIR *dir = opendir(fTermAdminPath.c_str());
00975    if (!dir) {
00976       TRACE(XERR, "cannot open dir "<<fTermAdminPath<<" ; error: "<<errno);
00977       return -1;
00978    }
00979 
00980    // Scan the terminated sessions admin path
00981    int now = -1;
00982    struct dirent *ent = 0;
00983    while ((ent = (struct dirent *)readdir(dir))) {
00984       if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
00985       // Get the session instance (skip non-digital entries)
00986       XrdOucString rest, a;
00987       int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
00988       if (!XPD_LONGOK(pid) || pid <= 0) continue;
00989 
00990       // Current time
00991       now = (now > 0) ? now : time(0);
00992 
00993       // Full path
00994       XrdOucString path;
00995       XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), ent->d_name);
00996 
00997       // Check termination time
00998       struct stat st;
00999       int rcst = stat(path.c_str(), &st);
01000       TRACE(DBG, pid<<": rcst: "<<rcst<<", now - mtime: "<<now - st.st_mtime<<" secs")
01001       if ((now - st.st_mtime) > fTerminationTimeOut || rcst != 0) {
01002          // Check if the process is still alive
01003          if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
01004             // Send again an hard-kill signal
01005             XrdProofSessionInfo info(path.c_str());
01006             XrdProofUI ui;
01007             XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
01008             XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn());
01009          } else {
01010             // Delete the entry
01011             RmSession(ent->d_name);
01012          }
01013       }
01014    }
01015    // Close the directory
01016    closedir(dir);
01017 
01018    // Done
01019    return 0;
01020 }
01021 
01022 //______________________________________________________________________________
01023 int XrdProofdProofServMgr::CleanClientSessions(const char *usr, int srvtype)
01024 {
01025    // Go through the sessions admin path and clean all sessions belonging to 'usr'.
01026    // Move those not responding in the terminated sessions admin path.
01027    XPDLOC(SMGR, "ProofServMgr::CleanClientSessions")
01028 
01029    TRACE(REQ, "cleaning "<<usr<<" ...");
01030 
01031    // Check which client
01032    bool all = (!usr || strlen(usr) <= 0 || !strcmp(usr, "all")) ? 1 : 0;
01033 
01034    // Get user info
01035    XrdProofUI ui;
01036    if (!all)
01037       XrdProofdAux::GetUserInfo(usr, ui);
01038    XrdOucString path, rest, key, a;
01039 
01040    // We need lock to avoid session actions request while we are doing this
01041    XrdSysRecMutex *mtx = 0;
01042    if (all) {
01043       // Lock us all
01044       mtx = &fMutex;
01045    } else {
01046       // Lock the client
01047       XrdProofdClient *c = fMgr->ClientMgr()->GetClient(usr);
01048       if (c) mtx = c->Mutex();
01049    }
01050 
01051    std::list<int> tobedel;
01052    {  XrdSysMutexHelper mtxh(mtx);
01053 
01054       // Check the terminated session dir first
01055       DIR *dir = opendir(fTermAdminPath.c_str());
01056       if (!dir) {
01057          TRACE(XERR, "cannot open dir "<<fTermAdminPath<<" ; error: "<<errno);
01058       } else {
01059          // Go trough
01060          struct dirent *ent = 0;
01061          while ((ent = (struct dirent *)readdir(dir))) {
01062             // Skip basic entries
01063             if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
01064             // Get the session instance
01065             int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
01066             if (!XPD_LONGOK(pid) || pid <= 0) continue;
01067             // Read info from file and check that we are interested in this session
01068             XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), ent->d_name);
01069             XrdProofSessionInfo info(path.c_str());
01070             // Check user
01071             if (!all && info.fUser != usr) continue;
01072             // Check server type
01073             if (srvtype != kXPD_AnyServer && info.fSrvType != srvtype) continue;
01074             // Refresh user info, if needed
01075             if (all)
01076                XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
01077             // Check if the process is still alive
01078             if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
01079                // Send a hard-kill signal
01080                XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn());
01081             } else {
01082                // Delete the entry
01083                RmSession(ent->d_name);
01084             }
01085          }
01086          // Close the directory
01087          closedir(dir);
01088       }
01089 
01090       // Check the active session dir now
01091       dir = opendir(fActiAdminPath.c_str());
01092       if (!dir) {
01093          TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
01094          return -1;
01095       }
01096 
01097       // Scan the active sessions admin path
01098       struct dirent *ent = 0;
01099       while ((ent = (struct dirent *)readdir(dir))) {
01100          // Skip basic entries
01101          if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
01102          // Get the session instance
01103          int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
01104          if (a == "status") continue;
01105          if (!XPD_LONGOK(pid) || pid <= 0) continue;
01106          // Read info from file and check that we are interested in this session
01107          XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), ent->d_name);
01108          XrdProofSessionInfo info(path.c_str());
01109          if (!all && info.fUser != usr) continue;
01110          // Check server type
01111          if (srvtype != kXPD_AnyServer && info.fSrvType != srvtype) continue;
01112          // Refresh user info, if needed
01113          if (all)
01114             XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
01115          // Check if the process is still alive
01116          if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
01117             // We will remove this later
01118             tobedel.push_back(pid);
01119             // Send a termination signal
01120             XrdProofdAux::KillProcess(pid, 0, ui, fMgr->ChangeOwn());
01121          }
01122          // Flag as terminated
01123          MvSession(ent->d_name);
01124       }
01125       // Close the directory
01126       closedir(dir);
01127    }
01128 
01129    // Cleanup fSessions
01130    std::list<int>::iterator ii = tobedel.begin();
01131    while (ii != tobedel.end()) {
01132       XPDFORM(key, "%d", *ii);
01133       XrdSysMutexHelper mhp(fMutex);
01134       fSessions.Del(key.c_str());
01135       ii++;
01136    }
01137 
01138    // Done
01139    return 0;
01140 }
01141 
01142 //__________________________________________________________________________
01143 void XrdProofdProofServMgr::RegisterDirectives()
01144 {
01145    // Register directives for configuration
01146 
01147    // Register special config directives
01148    Register("proofservmgr", new XrdProofdDirective("proofservmgr", this, &DoDirectiveClass));
01149    Register("putenv", new XrdProofdDirective("putenv", this, &DoDirectiveClass));
01150    Register("putrc", new XrdProofdDirective("putrc", this, &DoDirectiveClass));
01151    Register("shutdown", new XrdProofdDirective("shutdown", this, &DoDirectiveClass));
01152    // Register config directives for ints
01153    Register("intwait",
01154                   new XrdProofdDirective("intwait", (void *)&fInternalWait, &DoDirectiveInt));
01155    Register("reconnto",
01156                   new XrdProofdDirective("reconnto", (void *)&fReconnectTimeOut, &DoDirectiveInt));
01157    // Register config directives for strings
01158    Register("proofplugin",
01159                   new XrdProofdDirective("proofplugin", (void *)&fProofPlugin, &DoDirectiveString));
01160    Register("proofservparents",
01161                   new XrdProofdDirective("proofservparents", (void *)&fParentExecs, &DoDirectiveString));
01162 }
01163 
01164 //______________________________________________________________________________
01165 int XrdProofdProofServMgr::DoDirective(XrdProofdDirective *d,
01166                                        char *val, XrdOucStream *cfg, bool rcf)
01167 {
01168    // Update the priorities of the active sessions.
01169    XPDLOC(SMGR, "ProofServMgr::DoDirective")
01170 
01171    if (!d)
01172       // undefined inputs
01173       return -1;
01174 
01175    if (d->fName == "proofservmgr") {
01176       return DoDirectiveProofServMgr(val, cfg, rcf);
01177    } else if (d->fName == "putenv") {
01178       return DoDirectivePutEnv(val, cfg, rcf);
01179    } else if (d->fName == "putrc") {
01180       return DoDirectivePutRc(val, cfg, rcf);
01181    } else if (d->fName == "shutdown") {
01182       return DoDirectiveShutdown(val, cfg, rcf);
01183    }
01184    TRACE(XERR,"unknown directive: "<<d->fName);
01185    return -1;
01186 }
01187 
01188 //______________________________________________________________________________
01189 int XrdProofdProofServMgr::DoDirectiveProofServMgr(char *val, XrdOucStream *cfg, bool rcf)
01190 {
01191    // Process 'proofswrvmgr' directive
01192    // eg: xpd.proofswrvmgr checkfq:120 termto:100 verifyto:5 recoverto:20
01193    XPDLOC(SMGR, "ProofServMgr::DoDirectiveProofServMgr")
01194 
01195    if (!val || !cfg)
01196       // undefined inputs
01197       return -1;
01198 
01199    if (rcf)
01200       // Do not reconfigure this (need to check what happens with the cron thread ...
01201       return 0;
01202 
01203    int checkfq = -1;
01204    int termto = -1;
01205    int verifyto = -1;
01206    int recoverto = -1;
01207    int checklost = 0;
01208 
01209    while (val) {
01210       XrdOucString tok(val);
01211       if (tok.beginswith("checkfq:")) {
01212          tok.replace("checkfq:", "");
01213          checkfq = strtol(tok.c_str(), 0, 10);
01214       } else if (tok.beginswith("termto:")) {
01215          tok.replace("termto:", "");
01216          termto = strtol(tok.c_str(), 0, 10);
01217       } else if (tok.beginswith("verifyto:")) {
01218          tok.replace("verifyto:", "");
01219          verifyto = strtol(tok.c_str(), 0, 10);
01220       } else if (tok.beginswith("recoverto:")) {
01221          tok.replace("recoverto:", "");
01222          recoverto = strtol(tok.c_str(), 0, 10);
01223       } else if (tok.beginswith("checklost:")) {
01224          tok.replace("checklost:", "");
01225          checklost = strtol(tok.c_str(), 0, 10);
01226       }
01227       // Get next
01228       val = cfg->GetWord();
01229    }
01230 
01231    // Check deprecated 'if' directive
01232    if (fMgr->Host() && cfg)
01233       if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
01234          return 0;
01235 
01236    // Set the values
01237    fCheckFrequency = (XPD_LONGOK(checkfq) && checkfq > 0) ? checkfq : fCheckFrequency;
01238    fTerminationTimeOut = (XPD_LONGOK(termto) && termto > 0) ? termto : fTerminationTimeOut;
01239    fVerifyTimeOut = (XPD_LONGOK(verifyto) && (verifyto > fCheckFrequency + 1))
01240                   ? verifyto : fVerifyTimeOut;
01241    fRecoverTimeOut = (XPD_LONGOK(recoverto) && recoverto > 0) ? recoverto : fRecoverTimeOut;
01242    if (XPD_LONGOK(checklost)) fCheckLost = (checklost != 0) ? 1 : 0;
01243 
01244    XrdOucString msg;
01245    XPDFORM(msg, "checkfq: %d s, termto: %d s, verifyto: %d s, recoverto: %d s, checklost: %d",
01246             fCheckFrequency, fTerminationTimeOut, fVerifyTimeOut, fRecoverTimeOut, fCheckLost);
01247    TRACE(ALL, msg);
01248 
01249    return 0;
01250 }
01251 
01252 //______________________________________________________________________________
01253 int XrdProofdProofServMgr::DoDirectivePutEnv(char *val, XrdOucStream *cfg, bool)
01254 {
01255    // Process 'putenv' directives
01256 
01257    if (!val)
01258       // undefined inputs
01259       return -1;
01260 
01261    // Parse env variables to be passed to 'proofserv':
01262    XrdOucString users, groups, rcval, rcnam;
01263    int smi = -1, smx = -1, vmi = -1, vmx = -1; 
01264    bool hex = 0;
01265    ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
01266 
01267    // Adjust name of the variable
01268    int iequ = rcnam.find('=');
01269    if (iequ == STR_NPOS) return -1;
01270    rcnam.erase(iequ);
01271    
01272    // Fill entries
01273    FillEnvList(&fProofServEnvs, rcnam.c_str(), rcval.c_str(),
01274                                 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
01275 
01276    return 0;
01277 }
01278 
01279 //______________________________________________________________________________
01280 int XrdProofdProofServMgr::DoDirectivePutRc(char *val, XrdOucStream *cfg, bool)
01281 {
01282    // Process 'putrc' directives.
01283    // Syntax:
01284    //    xpd.putrc  [u:<usr1>,<usr2>,...] [g:<grp1>,<grp2>,...] 
01285    //               [s:[svnmin][-][svnmax]] [v:[vermin][-][vermax]] RcVarName RcVarValue
01286    // NB: <usr1>,... and <grp1>,... may contain the wild card '*' 
01287 
01288    if (!val || !cfg)
01289       // undefined inputs
01290       return -1;
01291    
01292    // Parse rootrc variables to be passed to 'proofserv':
01293    XrdOucString users, groups, rcval, rcnam;
01294    int smi = -1, smx = -1, vmi = -1, vmx = -1; 
01295    bool hex = 0;
01296    ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
01297    
01298    // Fill entries
01299    FillEnvList(&fProofServRCs, rcnam.c_str(), rcval.c_str(),
01300                                users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
01301 
01302    return 0;
01303 }
01304 
01305 //______________________________________________________________________________
01306 void XrdProofdProofServMgr::ExtractEnv(char *val, XrdOucStream *cfg,
01307                                        XrdOucString &users, XrdOucString &groups,
01308                                        XrdOucString &rcval, XrdOucString &rcnam,
01309                                        int &smi, int &smx, int &vmi, int &vmx, bool &hex)
01310 {
01311    // Extract env information from the stream 'cfg'
01312 
01313    XrdOucString ssvn, sver;
01314    int idash = -1; 
01315    while (val && val[0]) {
01316       if (!strncmp(val, "u:", 2)) {
01317          users = val;
01318          users.erase(0,2);
01319       } else if (!strncmp(val, "g:", 2)) {
01320          groups = val;
01321          groups.erase(0,2);
01322       } else if (!strncmp(val, "s:", 2)) {
01323          ssvn = val;
01324          ssvn.erase(0,2);
01325          idash = ssvn.find('-');
01326          if (idash != STR_NPOS) {
01327             if (ssvn.isdigit(0, idash-1)) smi = ssvn.atoi(0, idash-1);
01328             if (ssvn.isdigit(idash+1)) smx = ssvn.atoi(idash+1);
01329          } else {
01330             if (ssvn.isdigit()) smi = ssvn.atoi();
01331          }
01332       } else if (!strncmp(val, "v:", 2)) {
01333          sver = val;
01334          sver.erase(0,2);
01335          hex = 0;
01336          if (sver.beginswith('x')) {
01337             hex = 1;
01338             sver.erase(0,1);
01339          }
01340          idash = sver.find('-');
01341          if (idash != STR_NPOS) {
01342             if (sver.isdigit(0, idash-1)) vmi = sver.atoi(0, idash-1);
01343             if (sver.isdigit(idash+1)) vmx = sver.atoi(idash+1);
01344          } else {
01345             if (sver.isdigit()) vmi = sver.atoi();
01346          }
01347       } else {
01348         if (rcval.length() > 0) {
01349            rcval += ' ';
01350         } else {
01351            rcnam = val;
01352         }
01353         rcval += val;
01354       }
01355       val = cfg->GetWord();
01356    }
01357    // Done
01358    return;
01359 }
01360 
01361 //______________________________________________________________________________
01362 void XrdProofdProofServMgr::FillEnvList(std::list<XpdEnv> *el, const char *nam, const char *val,
01363                                         const char *usrs, const char *grps,
01364                                         int smi, int smx, int vmi, int vmx, bool hex)
01365 {
01366    // Fill env entry(ies) in the relevant list
01367    XPDLOC(SMGR, "ProofServMgr::FillEnvList")
01368 
01369    if (!el) {
01370       TRACE(ALL, "env list undefined!");
01371       return;
01372    }
01373    
01374    XrdOucString users(usrs), groups(grps);
01375    // Transform version numbers in the human unreadable format used internally (version code)
01376    if (vmi > 0) vmi = XpdEnv::ToVersCode(vmi, hex);
01377    if (vmx > 0) vmx = XpdEnv::ToVersCode(vmx, hex);
01378    // Create the entry
01379    XpdEnv xpe(nam, val, users.c_str(), groups.c_str(), smi, smx, vmi, vmx);
01380    if (users.length() > 0) {
01381       XrdOucString usr;
01382       int from = 0;
01383       while ((from = users.tokenize(usr, from, ',')) != -1) {
01384          if (usr.length() > 0) {
01385             if (groups.length() > 0) {
01386                XrdOucString grp;
01387                int fromg = 0;
01388                while ((fromg = groups.tokenize(grp, from, ',')) != -1) {
01389                   if (grp.length() > 0) {
01390                      xpe.Reset(nam, val, usr.c_str(), grp.c_str(), smi, smx, vmi, vmx);
01391                      el->push_back(xpe);
01392                   }
01393                }
01394             } else {
01395                xpe.Reset(nam, val, usr.c_str(), 0, smi, smx, vmi, vmx);
01396                el->push_back(xpe);
01397             }
01398          }
01399       }
01400    } else {
01401       if (groups.length() > 0) {
01402          XrdOucString grp;
01403          int fromg = 0;
01404          while ((fromg = groups.tokenize(grp, fromg, ',')) != -1) {
01405             if (grp.length() > 0) {
01406                xpe.Reset(nam, val, 0, grp.c_str(), smi, smx, vmi, vmx);
01407                el->push_back(xpe);
01408             }
01409          }
01410       } else {
01411          el->push_back(xpe);
01412       }
01413    }
01414    // Done
01415    return;
01416 }
01417 
01418 //______________________________________________________________________________
01419 int XrdProofdProofServMgr::DoDirectiveShutdown(char *val, XrdOucStream *cfg, bool)
01420 {
01421    // Process 'shutdown' directive
01422 
01423    if (!val || !cfg)
01424       // undefined inputs
01425       return -1;
01426 
01427    int opt = -1;
01428    int delay = -1;
01429 
01430    // Shutdown option
01431    int dp = strtol(val,0,10);
01432    if (dp >= 0 && dp <= 2)
01433       opt = dp;
01434    // Shutdown delay
01435    if ((val = cfg->GetWord())) {
01436       int l = strlen(val);
01437       int f = 1;
01438       XrdOucString tval = val;
01439       // Parse
01440       if (val[l-1] == 's') {
01441          val[l-1] = 0;
01442       } else if (val[l-1] == 'm') {
01443          f = 60;
01444          val[l-1] = 0;
01445       } else if (val[l-1] == 'h') {
01446          f = 3600;
01447          val[l-1] = 0;
01448       } else if (val[l-1] < 48 || val[l-1] > 57) {
01449          f = -1;
01450       }
01451       if (f > 0) {
01452          int de = strtol(val,0,10);
01453          if (de > 0)
01454             delay = de * f;
01455       }
01456    }
01457 
01458    // Check deprecated 'if' directive
01459    if (fMgr->Host() && cfg)
01460       if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
01461          return 0;
01462 
01463    // Set the values
01464    fShutdownOpt = (opt > -1) ? opt : fShutdownOpt;
01465    fShutdownDelay = (delay > -1) ? delay : fShutdownDelay;
01466 
01467    return 0;
01468 }
01469 
01470 //______________________________________________________________________________
01471 int XrdProofdProofServMgr::Process(XrdProofdProtocol *p)
01472 {
01473    // Process manager request
01474    XPDLOC(SMGR, "ProofServMgr::Process")
01475 
01476    int rc = 1;
01477    XPD_SETRESP(p, "Process");
01478 
01479    TRACEP(p, REQ, "enter: req id: " << p->Request()->header.requestid << " (" <<
01480                 XrdProofdAux::ProofRequestTypes(p->Request()->header.requestid) << ")");
01481 
01482    XrdSysMutexHelper mtxh(p->Client()->Mutex());
01483 
01484    // Once logged-in, the user can request the real actions
01485    XrdOucString emsg("Invalid request code: ");
01486 
01487    int twait = 20;
01488 
01489    if (Pipe()->Post(XrdProofdProofServMgr::kProcessReq, 0) != 0) {
01490       response->Send(kXR_ServerError,
01491                      "ProofServMgr::Process: error posting internal pipe for authorization to proceed");
01492       return 0;
01493    }
01494    if (fProcessSem.Wait(twait) != 0) {
01495       response->Send(kXR_ServerError,
01496                      "ProofServMgr::Process: timed-out waiting for authorization to proceed - retry later");
01497       return 0;
01498    }
01499 
01500    // This is needed to block the session checks
01501    XpdSrvMgrCreateCnt cnt(this, kProcessCnt);
01502 
01503    switch(p->Request()->header.requestid) {
01504    case kXP_create:
01505       return Create(p);
01506    case kXP_destroy:
01507       return Destroy(p);
01508    case kXP_attach:
01509       return Attach(p);
01510    case kXP_detach:
01511       return Detach(p);
01512    default:
01513       emsg += p->Request()->header.requestid;
01514       break;
01515    }
01516 
01517    // Whatever we have, it's not valid
01518    response->Send(kXR_InvalidRequest, emsg.c_str());
01519    return 0;
01520 }
01521 
01522 //______________________________________________________________________________
01523 int XrdProofdProofServMgr::Attach(XrdProofdProtocol *p)
01524 {
01525    // Handle a request to attach to an existing session
01526    XPDLOC(SMGR, "ProofServMgr::Attach")
01527 
01528    int psid = -1, rc = 0;
01529    XPD_SETRESP(p, "Attach");
01530 
01531    // Unmarshall the data
01532    psid = ntohl(p->Request()->proof.sid);
01533    TRACEP(p, REQ, "psid: "<<psid<<", CID = "<<p->CID());
01534 
01535    // The client instance must be defined
01536    XrdProofdClient *c = p->Client();
01537    if (!c) {
01538       TRACEP(p, XERR, "client instance undefined");
01539       response->Send(kXR_ServerError,"client instance undefined");
01540       return 0;
01541    }
01542 
01543    // Find server session; sessions maybe recovering, so we need to take
01544    // that into account
01545    XrdProofdProofServ *xps = 0;
01546    int now = time(0);
01547    int deadline = -1, defdeadline = now + fRecoverTimeOut;
01548    while ((deadline < 0) || (now < deadline)) {
01549       if (!(xps = c->GetServer(psid)) || !xps->IsValid()) {
01550          // If the client is recovering start regular checks
01551          if (!IsClientRecovering(c->User(), c->Group(), deadline)) {
01552             // Failure
01553             TRACEP(p, XERR, "session ID not found: "<<psid);
01554             response->Send(kXR_InvalidRequest,"session ID not found");
01555             return 0;
01556          } else {
01557             // Make dure we do not enter an infinite loop
01558             deadline = (deadline > 0) ? deadline : defdeadline;
01559             // Wait until deadline in 1 sec steps
01560             sleep(1);
01561             now++;
01562          }
01563       } else {
01564          // Found
01565          break;
01566       }
01567    }
01568    // If we deadline we should fail now
01569    if (!xps || !xps->IsValid()) {
01570       TRACEP(p, XERR, "session ID not found: "<<psid);
01571       response->Send(kXR_InvalidRequest,"session ID not found");
01572       return 0;
01573    }
01574    TRACEP(p, DBG, "xps: "<<xps<<", status: "<< xps->Status());
01575 
01576    // Stream ID
01577    unsigned short sid;
01578    memcpy((void *)&sid, (const void *)&(p->Request()->header.streamid[0]), 2);
01579 
01580    // We associate this instance to the corresponding slot in the
01581    // session vector of attached clients
01582    XrdClientID *csid = xps->GetClientID(p->CID());
01583    csid->SetP(p);
01584    csid->SetSid(sid);
01585 
01586    // Take parentship, if orphalin
01587    if (!(xps->Parent()))
01588       xps->SetParent(csid);
01589 
01590    // Notify to user
01591    int protvers = (xps && xps->ROOT()) ? xps->ROOT()->SrvProtVers() : -1;
01592    if (p->ConnType() == kXPD_ClientMaster) {
01593       // Send also back the data pool url
01594       XrdOucString dpu = fMgr->PoolURL();
01595       if (!dpu.endswith('/'))
01596          dpu += '/';
01597       dpu += fMgr->NameSpace();
01598       response->SendI(psid, protvers, (kXR_int16)XPROOFD_VERSBIN,
01599                            (void *) dpu.c_str(), dpu.length());
01600    } else
01601       response->SendI(psid, protvers, (kXR_int16)XPROOFD_VERSBIN);
01602 
01603    // Send saved start processing message, if not idle
01604    if (xps->Status() == kXPD_running && xps->StartMsg()) {
01605       TRACEP(p, XERR, "sending start process message ("<<xps->StartMsg()->fSize<<" bytes)");
01606       response->Send(kXR_attn, kXPD_msg,
01607                           xps->StartMsg()->fBuff, xps->StartMsg()->fSize);
01608    }
01609 
01610    // Over
01611    return 0;
01612 }
01613 
01614 //_________________________________________________________________________________
01615 int XrdProofdProofServMgr::Create(XrdProofdProtocol *p)
01616 {
01617    // Handle a request to create a new session
01618    XPDLOC(SMGR, "ProofServMgr::Create")
01619 
01620    int psid = -1, rc = 0;
01621    XPD_SETRESP(p, "Create");
01622 
01623    TRACEP(p, DBG, "enter");
01624    XrdOucString msg;
01625 
01626    XpdSrvMgrCreateGuard mcGuard;
01627 
01628    // Check if we are allowed to start a new session
01629    int mxsess = fMgr->ProofSched() ? fMgr->ProofSched()->MaxSessions() : -1;
01630    if (p->ConnType() == kXPD_ClientMaster && mxsess > 0) {
01631       XrdSysMutexHelper mhp(fMutex);
01632       int cursess = CurrentSessions();
01633       TRACEP(p,ALL," cursess: "<<cursess);
01634       if (mxsess <= cursess) {
01635          XPDFORM(msg, " ++++ Max number of sessions reached (%d) - please retry later ++++ \n", cursess); 
01636          response->Send(kXR_attn, kXPD_srvmsg, (char *) msg.c_str(), msg.length());
01637          response->Send(kXP_TooManySess, "cannot start a new session");
01638          return 0;
01639       }
01640       // If we fail this guarantees that the counters are decreased, if needed 
01641       mcGuard.Set(&fCurrentSessions);
01642    }
01643 
01644    // Update counter to control checks during creation
01645    XpdSrvMgrCreateCnt cnt(this, kCreateCnt);
01646    if (TRACING(DBG)) {
01647       int nc = CheckCounter(kCreateCnt);
01648       TRACEP(p, DBG, nc << " threads are creating a new session");
01649    }
01650 
01651    // Allocate next free server ID and fill in the basic stuff
01652    XrdProofdProofServ *xps = p->Client()->GetFreeServObj();
01653    xps->SetClient(p->Client()->User());
01654    xps->SetSrvType(p->ConnType());
01655    psid = xps->ID();
01656 
01657    // Prepare the stream identifier
01658    unsigned short sid;
01659    memcpy((void *)&sid, (const void *)&(p->Request()->header.streamid[0]), 2);
01660    // We associate this instance to the corresponding slot in the
01661    // session vector of attached clients
01662    XrdClientID *csid = xps->GetClientID(p->CID());
01663    csid->SetSid(sid);
01664    csid->SetP(p);
01665    // Take parentship, if orphalin
01666    xps->SetParent(csid);
01667 
01668    // Unmarshall log level
01669    int loglevel = ntohl(p->Request()->proof.int1);
01670 
01671    // Parse buffer
01672    char *buf = p->Argp()->buff;
01673    int   len = p->Request()->proof.dlen;
01674 
01675    // Extract session tag
01676    XrdOucString tag(buf,len);
01677 
01678    TRACEP(p, DBG, "received buf: "<<tag);
01679 
01680    tag.erase(tag.find('|'));
01681    xps->SetTag(tag.c_str());
01682    TRACEP(p, DBG, "tag: "<<tag);
01683 
01684    // Extract ordinal number
01685    XrdOucString ord = "0";
01686    if ((p->ConnType() == kXPD_MasterWorker) || (p->ConnType() == kXPD_MasterMaster)) {
01687       ord.assign(buf,0,len-1);
01688       int iord = ord.find("|ord:");
01689       if (iord != STR_NPOS) {
01690          ord.erase(0,iord+5);
01691          ord.erase(ord.find("|"));
01692       } else
01693          ord = "0";
01694    }
01695    xps->SetOrdinal(ord.c_str());
01696 
01697    // Extract config file, if any (for backward compatibility)
01698    XrdOucString cffile;
01699    cffile.assign(buf,0,len-1);
01700    int icf = cffile.find("|cf:");
01701    if (icf != STR_NPOS) {
01702       cffile.erase(0,icf+4);
01703       cffile.erase(cffile.find("|"));
01704    } else
01705       cffile = "";
01706 
01707    // Extract user envs, if any
01708    XrdOucString uenvs;
01709    uenvs.assign(buf,0,len-1);
01710    int ienv = uenvs.find("|envs:");
01711    if (ienv != STR_NPOS) {
01712       uenvs.erase(0,ienv+6);
01713       uenvs.erase(uenvs.find("|"));
01714       xps->SetUserEnvs(uenvs.c_str());
01715    } else
01716       uenvs = "";
01717 
01718    // Check if the user wants to wait more for the session startup
01719    int intwait = fInternalWait;
01720    if (uenvs.length() > 0) {
01721       TRACEP(p, DBG, "user envs: "<<uenvs);
01722       int iiw = STR_NPOS;
01723       if ((iiw = uenvs.find("PROOF_INTWAIT=")) !=  STR_NPOS) {
01724          XrdOucString s(uenvs, iiw + strlen("PROOF_INTWAIT="));
01725          s.erase(s.find(','));
01726          if (s.isdigit()) {
01727             intwait = s.atoi();
01728             TRACEP(p, ALL, "startup internal wait set by user to "<<intwait);
01729          }
01730       }
01731    }
01732 
01733    // The ROOT version to be used
01734    xps->SetROOT(p->Client()->ROOT());
01735    XPDFORM(msg, "using ROOT version: %s", xps->ROOT()->Export());
01736    TRACEP(p, REQ, msg);
01737    if (p->ConnType() == kXPD_ClientMaster) {
01738       // Notify the client if using a version different from the default one
01739       if (p->Client()->ROOT() != fMgr->ROOTMgr()->DefaultVersion()) {
01740          XPDFORM(msg, "++++ Using NON-default ROOT version: %s ++++\n", xps->ROOT()->Export());
01741          response->Send(kXR_attn, kXPD_srvmsg, (char *) msg.c_str(), msg.length());
01742       }
01743    }
01744 
01745    // Notify
01746    TRACEP(p, DBG, "{ord,cfg,psid,cid,log}: {"<<ord<<","<<cffile<<","<<psid
01747                                              <<","<<p->CID()<<","<<loglevel<<"}");
01748 
01749    // Here we fork: for some weird problem on SMP machines there is a
01750    // non-zero probability for a deadlock situation in system mutexes.
01751    // The semaphore seems to have solved the problem.
01752    if (fForkSem.Wait(10) != 0) {
01753       xps->Reset();
01754       // Timeout acquire fork semaphore
01755       response->Send(kXP_ServerError, "timed-out acquiring fork semaphore");
01756       return 0;
01757    }
01758 
01759    // Pipe for child-to-parent communications during setup
01760    XrdProofdPipe fpc, fcp;
01761    if (!(fpc.IsValid()) || !(fcp.IsValid())) {
01762       xps->Reset();
01763       // Failure creating pipe
01764       response->Send(kXP_ServerError,
01765                      "unable to create pipes for communication during setup");
01766       return 0;
01767    }
01768 
01769    // Fork an agent process to handle this session
01770    int pid = -1;
01771    TRACEP(p, FORK,"Forking external proofsrv");
01772    if (!(pid = fMgr->Sched()->Fork("proofsrv"))) {
01773 
01774       // Get unique tag and relevant dirs for this session
01775       ProofServEnv_t in = {xps, loglevel, cffile.c_str(), "", "", "", "", ""};
01776       GetTagDirs(p, xps, in.fSessionTag, in.fTopSessionTag, in.fSessionDir, in.fWrkDir);
01777       XPDFORM(in.fLogFile, "%s.log", in.fWrkDir.c_str());
01778       TRACE(FORK, "log file: "<<in.fLogFile);
01779 
01780       XpdMsg xmsg;
01781       XrdOucString path, sockpath, emsg;
01782 
01783       // Receive the admin path from the parent
01784       if (fpc.Poll() < 0) {
01785          TRACE(XERR, "error while polling to receive the admin path from parent - EXIT" );
01786          exit(1);
01787       }
01788       if (fpc.Recv(xmsg) != 0) {
01789          TRACE(XERR, "error reading message while waiting for the admin path from parent - EXIT" );
01790          exit(1);
01791       }
01792       if (xmsg.Type() < 0) {
01793          TRACE(XERR, "the parent failed to setup the admin path - EXIT" );
01794          exit(1);
01795       }
01796       // Set the path w/o asserting the related files
01797       path = xmsg.Buf();
01798       xps->SetAdminPath(path.c_str(), 0);
01799       TRACE(FORK, "child: admin path: "<<path);
01800 
01801       xmsg.Reset();
01802       // Receive the sock path from the parent
01803       if (fpc.Poll() < 0) {
01804          TRACE(XERR, "error while polling to receive the sock path from parent - EXIT" );
01805          exit(1);
01806       }
01807       if (fpc.Recv(xmsg) != 0) {
01808          TRACE(XERR, "error reading message while waiting for the sock path from parent - EXIT" );
01809          exit(1);
01810       }
01811       if (xmsg.Type() < 0) {
01812          TRACE(XERR, "the parent failed to setup the sock path - EXIT" );
01813          exit(1);
01814       }
01815       // Set the UNIX sock path
01816       sockpath = xmsg.Buf();
01817       xps->SetUNIXSockPath(sockpath.c_str());
01818       TRACE(FORK, "child: UNIX sock path: "<<sockpath);
01819 
01820       // Log to the session log file from now on
01821       if (fLogger) fLogger->Bind(in.fLogFile.c_str());
01822 
01823       // These files belongs to the client
01824       if (chown(in.fLogFile.c_str(), p->Client()->UI().fUid, p->Client()->UI().fGid) != 0)
01825          TRACE(XERR, "chown on '"<<in.fLogFile.c_str()<<"'; errno: "<<errno);
01826 
01827       XrdOucString pmsg = "child process ";
01828       pmsg += (int) getpid();
01829       TRACE(FORK, pmsg);
01830 
01831       // We set to the user ownerships and create relevant dirs
01832       bool asserdatadir = 1;
01833       int srvtype = xps->SrvType();
01834       TRACE(ALL,"srvtype = "<< srvtype);
01835       if (xps->SrvType() != kXPD_Worker && !strchr(fMgr->DataDirOpts(), 'M')) {
01836          asserdatadir = 0;
01837       } else if (xps->SrvType() == kXPD_Worker && !strchr(fMgr->DataDirOpts(), 'W')) {
01838          asserdatadir = 0;
01839       }
01840       const char *pord = asserdatadir ? ord.c_str() : 0;
01841       const char *ptag = asserdatadir ? in.fSessionTag.c_str() : 0;
01842       if (SetUserOwnerships(p, pord, ptag) != 0) {
01843          emsg = "SetUserOwnerships did not return OK - EXIT";
01844          TRACE(XERR, emsg);
01845          if (fcp.Post(0, emsg.c_str()) != 0)
01846             TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
01847          exit(1);
01848       }
01849 
01850       // We set to the user environment
01851       if (SetUserEnvironment(p) != 0) {
01852          emsg = "SetUserEnvironment did not return OK - EXIT";
01853          TRACE(XERR, emsg);
01854          if (fcp.Post(0, emsg.c_str()) != 0)
01855             TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
01856          exit(1);
01857       }
01858 
01859       char *argvv[6] = {0};
01860 
01861       char *sxpd = 0;
01862       if (fMgr && fMgr->AdminPath()) {
01863          // We add our admin path to be able to identify processes coming from us
01864          sxpd = new char[strlen(fMgr->AdminPath()) + strlen("xpdpath:") + 1];
01865          sprintf(sxpd, "xpdpath:%s", fMgr->AdminPath());
01866       } else {
01867          // We add our PID to be able to identify processes coming from us
01868          sxpd = new char[10];
01869          sprintf(sxpd, "%d", getppid());
01870       }
01871 
01872       // Log level
01873       char slog[10] = {0};
01874       sprintf(slog, "%d", loglevel);
01875 
01876       // start server
01877       argvv[0] = (char *) xps->ROOT()->PrgmSrv();
01878       argvv[1] = (char *)((p->ConnType() == kXPD_MasterWorker) ? "proofslave"
01879                        : "proofserv");
01880       argvv[2] = (char *)"xpd";
01881       argvv[3] = (char *)sxpd;
01882       argvv[4] = (char *)slog;
01883       argvv[5] = 0;
01884 
01885       // Set environment for proofserv
01886       if (SetProofServEnv(p, (void *)&in) != 0) {
01887          emsg = "SetProofServEnv did not return OK - EXIT";
01888          TRACE(XERR, emsg);
01889          if (fcp.Post(0, emsg.c_str()) != 0)
01890             TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
01891          exit(1);
01892       }
01893       TRACE(FORK, (int)getpid() << ": proofserv env set up");
01894 
01895       // Setup OK: now we go
01896       // Communicate the logfile path
01897       if (fcp.Post(1, xps->Fileout()) != 0) {
01898          TRACE(XERR, "cannot write log file path to internal pipe; errno: "<<errno);
01899          exit(1);
01900       }
01901       TRACE(FORK, (int)getpid()<< ": log file path communicated");
01902 
01903       // Unblock SIGUSR1 and SIGUSR2
01904       sigset_t myset;
01905       sigemptyset(&myset);
01906       sigaddset(&myset, SIGUSR1);
01907       sigaddset(&myset, SIGUSR2);
01908       pthread_sigmask(SIG_UNBLOCK, &myset, 0);
01909 
01910       // Close pipes
01911       fpc.Close();
01912       fcp.Close();
01913 
01914       TRACE(FORK, (int)getpid()<<": user: "<<p->Client()->User()<<
01915                   ", uid: "<<getuid()<<", euid:"<<geteuid());
01916       // Run the program
01917       execv(xps->ROOT()->PrgmSrv(), argvv);
01918 
01919       // We should not be here!!!
01920       TRACE(XERR, "returned from execv: bad, bad sign !!!");
01921       exit(1);
01922    }
01923 
01924    // Wakeup colleagues
01925    fForkSem.Post();
01926 
01927    // parent process
01928    if (pid < 0) {
01929       xps->Reset();
01930       // Failure in forking
01931       response->Send(kXP_ServerError, "could not fork agent");
01932       return 0;
01933    }
01934 
01935    TRACEP(p, FORK,"Parent process: child is "<<pid);
01936    XrdOucString emsg;
01937 
01938    // Cleanup current socket, if any
01939    if (xps->UNIXSock()) {
01940       TRACEP(p, FORK,"current UNIX sock: "<<xps->UNIXSock() <<", path: "<<xps->UNIXSockPath());
01941       xps->DeleteUNIXSock();
01942    }
01943 
01944    // Admin and UNIX Socket Path (set path and create the socket); we need to
01945    // set and create them in here, otherwise the cleaning may remove the socket
01946    XrdOucString path, sockpath;
01947    XPDFORM(path, "%s/%s.%s.%d", fActiAdminPath.c_str(),
01948                                 p->Client()->User(), p->Client()->Group(), pid);
01949    // Sock path under dedicated directory to avoid problems related to its length
01950    XPDFORM(sockpath, "%s/xpd.%d.%d", fMgr->SockPathDir(), fMgr->Port(), pid);
01951    if (sockpath.length() > 100) {
01952       emsg += ": socket path very long (";
01953       emsg += sockpath.length();
01954       emsg += "): this may lead to stack corruption!";
01955       emsg += " Use xpd.sockpathdir to change it";
01956    }
01957    int pathrc = 0;
01958    if (!pathrc && !(pathrc = xps->SetAdminPath(path.c_str(), 1))) {
01959       // Communicate the path to child
01960       if ((pathrc = fpc.Post(0, path.c_str())) != 0)
01961          emsg += ": failed to communicating path to child";
01962    } else {
01963       emsg += ": failed to setup child admin path";
01964       // Communicate failure to child
01965       if ((pathrc = fpc.Post(-1, path.c_str())) != 0)
01966          emsg += ": failed communicating failure to child";
01967    }
01968    // Now create the UNIX sock path
01969    if (!pathrc) {
01970       xps->SetUNIXSockPath(sockpath.c_str());
01971       if ((pathrc = xps->CreateUNIXSock(fEDest)) != 0) {
01972          // Failure
01973          emsg += ": failure creating UNIX socket on " ;
01974          emsg += sockpath;
01975       }
01976    }
01977    if (!pathrc) {
01978       TRACEP(p, FORK,"UNIX sock: "<<xps->UNIXSockPath());
01979       if ((pathrc = chown(sockpath.c_str(), p->Client()->UI().fUid, p->Client()->UI().fGid)) != 0) {
01980          emsg += ": failure changing ownership of the UNIX socket on " ;
01981          emsg += sockpath;
01982          emsg += "; errno: " ;
01983          emsg += errno;
01984       }
01985    }
01986    // Communicate sockpath or failure, if any 
01987    if (!pathrc) {
01988       // Communicate the path to child
01989       if ((pathrc = fpc.Post(0, sockpath.c_str())) != 0)
01990          emsg += ": failed to communicating path to child";
01991    } else {
01992       emsg += ": failed to setup child admin path";
01993       // Communicate failure to child
01994       if ((pathrc = fpc.Post(-1, sockpath.c_str())) != 0)
01995          emsg += ": failed communicating failure to child";
01996    }
01997    if (pathrc != 0) {
01998       // Failure
01999       xps->Reset();
02000       XrdProofdAux::KillProcess(pid, 1, p->Client()->UI(), fMgr->ChangeOwn());
02001       response->Send(kXP_ServerError, emsg.c_str());
02002       return 0;
02003    }
02004 
02005    TRACEP(p, FORK, "waiting for client setup status ...");
02006 
02007    emsg = "proofserv setup";
02008    // Wait for the setup process on the pipe, 20 secs max (10 x 2000 millisecs): this
02009    // is enough to cover possible delays due to heavy load; the client will anyhow
02010    // retry a few times
02011    int ntry = 10, prc = 0, rst = -1;
02012    while (prc == 0 && ntry--) {
02013       // Poll for 2 secs
02014       if ((prc = fcp.Poll(2)) > 0) {
02015          // Got something: read the message out
02016          XpdMsg xmsg;
02017          if (fcp.Recv(xmsg) != 0) {
02018             emsg += ": error receiving message from pipe";
02019             prc = -1;
02020             break;
02021          }
02022          // Status is the message type
02023          rst = xmsg.Type();
02024          // Read string, if any
02025          XrdOucString xbuf = xmsg.Buf();
02026          if (xbuf.length() <= 0) {
02027             emsg = "error reading buffer {logfile, error message} from message received on the pipe";
02028             prc = -1;
02029             break;
02030          }
02031          if (rst > 0) {
02032             // Set the log file
02033             xps->SetFileout(xbuf.c_str());
02034             // Set also the session tag
02035             XrdOucString stag(xbuf);
02036             stag.erase(stag.rfind('/'));
02037             stag.erase(0, stag.find("session-") + strlen("session-"));
02038             xps->SetTag(stag.c_str());
02039 
02040          } else {
02041             // Setup failed: save the error
02042             prc = -1;
02043             emsg += ": failed: ";
02044             emsg += xbuf;
02045             break;
02046          }
02047 
02048       } else if (prc < 0) {
02049          emsg += ": error receive status-of-setup from pipe";
02050          break;
02051       } else {
02052          TRACEP(p, FORK, "receiving status-of-setup from pipe: waiting 2 s ..."<<pid);
02053       }
02054    }
02055 
02056    // Close pipes
02057    fpc.Close();
02058    fcp.Close();
02059 
02060    // Notify the user
02061    if (prc <= 0) {
02062       // Timed-out or failed: we are done; if timed-out finalize the notification message
02063       if (prc == 0) emsg += ": timed-out receiving status-of-setup from pipe";
02064       emsg += ": failure setting up proofserv" ;
02065       xps->Reset();
02066       XrdProofdAux::KillProcess(pid, 1, p->Client()->UI(), fMgr->ChangeOwn());
02067       response->Send(kXP_ServerError, emsg.c_str());
02068       return 0;
02069 
02070    } else {
02071       // Setup was successful
02072       if (p->ConnType() == kXPD_ClientMaster) {
02073          // Send also back the data pool url
02074          XrdOucString dpu = fMgr->PoolURL();
02075          if (!dpu.endswith('/'))
02076             dpu += '/';
02077          dpu += fMgr->NameSpace();
02078          response->SendI(psid, xps->ROOT()->SrvProtVers(), (kXR_int16)XPROOFD_VERSBIN,
02079                               (void *) dpu.c_str(), dpu.length());
02080       } else
02081          response->SendI(psid, xps->ROOT()->SrvProtVers(), (kXR_int16)XPROOFD_VERSBIN);
02082    }
02083 
02084    // now we wait for the callback to be (successfully) established
02085    TRACEP(p, FORK, "server launched: wait for callback ");
02086 
02087    // Set ID
02088    xps->SetSrvPID(pid);
02089 
02090    // Wait for the call back
02091    if (Accept(xps, intwait, emsg) != 0) {
02092       // Failure: kill the child process
02093       if (XrdProofdAux::KillProcess(pid, 0, p->Client()->UI(), fMgr->ChangeOwn()) != 0)
02094          emsg += ": process could not be killed";
02095       else
02096          emsg += ": process killed";
02097       // Reset the instance
02098       xps->Reset();
02099       // Notify
02100       TRACEP(p, XERR, "problems accepting callback: " <<emsg);
02101       response->Send(kXR_attn, kXPD_errmsg, (char *) emsg.c_str(), emsg.length());
02102       return 0;
02103    }
02104    // Set the group, if any
02105    xps->SetGroup(p->Client()->Group());
02106 
02107    // Change child process priority, if required
02108    int dp = 0;
02109    if (fMgr->PriorityMgr()->SetProcessPriority(xps->SrvPID(),
02110                                                         p->Client()->User(), dp) != 0) {
02111       TRACEP(p, XERR, "problems changing child process priority");
02112    } else if (dp > 0) {
02113       TRACEP(p, DBG, "priority of the child process changed by " << dp << " units");
02114    }
02115 
02116    XrdClientID *cid = xps->Parent();
02117    TRACEP(p, FORK, "xps: "<<xps<<", ClientID: "<<(int *)cid<<" (sid: "<<sid<<")"<<" NClients: "<<xps->GetNClients(1));
02118 
02119    // Record this session in the client sandbox
02120    if (p->Client()->Sandbox()->AddSession(xps->Tag()) == -1)
02121       TRACEP(p, REQ, "problems recording session in sandbox");
02122 
02123    // Success; avoid that the global counter is decreased
02124    mcGuard.Set(0);
02125 
02126    // Update the global session handlers
02127    XrdOucString key; key += pid;
02128    {  XrdSysMutexHelper mh(fMutex);
02129       fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
02130       fActiveSessions.push_back(xps);
02131    }
02132    AddSession(p, xps);
02133 
02134    // Check session validity
02135    if (!xps->IsValid()) {
02136       // Notify
02137       TRACEP(p, XERR, "PROOF session is invalid: protocol error? " <<emsg);
02138    }
02139 
02140    // Over
02141    return 0;
02142 }
02143 
02144 //_________________________________________________________________________________
02145 int XrdProofdProofServMgr::ResolveSession(const char *fpid)
02146 {
02147    // Handle a request to recover a session after stop&restart
02148    XPDLOC(SMGR, "ProofServMgr::ResolveSession")
02149 
02150    TRACE(REQ,  "resolving "<< fpid<<" ...");
02151 
02152    // Check inputs
02153    if (!fpid || strlen(fpid)<= 0 || !(fMgr->ClientMgr()) || !fRecoverClients) {
02154       TRACE(XERR, "invalid inputs: "<<fpid<<", "<<fMgr->ClientMgr()<<
02155                   ", "<<fRecoverClients);
02156       return -1;
02157    }
02158 
02159    // Path to the session file
02160    XrdOucString path;
02161    XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), fpid);
02162 
02163    // Read info
02164    XrdProofSessionInfo si(path.c_str());
02165 
02166    // Check if recovering is supported
02167    if (si.fSrvProtVers < 18) {
02168       TRACE(DBG, "session does not support recovering: protocol "
02169                  <<si.fSrvProtVers<<" < 18");
02170       return -1;
02171    }
02172 
02173    // Create client instance
02174    XrdProofdClient *c = fMgr->ClientMgr()->GetClient(si.fUser.c_str(), si.fGroup.c_str(),
02175                                                      si.fUnixPath.c_str());
02176    if (!c) {
02177       TRACE(DBG, "client instance not initialized");
02178       return -1;
02179    }
02180 
02181    // Allocate the server object
02182    int psid = si.fID;
02183    XrdProofdProofServ *xps = c->GetServObj(psid);
02184    if (!xps) {
02185       TRACE(DBG, "server object not initialized");
02186       return -1;
02187    }
02188 
02189    // Fill info for this session
02190    si.FillProofServ(*xps, fMgr->ROOTMgr());
02191    if (xps->CreateUNIXSock(fEDest) != 0) {
02192       // Failure
02193       TRACE(XERR,"failure creating UNIX socket on " << xps->UNIXSockPath());
02194       xps->Reset();
02195       return -1;
02196    }
02197 
02198    // Set invalid as we are not yet connected
02199    xps->SetValid(0);
02200 
02201    // Add to the lists
02202    XrdSysMutexHelper mhp(fRecoverMutex);
02203    std::list<XpdClientSessions *>::iterator ii = fRecoverClients->begin();
02204    while (ii != fRecoverClients->end()) {
02205       if ((*ii)->fClient == c)
02206          break;
02207       ii++;
02208    }
02209    if (ii != fRecoverClients->end()) {
02210       (*ii)->fProofServs.push_back(xps);
02211    } else {
02212       XpdClientSessions *cl = new XpdClientSessions(c);
02213       cl->fProofServs.push_back(xps);
02214       fRecoverClients->push_back(cl);
02215    }
02216 
02217    // Done
02218    return 0;
02219 }
02220 
02221 //_________________________________________________________________________________
02222 int XrdProofdProofServMgr::Recover(XpdClientSessions *cl)
02223 {
02224    // Handle a request to recover a session after stop&restart for a specific client
02225    XPDLOC(SMGR, "ProofServMgr::Recover")
02226 
02227    if (!cl) {
02228       TRACE(XERR, "invalid input!");
02229       return 0;
02230    }
02231 
02232    TRACE(DBG,  "client: "<< cl->fClient->User());
02233 
02234    int nr = 0;
02235    XrdOucString emsg;
02236    XrdProofdProofServ *xps = 0;
02237    int nps = 0, npsref = 0;
02238    { XrdSysMutexHelper mhp(cl->fMutex); nps = cl->fProofServs.size(), npsref = nps; }
02239    while (nps--) {
02240 
02241       { XrdSysMutexHelper mhp(cl->fMutex); xps = cl->fProofServs.front();
02242         cl->fProofServs.remove(xps); cl->fProofServs.push_back(xps); }
02243 
02244       // Short steps of 1 sec
02245       if (Accept(xps, 1, emsg) != 0) {
02246          if (emsg == "timeout") {
02247             TRACE(DBG, "timeout while accepting callback");
02248          } else {
02249             TRACE(XERR, "problems accepting callback: "<<emsg);
02250          }
02251       } else {
02252          // Update the global session handlers
02253          XrdOucString key; key += xps->SrvPID();
02254          fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
02255          fActiveSessions.push_back(xps);
02256          xps->Protocol()->SetAdminPath(xps->AdminPath());
02257          // Remove from the temp list
02258          { XrdSysMutexHelper mhp(cl->fMutex); cl->fProofServs.remove(xps); }
02259          // Count
02260          nr++;
02261          // Notify
02262          if (TRACING(REQ)) {
02263             int pid = xps->SrvPID();
02264             int left = -1;
02265             { XrdSysMutexHelper mhp(cl->fMutex); left = cl->fProofServs.size(); }
02266             XPDPRT("session for "<<cl->fClient->User()<<"."<<cl->fClient->Group()<<
02267                    " successfully recovered ("<<left<<" left); pid: "<<pid);
02268          }
02269       }
02270    }
02271 
02272    // Over
02273    return nr;
02274 }
02275 
02276 //______________________________________________________________________________
02277 int XrdProofdProofServMgr::Accept(XrdProofdProofServ *xps,
02278                                   int to, XrdOucString &msg)
02279 {
02280    // Accept a callback from a starting-up server; return a pointer to the
02281    // attached session or 0.
02282    XPDLOC(SMGR, "ProofServMgr::Accept")
02283 
02284    // We will get back a peer to initialize a link
02285    XrdNetPeer peerpsrv;
02286    XrdLink   *linkpsrv = 0;
02287    XrdProtocol *xp = 0;
02288    int lnkopts = 0;
02289    bool go = 1;
02290 
02291    // Check inputs
02292    if (!xps || !xps->UNIXSock()) {
02293       TRACE(XERR, "session pointer undefined or socket invalid: "<<xps);
02294       return -1;
02295    }
02296    TRACE(REQ, "waiting for server callback for "<<to<<" secs ... on "<<xps->UNIXSockPath());
02297 
02298    // Perform regular accept
02299    if (go && !(xps->UNIXSock()->Accept(peerpsrv, XRDNET_NODNTRIM, to))) {
02300       msg = "timeout";
02301       go = 0;
02302    }
02303    if (go) {
02304       // Make sure we have the full host name
02305       if (peerpsrv.InetName) {
02306          char *ptmp = peerpsrv.InetName;
02307          peerpsrv.InetName = XrdNetDNS::getHostName("localhost");
02308          free(ptmp);
02309       }
02310    }
02311 
02312    // Allocate a new network object
02313    if (go && !(linkpsrv = XrdLink::Alloc(peerpsrv, lnkopts))) {
02314       msg = "could not allocate network object: ";
02315       go = 0;
02316    }
02317 
02318    if (go) {
02319       // Keep buffer after object goes away
02320       peerpsrv.InetBuff = 0;
02321       TRACE(DBG, "accepted connection from " << peerpsrv.InetName);
02322       // Get a protocol object off the stack (if none, allocate a new one)
02323       XrdProofdProtocol *p = new XrdProofdProtocol();
02324       if (!(xp = p->Match(linkpsrv))) {
02325          msg = "match failed: protocol error: ";
02326          go = 0;
02327       }
02328       delete p;
02329    }
02330 
02331    if (go) {
02332       // Save path into the protocol instance: it may be needed during Process
02333       XrdOucString apath(xps->AdminPath());
02334       apath += ".status";
02335       ((XrdProofdProtocol *)xp)->SetAdminPath(apath.c_str());
02336       // Take a short-cut and process the initial request as a sticky request
02337       if (xp->Process(linkpsrv) != 0) {
02338          msg = "handshake with internal link failed: ";
02339          go = 0;
02340       }
02341    }
02342 
02343    // Attach this link to the appropriate poller and enable it.
02344    if (go && !XrdPoll::Attach(linkpsrv)) {
02345       msg = "could not attach new internal link to poller: ";
02346       go = 0;
02347    }
02348 
02349    if (!go) {
02350       // Close the link
02351       if (linkpsrv)
02352          linkpsrv->Close();
02353       return -1;
02354    }
02355 
02356    // Tight this protocol instance to the link
02357    linkpsrv->setProtocol(xp);
02358 
02359    TRACE(REQ, "Protocol "<<xp<<" attached to link "<<linkpsrv<<" ("<< peerpsrv.InetName <<")");
02360 
02361    // Schedule it
02362    fMgr->Sched()->Schedule((XrdJob *)linkpsrv);
02363 
02364    // Save the protocol in the session instance
02365    xps->SetProtocol((XrdProofdProtocol *)xp);
02366 
02367    // Done
02368    return 0;
02369 }
02370 
02371 //______________________________________________________________________________
02372 int XrdProofdProofServMgr::Detach(XrdProofdProtocol *p)
02373 {
02374    // Handle a request to detach from an existing session
02375    XPDLOC(SMGR, "ProofServMgr::Detach")
02376 
02377    int psid = -1, rc = 0;
02378    XPD_SETRESP(p, "Detach");
02379 
02380    // Unmarshall the data
02381    psid = ntohl(p->Request()->proof.sid);
02382    TRACEP(p, REQ, "psid: "<<psid);
02383 
02384    // Find server session
02385    XrdProofdProofServ *xps = 0;
02386    if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
02387       TRACEP(p, XERR, "session ID not found: "<<psid);
02388       response->Send(kXR_InvalidRequest,"session ID not found");
02389       return 0;
02390    }
02391    xps->FreeClientID(p->Pid());
02392 
02393    // Notify to user
02394    response->Send();
02395 
02396    return 0;
02397 }
02398 
02399 //______________________________________________________________________________
02400 int XrdProofdProofServMgr::Destroy(XrdProofdProtocol *p)
02401 {
02402    // Handle a request to shutdown an existing session
02403    XPDLOC(SMGR, "ProofServMgr::Destroy")
02404 
02405    int psid = -1, rc = 0;
02406    XPD_SETRESP(p, "Destroy");
02407 
02408    // Unmarshall the data
02409    psid = ntohl(p->Request()->proof.sid);
02410    TRACEP(p, REQ, "psid: "<<psid);
02411 
02412    XrdOucString msg;
02413 
02414    // Find server session
02415    XrdProofdProofServ *xpsref = 0;
02416    if (psid > -1) {
02417       // Request for a specific session
02418       if (!p->Client() || !(xpsref = p->Client()->GetServer(psid))) {
02419          TRACEP(p, XERR, "reference session ID not found");
02420          response->Send(kXR_InvalidRequest,"reference session ID not found");
02421          return 0;
02422       }
02423       XPDFORM(msg, "session %d destroyed by %s", xpsref->SrvPID(), p->Link()->ID);
02424    } else {
02425       XPDFORM(msg, "all sessions destroyed by %s", p->Link()->ID);
02426    }
02427 
02428    // Terminate the servers
02429    p->Client()->TerminateSessions(kXPD_AnyServer, xpsref,
02430                                   msg.c_str(), Pipe(), fMgr->ChangeOwn());
02431 
02432    // Notify to user
02433    response->Send();
02434 
02435    // Over
02436    return 0;
02437 }
02438 
02439 //__________________________________________________________________________
02440 static int WriteSessEnvs(const char *, XpdEnv *env, void *s)
02441 {
02442    // Run thorugh entries to broadcast the relevant priority
02443    XPDLOC(SMGR, "WriteSessEnvs")
02444 
02445    XrdOucString emsg;
02446    
02447    XpdWriteEnv_t *xwe = (XpdWriteEnv_t *)s;  
02448 
02449    if (env && xwe && xwe->fMgr && xwe->fClient &&  xwe->fEnv) {
02450       if (env->fEnv.length() > 0) {
02451          // Resolve keywords
02452          xwe->fMgr->ResolveKeywords(env->fEnv, xwe->fClient);
02453          // Set the env now
02454          char *ev = new char[env->fEnv.length()+1];
02455          strncpy(ev, env->fEnv.c_str(), env->fEnv.length());
02456          ev[env->fEnv.length()] = 0;
02457          putenv(ev);
02458          fprintf(xwe->fEnv, "%s\n", ev);
02459          TRACE(DBG, ev);
02460       }
02461       // Go to next
02462       return 0;
02463    } else {
02464       emsg = "some input undefined";
02465    }
02466 
02467    // Some problem
02468    TRACE(XERR,"protocol error: "<<emsg);
02469    return 1;
02470 }
02471 
02472 //______________________________________________________________________________
02473 int XrdProofdProofServMgr::SetProofServEnvOld(XrdProofdProtocol *p, void *input)
02474 {
02475    // Set environment for proofserv; old version preparing the environment for
02476    // proofserv protocol version <= 13. Needed for backward compatibility.
02477    XPDLOC(SMGR, "ProofServMgr::SetProofServEnvOld")
02478 
02479    char *ev = 0;
02480 
02481    // Check inputs
02482    if (!p || !p->Client() || !input) {
02483       TRACE(XERR, "at leat one input is invalid - cannot continue");
02484       return -1;
02485    }
02486 
02487    // Set basic environment for proofserv
02488    if (SetProofServEnv(fMgr, p->Client()->ROOT()) != 0) {
02489       TRACE(XERR, "problems setting basic environment - exit");
02490       return -1;
02491    }
02492 
02493    ProofServEnv_t *in = (ProofServEnv_t *)input;
02494 
02495    // Session proxy
02496    XrdProofdProofServ *xps = in->fPS;
02497    if (!xps) {
02498       TRACE(XERR, "unable to get instance of proofserv proxy");
02499       return -1;
02500    }
02501    int psid = xps->ID();
02502    TRACE(REQ,  "psid: "<<psid<<", log: "<<in->fLogLevel);
02503 
02504    // Work directory
02505    XrdOucString udir = p->Client()->Sandbox()->Dir();
02506    TRACE(DBG, "working dir for "<<p->Client()->User()<<" is: "<<udir);
02507 
02508    ev = new char[strlen("ROOTPROOFSESSDIR=") + in->fWrkDir.length() + 2];
02509    sprintf(ev, "ROOTPROOFSESSDIR=%s", in->fWrkDir.c_str());
02510    putenv(ev);
02511    TRACE(DBG, ev);
02512 
02513    // Log level
02514    ev = new char[strlen("ROOTPROOFLOGLEVEL=")+5];
02515    sprintf(ev, "ROOTPROOFLOGLEVEL=%d", in->fLogLevel);
02516    putenv(ev);
02517    TRACE(DBG, ev);
02518 
02519    // Ordinal number
02520    ev = new char[strlen("ROOTPROOFORDINAL=")+strlen(xps->Ordinal())+2];
02521    sprintf(ev, "ROOTPROOFORDINAL=%s", xps->Ordinal());
02522    putenv(ev);
02523    TRACE(DBG, ev);
02524 
02525    // ROOT Version tag if not the default one
02526    ev = new char[strlen("ROOTVERSIONTAG=")+strlen(p->Client()->ROOT()->Tag())+2];
02527    sprintf(ev, "ROOTVERSIONTAG=%s", p->Client()->ROOT()->Tag());
02528    putenv(ev);
02529    TRACE(DBG, ev);
02530 
02531    // Create the env file
02532    TRACE(DBG, "creating env file");
02533    XrdOucString envfile = in->fWrkDir;
02534    envfile += ".env";
02535    FILE *fenv = fopen(envfile.c_str(), "w");
02536    if (!fenv) {
02537       TRACE(XERR,
02538                   "unable to open env file: "<<envfile);
02539       return -1;
02540    }
02541    TRACE(DBG, "environment file: "<< envfile);
02542 
02543    // Forwarded sec credentials, if any
02544    if (p->AuthProt()) {
02545 
02546       // Additional envs possibly set by the protocol for next application
02547       XrdOucString secenvs(getenv("XrdSecENVS"));
02548       if (secenvs.length() > 0) {
02549          // Go through the list
02550          XrdOucString env;
02551          int from = 0;
02552          while ((from = secenvs.tokenize(env, from, ',')) != -1) {
02553             if (env.length() > 0) {
02554                // Set the env now
02555                ev = new char[env.length()+1];
02556                strncpy(ev, env.c_str(), env.length());
02557                ev[env.length()] = 0;
02558                putenv(ev);
02559                fprintf(fenv, "%s\n", ev);
02560                TRACE(DBG, ev);
02561             }
02562          }
02563       }
02564 
02565       // The credential buffer, if any
02566       XrdSecCredentials *creds = p->AuthProt()->getCredentials();
02567       if (creds) {
02568          int lev = strlen("XrdSecCREDS=")+creds->size;
02569          ev = new char[lev+1];
02570          strcpy(ev, "XrdSecCREDS=");
02571          memcpy(ev+strlen("XrdSecCREDS="), creds->buffer, creds->size);
02572          ev[lev] = 0;
02573          putenv(ev);
02574          TRACE(DBG, "XrdSecCREDS set");
02575 
02576          // If 'pwd', save AFS key, if any
02577          if (!strncmp(p->AuthProt()->Entity.prot, "pwd", 3)) {
02578             XrdOucString credsdir = udir;
02579             credsdir += "/.creds";
02580             // Make sure the directory exists
02581             if (!XrdProofdAux::AssertDir(credsdir.c_str(), p->Client()->UI(), fMgr->ChangeOwn())) {
02582                if (SaveAFSkey(creds, credsdir.c_str(), p->Client()->UI()) == 0) {
02583                   ev = new char[strlen("ROOTPROOFAFSCREDS=")+credsdir.length()+strlen("/.afs")+2];
02584                   sprintf(ev, "ROOTPROOFAFSCREDS=%s/.afs", credsdir.c_str());
02585                   putenv(ev);
02586                   fprintf(fenv, "ROOTPROOFAFSCREDS has been set\n");
02587                   TRACE(DBG, ev);
02588                } else {
02589                   TRACE(DBG, "problems in saving AFS key");
02590                }
02591             } else {
02592                TRACE(XERR, "unable to create creds dir: "<<credsdir);
02593                return -1;
02594             }
02595          }
02596       }
02597    }
02598 
02599    // Set ROOTSYS
02600    fprintf(fenv, "ROOTSYS=%s\n", xps->ROOT()->Dir());
02601 
02602    // Set conf dir
02603    fprintf(fenv, "ROOTCONFDIR=%s\n", xps->ROOT()->Dir());
02604 
02605    // Set TMPDIR
02606    fprintf(fenv, "ROOTTMPDIR=%s\n", fMgr->TMPdir());
02607 
02608    // Port (really needed?)
02609    fprintf(fenv, "ROOTXPDPORT=%d\n", fMgr->Port());
02610 
02611    // Work dir
02612    fprintf(fenv, "ROOTPROOFWORKDIR=%s\n", udir.c_str());
02613 
02614    // Session tag
02615    fprintf(fenv, "ROOTPROOFSESSIONTAG=%s\n", in->fSessionTag.c_str());
02616 
02617    // Whether user specific config files are enabled
02618    if (fMgr->NetMgr()->WorkerUsrCfg())
02619       fprintf(fenv, "ROOTUSEUSERCFG=1\n");
02620 
02621    // Set Open socket
02622    fprintf(fenv, "ROOTOPENSOCK=%s\n", xps->UNIXSockPath());
02623 
02624    // Entity
02625    fprintf(fenv, "ROOTENTITY=%s@%s\n", p->Client()->User(), p->Link()->Host());
02626 
02627    // Session ID
02628    fprintf(fenv, "ROOTSESSIONID=%d\n", psid);
02629 
02630    // Client ID
02631    fprintf(fenv, "ROOTCLIENTID=%d\n", p->CID());
02632 
02633    // Client Protocol
02634    fprintf(fenv, "ROOTPROOFCLNTVERS=%d\n", p->ProofProtocol());
02635 
02636    // Ordinal number
02637    fprintf(fenv, "ROOTPROOFORDINAL=%s\n", xps->Ordinal());
02638 
02639    // ROOT version tag if different from the default one
02640    if (getenv("ROOTVERSIONTAG"))
02641       fprintf(fenv, "ROOTVERSIONTAG=%s\n", getenv("ROOTVERSIONTAG"));
02642 
02643    // Config file
02644    if (in->fCfg.length() > 0)
02645       fprintf(fenv, "ROOTPROOFCFGFILE=%s\n", in->fCfg.c_str());
02646 
02647    // Log file in the log dir
02648    fprintf(fenv, "ROOTPROOFLOGFILE=%s\n", in->fLogFile.c_str());
02649    xps->SetFileout(in->fLogFile.c_str());
02650 
02651    // Additional envs (xpd.putenv directive)
02652    {  XrdSysMutexHelper mhp(fEnvsMutex);
02653       if (fProofServEnvs.size() > 0) {
02654          // Hash list of the directives applying to this {user, group, svn, version}
02655          XrdOucHash<XpdEnv> sessenvs;
02656          std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
02657          for ( ; ienvs != fProofServEnvs.end(); ienvs++) {
02658             int envmatch = (*ienvs).Matches(p->Client()->User(), p->Client()->Group(),
02659                                             p->Client()->ROOT()->SvnRevision(),
02660                                             p->Client()->ROOT()->VersionCode());
02661             if (envmatch >= 0) {
02662                XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
02663                if (env) {
02664                   int envmtcex = env->Matches(p->Client()->User(), p->Client()->Group(),
02665                                               p->Client()->ROOT()->SvnRevision(),
02666                                               p->Client()->ROOT()->VersionCode());
02667                   if (envmatch > envmtcex) {
02668                      // Replace the entry
02669                      env = &(*ienvs);
02670                      sessenvs.Rep(env->fName.c_str(), env, 0, Hash_keepdata);
02671                   }
02672                } else {
02673                   // Add an entry
02674                   env = &(*ienvs);
02675                   sessenvs.Add(env->fName.c_str(), env, 0, Hash_keepdata);
02676                }
02677                TRACE(HDBG, "Adding: "<<(*ienvs).fEnv);
02678             }
02679          }
02680          XpdWriteEnv_t xpwe = {fMgr, p->Client(), fenv};
02681          sessenvs.Apply(WriteSessEnvs, (void *)&xpwe);
02682       }
02683    }
02684 
02685    // Set the user envs
02686    if (xps->UserEnvs() &&
02687        strlen(xps->UserEnvs()) && strstr(xps->UserEnvs(),"=")) {
02688       // The single components
02689       XrdOucString ue = xps->UserEnvs();
02690       XrdOucString env, namelist;
02691       int from = 0, ieq = -1;
02692       while ((from = ue.tokenize(env, from, ',')) != -1) {
02693          if (env.length() > 0 && (ieq = env.find('=')) != -1) {
02694             // Resolve keywords
02695             ResolveKeywords(env, in);
02696             ev = new char[env.length()+1];
02697             strncpy(ev, env.c_str(), env.length());
02698             ev[env.length()] = 0;
02699             putenv(ev);
02700             fprintf(fenv, "%s\n", ev);
02701             TRACE(DBG, ev);
02702             env.erase(ieq);
02703             if (namelist.length() > 0)
02704                namelist += ',';
02705             namelist += env;
02706          }
02707       }
02708       // The list of names, ','-separated
02709       ev = new char[strlen("PROOF_ALLVARS=") + namelist.length() + 2];
02710       sprintf(ev, "PROOF_ALLVARS=%s", namelist.c_str());
02711       putenv(ev);
02712       fprintf(fenv, "%s\n", ev);
02713       TRACE(DBG, ev);
02714    }
02715 
02716    // Close file
02717    fclose(fenv);
02718 
02719    // Create or Update symlink to last session
02720    TRACE(DBG, "creating symlink");
02721    XrdOucString syml = udir;
02722    if (p->ConnType() == kXPD_MasterWorker)
02723       syml += "/last-worker-session";
02724    else
02725       syml += "/last-master-session";
02726    if (XrdProofdAux::SymLink(in->fSessionDir.c_str(), syml.c_str()) != 0) {
02727       TRACE(XERR, "problems creating symlink to last session (errno: "<<errno<<")");
02728    }
02729 
02730    // We are done
02731    TRACE(DBG, "done");
02732    return 0;
02733 }
02734 
02735 //______________________________________________________________________________
02736 int XrdProofdProofServMgr::SetProofServEnv(XrdProofdManager *mgr, XrdROOT *r)
02737 {
02738    // Set basic environment accordingly to 'r'
02739    XPDLOC(SMGR, "ProofServMgr::SetProofServEnv")
02740 
02741    char *ev = 0;
02742 
02743    TRACE(REQ,  "ROOT dir: "<< (r ? r->Dir() : "*** undef ***"));
02744 
02745    if (r) {
02746       char *libdir = (char *) r->LibDir();
02747       char *ldpath = 0;
02748       if (mgr->BareLibPath() && strlen(mgr->BareLibPath()) > 0) {
02749          ldpath = new char[32 + strlen(libdir) + strlen(mgr->BareLibPath())];
02750          sprintf(ldpath, "%s=%s:%s", XPD_LIBPATH, libdir, mgr->BareLibPath());
02751       } else {
02752          ldpath = new char[32 + strlen(libdir)];
02753          sprintf(ldpath, "%s=%s", XPD_LIBPATH, libdir);
02754       }
02755       putenv(ldpath);
02756       // Set ROOTSYS
02757       char *rootsys = (char *) r->Dir();
02758       ev = new char[15 + strlen(rootsys)];
02759       sprintf(ev, "ROOTSYS=%s", rootsys);
02760       putenv(ev);
02761 
02762       // Set bin directory
02763       char *bindir = (char *) r->BinDir();
02764       ev = new char[15 + strlen(bindir)];
02765       sprintf(ev, "ROOTBINDIR=%s", bindir);
02766       putenv(ev);
02767 
02768       // Set conf dir
02769       char *confdir = (char *) r->DataDir();
02770       ev = new char[20 + strlen(confdir)];
02771       sprintf(ev, "ROOTCONFDIR=%s", confdir);
02772       putenv(ev);
02773 
02774       // Set TMPDIR
02775       ev = new char[20 + strlen(mgr->TMPdir())];
02776       sprintf(ev, "TMPDIR=%s", mgr->TMPdir());
02777       putenv(ev);
02778 
02779       // Done
02780       return 0;
02781    }
02782 
02783    // Bad input
02784    TRACE(XERR, "XrdROOT instance undefined!");
02785    return -1;
02786 }
02787 
02788 //______________________________________________________________________________
02789 void XrdProofdProofServMgr::GetTagDirs(XrdProofdProtocol *p, XrdProofdProofServ *xps,
02790                                        XrdOucString &sesstag, XrdOucString &topsesstag,
02791                                        XrdOucString &sessiondir, XrdOucString &sesswrkdir)
02792 {
02793    // Determine the unique tag and relevant dirs for this session
02794 
02795    // Client sandbox
02796    XrdOucString udir = p->Client()->Sandbox()->Dir();
02797 
02798    // Create the unique tag identify this session
02799    XrdOucString host = fMgr->Host();
02800    if (host.find(".") != STR_NPOS)
02801       host.erase(host.find("."));
02802    XPDFORM(sesstag, "%s-%d-%d", host.c_str(), (int)time(0), (int)getpid());
02803 
02804    // Session dir
02805    topsesstag = sesstag;
02806    sessiondir = udir;
02807    if (p->ConnType() == kXPD_ClientMaster) {
02808       sessiondir += "/session-";
02809       sessiondir += sesstag;
02810       xps->SetTag(sesstag.c_str());
02811    } else {
02812       sessiondir += "/";
02813       sessiondir += xps->Tag();
02814       topsesstag = xps->Tag();
02815       topsesstag.replace("session-","");
02816    }
02817 
02818    // Make sure the directory exists ...
02819    if (XrdProofdAux::AssertDir(sessiondir.c_str(), p->Client()->UI(),
02820                                fMgr->ChangeOwn()) == -1) {
02821       return;
02822    }
02823 
02824    // The session working dir depends on the role
02825    sesswrkdir = sessiondir;
02826    if (p->ConnType() == kXPD_MasterWorker) {
02827       XPDFORM(sesswrkdir, "%s/worker-%s-%s", sessiondir.c_str(), xps->Ordinal(), sesstag.c_str());
02828    } else {
02829       XPDFORM(sesswrkdir, "%s/master-%s-%s", sessiondir.c_str(), xps->Ordinal(), sesstag.c_str());
02830    }
02831 
02832    // Done
02833    return;
02834 }
02835 
02836 //__________________________________________________________________________
02837 static int WriteSessRCs(const char *, XpdEnv *erc, void *f)
02838 {
02839    // Run thorugh entries to broadcast the relevant priority
02840    XPDLOC(SMGR, "WriteSessRCs")
02841 
02842    XrdOucString emsg;
02843    FILE *frc = (FILE *)f;
02844    if (frc && erc) {
02845       XrdOucString rc = erc->fEnv;
02846       if (rc.length() > 0) {
02847          if (rc.find("Proof.DataSetManager") != STR_NPOS) {
02848             TRACE(ALL,"Proof.DataSetManager ignored: use xpd.datasetsrc to define dataset managers");
02849          } else {
02850             fprintf(frc, "%s\n", rc.c_str());
02851          }
02852       }
02853       // Go to next
02854       return 0;
02855    } else {
02856       emsg = "file or input entry undefined";
02857    }
02858 
02859    // Some problem
02860    TRACE(XERR,"protocol error: "<<emsg);
02861    return 1;
02862 }
02863 
02864 //______________________________________________________________________________
02865 int XrdProofdProofServMgr::SetProofServEnv(XrdProofdProtocol *p, void *input)
02866 {
02867    // Set environment for proofserv
02868    XPDLOC(SMGR, "ProofServMgr::SetProofServEnv")
02869 
02870    char *ev = 0;
02871 
02872    // Check inputs
02873    if (!p || !p->Client() || !input) {
02874       TRACE(XERR, "at leat one input is invalid - cannot continue");
02875       return -1;
02876    }
02877 
02878    // Old proofservs expect different settings
02879    int rootvers = p->Client()->ROOT() ? p->Client()->ROOT()->SrvProtVers() : -1;
02880    TRACE(DBG, "rootvers: "<< rootvers);
02881    if (rootvers < 14 && rootvers > -1)
02882       return SetProofServEnvOld(p, input);
02883 
02884    ProofServEnv_t *in = (ProofServEnv_t *)input;
02885 
02886    // Session proxy
02887    XrdProofdProofServ *xps = in->fPS;
02888    if (!xps) {
02889       TRACE(XERR, "unable to get instance of proofserv proxy");
02890       return -1;
02891    }
02892    int psid = xps->ID();
02893    TRACE(REQ,  "psid: "<<psid<<", log: "<<in->fLogLevel);
02894 
02895    // Client sandbox
02896    XrdOucString udir = p->Client()->Sandbox()->Dir();
02897    TRACE(DBG, "sandbox for "<<p->Client()->User()<<" is: "<<udir);
02898    TRACE(DBG, "session unique tag "<<in->fSessionTag);
02899    TRACE(DBG, "session dir " << in->fSessionDir);
02900    TRACE(DBG, "session working dir:" << in->fWrkDir);
02901 
02902    // Log into the session it
02903    if (XrdProofdAux::ChangeToDir(in->fSessionDir.c_str(), p->Client()->UI(),
02904                                  fMgr->ChangeOwn()) != 0) {
02905       TRACE(XERR, "couldn't change directory to " << in->fSessionDir);
02906       return -1;
02907    }
02908 
02909    // Set basic environment for proofserv
02910    if (SetProofServEnv(fMgr, p->Client()->ROOT()) != 0) {
02911       TRACE(XERR, "problems setting basic environment - exit");
02912       return -1;
02913    }
02914 
02915    // Create the rootrc and env files
02916    TRACE(DBG, "creating env file");
02917    XrdOucString rcfile = in->fWrkDir;
02918    rcfile += ".rootrc";
02919    FILE *frc = fopen(rcfile.c_str(), "w");
02920    if (!frc) {
02921       TRACE(XERR, "unable to open rootrc file: "<<rcfile);
02922       return -1;
02923    }
02924    // Symlink to session.rootrc
02925    if (XrdProofdAux::SymLink(rcfile.c_str(), "session.rootrc") != 0) {
02926       TRACE(XERR, "problems creating symlink to 'session.rootrc' (errno: "<<errno<<")");
02927    }
02928    TRACE(REQ, "session rootrc file: "<< rcfile);
02929 
02930    // Port
02931    fprintf(frc, "# XrdProofdProtocol listening port\n");
02932    fprintf(frc, "ProofServ.XpdPort: %d\n", fMgr->Port());
02933 
02934    // Local root prefix
02935    if (fMgr->LocalROOT() && strlen(fMgr->LocalROOT()) > 0) {
02936       fprintf(frc, "# Prefix to be prepended to local paths\n");
02937       fprintf(frc, "Path.Localroot: %s\n", fMgr->LocalROOT());
02938    }
02939 
02940    // Data pool entry-point URL
02941    if (fMgr->PoolURL() && strlen(fMgr->PoolURL()) > 0) {
02942       XrdOucString purl(fMgr->PoolURL());
02943       if (!purl.endswith("/"))
02944          purl += "/";
02945       fprintf(frc, "# URL for the data pool entry-point\n");
02946       fprintf(frc, "ProofServ.PoolUrl: %s\n", purl.c_str());
02947    }
02948 
02949    // The session working dir depends on the role
02950    fprintf(frc, "# The session working dir\n");
02951    fprintf(frc, "ProofServ.SessionDir: %s\n", in->fWrkDir.c_str());
02952 
02953    // Log / Debug level
02954    fprintf(frc, "# Proof Log/Debug level\n");
02955    fprintf(frc, "Proof.DebugLevel: %d\n", in->fLogLevel);
02956 
02957    // Ordinal number
02958    fprintf(frc, "# Ordinal number\n");
02959    fprintf(frc, "ProofServ.Ordinal: %s\n", xps->Ordinal());
02960 
02961    // ROOT Version tag
02962    if (p->Client()->ROOT()) {
02963       fprintf(frc, "# ROOT Version tag\n");
02964       fprintf(frc, "ProofServ.RootVersionTag: %s\n", p->Client()->ROOT()->Tag());
02965    }
02966    // Proof group
02967    if (p->Client()->Group()) {
02968       fprintf(frc, "# Proof group\n");
02969       fprintf(frc, "ProofServ.ProofGroup: %s\n", p->Client()->Group());
02970    }
02971 
02972    //  Path to file with group information
02973    if (fMgr->GroupsMgr() && fMgr->GroupsMgr()->GetCfgFile()) {
02974       fprintf(frc, "# File with group information\n");
02975       fprintf(frc, "Proof.GroupFile: %s\n", fMgr->GroupsMgr()->GetCfgFile());
02976    }
02977 
02978    // Work dir
02979    fprintf(frc, "# Users sandbox\n");
02980    fprintf(frc, "ProofServ.Sandbox: %s\n", udir.c_str());
02981 
02982    // Image
02983    if (fMgr->Image() && strlen(fMgr->Image()) > 0) {
02984       fprintf(frc, "# Server image\n");
02985       fprintf(frc, "ProofServ.Image: %s\n", fMgr->Image());
02986    }
02987 
02988    // Session tag
02989    fprintf(frc, "# Session tag\n");
02990    fprintf(frc, "ProofServ.SessionTag: %s\n", in->fTopSessionTag.c_str());
02991 
02992    // Session admin path
02993    fprintf(frc, "# Session admin path\n");
02994    int proofvrs = (p->Client()->ROOT()) ? p->Client()->ROOT()->SrvProtVers() : -1;
02995    if (proofvrs < 0 || proofvrs < 27) {
02996       // Use the first version of the session status file
02997       fprintf(frc, "ProofServ.AdminPath: %s\n", xps->AdminPath());
02998    } else {
02999       // New version with updated status
03000       fprintf(frc, "ProofServ.AdminPath: %s.status\n", xps->AdminPath());
03001    }
03002 
03003    // Whether user specific config files are enabled
03004    if (fMgr->NetMgr()->WorkerUsrCfg()) {
03005       fprintf(frc, "# Whether user specific config files are enabled\n");
03006       fprintf(frc, "ProofServ.UseUserCfg: 1\n");
03007    }
03008    // Set Open socket
03009    fprintf(frc, "# Open socket\n");
03010    fprintf(frc, "ProofServ.OpenSock: %s\n", xps->UNIXSockPath());
03011    // Entity
03012    fprintf(frc, "# Entity\n");
03013    if (p->Client()->UI().fGroup.length() > 0)
03014       fprintf(frc, "ProofServ.Entity: %s:%s@%s\n",
03015               p->Client()->User(), p->Client()->UI().fGroup.c_str(), p->Link()->Host());
03016    else
03017       fprintf(frc, "ProofServ.Entity: %s@%s\n", p->Client()->User(), p->Link()->Host());
03018 
03019 
03020    // Session ID
03021    fprintf(frc, "# Session ID\n");
03022    fprintf(frc, "ProofServ.SessionID: %d\n", psid);
03023 
03024    // Client ID
03025    fprintf(frc, "# Client ID\n");
03026    fprintf(frc, "ProofServ.ClientID: %d\n", p->CID());
03027 
03028    // Client Protocol
03029    fprintf(frc, "# Client Protocol\n");
03030    fprintf(frc, "ProofServ.ClientVersion: %d\n", p->ProofProtocol());
03031 
03032    // Config file
03033    if (in->fCfg.length() > 0) {
03034       if (in->fCfg == "masteronly") {
03035          fprintf(frc, "# MasterOnly option\n");
03036          // Master Only setup
03037          fprintf(frc, "Proof.MasterOnly: 1\n");
03038       } else {
03039          fprintf(frc, "# Config file\n");
03040          // User defined
03041          fprintf(frc, "ProofServ.ProofConfFile: %s\n", in->fCfg.c_str());
03042       }
03043    } else {
03044       fprintf(frc, "# Config file\n");
03045       if (fMgr->IsSuperMst()) {
03046          fprintf(frc, "# Config file\n");
03047          fprintf(frc, "ProofServ.ProofConfFile: sm:\n");
03048       } else if (fProofPlugin.length() > 0) {
03049          fprintf(frc, "# Config file\n");
03050          fprintf(frc, "ProofServ.ProofConfFile: %s\n", fProofPlugin.c_str());
03051       }
03052    }
03053 
03054    // We set this to avoid blocking to much on xrdclient actions; they can be
03055    // oevrwritten with explicit putrc directives
03056    fprintf(frc, "# Default settings for XrdClient\n");
03057    fprintf(frc, "XNet.FirstConnectMaxCnt 3\n");
03058    fprintf(frc, "XNet.ConnectTimeout     5\n");
03059 
03060    // This is a workaround for a problem fixed in 5.24/00
03061    int vrscode = (p->Client()->ROOT()) ? p->Client()->ROOT()->VersionCode() : -1;
03062    if (vrscode > 0 && vrscode < XrdROOT::GetVersionCode(5,24,0)) {
03063       fprintf(frc, "# Force remote reading also for local files to avoid a wrong TTreeCache initialization\n");
03064       fprintf(frc, "Path.ForceRemote 1\n");
03065    }
03066 
03067    // Additional rootrcs (xpd.putrc directive)
03068    {  XrdSysMutexHelper mhp(fEnvsMutex);
03069       if (fProofServRCs.size() > 0) {
03070          fprintf(frc, "# Additional rootrcs (xpd.putrc directives)\n");
03071          // Hash list of the directives applying to this {user, group, svn, version}
03072          XrdOucHash<XpdEnv> sessrcs;
03073          std::list<XpdEnv>::iterator ircs = fProofServRCs.begin();
03074          for ( ; ircs != fProofServRCs.end(); ircs++) {
03075             int rcmatch = (*ircs).Matches(p->Client()->User(), p->Client()->Group(),
03076                                           p->Client()->ROOT()->SvnRevision(),
03077                                           p->Client()->ROOT()->VersionCode());
03078             if (rcmatch >= 0) {
03079                XpdEnv *rcenv = sessrcs.Find((*ircs).fName.c_str());
03080                if (rcenv) {
03081                   int rcmtcex = rcenv->Matches(p->Client()->User(), p->Client()->Group(),
03082                                                p->Client()->ROOT()->SvnRevision(),
03083                                                p->Client()->ROOT()->VersionCode());
03084                   if (rcmatch > rcmtcex) {
03085                      // Replace the entry
03086                      rcenv = &(*ircs);
03087                      sessrcs.Rep(rcenv->fName.c_str(), rcenv, 0, Hash_keepdata);
03088                   }
03089                } else {
03090                   // Add an entry
03091                   rcenv = &(*ircs);
03092                   sessrcs.Add(rcenv->fName.c_str(), rcenv, 0, Hash_keepdata);
03093                }
03094                TRACE(HDBG, "Adding: "<<(*ircs).fEnv);
03095             }
03096          }
03097          sessrcs.Apply(WriteSessRCs, (void *)frc);
03098       }
03099    }
03100    // If applicable, add dataset managers initiators
03101    if (fMgr->DataSetSrcs()->size() > 0) {
03102       fprintf(frc, "# Dataset sources\n");
03103       XrdOucString rc("Proof.DataSetManager: ");
03104       std::list<XrdProofdDSInfo *>::iterator ii;
03105       for (ii = fMgr->DataSetSrcs()->begin(); ii != fMgr->DataSetSrcs()->end(); ii++) {
03106          if (ii != fMgr->DataSetSrcs()->begin()) rc += ", ";
03107          rc += (*ii)->fType;
03108          rc += " dir:";
03109          rc += (*ii)->fUrl;
03110          rc += " opt:";
03111          rc += (*ii)->fOpts;
03112       }
03113       fprintf(frc, "%s\n", rc.c_str());
03114    }
03115 
03116    // If applicable, add datadir location
03117    if (fMgr->DataDir() && strlen(fMgr->DataDir()) > 0) {
03118       fprintf(frc, "# Data directory\n");
03119       XrdOucString rc;
03120       XPDFORM(rc, "ProofServ.DataDir: %s/%s/%s/%s/%s", fMgr->DataDir(),
03121                   p->Client()->Group(), p->Client()->User(), xps->Ordinal(),
03122                   in->fSessionTag.c_str());
03123       fprintf(frc, "%s\n", rc.c_str());
03124    }
03125 
03126    // Done with this
03127    fclose(frc);
03128 
03129    // Now save the exported env variables, for the record
03130    XrdOucString envfile = in->fWrkDir;
03131    envfile += ".env";
03132    FILE *fenv = fopen(envfile.c_str(), "w");
03133    if (!fenv) {
03134       TRACE(XERR, "unable to open env file: "<<envfile);
03135       return -1;
03136    }
03137    TRACE(REQ, "environment file: "<< envfile);
03138 
03139    // Forwarded sec credentials, if any
03140    if (p->AuthProt()) {
03141 
03142       // Additional envs possibly set by the protocol for next application
03143       XrdOucString secenvs(getenv("XrdSecENVS"));
03144       if (secenvs.length() > 0) {
03145          // Go through the list
03146          XrdOucString env;
03147          int from = 0;
03148          while ((from = secenvs.tokenize(env, from, ',')) != -1) {
03149             if (env.length() > 0) {
03150                // Set the env now
03151                ev = new char[env.length()+1];
03152                strncpy(ev, env.c_str(), env.length());
03153                ev[env.length()] = 0;
03154                putenv(ev);
03155                fprintf(fenv, "%s\n", ev);
03156                TRACE(DBG, ev);
03157             }
03158          }
03159       }
03160 
03161       // The credential buffer, if any
03162       XrdSecCredentials *creds = p->AuthProt()->getCredentials();
03163       if (creds) {
03164          int lev = strlen("XrdSecCREDS=")+creds->size;
03165          ev = new char[lev+1];
03166          strcpy(ev, "XrdSecCREDS=");
03167          memcpy(ev+strlen("XrdSecCREDS="), creds->buffer, creds->size);
03168          ev[lev] = 0;
03169          putenv(ev);
03170          TRACE(DBG, "XrdSecCREDS set");
03171 
03172          // If 'pwd', save AFS key, if any
03173          if (!strncmp(p->AuthProt()->Entity.prot, "pwd", 3)) {
03174             XrdOucString credsdir = udir;
03175             credsdir += "/.creds";
03176             // Make sure the directory exists
03177             if (!XrdProofdAux::AssertDir(credsdir.c_str(), p->Client()->UI(), fMgr->ChangeOwn())) {
03178                if (SaveAFSkey(creds, credsdir.c_str(), p->Client()->UI()) == 0) {
03179                   ev = new char[strlen("ROOTPROOFAFSCREDS=")+credsdir.length()+strlen("/.afs")+2];
03180                   sprintf(ev, "ROOTPROOFAFSCREDS=%s/.afs", credsdir.c_str());
03181                   putenv(ev);
03182                   fprintf(fenv, "ROOTPROOFAFSCREDS has been set\n");
03183                   TRACE(DBG, ev);
03184                } else {
03185                   TRACE(DBG, "problems in saving AFS key");
03186                }
03187             } else {
03188                TRACE(XERR, "unable to create creds dir: "<<credsdir);
03189                return -1;
03190             }
03191          }
03192       }
03193    }
03194 
03195    // Library path
03196    fprintf(fenv, "%s=%s\n", XPD_LIBPATH, getenv(XPD_LIBPATH));
03197 
03198    // ROOTSYS
03199    fprintf(fenv, "ROOTSYS=%s\n", xps->ROOT()->Dir());
03200 
03201    // Conf dir
03202    fprintf(fenv, "ROOTCONFDIR=%s\n", xps->ROOT()->Dir());
03203 
03204    // TMPDIR
03205    fprintf(fenv, "TMPDIR=%s\n", fMgr->TMPdir());
03206 
03207    // RC file
03208    ev = new char[strlen("ROOTRCFILE=")+rcfile.length()+2];
03209    sprintf(ev, "ROOTRCFILE=%s", rcfile.c_str());
03210    putenv(ev);
03211    fprintf(fenv, "%s\n", ev);
03212    TRACE(DBG, ev);
03213 
03214    // ROOT version tag (needed in building packages)
03215    ev = new char[strlen("ROOTVERSIONTAG=")+strlen(p->Client()->ROOT()->Tag())+2];
03216    sprintf(ev, "ROOTVERSIONTAG=%s", p->Client()->ROOT()->Tag());
03217    putenv(ev);
03218    fprintf(fenv, "%s\n", ev);
03219    TRACE(DBG, ev);
03220 
03221    // Log file in the log dir
03222    ev = new char[strlen("ROOTPROOFLOGFILE=") + in->fLogFile.length() + 2];
03223    sprintf(ev, "ROOTPROOFLOGFILE=%s", in->fLogFile.c_str());
03224    putenv(ev);
03225    fprintf(fenv, "%s\n", ev);
03226    xps->SetFileout(in->fLogFile.c_str());
03227    TRACE(DBG, ev);
03228 
03229    // Local data server
03230    XrdOucString locdatasrv("root://");
03231    locdatasrv += fMgr->Host();
03232    ev = new char[strlen("LOCALDATASERVER=") + locdatasrv.length() + 2];
03233    sprintf(ev, "LOCALDATASERVER=%s", locdatasrv.c_str());
03234    putenv(ev);
03235    fprintf(fenv, "%s\n", ev);
03236    TRACE(DBG, ev);
03237 
03238    // Xrootd config file
03239    if (CfgFile()) {
03240       ev = new char[strlen("XRDCF=")+strlen(CfgFile())+2];
03241       sprintf(ev, "XRDCF=%s", CfgFile());
03242       putenv(ev);
03243       fprintf(fenv, "%s\n", ev);
03244       TRACE(DBG, ev);
03245    }
03246 
03247    // Additional envs (xpd.putenv directive)
03248    {  XrdSysMutexHelper mhp(fEnvsMutex);
03249       if (fProofServEnvs.size() > 0) {
03250          // Hash list of the directives applying to this {user, group, svn, version}
03251          XrdOucHash<XpdEnv> sessenvs;
03252          std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
03253          for ( ; ienvs != fProofServEnvs.end(); ienvs++) {
03254             int envmatch = (*ienvs).Matches(p->Client()->User(), p->Client()->Group(),
03255                                             p->Client()->ROOT()->SvnRevision(),
03256                                             p->Client()->ROOT()->VersionCode());
03257             if (envmatch >= 0) {
03258                XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
03259                if (env) {
03260                   int envmtcex = env->Matches(p->Client()->User(), p->Client()->Group(),
03261                                               p->Client()->ROOT()->SvnRevision(),
03262                                               p->Client()->ROOT()->VersionCode());
03263                   if (envmatch > envmtcex) {
03264                      // Replace the entry
03265                      env = &(*ienvs);
03266                      sessenvs.Rep(env->fName.c_str(), env, 0, Hash_keepdata);
03267                   }
03268                } else {
03269                   // Add an entry
03270                   env = &(*ienvs);
03271                   sessenvs.Add(env->fName.c_str(), env, 0, Hash_keepdata);
03272                }
03273                TRACE(HDBG, "Adding: "<<(*ienvs).fEnv);
03274             }
03275          }
03276          XpdWriteEnv_t xpwe = {fMgr, p->Client(), fenv};
03277          sessenvs.Apply(WriteSessEnvs, (void *)&xpwe);
03278       }
03279    }
03280    // Set the user envs
03281    if (xps->UserEnvs() &&
03282        strlen(xps->UserEnvs()) && strstr(xps->UserEnvs(),"=")) {
03283       // The single components
03284       XrdOucString ue = xps->UserEnvs();
03285       XrdOucString env, namelist;
03286       int from = 0, ieq = -1;
03287       while ((from = ue.tokenize(env, from, ',')) != -1) {
03288          if (env.length() > 0 && (ieq = env.find('=')) != -1) {
03289             // Resolve keywords
03290             ResolveKeywords(env, in);
03291             ev = new char[env.length()+1];
03292             strncpy(ev, env.c_str(), env.length());
03293             ev[env.length()] = 0;
03294             putenv(ev);
03295             fprintf(fenv, "%s\n", ev);
03296             TRACE(DBG, ev);
03297             env.erase(ieq);
03298             if (namelist.length() > 0)
03299                namelist += ',';
03300             namelist += env;
03301          }
03302       }
03303       // The list of names, ','-separated
03304       ev = new char[strlen("PROOF_ALLVARS=") + namelist.length() + 2];
03305       sprintf(ev, "PROOF_ALLVARS=%s", namelist.c_str());
03306       putenv(ev);
03307       fprintf(fenv, "%s\n", ev);
03308       TRACE(DBG, ev);
03309    }
03310 
03311    // Close file
03312    fclose(fenv);
03313 
03314    // Create or Update symlink to last session
03315    TRACE(REQ, "creating symlink");
03316    XrdOucString syml = udir;
03317    if (p->ConnType() == kXPD_MasterWorker)
03318       syml += "/last-worker-session";
03319    else
03320       syml += "/last-master-session";
03321    if (XrdProofdAux::SymLink(in->fSessionDir.c_str(), syml.c_str()) != 0) {
03322       TRACE(XERR, "problems creating symlink to "
03323                   " last session (errno: "<<errno<<")");
03324    }
03325 
03326    // We are done
03327    TRACE(REQ, "done");
03328    return 0;
03329 }
03330 
03331 //______________________________________________________________________________
03332 int XrdProofdProofServMgr::CleanupLostProofServ()
03333 {
03334    // Cleanup (kill) all 'proofserv' processes which lost control from their
03335    // creator or controller daemon. We rely here on the information in the admin
03336    // path(s) (<xrd_admin>/.xproof.<port>).
03337    // This is called regurarly by the cron job to avoid having proofservs around.
03338    // Return number of process killed or -1 in case of any error.
03339    XPDLOC(SMGR, "ProofServMgr::CleanupLostProofServ")
03340 
03341    if (!fCheckLost) {
03342       TRACE(REQ,  "disabled ...");
03343       return 0;
03344    }
03345 
03346    TRACE(REQ,  "checking for orphalin proofserv processes ...");
03347    int nk = 0;
03348 
03349    // Get the list of existing proofserv processes from the process table
03350    std::map<int,XrdOucString> procs;
03351    if (XrdProofdAux::GetProcesses("proofserv", &procs) <= 0) {
03352       TRACE(DBG, " no proofservs around: nothing to do");
03353       return 0;
03354    }
03355 
03356    XrdProofUI ui;
03357    if (XrdProofdAux::GetUserInfo(fMgr->EffectiveUser(), ui) != 0) {
03358       TRACE(DBG, "problems getting info for user " << fMgr->EffectiveUser());
03359       return -1;
03360    }
03361 
03362    // Hash list of controlled and xrootd process
03363    XrdOucRash<int, int> controlled, xrdproc;
03364 
03365    // Hash list of sessions files loaded
03366    XrdOucHash<XrdOucString> sessionspaths;
03367 
03368    // For each process extract the information about the daemon supposed to be in control
03369    int pid, ia, a;
03370    XrdOucString cmd, apath, pidpath, sessiondir, emsg, rest, after;
03371    std::map<int,XrdOucString>::iterator ip;
03372    for (ip = procs.begin(); ip != procs.end(); ip++) {
03373       pid = ip->first;
03374       cmd = ip->second;
03375       if ((ia = cmd.find("xpdpath:")) != STR_NPOS) {
03376          cmd.tokenize(apath, ia, ' ');
03377          apath.replace("xpdpath:", "");
03378          if (apath.length() <= 0) {
03379             TRACE(ALL, "admin path not found; initial cmd line: "<<cmd);
03380             continue;
03381          }
03382          // Extract daemon PID and check that it is alive
03383          XPDFORM(pidpath, "%s/xrootd.pid", apath.c_str());
03384          int xpid = XrdProofdAux::GetIDFromPath(pidpath.c_str(), emsg);
03385          int *alive = xrdproc.Find(xpid);
03386          if (!alive) {
03387             a = (XrdProofdAux::VerifyProcessByID(xpid, fParentExecs.c_str())) ? 1 : 0;
03388             xrdproc.Add(xpid, a);
03389             if (!(alive = xrdproc.Find(xpid))) {
03390                TRACE(ALL, "unable to add info in the Rash table!");
03391             }
03392          } else {
03393             a = *alive;
03394          }
03395          // If the daemon is still there check that the process has its entry in the
03396          // session path(s);
03397          bool ok = 0;
03398          if (a == 1) {
03399             const char *subdir[2] = {"activesessions", "terminatedsessions"};
03400             for (int i = 0; i < 2; i++) {
03401                XPDFORM(sessiondir, "%s/%s", apath.c_str(), subdir[i]);
03402                if (!sessionspaths.Find(sessiondir.c_str())) {
03403                   DIR *sdir = opendir(sessiondir.c_str());
03404                   if (!sdir) {
03405                      XPDFORM(emsg, "cannot open '%s' - errno: %d", apath.c_str(), errno);
03406                      TRACE(XERR, emsg.c_str());
03407                      continue;
03408                   }
03409                   struct dirent *sent = 0;
03410                   while ((sent = readdir(sdir))) {
03411                      if (!strncmp(sent->d_name, ".", 1) || !strncmp(sent->d_name, "..", 2))
03412                         continue;
03413                      // Get the pid
03414                      int ppid = XrdProofdAux::ParsePidPath(sent->d_name, rest, after);
03415                      // Add to the list
03416                      controlled.Add(ppid, ppid);
03417                   }
03418                   closedir(sdir);
03419                   sessionspaths.Add(sessiondir.c_str(), 0, 0, Hash_data_is_key);
03420                }
03421                ok = (controlled.Find(pid)) ? 1 : ok;
03422                // We are done, if the process is controlled
03423                if (ok) break;
03424             }
03425          }
03426          // If the process is not controlled we have to kill it
03427          if (!ok) {
03428             TRACE(ALL,"process: "<<pid<<" lost its controller: killing");
03429             if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
03430                nk++;
03431          }
03432       }
03433 
03434    }
03435 
03436    // Done
03437    return nk;
03438 }
03439 
03440 //______________________________________________________________________________
03441 int XrdProofdProofServMgr::CleanupProofServ(bool all, const char *usr)
03442 {
03443    // Cleanup (kill) all 'proofserv' processes from the process table.
03444    // Only the processes associated with 'usr' are killed,
03445    // unless 'all' is TRUE, in which case all 'proofserv' instances are
03446    // terminated (this requires superuser privileges).
03447    // Super users can also terminated all processes fo another user (specified
03448    // via usr).
03449    // Return number of process notified for termination on success, -1 otherwise
03450    XPDLOC(SMGR, "ProofServMgr::CleanupProofServ")
03451 
03452    TRACE(REQ,  "all: "<<all<<", usr: " << (usr ? usr : "undef"));
03453    int nk = 0;
03454 
03455    // Name
03456    const char *pn = "proofserv";
03457 
03458    // Uid
03459    XrdProofUI ui;
03460    int refuid = -1;
03461    if (!all) {
03462       if (!usr) {
03463          TRACE(DBG, "usr must be defined for all = FALSE");
03464          return -1;
03465       }
03466       if (XrdProofdAux::GetUserInfo(usr, ui) != 0) {
03467          TRACE(DBG, "problems getting info for user " << usr);
03468          return -1;
03469       }
03470       refuid = ui.fUid;
03471    }
03472 
03473 #if defined(linux)
03474    // Loop over the "/proc" dir
03475    DIR *dir = opendir("/proc");
03476    if (!dir) {
03477       XrdOucString emsg("cannot open /proc - errno: ");
03478       emsg += errno;
03479       TRACE(DBG, emsg.c_str());
03480       return -1;
03481    }
03482 
03483    struct dirent *ent = 0;
03484    while ((ent = readdir(dir))) {
03485       if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
03486       if (DIGIT(ent->d_name[0])) {
03487          XrdOucString fn("/proc/", 256);
03488          fn += ent->d_name;
03489          fn += "/status";
03490          // Open file
03491          FILE *ffn = fopen(fn.c_str(), "r");
03492          if (!ffn) {
03493             XrdOucString emsg("cannot open file ");
03494             emsg += fn; emsg += " - errno: "; emsg += errno;
03495             TRACE(HDBG, emsg);
03496             continue;
03497          }
03498          // Read info
03499          bool xname = 1, xpid = 1, xppid = 1;
03500          bool xuid = (all) ? 0 : 1;
03501          int pid = -1;
03502          int ppid = -1;
03503          char line[2048] = { 0 };
03504          while (fgets(line, sizeof(line), ffn) &&
03505                (xname || xpid || xppid || xuid)) {
03506             // Check name
03507             if (xname && strstr(line, "Name:")) {
03508                if (!strstr(line, pn))
03509                   break;
03510                xname = 0;
03511             }
03512             if (xpid && strstr(line, "Pid:")) {
03513                pid = (int) XrdProofdAux::GetLong(&line[strlen("Pid:")]);
03514                xpid = 0;
03515             }
03516             if (xppid && strstr(line, "PPid:")) {
03517                ppid = (int) XrdProofdAux::GetLong(&line[strlen("PPid:")]);
03518                // Parent process must be us or be dead
03519                if (ppid != getpid() && XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str()))
03520                   // Process created by another running xrootd
03521                   break;
03522                xppid = 0;
03523             }
03524             if (xuid && strstr(line, "Uid:")) {
03525                int uid = (int) XrdProofdAux::GetLong(&line[strlen("Uid:")]);
03526                if (refuid == uid)
03527                   xuid = 0;
03528             }
03529          }
03530          // Close the file
03531          fclose(ffn);
03532          // If this is a good candidate, kill it
03533          if (!xname && !xpid && !xppid && !xuid) {
03534 
03535             bool muok = 1;
03536             if (fMgr->MultiUser() && !all) {
03537                // We need to check the user name: we may be the owner of somebody
03538                // else process; if not session is attached, we kill it
03539                muok = 0;
03540                XrdProofdProofServ *srv = GetActiveSession(pid);
03541                if (!srv || (srv && !strcmp(usr, srv->Client())))
03542                   muok = 1;
03543             }
03544             if (muok)
03545                if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
03546                   nk++;
03547          }
03548       }
03549    }
03550    // Close the directory
03551    closedir(dir);
03552 
03553 #elif defined(__sun)
03554 
03555    // Loop over the "/proc" dir
03556    DIR *dir = opendir("/proc");
03557    if (!dir) {
03558       XrdOucString emsg("cannot open /proc - errno: ");
03559       emsg += errno;
03560       TRACE(DBG, emsg);
03561       return -1;
03562    }
03563 
03564    struct dirent *ent = 0;
03565    while ((ent = readdir(dir))) {
03566       if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
03567       if (DIGIT(ent->d_name[0])) {
03568          XrdOucString fn("/proc/", 256);
03569          fn += ent->d_name;
03570          fn += "/psinfo";
03571          // Open file
03572          int ffd = open(fn.c_str(), O_RDONLY);
03573          if (ffd <= 0) {
03574             XrdOucString emsg("cannot open file ");
03575             emsg += fn; emsg += " - errno: "; emsg += errno;
03576             TRACE(HDBG, emsg);
03577             continue;
03578          }
03579          // Read info
03580          bool xname = 1;
03581          bool xuid = (all) ? 0 : 1;
03582          bool xppid = 1;
03583          // Get the information
03584          psinfo_t psi;
03585          if (read(ffd, &psi, sizeof(psinfo_t)) != sizeof(psinfo_t)) {
03586             XrdOucString emsg("cannot read ");
03587             emsg += fn; emsg += ": errno: "; emsg += errno;
03588             TRACE(XERR, emsg);
03589             close(ffd);
03590             continue;
03591          }
03592          // Close the file
03593          close(ffd);
03594 
03595          // Check name
03596          if (xname) {
03597             if (!strstr(psi.pr_fname, pn))
03598                continue;
03599             xname = 0;
03600          }
03601          // Check uid, if required
03602          if (xuid) {
03603             if (refuid == psi.pr_uid)
03604                xuid = 0;
03605          }
03606          // Parent process must be us or be dead
03607          int ppid = psi.pr_ppid;
03608          if (ppid != getpid() && XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str())) {
03609             // Process created by another running xrootd
03610             continue;
03611             xppid = 0;
03612          }
03613 
03614          // If this is a good candidate, kill it
03615          if (!xname && !xppid && !xuid) {
03616             bool muok = 1;
03617             if (fMgr->MultiUser() && !all) {
03618                // We need to check the user name: we may be the owner of somebody
03619                // else process; if no session is attached , we kill it
03620                muok = 0;
03621                XrdProofdProofServ *srv = GetActiveSession(psi.pr_pid);
03622                if (!srv || (srv && !strcmp(usr, srv->Client())))
03623                   muok = 1;
03624             }
03625             if (muok)
03626                if (XrdProofdAux::KillProcess(psi.pr_pid, 1, ui, fMgr->ChangeOwn()) == 0)
03627                   nk++;
03628          }
03629       }
03630    }
03631    // Close the directory
03632    closedir(dir);
03633 
03634 #elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__APPLE__)
03635 
03636    // Get the proclist
03637    kinfo_proc *pl = 0;
03638    int np;
03639    int ern = 0;
03640    if ((ern = XrdProofdAux::GetMacProcList(&pl, np)) != 0) {
03641       XrdOucString emsg("cannot get the process list: errno: ");
03642       emsg += ern;
03643       TRACE(XERR, emsg);
03644       return -1;
03645    }
03646 
03647    // Loop over the list
03648    int ii = np;
03649    while (ii--) {
03650       if (strstr(pl[ii].kp_proc.p_comm, pn)) {
03651          if (all || (int)(pl[ii].kp_eproc.e_ucred.cr_uid) == refuid) {
03652             // Parent process must be us or be dead
03653             int ppid = pl[ii].kp_eproc.e_ppid;
03654             bool xppid = 0;
03655             if (ppid != getpid()) {
03656                int jj = np;
03657                while (jj--) {
03658                   if (strstr(pl[jj].kp_proc.p_comm, "xrootd") &&
03659                       pl[jj].kp_proc.p_pid == ppid) {
03660                       xppid = 1;
03661                       break;
03662                   }
03663                }
03664             }
03665             if (!xppid) {
03666                bool muok = 1;
03667                if (fMgr->MultiUser() && !all) {
03668                   // We need to check the user name: we may be the owner of somebody
03669                   // else process; if no session is attached, we kill it
03670                   muok = 0;
03671                   XrdProofdProofServ *srv = GetActiveSession(pl[np].kp_proc.p_pid);
03672                   if (!srv || (srv && !strcmp(usr, srv->Client())))
03673                      muok = 1;
03674                }
03675                if (muok)
03676                   // Good candidate to be shot
03677                   if (XrdProofdAux::KillProcess(pl[np].kp_proc.p_pid, 1, ui, fMgr->ChangeOwn()))
03678                      nk++;
03679             }
03680          }
03681       }
03682    }
03683    // Cleanup
03684    free(pl);
03685 #else
03686    // For the remaining cases we use 'ps' via popen to localize the processes
03687 
03688    // Build command
03689    XrdOucString cmd = "ps ";
03690    bool busr = 0;
03691    const char *cusr = (usr && strlen(usr) && fSuperUser) ? usr : fPClient->ID();
03692    if (all) {
03693       cmd += "ax";
03694    } else {
03695       cmd += "-U ";
03696       cmd += cusr;
03697       cmd += " -u ";
03698       cmd += cusr;
03699       cmd += " -f";
03700       busr = 1;
03701    }
03702    cmd += " | grep proofserv 2>/dev/null";
03703 
03704    // Our parent ID as a string
03705    char cpid[10];
03706    sprintf(cpid, "%d", getpid());
03707 
03708    // Run it ...
03709    XrdOucString pids = ":";
03710    FILE *fp = popen(cmd.c_str(), "r");
03711    if (fp != 0) {
03712       char line[2048] = { 0 };
03713       while (fgets(line, sizeof(line), fp)) {
03714          // Parse line: make sure that we are the parent
03715          char *px = strstr(line, "xpd");
03716          if (!px)
03717             // Not xpd: old proofd ?
03718             continue;
03719          char *pi = strstr(px+3, cpid);
03720          if (!pi) {
03721             // Not started by us: check if the parent is still running
03722             pi = px + 3;
03723             int ppid = (int) XrdProofdAux::GetLong(pi);
03724             TRACE(HDBG, "found alternative parent ID: "<< ppid);
03725             // If still running then skip
03726             if (XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str()))
03727                continue;
03728          }
03729          // Get pid now
03730          int from = 0;
03731          if (busr)
03732             from += strlen(cusr);
03733          int pid = (int) XrdProofdAux::GetLong(&line[from]);
03734          bool muok = 1;
03735          if (fMgr->MultiUser() && !all) {
03736             // We need to check the user name: we may be the owner of somebody
03737             // else process; if no session is attached, we kill it
03738             muok = 0;
03739             XrdProofdProofServ *srv = GetActiveSession(pid);
03740             if (!srv || (srv && !strcmp(usr, srv->Client())))
03741                muok = 1;
03742          }
03743          if (muok)
03744             // Kill it
03745             if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
03746                nk++;
03747       }
03748       pclose(fp);
03749    } else {
03750       // Error executing the command
03751       return -1;
03752    }
03753 #endif
03754 
03755    // Done
03756    return nk;
03757 }
03758 
03759 //___________________________________________________________________________
03760 int XrdProofdProofServMgr::SetUserOwnerships(XrdProofdProtocol *p,
03761                                              const char *ord, const char *stag)
03762 {
03763    // Set user ownerships on some critical files or directories.
03764    // Return 0 on success, -1 if enything goes wrong.
03765    XPDLOC(SMGR, "ProofServMgr::SetUserOwnerships")
03766 
03767    TRACE(REQ, "enter");
03768 
03769    // If applicable, make sure that the private dataset dir for this user exists 
03770    // and has the right permissions
03771    if (fMgr->DataSetSrcs()->size() > 0) {
03772       XrdProofUI ui;
03773       XrdProofdAux::GetUserInfo(XrdProofdProtocol::EUidAtStartup(), ui);
03774       std::list<XrdProofdDSInfo *>::iterator ii;
03775       for (ii = fMgr->DataSetSrcs()->begin(); ii != fMgr->DataSetSrcs()->end(); ii++) {
03776          TRACE(ALL, "Checking dataset source: url:"<<(*ii)->fUrl<<", local:"
03777                                                    <<(*ii)->fLocal<<", rw:"<<(*ii)->fRW);
03778          if ((*ii)->fLocal && (*ii)->fRW) {
03779             XrdOucString d;
03780             XPDFORM(d, "%s/%s", ((*ii)->fUrl).c_str(), p->Client()->UI().fGroup.c_str());
03781             if (XrdProofdAux::AssertDir(d.c_str(), ui, fMgr->ChangeOwn()) == 0) {
03782                if (XrdProofdAux::ChangeMod(d.c_str(), 0777) == 0) {
03783                   XPDFORM(d, "%s/%s/%s", ((*ii)->fUrl).c_str(), p->Client()->UI().fGroup.c_str(),
03784                                                                 p->Client()->UI().fUser.c_str());
03785                   if (XrdProofdAux::AssertDir(d.c_str(), p->Client()->UI(), fMgr->ChangeOwn()) == 0) {
03786                      if (XrdProofdAux::ChangeMod(d.c_str(), 0755) != 0) {
03787                         TRACE(XERR, "problems setting permissions 0755 on: "<<d);
03788                      }
03789                   } else {
03790                      TRACE(XERR, "problems asserting: "<<d);
03791                   }
03792                } else {
03793                   TRACE(XERR, "problems setting permissions 0777 on: "<<d);
03794                }
03795             } else {
03796                TRACE(XERR, "problems asserting: "<<d);
03797             }
03798          }
03799       }
03800    }
03801 
03802    // If applicable, make sure that the private data dir for this user exists 
03803    // and has the right permissions
03804    if (fMgr->DataDir() && strlen(fMgr->DataDir()) > 0 && ord && stag) {
03805       XrdProofUI ui;
03806       XrdProofdAux::GetUserInfo(XrdProofdProtocol::EUidAtStartup(), ui);
03807       XrdOucString dgr, dus[3];
03808       XPDFORM(dgr, "%s/%s", fMgr->DataDir(), p->Client()->UI().fGroup.c_str());
03809       if (XrdProofdAux::AssertDir(dgr.c_str(), ui, fMgr->ChangeOwn()) == 0) {
03810          if (XrdProofdAux::ChangeMod(dgr.c_str(), 0777) == 0) {
03811             unsigned int mode = 0755;
03812             if (strchr(fMgr->DataDirOpts(), 'g')) mode = 0775;
03813             if (strchr(fMgr->DataDirOpts(), 'a') || strchr(fMgr->DataDirOpts(), 'o')) mode = 0777;
03814             XPDFORM(dus[0], "%s/%s", dgr.c_str(), p->Client()->UI().fUser.c_str());
03815             XPDFORM(dus[1], "%s/%s", dus[0].c_str(), ord);
03816             XPDFORM(dus[2], "%s/%s", dus[1].c_str(), stag);
03817             for (int i = 0; i < 3; i++) {
03818                if (XrdProofdAux::AssertDir(dus[i].c_str(), p->Client()->UI(), fMgr->ChangeOwn()) == 0) {
03819                   if (XrdProofdAux::ChangeMod(dus[i].c_str(), mode) != 0) {
03820                      TRACE(XERR, "problems setting permissions "<< oct << mode<<" on: "<<dus[i]);
03821                   }
03822                } else {
03823                   TRACE(XERR, "problems asserting: "<<dus[i]);
03824                   break;
03825                }
03826             }
03827          } else {
03828             TRACE(XERR, "problems setting permissions 0777 on: "<<dgr);
03829          }
03830       } else {
03831          TRACE(XERR, "problems asserting: "<<dgr);
03832       }
03833    }
03834 
03835    if (fMgr->ChangeOwn()) {
03836       // Change ownership of '.creds'
03837       XrdOucString creds(p->Client()->Sandbox()->Dir());
03838       creds += "/.creds";
03839       if (XrdProofdAux::ChangeOwn(creds.c_str(), p->Client()->UI()) != 0) {
03840          TRACE(XERR, "can't change ownership of "<<creds);
03841          return -1;
03842       }
03843    }
03844 
03845    // We are done
03846    TRACE(REQ, "done");
03847    return 0;
03848 }
03849 
03850 //___________________________________________________________________________
03851 int XrdProofdProofServMgr::SetUserEnvironment(XrdProofdProtocol *p)
03852 {
03853    // Set user environment: set effective user and group ID of the process
03854    // to the ones of the owner of this protocol instnace and change working
03855    // dir to the sandbox.
03856    // Return 0 on success, -1 if enything goes wrong.
03857    XPDLOC(SMGR, "ProofServMgr::SetUserEnvironment")
03858 
03859    TRACE(REQ, "enter");
03860 
03861    if (XrdProofdAux::ChangeToDir(p->Client()->Sandbox()->Dir(),
03862                                  p->Client()->UI(), fMgr->ChangeOwn()) != 0) {
03863       TRACE(XERR, "couldn't change directory to "<< p->Client()->Sandbox()->Dir());
03864       return -1;
03865    }
03866 
03867    // set HOME env
03868    char *h = new char[8 + strlen(p->Client()->Sandbox()->Dir())];
03869    sprintf(h, "HOME=%s", p->Client()->Sandbox()->Dir());
03870    putenv(h);
03871    TRACE(DBG, "set "<<h);
03872 
03873    // set USER env
03874    char *u = new char[8 + strlen(p->Client()->User())];
03875    sprintf(u, "USER=%s", p->Client()->User());
03876    putenv(u);
03877    TRACE(DBG, "set "<<u);
03878 
03879    // Set access control list from /etc/initgroup
03880    // (super-user privileges required)
03881    TRACE(DBG, "setting ACLs");
03882    if (fMgr->ChangeOwn() && (int) geteuid() != p->Client()->UI().fUid) {
03883 
03884       XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
03885       if (XpdBadPGuard(pGuard, p->Client()->UI().fUid)) {
03886          TRACE(XERR, "could not get privileges");
03887          return -1;
03888       }
03889 
03890       initgroups(p->Client()->UI().fUser.c_str(), p->Client()->UI().fGid);
03891    }
03892 
03893    if (fMgr->ChangeOwn()) {
03894       // acquire permanently target user privileges
03895       TRACE(DBG, "acquiring target user identity: "<<(uid_t)p->Client()->UI().fUid<<
03896                                                ", "<<(gid_t)p->Client()->UI().fGid);
03897       if (XrdSysPriv::ChangePerm((uid_t)p->Client()->UI().fUid,
03898                                  (gid_t)p->Client()->UI().fGid) != 0) {
03899          TRACE(XERR, "can't acquire "<< p->Client()->UI().fUser <<" identity");
03900          return -1;
03901       }
03902    }
03903 
03904    // We are done
03905    TRACE(REQ, "done");
03906    return 0;
03907 }
03908 
03909 //______________________________________________________________________________
03910 int XrdProofdProofServMgr::SaveAFSkey(XrdSecCredentials *c,
03911                                       const char *dir, XrdProofUI ui)
03912 {
03913    // Save the AFS key, if any, for usage in proofserv in file 'dir'/.afs .
03914    // Return 0 on success, -1 on error.
03915    XPDLOC(SMGR, "ProofServMgr::SaveAFSkey")
03916 
03917    // Check file name
03918    if (!dir || strlen(dir) <= 0) {
03919       TRACE(XERR, "dir name undefined");
03920       return -1;
03921    }
03922 
03923    // Check credentials
03924    if (!c) {
03925       TRACE(XERR, "credentials undefined");
03926       return -1;
03927    }
03928    TRACE(REQ, "dir: "<<dir);
03929 
03930    // Decode credentials
03931    int lout = 0;
03932    char *out = new char[c->size];
03933    if (XrdSutFromHex(c->buffer, out, lout) != 0) {
03934       TRACE(XERR, "problems unparsing hex string");
03935       delete [] out;
03936       return -1;
03937    }
03938 
03939    // Locate the key
03940    char *key = out + 5;
03941    if (strncmp(key, "afs:", 4)) {
03942       TRACE(DBG, "string does not contain an AFS key");
03943       delete [] out;
03944       return 0;
03945    }
03946    key += 4;
03947 
03948    // Save to file, if not existing already
03949    XrdOucString fn = dir;
03950    fn += "/.afs";
03951 
03952    int rc = 0;
03953    struct stat st;
03954    if (stat(fn.c_str(), &st) != 0 && errno == ENOENT) {
03955 
03956       // Open the file, truncating if already existing
03957       int fd = open(fn.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600);
03958       if (fd <= 0) {
03959          TRACE(XERR, "problems creating file - errno: " << errno);
03960          delete [] out;
03961          return -1;
03962       }
03963       // Write out the key
03964       int lkey = lout - 9;
03965       if (XrdProofdAux::Write(fd, key, lkey) != lkey) {
03966          TRACE(XERR, "problems writing to file - errno: " << errno);
03967          rc = -1;
03968       }
03969 
03970       // Cleanup
03971       delete [] out;
03972       close(fd);
03973    } else {
03974       TRACE(XERR, "cannot stat existing file "<<fn<<" - errno: " << errno);
03975       delete [] out;
03976       return -1;
03977    }
03978 
03979    // Make sure the file is owned by the user
03980    if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
03981       TRACE(XERR, "can't change ownership of "<<fn);
03982    }
03983 
03984    return rc;
03985 }
03986 
03987 //__________________________________________________________________________
03988 XrdProofdProofServ *XrdProofdProofServMgr::GetActiveSession(int pid)
03989 {
03990    // Return active session with process ID pid, if any.
03991 
03992    XrdOucString key; key += pid;
03993    return fSessions.Find(key.c_str());
03994 }
03995 
03996 //__________________________________________________________________________
03997 static int BroadcastPriority(const char *, XrdProofdProofServ *ps, void *s)
03998 {
03999    // Run thorugh entries to broadcast the relevant priority
04000    XPDLOC(SMGR, "BroadcastPriority")
04001 
04002    XpdBroadcastPriority_t *bp = (XpdBroadcastPriority_t *)s;
04003 
04004    int nb = *(bp->fNBroadcast);
04005 
04006    XrdOucString emsg;
04007    if (ps) {
04008       if (ps->IsValid() && (ps->Status() == kXPD_running) &&
04009         !(ps->SrvType() == kXPD_Master)) {
04010          XrdProofGroup *g = (ps->Group() && bp->fGroupMgr)
04011                           ? bp->fGroupMgr->GetGroup(ps->Group()) : 0;
04012          TRACE(DBG, "group: "<<  g<<", client: "<<ps->Client());
04013          if (g && g->Active() > 0) {
04014             TRACE(DBG, "priority: "<< g->Priority()<<" active: "<<g->Active());
04015             int prio = (int) (g->Priority() * 100);
04016             ps->BroadcastPriority(prio);
04017             nb++;
04018          }
04019       }
04020       // Go to next
04021       return 0;
04022    } else {
04023       emsg = "input entry undefined";
04024    }
04025 
04026    // Some problem
04027    TRACE(XERR,"protocol error: "<<emsg);
04028    return 1;
04029 }
04030 
04031 //__________________________________________________________________________
04032 void XrdProofdProofServMgr::BroadcastClusterInfo()
04033 {
04034    // Broadcast cluster info to the active sessions
04035    XPDLOC(SMGR, "ProofServMgr::BroadcastClusterInfo")
04036 
04037    TRACE(REQ, "enter");
04038 
04039    int tot = 0, act = 0;
04040    std::list<XrdProofdProofServ *>::iterator si = fActiveSessions.begin();
04041    while (si != fActiveSessions.end()) {
04042       if ((*si)->SrvType() != kXPD_Worker) {
04043          tot++;
04044          if ((*si)->Status() == kXPD_running) act++;
04045       }
04046       si++;
04047    }
04048    XPDPRT("tot: "<<tot<<", act: "<<act);
04049    // Now propagate
04050    si = fActiveSessions.begin();
04051    while (si != fActiveSessions.end()) {
04052       if ((*si)->Status() == kXPD_running) (*si)->SendClusterInfo(tot, act);
04053       si++;
04054    }
04055 }
04056 
04057 //__________________________________________________________________________
04058 int XrdProofdProofServMgr::BroadcastPriorities()
04059 {
04060    // Broadcast priorities to the active sessions.
04061    // Returns the number of sessions contacted.
04062    XPDLOC(SMGR, "ProofServMgr::BroadcastPriorities")
04063 
04064    TRACE(REQ, "enter");
04065 
04066    int nb = 0;
04067    XpdBroadcastPriority_t bp = { (fMgr ? fMgr->GroupsMgr() : 0), &nb };
04068    fSessions.Apply(BroadcastPriority, (void *)&bp);
04069    // Done
04070    return nb;
04071 }
04072 
04073 //__________________________________________________________________________
04074 bool XrdProofdProofServMgr::IsReconnecting()
04075 {
04076    // Return true if in reconnection state, i.e. during
04077    // that period during which clients are expected to reconnect.
04078    // Return false if the session is fully effective
04079 
04080    int rect = -1;
04081    if (fReconnectTime >= 0) {
04082       rect = time(0) - fReconnectTime;
04083       if (rect < fReconnectTimeOut)
04084          return true;
04085    }
04086    // Not reconnecting
04087    return false;
04088 }
04089 
04090 //__________________________________________________________________________
04091 void XrdProofdProofServMgr::SetReconnectTime(bool on)
04092 {
04093    // Change reconnecting status
04094    //
04095 
04096    XrdSysMutexHelper mhp(fMutex);
04097 
04098    if (on) {
04099       fReconnectTime = time(0);
04100    } else {
04101       fReconnectTime = -1;
04102    }
04103 }
04104 
04105 //__________________________________________________________________________
04106 static int FreeClientID(const char *, XrdProofdProofServ *ps, void *s)
04107 {
04108    // Run through entries to reset the disconnecting client slots
04109    XPDLOC(SMGR, "FreeClientID")
04110 
04111    int pid = *((int *)s);
04112 
04113    if (ps) {
04114       ps->FreeClientID(pid);
04115       // Go to next
04116       return 0;
04117    }
04118 
04119    // Some problem
04120    TRACE(XERR, "protocol error: undefined session!");
04121    return 1;
04122 }
04123 
04124 //__________________________________________________________________________
04125 void XrdProofdProofServMgr::DisconnectFromProofServ(int pid)
04126 {
04127    // Change reconnecting status
04128    //
04129 
04130    XrdSysMutexHelper mhp(fMutex);
04131 
04132    fSessions.Apply(FreeClientID, (void *)&pid);
04133 }
04134 
04135 //__________________________________________________________________________
04136 static int CountTopMasters(const char *, XrdProofdProofServ *ps, void *s)
04137 {
04138    // Run thorugh entries to count top-masters
04139    XPDLOC(SMGR, "CountTopMasters")
04140 
04141    int *ntm = (int *)s;
04142 
04143    XrdOucString emsg;
04144    if (ps) {
04145       if (ps->SrvType() == kXPD_TopMaster) (*ntm)++;
04146       // Go to next
04147       return 0;
04148    } else {
04149       emsg = "input entry undefined";
04150    }
04151 
04152    // Some problem
04153    TRACE(XERR,"protocol error: "<<emsg);
04154    return 1;
04155 }
04156 
04157 //__________________________________________________________________________
04158 int XrdProofdProofServMgr::CurrentSessions(bool recalculate)
04159 {
04160    // Return the number of current sessions (top masters)
04161 
04162    XPDLOC(SMGR, "ProofServMgr::CurrentSessions")
04163 
04164    TRACE(REQ, "enter");
04165 
04166    XrdSysMutexHelper mhp(fMutex);
04167    if (recalculate) {
04168       fCurrentSessions = 0;
04169       fSessions.Apply(CountTopMasters, (void *)&fCurrentSessions);
04170    }
04171 
04172    // Done
04173    return fCurrentSessions;
04174 }
04175 
04176 //__________________________________________________________________________
04177 void XrdProofdProofServMgr::ResolveKeywords(XrdOucString &s, ProofServEnv_t *in)
04178 {
04179    // Resolve some keywords in 's'
04180    //    <logfileroot>, <user>, <rootsys>
04181 
04182    if (!in) return;
04183 
04184    bool isWorker = 0;
04185    if (in->fPS->SrvType() == kXPD_Worker) isWorker = 1;
04186 
04187    // Log file
04188    if (!isWorker && s.find("<logfilemst>") != STR_NPOS) {
04189       XrdOucString lfr(in->fLogFile);
04190       if (lfr.endswith(".log")) lfr.erase(lfr.rfind(".log"));
04191       s.replace("<logfilemst>", lfr);
04192    } else if (isWorker && s.find("<logfilewrk>") != STR_NPOS) {
04193       XrdOucString lfr(in->fLogFile);
04194       if (lfr.endswith(".log")) lfr.erase(lfr.rfind(".log"));
04195       s.replace("<logfilewrk>", lfr);
04196    }
04197 
04198    // user
04199    if (getenv("USER") && s.find("<user>") != STR_NPOS) {
04200       XrdOucString usr(getenv("USER"));
04201       s.replace("<user>", usr);
04202    }
04203 
04204    // rootsys
04205    if (getenv("ROOTSYS") && s.find("<rootsys>") != STR_NPOS) {
04206       XrdOucString rootsys(getenv("ROOTSYS"));
04207       s.replace("<rootsys>", rootsys);
04208    }
04209 }
04210 
04211 //
04212 // Auxilliary class to handle session pid files
04213 //
04214 
04215 //______________________________________________________________________________
04216 XrdProofSessionInfo::XrdProofSessionInfo(XrdProofdClient *c, XrdProofdProofServ *s)
04217 {
04218    // Construct from 'c' and 's'
04219 
04220    fLastAccess = 0;
04221 
04222    // Fill from the client instance
04223    fUser = c ? c->User() : "";
04224    fGroup = c ? c->Group() : "";
04225 
04226    // Fill from the server instance
04227    fPid = s ? s->SrvPID() : -1;
04228    fID = s ? s->ID() : -1;
04229    fSrvType = s ? s->SrvType() : -1;
04230    fStatus = s ? s->Status() : kXPD_unknown;
04231    fOrdinal = s ? s->Ordinal() : "";
04232    fTag = s ? s->Tag() : "";
04233    fAlias = s ? s->Alias() : "";
04234    fLogFile = s ? s->Fileout() : "";
04235    fROOTTag = (s && s->ROOT())? s->ROOT()->Tag() : "";
04236    fSrvProtVers = (s && s->ROOT()) ? s->ROOT()->SrvProtVers() : -1;
04237    fUserEnvs = s ? s->UserEnvs() : "";
04238    fAdminPath = s ? s->AdminPath() : "";
04239    fUnixPath = s ? s->UNIXSockPath() : "";
04240 }
04241 
04242 //______________________________________________________________________________
04243 void XrdProofSessionInfo::FillProofServ(XrdProofdProofServ &s, XrdROOTMgr *rmgr)
04244 {
04245    // Fill 's' fields using the stored info
04246    XPDLOC(SMGR, "SessionInfo::FillProofServ")
04247 
04248    s.SetClient(fUser.c_str());
04249    s.SetGroup(fGroup.c_str());
04250    if (fPid > 0)
04251       s.SetSrvPID(fPid);
04252    if (fID >= 0)
04253       s.SetID(fID);
04254    s.SetSrvType(fSrvType);
04255    s.SetStatus(fStatus);
04256    s.SetOrdinal(fOrdinal.c_str());
04257    s.SetTag(fTag.c_str());
04258    s.SetAlias(fAlias.c_str());
04259    s.SetFileout(fLogFile.c_str());
04260    if (rmgr) {
04261       if (rmgr->GetVersion(fROOTTag.c_str())) {
04262          s.SetROOT(rmgr->GetVersion(fROOTTag.c_str()));
04263       } else {
04264          TRACE(ALL, "ROOT version '"<< fROOTTag <<
04265                     "' not availabe anymore: setting the default");
04266          s.SetROOT(rmgr->DefaultVersion());
04267       }
04268    }
04269    s.SetUserEnvs(fUserEnvs.c_str());
04270    s.SetAdminPath(fAdminPath.c_str(), 0);
04271    s.SetUNIXSockPath(fUnixPath.c_str());
04272 }
04273 
04274 //______________________________________________________________________________
04275 int XrdProofSessionInfo::SaveToFile(const char *file)
04276 {
04277    // Save content to 'file'
04278    XPDLOC(SMGR, "SessionInfo::SaveToFile")
04279 
04280    // Check inputs
04281    if (!file || strlen(file) <= 0) {
04282       TRACE(XERR,"invalid input: "<<file);
04283       return -1;
04284    }
04285 
04286    // Create the file
04287    FILE *fpid = fopen(file, "w");
04288    if (fpid) {
04289       fprintf(fpid, "%s %s\n", fUser.c_str(), fGroup.c_str());
04290       fprintf(fpid, "%s\n", fUnixPath.c_str());
04291       fprintf(fpid, "%d %d %d\n", fPid, fID, fSrvType);
04292       fprintf(fpid, "%s %s %s\n", fOrdinal.c_str(), fTag.c_str(), fAlias.c_str());
04293       fprintf(fpid, "%s\n", fLogFile.c_str());
04294       fprintf(fpid, "%d %s\n", fSrvProtVers, fROOTTag.c_str());
04295       if (fUserEnvs.length() > 0)
04296          fprintf(fpid, "\n%s", fUserEnvs.c_str());
04297       fclose(fpid);
04298 
04299       // Make it writable by anyone (to allow the corresponding proofserv
04300       // to touch it for the asynchronous ping request)
04301       if (chmod(file, 0666) != 0) {
04302          TRACE(XERR, "could not change mode to 0666 on file "<<
04303                      file<<"; error: "<<errno);
04304       }
04305 
04306       return 0;
04307    }
04308 
04309    TRACE(XERR,"session pid file cannot be (re-)created: "<<
04310               file<<"; error: "<<errno);
04311    return -1;
04312 }
04313 
04314 //______________________________________________________________________________
04315 void XrdProofSessionInfo::Reset()
04316 {
04317    // Reset the content
04318 
04319    fLastAccess = 0;
04320    fUser = "";
04321    fGroup = "";
04322    fAdminPath = "";
04323    fUnixPath = "";
04324    fPid = -1;
04325    fStatus = kXPD_unknown;
04326    fID = -1;
04327    fSrvType = -1;
04328    fOrdinal = "";
04329    fTag = "";
04330    fAlias = "";
04331    fLogFile = "";
04332    fROOTTag = "";
04333    fSrvProtVers = -1;
04334    fUserEnvs = "";
04335 }
04336 
04337 //______________________________________________________________________________
04338 int XrdProofSessionInfo::ReadFromFile(const char *file)
04339 {
04340    // Read content from 'file'
04341    XPDLOC(SMGR, "SessionInfo::ReadFromFile")
04342 
04343    // Check inputs
04344    if (!file || strlen(file) <= 0) {
04345       TRACE(XERR,"invalid input: "<<file);
04346       return -1;
04347    }
04348 
04349    Reset();
04350 
04351    // Open the session file
04352    FILE *fpid = fopen(file,"r");
04353    if (fpid) {
04354       char line[4096];
04355       char v1[512], v2[512], v3[512];
04356       if (fgets(line, sizeof(line), fpid)) {
04357          if (sscanf(line, "%s %s", v1, v2) == 2) {
04358             fUser = v1;
04359             fGroup = v2;
04360          } else {
04361             TRACE(XERR,"warning: corrupted line? "<<line);
04362          }
04363       }
04364       if (fgets(line, sizeof(line), fpid)) {
04365          int l = strlen(line);
04366          if (line[l-1] == '\n') line[l-1] = '\0';
04367          fUnixPath = line;
04368       }
04369       if (fgets(line, sizeof(line), fpid)) {
04370          sscanf(line, "%d %d %d", &fPid, &fID, &fSrvType);
04371       }
04372       if (fgets(line, sizeof(line), fpid)) {
04373          int ns = 0;
04374          if ((ns = sscanf(line, "%s %s %s", v1, v2, v3)) >= 2) {
04375             fOrdinal = v1;
04376             fTag = v2;
04377             fAlias = (ns == 3) ? v3 : "";
04378          } else {
04379             TRACE(XERR,"warning: corrupted line? "<<line);
04380          }
04381       }
04382       if (fgets(line, sizeof(line), fpid)) {
04383          fLogFile = line;
04384       }
04385       if (fgets(line, sizeof(line), fpid)) {
04386          if (sscanf(line, "%d %s", &fSrvProtVers, v1) == 2) {
04387             fROOTTag = v1;
04388          } else {
04389             TRACE(XERR,"warning: corrupted line? "<<line);
04390          }
04391       }
04392       // All the remaining into fUserEnvs
04393       fUserEnvs = "";
04394       off_t lnow = lseek(fileno(fpid), (off_t) 0, SEEK_CUR);
04395       off_t ltot = lseek(fileno(fpid), (off_t) 0, SEEK_END);
04396       int left = (int)(ltot - lnow);
04397       int len = -1;
04398       do {
04399          int wanted = (left > 4095) ? 4095 : left;
04400          while ((len = read(fileno(fpid), line, wanted)) < 0 &&
04401                 errno == EINTR)
04402             errno = 0;
04403          if (len < wanted) {
04404             break;
04405          } else {
04406             line[len] = '\0';
04407             fUserEnvs += line;
04408          }
04409          // Update counters
04410          left -= len;
04411       } while (len > 0 && left > 0);
04412       // Done
04413       fclose(fpid);
04414       // The file name is the admin path
04415       fAdminPath = file;
04416       // Fill access time
04417       struct stat st;
04418       if (!stat(file, &st))
04419          fLastAccess = st.st_atime;
04420    } else {
04421       TRACE(XERR,"session file cannot be open: "<< file<<"; error: "<<errno);
04422       return -1;
04423    }
04424 
04425    // Read the last status now if the session is active
04426    XrdOucString fs(file);
04427    fs += ".status";
04428    fpid = fopen(fs.c_str(),"r");
04429    if (fpid) {
04430       char line[64];
04431       if (fgets(line, sizeof(line), fpid)) {
04432          sscanf(line, "%d", &fStatus);
04433       }
04434       // Done
04435       fclose(fpid);
04436    } else {
04437       TRACE(DBG,"no session status file for: "<< fs<<"; session was probably terminated");
04438    }
04439 
04440    // Done
04441    return 0;
04442 }
04443 
04444 //______________________________________________________________________________
04445 int XpdEnv::Matches(const char *usr, const char *grp, int svn, int ver)
04446 {
04447    // Check if this env applies to 'usr', 'grp, 'svn', 'ver'.
04448    // Returns -1 if it does not match, >=0 if it matches. The value is a linear
04449    // combination of matching lengths for user and group, with a weight of 1000 for
04450    // the users one, so that an exact user match will always win.
04451    XPDLOC(SMGR, "XpdEnv::Matches")
04452 
04453    int nmtc = -1;
04454    // Check the user
04455    if (fUsers.length() > 0) {
04456       XrdOucString u(usr);
04457       if ((nmtc = u.matches(fUsers.c_str())) == 0) return -1;
04458    } else {
04459       nmtc = strlen(usr);
04460    }
04461    nmtc += 1000;   // Weigth of user name match
04462    // Check the group
04463    int nmtcg = -1;
04464    if (fGroups.length() > 0) {
04465       XrdOucString g(grp);
04466       if ((nmtcg = g.matches(fGroups.c_str())) == 0) return -1;
04467    } else {
04468       nmtcg = strlen(grp);
04469    }
04470    nmtc += nmtcg;
04471 
04472    TRACE(HDBG, fEnv <<", u:"<<usr<<", g:"<<grp<<" --> nmtc: "<<nmtc);
04473 
04474    // Check the subversion number
04475    TRACE(HDBG, fEnv <<", svn:"<<svn);
04476    if (fSvnMin > 0 && svn < fSvnMin) return -1; 
04477    if (fSvnMax > 0 && svn > fSvnMax) return -1; 
04478 
04479    // Check the version code
04480    TRACE(HDBG, fEnv <<", ver:"<<ver);
04481    if (fVerMin > 0 && ver < fVerMin) return -1; 
04482    if (fVerMax > 0 && ver > fVerMax) return -1; 
04483    
04484    // If we are here then it matches
04485    return nmtc;
04486 }
04487 
04488 //______________________________________________________________________________
04489 int XpdEnv::ToVersCode(int ver, bool hex)
04490 {
04491    // Transform version number ver (format patch + 100*minor + 10000*maj, e.g. 52706)
04492    // If 'hex' is true, the components are decoded as hex numbers
04493    
04494    int maj = -1, min = -1, ptc = -1, xv = ver;
04495    if (hex) {
04496       maj = xv / 65536;
04497       xv -= maj * 65536;
04498       min = xv / 256;
04499       ptc = xv - min * 256;
04500    } else {
04501       maj = xv / 10000;
04502       xv -= maj * 10000;
04503       min = xv / 100;
04504       ptc = xv - min * 100;
04505    }
04506    // Get the version code now
04507    int vc = (maj << 16) + (min << 8) + ptc;
04508    return vc;
04509 }
04510 
04511 //______________________________________________________________________________
04512 void XpdEnv::Print(const char *what)
04513 {
04514    // Print the content of this env
04515    XPDLOC(SMGR, what)
04516    
04517    XrdOucString vmi("-1"), vmx("-1");
04518    if (fVerMin > 0) {
04519       int maj = (fVerMin >> 16);
04520       int min = ((fVerMin - maj * 65536) >> 8);
04521       int ptc = fVerMin - maj * 65536 - min * 256;
04522       XPDFORM(vmi, "%d%d%d", maj, min, ptc);
04523    }
04524    if (fVerMax > 0) {
04525       int maj = (fVerMax >> 16);
04526       int min = ((fVerMax - maj * 65536) >> 8);
04527       int ptc = fVerMax - maj * 65536 - min * 256;
04528       XPDFORM(vmx, "%d%d%d", maj, min, ptc);
04529    }
04530    XrdOucString u("allusers"), g("allgroups");
04531    if (fUsers.length() > 0) u = fUsers;
04532    if (fGroups.length() > 0) u = fGroups;
04533 
04534    TRACE(ALL, "'"<<fEnv<<"' {"<<u<<"|"<<g<<
04535          "} svn:["<<fSvnMin<<","<<fSvnMax<<"] vers:["<<vmi<<","<<vmx<<"]");
04536 }

Generated on Tue Jul 5 14:51:51 2011 for ROOT_528-00b_version by  doxygen 1.5.1