XrdProofdAdmin.cxx

Go to the documentation of this file.
00001 // @(#)root/proofd:$Id: XrdProofdAdmin.cxx 32808 2010-03-29 12:12:43Z ganis $
00002 // Author: G. Ganis Feb 2008
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // XrdProofdAdmin                                                       //
00015 //                                                                      //
00016 // Author: G. Ganis, CERN, 2008                                         //
00017 //                                                                      //
00018 // Envelop class for admin services.                                    //
00019 // Loaded as service by XrdProofdManager.                               //
00020 //                                                                      //
00021 //////////////////////////////////////////////////////////////////////////
00022 #include "XrdProofdPlatform.h"
00023 
00024 #ifdef OLDXRDOUC
00025 #  include "XrdOuc/XrdOucError.hh"
00026 #else
00027 #  include "XrdSys/XrdSysError.hh"
00028 #endif
00029 
00030 #include "Xrd/XrdBuffer.hh"
00031 #include "Xrd/XrdScheduler.hh"
00032 #include "XrdClient/XrdClientMessage.hh"
00033 #include "XrdOuc/XrdOucStream.hh"
00034 
00035 #include "XrdProofdAdmin.h"
00036 #include "XrdProofdClient.h"
00037 #include "XrdProofdClientMgr.h"
00038 #include "XrdProofdManager.h"
00039 #include "XrdProofdNetMgr.h"
00040 #include "XrdProofdPriorityMgr.h"
00041 #include "XrdProofdProofServMgr.h"
00042 #include "XrdProofdProtocol.h"
00043 #include "XrdProofGroup.h"
00044 #include "XrdProofSched.h"
00045 #include "XrdProofdProofServ.h"
00046 #include "XrdROOT.h"
00047 
00048 // Tracing utilities
00049 #include "XrdProofdTrace.h"
00050 
00051 //__________________________________________________________________________
00052 static int ExportCpCmd(const char *k, XpdAdminCpCmd *cc, void *s)
00053 {
00054    // Decrease active session counters on worker w
00055    XPDLOC(PMGR, "ExportCpCmd")
00056 
00057    XrdOucString *ccs = (XrdOucString *)s;
00058    if (cc && ccs) {
00059       if (ccs->length() > 0) *ccs += ",";
00060       *ccs += k;
00061       *ccs += ":";
00062       *ccs += cc->fCmd;
00063       TRACE(DBG, k <<" : "<<cc->fCmd<<" fmt: '"<<cc->fFmt<<"'");
00064       // Check next
00065       return 0;
00066    }
00067 
00068    // Not enough info: stop
00069    return 1;
00070 }
00071 
00072 //______________________________________________________________________________
00073 XrdProofdAdmin::XrdProofdAdmin(XrdProofdManager *mgr,
00074                                XrdProtocol_Config *pi, XrdSysError *e)
00075                   : XrdProofdConfig(pi->ConfigFN, e)
00076 {
00077    // Constructor
00078 
00079    fMgr = mgr;
00080    fExportPaths.clear();
00081    // Map of default copy commands supported / allowed, keyed by the protocol
00082    fAllowedCpCmds.Add("file", new XpdAdminCpCmd("cp","cp -rp %s %s",1));
00083    fAllowedCpCmds.Add("root", new XpdAdminCpCmd("xrdcp","xrdcp %s %s",1));
00084    fAllowedCpCmds.Add("xrd",  new XpdAdminCpCmd("xrdcp","xrdcp %s %s",1));
00085 #if !defined(__APPLE__)
00086    fAllowedCpCmds.Add("http", new XpdAdminCpCmd("wget","wget %s -O %s",0));
00087    fAllowedCpCmds.Add("https", new XpdAdminCpCmd("wget","wget %s -O %s",0));
00088 #else
00089    fAllowedCpCmds.Add("http", new XpdAdminCpCmd("curl","curl %s -o %s",0));
00090    fAllowedCpCmds.Add("https", new XpdAdminCpCmd("curl","curl %s -o %s",0));
00091 #endif
00092    fCpCmds = "";
00093    fAllowedCpCmds.Apply(ExportCpCmd, (void *)&fCpCmds);
00094 
00095    // Configuration directives
00096    RegisterDirectives();
00097 }
00098 
00099 //__________________________________________________________________________
00100 void XrdProofdAdmin::RegisterDirectives()
00101 {
00102    // Register directives for configuration
00103 
00104    Register("exportpath", new XrdProofdDirective("exportpath", this, &DoDirectiveClass));
00105    Register("cpcmd", new XrdProofdDirective("cpcmd", this, &DoDirectiveClass));
00106 }
00107 
00108 //______________________________________________________________________________
00109 int XrdProofdAdmin::Process(XrdProofdProtocol *p, int type)
00110 {
00111    // Process admin request
00112    XPDLOC(ALL, "Admin::Process")
00113 
00114    int rc = 0;
00115    XPD_SETRESP(p, "Process");
00116 
00117    TRACEP(p, REQ, "req id: " << type << " ("<<
00118                   XrdProofdAux::AdminMsgType(type) << ")");
00119 
00120    XrdOucString emsg;
00121    switch (type) {
00122       case kQuerySessions:
00123          return QuerySessions(p);
00124       case kQueryLogPaths:
00125          return QueryLogPaths(p);
00126       case kCleanupSessions:
00127          return CleanupSessions(p);
00128       case kSendMsgToUser:
00129          return SendMsgToUser(p);
00130       case kGroupProperties:
00131          return SetGroupProperties(p);
00132       case kGetWorkers:
00133          return GetWorkers(p);
00134       case kQueryWorkers:
00135          return QueryWorkers(p);
00136       case kQueryROOTVersions:
00137          return QueryROOTVersions(p);
00138       case kROOTVersion:
00139          return SetROOTVersion(p);
00140       case kSessionAlias:
00141          return SetSessionAlias(p);
00142       case kSessionTag:
00143          return SetSessionTag(p);
00144       case kReleaseWorker:
00145          return ReleaseWorker(p);
00146       case kExec:
00147          return Exec(p);
00148       case kGetFile:
00149          return GetFile(p);
00150       case kPutFile:
00151          return PutFile(p);
00152       case kCpFile:
00153          return CpFile(p);
00154       default:
00155          emsg += "Invalid type: ";
00156          emsg += type;
00157          break;
00158    }
00159 
00160    // Notify invalid request
00161    response->Send(kXR_InvalidRequest, emsg.c_str());
00162 
00163    // Done
00164    return 0;
00165 }
00166 
00167 //__________________________________________________________________________
00168 int XrdProofdAdmin::Config(bool rcf)
00169 {
00170    // Run configuration and parse the entered config directives.
00171    // Return 0 on success, -1 on error
00172    XPDLOC(ALL, "Admin::Config")
00173 
00174    // Run first the configurator
00175    if (XrdProofdConfig::Config(rcf) != 0) {
00176       XPDERR("problems parsing file ");
00177       return -1;
00178    }
00179 
00180    XrdOucString msg;
00181    msg = (rcf) ? "re-configuring" : "configuring";
00182    TRACE(ALL, msg.c_str());
00183 
00184    // Exported paths
00185    if (fExportPaths.size() > 0) {
00186       TRACE(ALL, "additional paths which can be browsed by all users: ");
00187       std::list<XrdOucString>::iterator is = fExportPaths.begin();
00188       while (is != fExportPaths.end()) { TRACE(ALL, "   "<<*is); is++; }
00189    }
00190    // Allowed / supported copy commands
00191    TRACE(ALL, "allowed/supported copy commands: "<<fCpCmds);
00192 
00193    // Done
00194    return 0;
00195 }
00196 
00197 //______________________________________________________________________________
00198 int XrdProofdAdmin::DoDirective(XrdProofdDirective *d,
00199                                     char *val, XrdOucStream *cfg, bool rcf)
00200 {
00201    // Update the priorities of the active sessions.
00202    XPDLOC(SMGR, "Admin::DoDirective")
00203 
00204    if (!d)
00205       // undefined inputs
00206       return -1;
00207 
00208    if (d->fName == "exportpath") {
00209       return DoDirectiveExportPath(val, cfg, rcf);
00210    } else if (d->fName == "cpcmd") {
00211       return DoDirectiveCpCmd(val, cfg, rcf);
00212    }
00213    TRACE(XERR,"unknown directive: "<<d->fName);
00214    return -1;
00215 }
00216 
00217 //______________________________________________________________________________
00218 int XrdProofdAdmin::DoDirectiveExportPath(char *val, XrdOucStream *cfg, bool)
00219 {
00220    // Process 'exportpath' directives
00221    // eg: xpd.exportpath /tmp/data /data2/data
00222 
00223    XPDLOC(SMGR, "Admin::DoDirectiveExportPath")
00224 
00225    if (!val || !cfg)
00226       // undefined inputs
00227       return -1;
00228 
00229    TRACE(ALL,"val: "<<val);
00230 
00231    while (val) {
00232       XrdOucString tkns(val), tkn;
00233       int from = 0;
00234       while ((from = tkns.tokenize(tkn, from, ' ')) != STR_NPOS) {
00235          fExportPaths.push_back(tkn);
00236       }
00237       // Get next
00238       val = cfg->GetWord();
00239    }
00240 
00241    return 0;
00242 }
00243 
00244 //______________________________________________________________________________
00245 int XrdProofdAdmin::DoDirectiveCpCmd(char *val, XrdOucStream *cfg, bool)
00246 {
00247    // Process 'cpcmd' directives
00248    // eg: xpd.cpcmd alien aliencp  fmt:"%s %s" put:0
00249 
00250    XPDLOC(SMGR, "Admin::DoDirectiveCpCmd")
00251 
00252    if (!val || !cfg)
00253       // undefined inputs
00254       return -1;
00255 
00256    XrdOucString proto, cpcmd, fmt;
00257    bool canput = 0, isfmt = 0, rm = 0;
00258 
00259    while (val) {
00260       XrdOucString tkn(val);
00261       if (proto.length() <= 0) {
00262          proto = tkn;
00263          if (proto.beginswith('-')) {
00264             rm = 1;
00265             proto.erase(0, 1);
00266             break;
00267          }
00268       } else if (cpcmd.length() <= 0) {
00269          cpcmd = tkn;
00270       } else if (tkn.beginswith("put:")) {
00271          isfmt = 0;
00272          if (tkn == "put:1") canput = 1;
00273       } else if (tkn.beginswith("fmt:")) {
00274          fmt.assign(tkn, 4, -1);
00275          isfmt = 1;
00276       } else {
00277          if (isfmt) {
00278             fmt += " ";
00279             fmt += tkn;
00280          }
00281       }
00282       // Get next
00283       val = cfg->GetWord();
00284    }
00285 
00286    if (rm) {
00287       // Remove the related entry
00288       fAllowedCpCmds.Del(proto.c_str());
00289    } else if (cpcmd.length() > 0 && fmt.length() > 0) {
00290       // Add or replace
00291       fmt.insert(" ", 0);
00292       fmt.insert(cpcmd, 0);
00293       fAllowedCpCmds.Rep(proto.c_str(), new XpdAdminCpCmd(cpcmd.c_str(),fmt.c_str(),canput));
00294    } else {
00295       TRACE(ALL, "incomplete information: ignoring!");
00296    }
00297 
00298    // Fill again the export string
00299    fCpCmds = "";
00300    fAllowedCpCmds.Apply(ExportCpCmd, (void *)&fCpCmds);
00301 
00302    return 0;
00303 }
00304 
00305 //______________________________________________________________________________
00306 int XrdProofdAdmin::QueryROOTVersions(XrdProofdProtocol *p)
00307 {
00308    // Handle request for list of ROOT versions
00309    XPDLOC(ALL, "Admin::QueryROOTVersions")
00310 
00311    int rc = 0;
00312    XPD_SETRESP(p, "QueryROOTVersions");
00313 
00314    XrdOucString msg = fMgr->ROOTMgr()->ExportVersions(p->Client()->ROOT());
00315 
00316    TRACEP(p, DBG, "sending: "<<msg);
00317 
00318    // Send back to user
00319    response->Send((void *)msg.c_str(), msg.length()+1);
00320 
00321    // Over
00322    return 0;
00323 }
00324 
00325 //______________________________________________________________________________
00326 int XrdProofdAdmin::SetROOTVersion(XrdProofdProtocol *p)
00327 {
00328    // Handle request for changing the default ROOT version
00329    XPDLOC(ALL, "Admin::SetROOTVersion")
00330 
00331    int rc = 0;
00332    XPD_SETRESP(p, "SetROOTVersion");
00333 
00334    // Change default ROOT version
00335    const char *t = p->Argp() ? (const char *) p->Argp()->buff : "default";
00336    int len = p->Argp() ? p->Request()->header.dlen : strlen("default");
00337    XrdOucString tag(t,len);
00338 
00339    // If a user name is given separate it out and check if
00340    // we can do the operation
00341    XrdOucString usr;
00342    if (tag.beginswith("u:")) {
00343       usr = tag;
00344       usr.erase(usr.rfind(' '));
00345       usr.replace("u:","");
00346       // Isolate the tag
00347       tag.erase(0,tag.find(' ') + 1);
00348    }
00349    TRACEP(p, REQ, "usr: "<<usr<<", version tag: "<< tag);
00350 
00351    // If the action is requested for a user different from us we
00352    // must be 'superuser'
00353    XrdProofdClient *c = p->Client();
00354    XrdOucString grp;
00355    if (usr.length() > 0) {
00356       // Separate group info, if any
00357       if (usr.find(':') != STR_NPOS) {
00358          grp = usr;
00359          grp.erase(grp.rfind(':'));
00360          usr.erase(0,usr.find(':') + 1);
00361       } else {
00362          XrdProofGroup *g =
00363             (fMgr->GroupsMgr()) ? fMgr->GroupsMgr()->GetUserGroup(usr.c_str()) : 0;
00364          grp = g ? g->Name() : "default";
00365       }
00366       if (usr != p->Client()->User()) {
00367          if (!p->SuperUser()) {
00368             usr.insert("not allowed to change settings for usr '", 0);
00369             usr += "'";
00370             TRACEP(p, XERR, usr.c_str());
00371             response->Send(kXR_InvalidRequest, usr.c_str());
00372             return 0;
00373          }
00374          // Lookup the list
00375          if (!(c = fMgr->ClientMgr()->GetClient(usr.c_str(), grp.c_str()))) {
00376             // No: fail
00377             XrdOucString emsg("user not found or not allowed: ");
00378             emsg += usr;
00379             TRACEP(p, XERR, emsg.c_str());
00380             response->Send(kXR_InvalidRequest, emsg.c_str());
00381             return 0;
00382          }
00383       }
00384    }
00385 
00386    // Search in the list
00387    XrdROOT *r = fMgr->ROOTMgr()->GetVersion(tag.c_str());
00388    bool ok = r ? 1 : 0;
00389    if (!r && tag == "default") {
00390       // If not found we may have been requested to set the default version
00391       r = fMgr->ROOTMgr()->DefaultVersion();
00392       ok = r ? 1 : 0;
00393    }
00394 
00395    if (ok) {
00396       // Save the version in the client instance
00397       c->SetROOT(r);
00398       // Notify
00399       TRACEP(p, DBG, "default changed to "<<c->ROOT()->Tag()<<
00400                    " for {client, group} = {"<<usr<<", "<<grp<<"} ("<<c<<")");
00401       // Forward down the tree, if not leaf
00402       if (fMgr->SrvType() != kXPD_Worker) {
00403          XrdOucString buf("u:");
00404          buf += c->UI().fUser;
00405          buf += " ";
00406          buf += tag;
00407          int type = ntohl(p->Request()->proof.int1);
00408          fMgr->NetMgr()->Broadcast(type, buf.c_str(), p->Client()->User(), response);
00409       }
00410       // Acknowledge user
00411       response->Send();
00412    } else {
00413       tag.insert("tag '", 0);
00414       tag += "' not found in the list of available ROOT versions";
00415       TRACEP(p, XERR, tag.c_str());
00416       response->Send(kXR_InvalidRequest, tag.c_str());
00417    }
00418 
00419    // Over
00420    return 0;
00421 }
00422 
00423 //______________________________________________________________________________
00424 int XrdProofdAdmin::QueryWorkers(XrdProofdProtocol *p)
00425 {
00426    // Handle request for getting the list of potential workers
00427    XPDLOC(ALL, "Admin::QueryWorkers")
00428 
00429    int rc = 0;
00430    XPD_SETRESP(p, "QueryWorkers");
00431 
00432    // Send back a list of potentially available workers
00433    XrdOucString sbuf(1024);
00434    fMgr->ProofSched()->ExportInfo(sbuf);
00435 
00436    // Send buffer
00437    char *buf = (char *) sbuf.c_str();
00438    int len = sbuf.length() + 1;
00439    TRACEP(p, DBG, "sending: "<<buf);
00440 
00441    // Send back to user
00442    response->Send(buf, len);
00443 
00444    // Over
00445    return 0;
00446 }
00447 
00448 //______________________________________________________________________________
00449 int XrdProofdAdmin::GetWorkers(XrdProofdProtocol *p)
00450 {
00451    // Handle request for getting the best set of workers
00452    XPDLOC(ALL, "Admin::GetWorkers")
00453 
00454    int rc = 0;
00455    XPD_SETRESP(p, "GetWorkers");
00456 
00457    // Unmarshall the data
00458    int psid = ntohl(p->Request()->proof.sid);
00459 
00460    // Find server session
00461    XrdProofdProofServ *xps = 0;
00462    if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
00463       TRACEP(p, XERR, "session ID not found: "<<psid);
00464       response->Send(kXR_InvalidRequest,"session ID not found");
00465       return 0;
00466    }
00467    int pid = xps->SrvPID();
00468    TRACEP(p, REQ, "request from session "<<pid);
00469 
00470    // We should query the chosen resource provider
00471    XrdOucString wrks("");
00472 
00473    // Read the message associated with the request; needs to do like this because
00474    // of a bug in the XrdOucString constructor when length is 0
00475    XrdOucString msg;
00476    if (p->Request()->header.dlen > 0)
00477       msg.assign((const char *) p->Argp()->buff, 0, p->Request()->header.dlen);
00478    if (fMgr->GetWorkers(wrks, xps, msg.c_str()) < 0 ) {
00479       // Something wrong
00480       response->Send(kXR_InvalidRequest, "GetWorkers failed");
00481       return 0;
00482    }
00483 
00484    // Send buffer
00485    // In case the session was enqueued, pass an empty list.
00486    char *buf = (char *) wrks.c_str();
00487    int len = wrks.length() + 1;
00488    TRACEP(p, DBG, "sending: "<<buf);
00489 
00490    // Send back to user
00491    if (buf) {
00492       response->Send(buf, len);
00493    } else {
00494       // Something wrong
00495       response->Send(kXR_InvalidRequest, "GetWorkers failed");
00496       return 0;
00497    }
00498 
00499    // Over
00500    return 0;
00501 }
00502 
00503 //______________________________________________________________________________
00504 int XrdProofdAdmin::SetGroupProperties(XrdProofdProtocol *p)
00505 {
00506    // Handle request for setting group properties
00507    XPDLOC(ALL, "Admin::SetGroupProperties")
00508 
00509    int rc = 1;
00510    XPD_SETRESP(p, "SetGroupProperties");
00511 
00512    // User's group
00513    int   len = p->Request()->header.dlen;
00514    char *grp = new char[len+1];
00515    memcpy(grp, p->Argp()->buff, len);
00516    grp[len] = 0;
00517    TRACEP(p, DBG, "request to change priority for group '"<< grp<<"'");
00518 
00519    // Make sure is the current one of the user
00520    if (strcmp(grp, p->Client()->UI().fGroup.c_str())) {
00521       TRACEP(p, XERR, "received group does not match the user's one");
00522       response->Send(kXR_InvalidRequest,
00523                      "SetGroupProperties: received group does not match the user's one");
00524       return 0;
00525    }
00526 
00527    // The priority value
00528    int priority = ntohl(p->Request()->proof.int2);
00529 
00530    // Tell the priority manager
00531    if (fMgr && fMgr->PriorityMgr()) {
00532       XrdOucString buf;
00533       XPDFORM(buf, "%s %d", grp, priority);
00534       if (fMgr->PriorityMgr()->Pipe()->Post(XrdProofdPriorityMgr::kSetGroupPriority,
00535                                              buf.c_str()) != 0) {
00536          TRACEP(p, XERR, "problem sending message on the pipe");
00537          response->Send(kXR_ServerError,
00538                              "SetGroupProperties: problem sending message on the pipe");
00539          return 0;
00540       }
00541    }
00542 
00543    // Notify
00544    TRACEP(p, REQ, "priority for group '"<< grp<<"' has been set to "<<priority);
00545 
00546    // Acknowledge user
00547    response->Send();
00548 
00549    // Over
00550    return 0;
00551 }
00552 
00553 //______________________________________________________________________________
00554 int XrdProofdAdmin::SendMsgToUser(XrdProofdProtocol *p)
00555 {
00556    // Handle request for sending a message to a user
00557    XPDLOC(ALL, "Admin::SendMsgToUser")
00558 
00559    int rc = 0;
00560    XPD_SETRESP(p, "SendMsgToUser");
00561 
00562    // Target client (default us)
00563    XrdProofdClient *tgtclnt = p->Client();
00564    XrdProofdClient *c = 0;
00565    std::list<XrdProofdClient *>::iterator i;
00566 
00567    // Extract the user name, if any
00568    int len = p->Request()->header.dlen;
00569    if (len <= 0) {
00570       // No message: protocol error?
00571       TRACEP(p, XERR, "no message");
00572       response->Send(kXR_InvalidRequest,"SendMsgToUser: no message");
00573       return 0;
00574    }
00575 
00576    XrdOucString cmsg((const char *)p->Argp()->buff, len);
00577    XrdOucString usr;
00578    if (cmsg.beginswith("u:")) {
00579       // Extract user
00580       int isp = cmsg.find(' ');
00581       if (isp != STR_NPOS) {
00582          usr.assign(cmsg, 2, isp-1);
00583          cmsg.erase(0, isp+1);
00584       }
00585       if (usr.length() > 0) {
00586          TRACEP(p, REQ, "request for user: '"<<usr<<"'");
00587          // Find the client instance
00588          bool clntfound = 0;
00589          if ((c = fMgr->ClientMgr()->GetClient(usr.c_str(), 0))) {
00590             tgtclnt = c;
00591             clntfound = 1;
00592          }
00593          if (!clntfound) {
00594             // No user: protocol error?
00595             TRACEP(p, XERR, "target client not found");
00596             response->Send(kXR_InvalidRequest,
00597                            "SendMsgToUser: target client not found");
00598             return 0;
00599          }
00600       }
00601    }
00602    // Recheck message length
00603    if (cmsg.length() <= 0) {
00604       // No message: protocol error?
00605       TRACEP(p, XERR, "no message after user specification");
00606       response->Send(kXR_InvalidRequest,
00607                           "SendMsgToUser: no message after user specification");
00608       return 0;
00609    }
00610 
00611    // Check if allowed
00612    if (!p->SuperUser()) {
00613       if (usr.length() > 0) {
00614          if (tgtclnt != p->Client()) {
00615             TRACEP(p, XERR, "not allowed to send messages to usr '"<<usr<<"'");
00616             response->Send(kXR_InvalidRequest,
00617                                 "SendMsgToUser: not allowed to send messages to specified usr");
00618             return 0;
00619          }
00620       } else {
00621          TRACEP(p, XERR, "not allowed to send messages to connected users");
00622          response->Send(kXR_InvalidRequest,
00623                              "SendMsgToUser: not allowed to send messages to connected users");
00624          return 0;
00625       }
00626    } else {
00627       if (usr.length() <= 0) tgtclnt = 0;
00628    }
00629 
00630    // The clients to notified
00631    fMgr->ClientMgr()->Broadcast(tgtclnt, cmsg.c_str());
00632 
00633    // Acknowledge user
00634    response->Send();
00635 
00636    // Over
00637    return 0;
00638 }
00639 
00640 //______________________________________________________________________________
00641 int XrdProofdAdmin::QuerySessions(XrdProofdProtocol *p)
00642 {
00643    // Handle request for list of sessions
00644    XPDLOC(ALL, "Admin::QuerySessions")
00645 
00646    int rc = 0;
00647    XPD_SETRESP(p, "QuerySessions");
00648 
00649    XrdOucString notmsg, msg;
00650    {  // This is needed to block the session checks
00651       XpdSrvMgrCreateCnt cnt(fMgr->SessionMgr(), XrdProofdProofServMgr::kProcessCnt);
00652       msg = p->Client()->ExportSessions(notmsg, response);
00653    }
00654 
00655    if (notmsg.length() > 0) {
00656       // Some sessions seem non-responding: notify the client
00657       response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notmsg.c_str(), notmsg.length());
00658    }
00659 
00660    TRACEP(p, DBG, "sending: "<<msg);
00661 
00662    // Send back to user
00663    response->Send((void *)msg.c_str(), msg.length()+1);
00664 
00665    // Over
00666    return 0;
00667 }
00668 
00669 //______________________________________________________________________________
00670 int XrdProofdAdmin::QueryLogPaths(XrdProofdProtocol *p)
00671 {
00672    // Handle request for log paths 
00673    XPDLOC(ALL, "Admin::QueryLogPaths")
00674 
00675    int rc = 0;
00676    XPD_SETRESP(p, "QueryLogPaths");
00677 
00678    int ridx = ntohl(p->Request()->proof.int2);
00679 
00680    // Find out for which session is this request
00681    XrdOucString stag, master, user, buf;
00682    int len = p->Request()->header.dlen;
00683    if (len > 0) {
00684       buf.assign(p->Argp()->buff,0,len-1);
00685       int im = buf.find("|master:");
00686       int iu = buf.find("|user:");
00687       stag = buf;
00688       stag.erase(stag.find("|"));
00689       if (im != STR_NPOS) {
00690          master.assign(buf, im + strlen("|master:"));
00691          master.erase(master.find("|"));
00692       }
00693       if (iu != STR_NPOS) {
00694          user.assign(buf, iu + strlen("|user:"));
00695          user.erase(user.find("|"));
00696          TRACEP(p, DBG, "user: "<<user);
00697       }
00698       if (stag.beginswith('*'))
00699          stag = "";
00700    }
00701    TRACEP(p, DBG, "master: "<<master<<", user: "<<user<<", stag: "<<stag);
00702 
00703    XrdProofdClient *client = (user.length() > 0) ? 0 : p->Client();
00704    if (!client)
00705       // Find the client instance
00706       client = fMgr->ClientMgr()->GetClient(user.c_str(), 0);
00707    if (!client) {
00708       TRACEP(p, XERR, "query sess logs: client for '"<<user<<"' not found");
00709       response->Send(kXR_InvalidRequest,"QueryLogPaths: query log: client not found");
00710       return 0;
00711    }
00712 
00713    XrdOucString tag = (stag == "" && ridx >= 0) ? "last" : stag;
00714    if (stag == "" && client->Sandbox()->GuessTag(tag, ridx) != 0) {
00715       TRACEP(p, XERR, "query sess logs: session tag not found");
00716       response->Send(kXR_InvalidRequest,"QueryLogPaths: query log: session tag not found");
00717       return 0;
00718    }
00719 
00720    // Return message
00721    XrdOucString rmsg;
00722 
00723    if (master.length() <= 0) {
00724       // The session tag first
00725       rmsg += tag; rmsg += "|";
00726       // The pool URL second
00727       rmsg += fMgr->PoolURL(); rmsg += "|";
00728    }
00729 
00730    // Locate the local log file
00731    XrdOucString sdir(client->Sandbox()->Dir());
00732    sdir += "/session-";
00733    sdir += tag;
00734 
00735    // Open dir
00736    DIR *dir = opendir(sdir.c_str());
00737    if (!dir) {
00738       XrdOucString msg("cannot open dir ");
00739       msg += sdir; msg += " (errno: "; msg += errno; msg += ")";
00740       TRACEP(p, XERR, msg.c_str());
00741       response->Send(kXR_InvalidRequest, msg.c_str());
00742       return 0;
00743    }
00744    // Scan the directory to add the top master (only if top master)
00745    if (master.length() <= 0) {
00746       bool found = 0;
00747       struct dirent *ent = 0;
00748       while ((ent = (struct dirent *)readdir(dir))) {
00749          if (!strncmp(ent->d_name, "master-", 7) &&
00750             strstr(ent->d_name, ".log")) {
00751             rmsg += "|0 proof://"; rmsg += fMgr->Host(); rmsg += ':';
00752             rmsg += fMgr->Port(); rmsg += '/';
00753             rmsg += sdir; rmsg += '/'; rmsg += ent->d_name;
00754             found = 1;
00755          }
00756       }
00757    }
00758    // Close dir
00759    closedir(dir);
00760 
00761    // Now open the workers file
00762    XrdOucString wfile(sdir);
00763    wfile += "/.workers";
00764    FILE *f = fopen(wfile.c_str(), "r");
00765    if (f) {
00766       char ln[2048];
00767       while (fgets(ln, sizeof(ln), f)) {
00768          if (ln[strlen(ln)-1] == '\n')
00769             ln[strlen(ln)-1] = 0;
00770          // Locate status and url
00771          char *ps = strchr(ln, ' ');
00772          if (ps) {
00773             *ps = 0;
00774             ps++;
00775             // Locate ordinal
00776             char *po = strchr(ps, ' ');
00777             if (po) {
00778                po++;
00779                // Locate path
00780                char *pp = strchr(po, ' ');
00781                if (pp) {
00782                   *pp = 0;
00783                   pp++;
00784                   // Record now
00785                   rmsg += "|"; rmsg += po; rmsg += " ";
00786                   if (master.length() > 0) {
00787                      rmsg += master;
00788                      rmsg += ",";
00789                   }
00790                   rmsg += ln; rmsg += '/';
00791                   rmsg += pp;
00792                   // Reposition on the file name
00793                   char *ppl = strrchr(pp, '/');
00794                   pp = (ppl) ? ppl : pp;
00795                   // If the line is for a submaster, we have to get the info
00796                   // about its workers
00797                   bool ismst = (strstr(pp, "master-")) ? 1 : 0;
00798                   if (ismst) {
00799                      XrdClientUrlInfo u((const char *)&ln[0]);
00800                      XrdOucString msg(stag);
00801                      msg += "|master:";
00802                      msg += ln;
00803                      msg += "|user:";
00804                      msg += u.User;
00805                      u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
00806                      char *bmst = fMgr->NetMgr()->ReadLogPaths(u.GetUrl().c_str(), msg.c_str(), ridx);
00807                      if (bmst) {
00808                         rmsg += bmst;
00809                         free(bmst);
00810                      }
00811                   }
00812                }
00813             }
00814          }
00815       }
00816       fclose(f);
00817    }
00818 
00819    // Send back to user
00820    response->Send((void *) rmsg.c_str(), rmsg.length()+1);
00821 
00822    // Over
00823    return 0;
00824 }
00825 
00826 //______________________________________________________________________________
00827 int XrdProofdAdmin::CleanupSessions(XrdProofdProtocol *p)
00828 {
00829    // Handle request of
00830    XPDLOC(ALL, "Admin::CleanupSessions")
00831 
00832    int rc = 0;
00833    XPD_SETRESP(p, "CleanupSessions");
00834 
00835    XrdOucString cmsg;
00836 
00837    // Target client (default us)
00838    XrdProofdClient *tgtclnt = p->Client();
00839 
00840    // If super user we may be requested to cleanup everything
00841    bool all = 0;
00842    char *usr = 0;
00843    bool clntfound = 1;
00844    if (p->SuperUser()) {
00845       int what = ntohl(p->Request()->proof.int2);
00846       all = (what == 1) ? 1 : 0;
00847 
00848       if (!all) {
00849          // Get a user name, if any.
00850          // A super user can ask cleaning for clients different from itself
00851          char *buf = 0;
00852          int len = p->Request()->header.dlen;
00853          if (len > 0) {
00854             clntfound = 0;
00855             buf = p->Argp()->buff;
00856             len = (len < 9) ? len : 8;
00857          } else {
00858             buf = (char *) p->Client()->User();
00859             len = strlen(p->Client()->User());
00860          }
00861          if (len > 0) {
00862             usr = new char[len+1];
00863             memcpy(usr, buf, len);
00864             usr[len] = '\0';
00865             // Group info, if any
00866             char *grp = strstr(usr, ":");
00867             if (grp)
00868                *grp++ = 0;
00869             // Find the client instance
00870             XrdProofdClient *c = fMgr->ClientMgr()->GetClient(usr, grp);
00871             if (c) {
00872                tgtclnt = c;
00873                clntfound = 1;
00874             }
00875             TRACEP(p, REQ, "superuser, cleaning usr: "<< usr);
00876          }
00877       } else {
00878          tgtclnt = 0;
00879          TRACEP(p, REQ, "superuser, all sessions cleaned");
00880       }
00881    } else {
00882       // Define the user name for later transactions (their executed under
00883       // the admin name)
00884       int len = strlen(tgtclnt->User()) + 1;
00885       usr = new char[len+1];
00886       memcpy(usr, tgtclnt->User(), len);
00887       usr[len] = '\0';
00888    }
00889 
00890    // We cannot continue if we do not have anything to clean
00891    if (!clntfound) {
00892       TRACEP(p, DBG, "client '"<<usr<<"' has no sessions - do nothing");
00893    }
00894 
00895    // hard or soft (always hard for old clients)
00896    bool hard = (ntohl(p->Request()->proof.int3) == 1 || p->ProofProtocol() < 18) ? 1 : 0;
00897    const char *lab = hard ? "hard-reset" : "soft-reset";
00898 
00899    // Asynchronous notification to requester
00900    if (fMgr->SrvType() != kXPD_Worker) {
00901       XPDFORM(cmsg, "CleanupSessions: %s: signalling active sessions for termination", lab);
00902       response->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
00903    }
00904 
00905    // Send a termination request to client sessions
00906    XPDFORM(cmsg, "CleanupSessions: %s: cleaning up client: requested by: %s", lab, p->Link()->ID);
00907    int srvtype = ntohl(p->Request()->proof.int2);
00908    fMgr->ClientMgr()->TerminateSessions(tgtclnt, cmsg.c_str(), srvtype);
00909 
00910    // Forward down the tree only if not leaf
00911    if (hard && fMgr->SrvType() != kXPD_Worker) {
00912 
00913       // Asynchronous notification to requester
00914       XPDFORM(cmsg, "CleanupSessions: %s: forwarding the reset request to next tier(s) ", lab);
00915       response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) cmsg.c_str(), cmsg.length());
00916 
00917       int type = ntohl(p->Request()->proof.int1);
00918       fMgr->NetMgr()->Broadcast(type, usr, p->Client()->User(), response, 1);
00919    }
00920 
00921    // Wait just a bit before testing the activity of the session manager
00922    sleep(1);
00923 
00924    // Additional waiting (max 10 secs) depends on the activity of the session manager
00925    int twait = 10;
00926    while (twait-- > 0 &&
00927           fMgr->SessionMgr()->CheckCounter(XrdProofdProofServMgr::kCleanSessionsCnt) > 0) {
00928       if (twait < 7) {
00929          XPDFORM(cmsg, "CleanupSessions: %s: wait %d more seconds for completion ...", lab, twait);
00930          response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) cmsg.c_str(), cmsg.length());
00931       }
00932       sleep(1);
00933    }
00934 
00935    // Cleanup usr
00936    SafeDelArray(usr);
00937 
00938    // Acknowledge user
00939    response->Send();
00940 
00941    // Over
00942    return 0;
00943 }
00944 
00945 //______________________________________________________________________________
00946 int XrdProofdAdmin::SetSessionAlias(XrdProofdProtocol *p)
00947 {
00948    // Handle request for setting the session alias
00949    XPDLOC(ALL, "Admin::SetSessionAlias")
00950 
00951    int rc = 0;
00952    XPD_SETRESP(p, "SetSessionAlias");
00953 
00954    //
00955    // Specific info about a session
00956    int psid = ntohl(p->Request()->proof.sid);
00957    XrdProofdProofServ *xps = 0;
00958    if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
00959       TRACEP(p, XERR, "session ID not found: "<<psid);
00960       response->Send(kXR_InvalidRequest,"SetSessionAlias: session ID not found");
00961       return 0;
00962    }
00963 
00964    // Set session alias
00965    const char *msg = (const char *) p->Argp()->buff;
00966    int   len = p->Request()->header.dlen;
00967    if (len > kXPROOFSRVALIASMAX - 1)
00968       len = kXPROOFSRVALIASMAX - 1;
00969 
00970    // Save tag
00971    if (len > 0 && msg) {
00972       xps->SetAlias(msg);
00973       if (TRACING(DBG)) {
00974          XrdOucString alias(xps->Alias());
00975          TRACEP(p, DBG, "session alias set to: "<<alias);
00976       }
00977    }
00978 
00979    // Acknowledge user
00980    response->Send();
00981 
00982    // Over
00983    return 0;
00984 }
00985 
00986 //______________________________________________________________________________
00987 int XrdProofdAdmin::SetSessionTag(XrdProofdProtocol *p)
00988 {
00989    // Handle request for setting the session tag
00990    XPDLOC(ALL, "Admin::SetSessionTag")
00991 
00992    int rc = 0;
00993    XPD_SETRESP(p, "SetSessionTag");
00994    //
00995    // Specific info about a session
00996    int psid = ntohl(p->Request()->proof.sid);
00997    XrdProofdProofServ *xps = 0;
00998    if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
00999       TRACEP(p, XERR, "session ID not found: "<<psid);
01000       response->Send(kXR_InvalidRequest,"SetSessionTag: session ID not found");
01001       return 0;
01002    }
01003 
01004    // Set session tag
01005    const char *msg = (const char *) p->Argp()->buff;
01006    int   len = p->Request()->header.dlen;
01007    if (len > kXPROOFSRVTAGMAX - 1)
01008       len = kXPROOFSRVTAGMAX - 1;
01009 
01010    // Save tag
01011    if (len > 0 && msg) {
01012       xps->SetTag(msg);
01013       if (TRACING(DBG)) {
01014          XrdOucString tag(xps->Tag());
01015          TRACEP(p, DBG, "session tag set to: "<<tag);
01016       }
01017    }
01018 
01019    // Acknowledge user
01020    response->Send();
01021 
01022    // Over
01023    return 0;
01024 }
01025 
01026 //______________________________________________________________________________
01027 int XrdProofdAdmin::ReleaseWorker(XrdProofdProtocol *p)
01028 {
01029    // Handle request for releasing a worker
01030    XPDLOC(ALL, "Admin::ReleaseWorker")
01031 
01032    int rc = 0;
01033    XPD_SETRESP(p, "ReleaseWorker");
01034    //
01035    // Specific info about a session
01036    int psid = ntohl(p->Request()->proof.sid);
01037    XrdProofdProofServ *xps = 0;
01038    if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
01039       TRACEP(p, XERR, "session ID not found: "<<psid);
01040       response->Send(kXR_InvalidRequest,"ReleaseWorker: session ID not found");
01041       return 0;
01042    }
01043 
01044    // Set session tag
01045    const char *msg = (const char *) p->Argp()->buff;
01046    int   len = p->Request()->header.dlen;
01047    if (len > kXPROOFSRVTAGMAX - 1)
01048       len = kXPROOFSRVTAGMAX - 1;
01049 
01050    // Save tag
01051    if (len > 0 && msg) {
01052       xps->RemoveWorker(msg);
01053       TRACEP(p, DBG, "worker \""<<msg<<"\" released");
01054       if (TRACING(HDBG)) fMgr->NetMgr()->Dump();
01055    }
01056 
01057    // Acknowledge user
01058    response->Send();
01059 
01060    // Over
01061    return 0;
01062 }
01063 
01064 //______________________________________________________________________________
01065 int XrdProofdAdmin::CheckForbiddenChars(const char *s)
01066 {
01067    // Check is 's' contains any of the forbidden chars '(){};'
01068    // Return 0 if OK (no forbidden chars), -1 in not OK
01069 
01070    int len = 0;
01071    if (!s || (len = strlen(s)) <= 0) return 0;
01072 
01073    int j = len;
01074    while (j--) {
01075       char c = s[j];
01076       if (c == '(' || c == ')' || c == '{' || c == '}' || c == ';') {
01077          return -1;
01078       }
01079    }
01080    // Done
01081    return 0;
01082 }
01083 
01084 //______________________________________________________________________________
01085 int XrdProofdAdmin::Exec(XrdProofdProtocol *p)
01086 {
01087    // Handle request of cleaning parts of the sandbox
01088 
01089    XPDLOC(ALL, "Admin::Exec")
01090 
01091    // Commands; must be synchronized with EAdminExecType in XProofProtocol.h
01092 #if !defined(__APPLE__)
01093    const char *cmds[] = { "rm", "ls", "more", "grep", "tail", "md5sum", "stat", "find" };
01094 #else
01095    const char *cmds[] = { "rm", "ls", "more", "grep", "tail", "md5", "stat", "find" };
01096 #endif
01097    const char *actcmds[] = { "remove", "access", "open", "open", "open", "open", "stat", "find"};
01098 
01099    int rc = 0;
01100    XPD_SETRESP(p, "Exec");
01101 
01102    XrdOucString emsg;
01103 
01104    // Target client (default us)
01105    XrdProofdClient *tgtclnt = p->Client();
01106    if (!tgtclnt) {
01107       emsg = "client instance not found";
01108       TRACEP(p, XERR, emsg);
01109       response->Send(kXR_InvalidRequest, emsg.c_str());
01110       return 0;
01111    }
01112 
01113    // Action type
01114    int action = ntohl(p->Request()->proof.int2);
01115    if (action < kRm || action > kFind) {
01116       emsg = "unknown action type: ";
01117       emsg += action;
01118       TRACEP(p, XERR, emsg);
01119       response->Send(kXR_InvalidRequest, emsg.c_str());
01120       return 0;
01121    }
01122 
01123    // Parse the string
01124    int dlen = p->Request()->header.dlen;
01125    XrdOucString msg, node, path, opt;
01126    if (dlen > 0 && p->Argp()->buff) {
01127       msg.assign((const char *)p->Argp()->buff, 0, dlen);
01128       // Parse
01129       emsg = "";
01130       int from = 0;
01131       if ((from = msg.tokenize(node, from, '|')) != -1) {
01132          if ((from = msg.tokenize(path, from, '|')) != -1) {
01133             from = msg.tokenize(opt, from, '|');
01134          } else {
01135             emsg = "'path' not found in message";
01136          }
01137       } else {
01138          emsg = "'node' not found in message";
01139       }
01140       if (emsg.length() > 0) {
01141          TRACEP(p, XERR, emsg);
01142          response->Send(kXR_InvalidRequest, emsg.c_str());
01143          return 0;
01144       }
01145    }
01146 
01147    // Path and opt cannot contain multiple commands (e.g. file; rm *)
01148    if (CheckForbiddenChars(path.c_str()) != 0) {
01149       emsg = "none of the characters '(){};' are allowed in path string ("; emsg += path; emsg += ")";
01150       TRACEP(p, XERR, emsg);
01151       response->Send(kXR_InvalidRequest, emsg.c_str());
01152       return 0;
01153    }
01154    if (CheckForbiddenChars(opt.c_str()) != 0) {
01155       emsg = "none of the characters '(){};' are allowed in opt string ("; emsg += opt; emsg += ")";
01156       TRACEP(p, XERR, emsg);
01157       response->Send(kXR_InvalidRequest, emsg.c_str());
01158       return 0;
01159    }
01160 
01161    // Check if we have to forward this request
01162    XrdOucString result;
01163    bool islocal = fMgr->NetMgr()->IsLocal(node.c_str(), 1);
01164    if (fMgr->SrvType() != kXPD_Worker) {
01165       int type = ntohl(p->Request()->proof.int1);
01166       if (node == "all") {
01167          if (action == kStat || action == kMd5sum) {
01168             emsg = "action cannot be run in mode 'all' - running on master only";
01169             response->Send(kXR_attn, kXPD_srvmsg, 2, (char *)emsg.c_str(), emsg.length());
01170          } else {
01171             fMgr->NetMgr()->Broadcast(type, msg.c_str(), p->Client()->User(), response, 0, action);
01172          }
01173       } else if (!islocal) {
01174          // Create 'url'
01175          XrdOucString u = (p->Client()->User()) ? p->Client()->User() : fMgr->EffectiveUser();
01176          u += '@';
01177          u += node;
01178          TRACEP(p, HDBG, "sending request to "<<u);
01179          // Send request
01180          XrdClientMessage *xrsp;
01181          if (!(xrsp = fMgr->NetMgr()->Send(u.c_str(), type, msg.c_str(), 0, response, 0, action))) {
01182             TRACEP(p, XERR, "problems sending request to "<<u);
01183          } else {
01184             if (action == kStat || action == kMd5sum) {
01185                // Extract the result
01186                result.assign((const char *) xrsp->GetData(), 0, xrsp->DataLen());
01187             } else if (action == kRm) {
01188                // Send 'OK'
01189                result = "OK";
01190             }
01191          }
01192          // Cleanup answer
01193          SafeDelete(xrsp);
01194       }
01195    }
01196 
01197    // We may not have been requested to execute the command
01198    if (node != "all" && !islocal) {
01199       // We are done: acknowledge user ...
01200       if (result.length() > 0) {
01201          response->Send(result.c_str());
01202       } else {
01203          response->Send();
01204       }
01205       // ... and go
01206       return 0;
01207    }
01208 
01209    // Here we execute the request
01210    XrdOucString cmd, pfx(fMgr->Host());
01211    pfx += ":"; pfx += fMgr->Port();
01212 
01213    // Notify the client
01214    if (node != "all") {
01215       if (action != kStat && action != kMd5sum && action != kRm) {
01216          emsg = "Node: "; emsg += pfx;
01217          emsg += "\n-----";
01218          response->Send(kXR_attn, kXPD_srvmsg, 2, (char *)emsg.c_str(), emsg.length());
01219       }
01220       pfx = "";
01221    } else {
01222       pfx += "| ";
01223    }
01224 
01225    // Get the full path, check if in sandbox and if the user is allowed
01226    // to access it
01227    XrdOucString fullpath(path);
01228    bool sandbox = 0;
01229    bool haswild = (fullpath.find('*') != STR_NPOS) ? 1 : 0;
01230    int check = (action == kMore || action == kTail ||
01231                 action == kGrep || action == kMd5sum) ? 2 : 1;
01232    if ((action == kRm || action == kLs) && haswild) check = 0;
01233    int rccp = 0;
01234    struct stat st;
01235    if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
01236                          fullpath, check, sandbox, &st, emsg)) != 0) {
01237       if (rccp == -2) {
01238          emsg = cmds[action];
01239          emsg += ": cannot ";
01240          emsg += actcmds[action];
01241          emsg += " `";
01242          emsg += fullpath;
01243          emsg += "': No such file or directory";
01244       } else if (rccp == -3) {
01245          emsg = cmds[action];
01246          emsg += ": cannot stat ";
01247          emsg += fullpath;
01248          emsg += ": errno: ";
01249          emsg += (int) errno;
01250       } else if (rccp == -4) {
01251          emsg = cmds[action];
01252          emsg += ": ";
01253          emsg += fullpath;
01254          emsg += ": Is not a regular file";
01255       }
01256       TRACEP(p, XERR, emsg);
01257       response->Send(kXR_InvalidRequest, emsg.c_str());
01258       return 0;
01259    }
01260 
01261    // Additional checks for remove requests
01262    if (action == kRm) {
01263       // Ownership required and no support for wild cards for absolute paths
01264       if (!sandbox) {
01265          if (haswild) {
01266             emsg = "not allowed to rm with wild cards on path: ";
01267             emsg += fullpath;
01268             TRACEP(p, XERR, emsg);
01269             response->Send(kXR_InvalidRequest, emsg.c_str());
01270             return 0;
01271          }
01272          if ((int) st.st_uid != tgtclnt->UI().fUid || (int) st.st_gid != tgtclnt->UI().fGid) {
01273             emsg = "rm on path: ";
01274             emsg += fullpath;
01275             emsg += " requires ownership; path owned by: (";
01276             emsg += (int) st.st_uid; emsg += ",";
01277             emsg += (int) st.st_gid; emsg += ")";
01278             TRACEP(p, XERR, emsg);
01279             response->Send(kXR_InvalidRequest, emsg.c_str());
01280             return 0;
01281          }
01282       } else {
01283          // Will not allow to remove basic sandbox sub-dirs
01284          const char *sbdir[5] = {"queries", "packages", "cache", "datasets", "data"};
01285          while (fullpath.endswith('/'))
01286             fullpath.erasefromend(1);
01287          XrdOucString sball(tgtclnt->Sandbox()->Dir()), sball1 = sball;
01288          sball += "/*"; sball1 += "/*/";
01289          if (fullpath == sball || fullpath == sball1) {
01290             emsg = "removing all sandbox directory is not allowed: ";
01291             emsg += fullpath;
01292             TRACEP(p, XERR, emsg);
01293             response->Send(kXR_InvalidRequest, emsg.c_str());
01294             return 0;
01295          }
01296          int kk = 5;
01297          while (kk--) {
01298             if (fullpath.endswith(sbdir[kk])) {
01299                emsg = "removing a basic sandbox directory is not allowed: ";
01300                emsg += fullpath;
01301                TRACEP(p, XERR, emsg);
01302                response->Send(kXR_InvalidRequest, emsg.c_str());
01303                return 0;
01304             }
01305          }
01306       }
01307 
01308       // Prepare the command
01309       cmd = cmds[action];
01310       if (opt.length() <= 0) opt = "-f";
01311       cmd += " "; cmd += opt;
01312       cmd += " "; cmd += fullpath;
01313       cmd += " 2>&1";
01314 
01315    } else {
01316 
01317       XrdOucString rederr;
01318       cmd = cmds[action];
01319       switch (action) {
01320          case kLs:
01321             if (opt.length() <= 0) opt = "-C";
01322             rederr = " 2>&1";
01323             break;
01324          case kMore:
01325          case kGrep:
01326          case kTail:
01327          case kFind:
01328             rederr = " 2>&1";
01329             break;
01330          case kStat:
01331             cmd = "";
01332             opt = "";
01333             break;
01334          case kMd5sum:
01335             opt = "";
01336             rederr = " 2>&1";
01337             break;
01338          default:
01339             emsg = "undefined action: ";
01340             emsg = action;
01341             emsg = " - protocol error!";
01342             TRACEP(p, XERR, emsg);
01343             response->Send(kXR_ServerError, emsg.c_str());
01344             break;
01345       }
01346       if (action != kFind) {
01347          if (cmd.length() > 0) cmd += " ";
01348          if (opt.length() > 0) { cmd += opt; cmd += " ";}
01349          cmd += fullpath;
01350       } else {
01351          cmd += " "; cmd += fullpath;
01352          if (opt.length() > 0) { cmd += " "; cmd += opt; }
01353       }
01354       if (rederr.length() > 0) cmd += rederr;
01355    }
01356 
01357    // Run the command now
01358    emsg = pfx;
01359    if (ExecCmd(p, response, action, cmd.c_str(), emsg) != 0) {
01360       TRACEP(p, XERR, emsg);
01361       response->Send(kXR_ServerError, emsg.c_str());
01362    } else {
01363       // Done
01364       switch (action) {
01365          case kStat:
01366          case kMd5sum:
01367             response->Send(emsg.c_str());
01368             break;
01369          case kRm:
01370             response->Send("OK");
01371             break;
01372          default:
01373             response->Send();
01374             break;
01375       }
01376    }
01377 
01378    // Over
01379    return 0;
01380 }
01381 
01382 //______________________________________________________________________________
01383 int XrdProofdAdmin::ExecCmd(XrdProofdProtocol *p, XrdProofdResponse *r,
01384                          int action, const char *cmd, XrdOucString &emsg)
01385 {
01386    // Low-level execution handler. The commands must be executed in user space.
01387    // We do that by forking and logging as user in the forked instance. The
01388    // parent will just send over te messages received from the user-child via
01389    // the pipe.
01390    // Return 0 on success, -1 on error
01391 
01392    XPDLOC(ALL, "Admin::ExecCmd")
01393 
01394    int rc = 0;
01395    XrdOucString pfx = emsg;
01396    emsg = "";
01397 
01398    // We do it via the shell
01399    if (!cmd || strlen(cmd) <= 0) {
01400       emsg = "undefined command!";
01401       return -1;
01402    }
01403 
01404    // Pipe for child-to-parent communications
01405    XrdProofdPipe pp;
01406    if (!pp.IsValid()) {
01407       emsg = "cannot create the pipe";
01408       return -1;
01409    }
01410 
01411    // Fork a test agent process to handle this session
01412    TRACEP(p, DBG, "forking to execute in the private sandbox");
01413    int pid = -1;
01414    if (!(pid = fMgr->Sched()->Fork("adminexeccmd"))) {
01415       // Child process
01416       // We set to the user environment as we must to run the command
01417       // in the user space
01418       if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
01419          emsg = "SetUserEnvironment did not return OK";
01420          rc = 1;
01421       } else {
01422          // Execute the command
01423          if (action == kStat) {
01424             struct stat st;
01425             if ((stat(cmd, &st)) != 0) {
01426                if (errno == ENOENT) {
01427                   emsg += "stat: cannot stat `";
01428                   emsg += cmd;
01429                   emsg += "': No such file or directory";
01430                } else {
01431                   emsg += "stat: cannot stat ";
01432                   emsg += cmd;
01433                   emsg += ": errno: ";
01434                   emsg += (int) errno;
01435                }
01436             } else {
01437                // Fill the buffer and go
01438                char msg[256];
01439                int  islink = S_ISLNK(st.st_mode);
01440                sprintf(msg, "%ld %ld %d %d %d %lld %ld %d", (long)st.st_dev,
01441                         (long)st.st_ino, st.st_mode, (int)(st.st_uid),
01442                         (int)(st.st_gid), (kXR_int64)st.st_size, st.st_mtime, islink);
01443                emsg = msg;
01444             }
01445          } else {
01446             // Execute the command in a pipe
01447             FILE *fp = popen(cmd, "r");
01448             if (!fp) {
01449                emsg = "could not run '"; emsg += cmd; emsg += "'";
01450                rc = 1;
01451             } else {
01452                // Read line by line
01453                int pfxlen = pfx.length();
01454                int len = 0;
01455                char line[2048];
01456                char buf[1024];
01457                int bufsiz = 1024, left = bufsiz - 1, lines = 0;
01458                while (fgets(line, sizeof(line), fp)) {
01459                   // Parse the line
01460                   int llen = strlen(line);
01461                   lines++;
01462                   // If md5sum, we need to parse only the first line
01463                   if (lines == 1 && action == kMd5sum) {
01464                      if (line[llen-1] == '\n') {
01465                         line[llen-1] = '\0';
01466                         llen--;
01467                      }
01468 #if !defined(__APPLE__)
01469                      // The first token
01470                      XrdOucString sl(line);
01471                      sl.tokenize(emsg, 0, ' ');
01472 #else
01473                      // The last token
01474                      XrdOucString sl(line), tkn;
01475                      int from = 0;
01476                      while ((from = sl.tokenize(tkn, from, ' ')) != STR_NPOS) {
01477                         emsg = tkn;
01478                      }
01479 #endif
01480                      break;
01481                   }
01482                   // Send over this part, if no more space
01483                   if ((llen + pfxlen) > left) {
01484                      buf[len] = '\0';
01485                      if (buf[len-1] == '\n') buf[len-1] = '\0';
01486                      if (r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) &buf[0], len) != 0) {
01487                         emsg = "error sending message to requester";
01488                         rc = 1;
01489                         break;
01490                      }
01491                      buf[0] = 0;
01492                      len = 0;
01493                      left = bufsiz -1;
01494                   }
01495                   // Add prefix to the buffer, if any
01496                   if (pfxlen > 0) {
01497                      memcpy(buf+len, pfx.c_str(), pfxlen);
01498                      len += pfxlen;
01499                      left -= pfxlen;
01500                   }
01501                   // Add line to the buffer
01502                   memcpy(buf+len, line, llen);
01503                   len += llen;
01504                   left -= llen;
01505                   // Check if we have been interrupted
01506                   if (lines > 0 && !(lines % 10)) {
01507                      char b[1];
01508                      if (p->Link()->Peek(&b[0], 1, 0) == 1) {
01509                         p->Process(p->Link());
01510                         if (p->IsCtrlC()) break;
01511                      }
01512                   }
01513                }
01514                // Send the last bunch
01515                if (len > 0) {
01516                   buf[len] = '\0';
01517                   if (buf[len-1] == '\n') buf[len-1] = '\0';
01518                   if (r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) &buf[0], len) != 0) {
01519                      emsg = "error sending message to requester";
01520                      rc = 1;
01521                   }
01522                }
01523                // Close the pipe
01524                int rcpc = 0;
01525                if (rc == 0 && (rcpc = pclose(fp)) == -1) {
01526                   emsg = "could not close the command pipe";
01527                   rc = 1;
01528                }
01529                if (WEXITSTATUS(rcpc) != 0) {
01530                   emsg = "failure: return code: ";
01531                   emsg += (int) WEXITSTATUS(rcpc);
01532                   rc = 1;
01533                }
01534             }
01535          }
01536       }
01537       // Send error, if any
01538       if (rc == 1) {
01539          // Post Error
01540          if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
01541       }
01542 
01543       // End-Of-Transmission
01544       if (pp.Post(0, emsg.c_str()) != 0) rc = 1;
01545 
01546       // Done
01547       exit(rc);
01548    }
01549 
01550    // Parent process
01551    if (pid < 0) {
01552       emsg = "forking failed - errno: "; emsg += (int) errno;
01553       return -1;
01554    }
01555 
01556    // now we wait for the callback to be (successfully) established
01557    TRACEP(p, DBG, "forking OK: wait for information");
01558 
01559    // Read status-of-setup from pipe
01560    int prc = 0, rst = -1;
01561    // We wait for 60 secs max among transfers
01562    while (rst < 0 && rc >= 0) {
01563       while ((prc = pp.Poll(60)) > 0) {
01564          XpdMsg msg;
01565          if (pp.Recv(msg) != 0) {
01566             emsg = "error receiving message from pipe";
01567             return -1;
01568          }
01569          // Status is the message type
01570          rst = msg.Type();
01571          // Read string, if any
01572          XrdOucString buf;
01573          if (rst < 0) {
01574             buf = msg.Buf();
01575             if (buf.length() <= 0) {
01576                emsg = "error reading string from received message";
01577                return -1;
01578             }
01579             // Store error message
01580             emsg = buf;
01581          } else {
01582             if (action == kMd5sum || action == kStat) {
01583                buf = msg.Buf();
01584                if (buf.length() <= 0) {
01585                   emsg = "error reading string from received message";
01586                  return -1;
01587                }
01588                // Store md5sum
01589                emsg = buf;
01590             }
01591             // Done
01592             break;
01593          }
01594       }
01595       if (prc == 0) {
01596          emsg = "timeout from poll";
01597          return -1;
01598       } else if (prc < 0) {
01599          emsg = "error from poll - errno: "; emsg += -prc;
01600          return -1;
01601       }
01602    }
01603 
01604    // Done
01605    return rc;
01606 }
01607 
01608 //______________________________________________________________________________
01609 int XrdProofdAdmin::CheckPath(bool superuser, const char *sbdir,
01610                               XrdOucString &fullpath, int check, bool &sandbox,
01611                               struct stat *st, XrdOucString &emsg)
01612 {
01613    // Handle request for sending a file
01614 
01615    if (!sbdir || strlen(sbdir) <= 0) {
01616       emsg = "CheckPath: sandbox dir undefined!";
01617       return -1;
01618    }
01619 
01620    // Get the full path and check if in sandbox
01621    XrdOucString path(fullpath);
01622    sandbox = 0;
01623    if (path.beginswith('/')) {
01624       fullpath = path;
01625       if (fullpath.beginswith(sbdir)) sandbox = 1;
01626    } else {
01627       if (path.beginswith("../")) path.erase(0,2);
01628       if (path.beginswith("./") || path.beginswith("~/")) path.erase(0,1);
01629       if (!path.beginswith("/")) path.insert('/',0);
01630       fullpath = sbdir;
01631       fullpath += path;
01632       sandbox = 1;
01633    }
01634    fullpath.replace("//","/");
01635 
01636    // If the path is absolute, we must check a normal user is allowed to browse
01637    if (!sandbox && !superuser) {
01638       bool notfound = 1;
01639       std::list<XrdOucString>::iterator si = fExportPaths.begin();
01640       while (si != fExportPaths.end()) {
01641          if (path.beginswith((*si).c_str())) {
01642             notfound = 0;
01643             break;
01644          }
01645          si++;
01646       }
01647       if (notfound) {
01648          emsg = "CheckPath: not allowed to run the requested action on ";
01649          emsg += path;
01650          return -1;
01651       }
01652    }
01653 
01654    if (check > 0 && st) {
01655       // Check if the file exists
01656       if (stat(fullpath.c_str(), st) != 0) {
01657          if (errno == ENOENT) {
01658             return -2;
01659          } else {
01660             return -3;
01661          }
01662       }
01663 
01664       // Certain actions require a regular file
01665       if ((check == 2) && !S_ISREG(st->st_mode)) return -4;
01666    }
01667 
01668    // Done
01669    return 0;
01670 }
01671 
01672 //______________________________________________________________________________
01673 int XrdProofdAdmin::GetFile(XrdProofdProtocol *p)
01674 {
01675    // Handle request for sending a file
01676 
01677    XPDLOC(ALL, "Admin::GetFile")
01678 
01679    int rc = 0;
01680    XPD_SETRESP(p, "GetFile");
01681 
01682    XrdOucString emsg;
01683 
01684    // Target client (default us)
01685    XrdProofdClient *tgtclnt = p->Client();
01686    if (!tgtclnt) {
01687       emsg = "client instance not found";
01688       TRACEP(p, XERR, emsg);
01689       response->Send(kXR_InvalidRequest, emsg.c_str());
01690       return 0;
01691    }
01692 
01693    // Parse the string
01694    int dlen = p->Request()->header.dlen;
01695    XrdOucString path;
01696    if (dlen > 0 && p->Argp()->buff) {
01697       path.assign((const char *)p->Argp()->buff, 0, dlen);
01698       if (path.length() <= 0) {
01699          TRACEP(p, XERR, "path missing!");
01700          response->Send(kXR_InvalidRequest, "path missing!");
01701          return 0;
01702       }
01703    }
01704 
01705    // Get the full path, check if in sandbox and if the user is allowed
01706    // to access it
01707    XrdOucString fullpath(path);
01708    bool sandbox = 0, check = 2;
01709    int rccp = 0;
01710    struct stat st;
01711    if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
01712                          fullpath, check, sandbox, &st, emsg)) != 0) {
01713       if (rccp == -2) {
01714          emsg = "Cannot open `";
01715          emsg += fullpath;
01716          emsg += "': No such file or directory";
01717       } else if (rccp == -3) {
01718          emsg = "Cannot stat `";
01719          emsg += fullpath;
01720          emsg += "': errno: ";
01721          emsg += (int) errno;
01722       } else if (rccp == -4) {
01723          emsg = fullpath;
01724          emsg += " is not a regular file";
01725       }
01726       TRACEP(p, XERR, emsg);
01727       response->Send(kXR_InvalidRequest, emsg.c_str());
01728       return 0;
01729    }
01730 
01731    // Pipe for child-to-parent communications
01732    XrdProofdPipe pp;
01733    if (!pp.IsValid()) {
01734       emsg = "cannot create the pipe for internal communications";
01735       TRACEP(p, XERR, emsg);
01736       response->Send(kXR_InvalidRequest, emsg.c_str());
01737    }
01738 
01739    // Fork a test agent process to handle this session
01740    TRACEP(p, DBG, "forking to execute in the private sandbox");
01741    int pid = -1;
01742    if (!(pid = fMgr->Sched()->Fork("admingetfile"))) {
01743 
01744       // Child process
01745       // We set to the user environment as we must to run the command
01746       // in the user space
01747       if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
01748          emsg = "SetUserEnvironment did not return OK";
01749          rc = 1;
01750       } else {
01751 
01752          // Open the file
01753          int fd = open(fullpath.c_str(), O_RDONLY);
01754          if (fd < 0) {
01755             emsg = "cannot open file: ";
01756             emsg += fullpath;
01757             emsg += " - errno:";
01758             emsg += (int) errno;
01759             TRACEP(p, XERR, emsg);
01760             response->Send(kXR_ServerError, emsg.c_str());
01761             rc = 1;
01762 
01763          } else {
01764             // Send the size as OK message
01765             char sizmsg[64];
01766             sprintf(sizmsg, "%lld", (kXR_int64) st.st_size);
01767             response->Send((const char *) &sizmsg[0]);
01768             TRACEP(p, XERR, "size is "<<sizmsg<<" bytes");
01769          }
01770          // Now we send the content
01771          const int kMAXBUF = 16384;
01772          char buf[kMAXBUF];
01773          off_t pos = 0;
01774          lseek(fd, pos, SEEK_SET);
01775 
01776          while (rc == 0 && pos < st.st_size) {
01777             off_t left = st.st_size - pos;
01778             if (left > kMAXBUF) left = kMAXBUF;
01779 
01780             int siz;
01781             while ((siz = read(fd, &buf[0], left)) < 0 && errno == EINTR)
01782                errno = 0;
01783             if (siz < 0 || siz != left) {
01784                emsg = "error reading from file: errno: ";
01785                emsg += (int) errno;
01786                rc = 1;
01787                break;
01788             }
01789 
01790             int src = 0;
01791             if ((src = response->Send(kXR_attn, kXPD_msg, (void *)&buf[0], left)) != 0) {
01792                emsg = "error reading from file: errno: ";
01793                emsg += src;
01794                rc = 1;
01795                break;
01796             }
01797             // Re-position
01798             pos += left;
01799             // Reset the timeout
01800             if (pp.Post(0, "") != 0) {
01801                rc = 1;
01802                break;
01803             }
01804          }
01805          // Close the file
01806          close(fd);
01807          // Send error, if any
01808          if (rc != 0) {
01809             TRACEP(p, XERR, emsg);
01810             response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) emsg.c_str(), emsg.length());
01811          }
01812       }
01813 
01814       // Send error, if any
01815       if (rc == 1) {
01816          // Post Error
01817          if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
01818       } else {
01819          // End-Of-Transmission
01820          if (pp.Post(1, "") != 0) rc = 1;
01821       }
01822 
01823       // Done
01824       exit(rc);
01825    }
01826 
01827    // Parent process
01828    if (pid < 0) {
01829       emsg = "forking failed - errno: "; emsg += (int) errno;
01830       TRACEP(p, XERR, emsg);
01831       response->Send(kXR_ServerError, emsg.c_str());
01832       return 0;
01833    }
01834 
01835    // The parent is done: wait for the child
01836    TRACEP(p, DBG, "forking OK: execution will continue in the child process");
01837 
01838    // Wait for end-of-operations from pipe
01839    int prc = 0, rst = 0;
01840    // We wait for 60 secs max among transfers
01841    while (rst == 0 && rc >= 0) {
01842       while ((prc = pp.Poll(60)) > 0) {
01843          XpdMsg msg;
01844          if (pp.Recv(msg) != 0) {
01845             emsg = "error receiving message from pipe";
01846             return -1;
01847          }
01848          // Status is the message type
01849          rst = msg.Type();
01850          // Read string, if any
01851          if (rst < 0) {
01852             // Error
01853             rc = -1;
01854             // Store error message
01855             emsg = msg.Buf();
01856             if (emsg.length() <= 0) {
01857                emsg = "error reading string from received message";
01858             }
01859             // We stop here
01860             break;
01861          } else if (rst > 0) {
01862             // We are done
01863             break;
01864          }
01865       }
01866       if (prc == 0) {
01867          emsg = "timeout from poll";
01868          rc = -1;
01869       } else if (prc < 0) {
01870          emsg = "error from poll - errno: "; emsg += -prc;
01871          rc = -1;
01872       }
01873    }
01874 
01875    // The parent is done
01876    TRACEP(p, DBG, "execution over: "<< ((rc == 0) ? "ok" : "failed"));
01877 
01878    // Done
01879    return 0;
01880 }
01881 
01882 //______________________________________________________________________________
01883 int XrdProofdAdmin::PutFile(XrdProofdProtocol *p)
01884 {
01885    // Handle request for recieving a file
01886 
01887    XPDLOC(ALL, "Admin::PutFile")
01888 
01889    int rc = 0;
01890    XPD_SETRESP(p, "PutFile");
01891 
01892    XrdOucString emsg;
01893 
01894    // Target client (default us)
01895    XrdProofdClient *tgtclnt = p->Client();
01896    if (!tgtclnt) {
01897       emsg = "client instance not found";
01898       TRACEP(p, XERR, emsg);
01899       response->Send(kXR_InvalidRequest, emsg.c_str());
01900       return 0;
01901    }
01902 
01903    // Parse the string
01904    kXR_int64 size = -1;
01905    int dlen = p->Request()->header.dlen;
01906    XrdOucString cmd, path, ssiz, opt;
01907    if (dlen > 0 && p->Argp()->buff) {
01908       cmd.assign((const char *)p->Argp()->buff, 0, dlen);
01909       if (cmd.length() <= 0) {
01910          TRACEP(p, XERR, "input buffer missing!");
01911          response->Send(kXR_InvalidRequest, "input buffer missing!");
01912          return 0;
01913       }
01914       int from = 0;
01915       if ((from = cmd.tokenize(path, from, ' ')) < 0) {
01916          TRACEP(p, XERR, "cannot resolve path!");
01917          response->Send(kXR_InvalidRequest, "cannot resolve path!");
01918          return 0;
01919       }
01920       if ((from = cmd.tokenize(ssiz, from, ' ')) < 0) {
01921          TRACEP(p, XERR, "cannot resolve word with size!");
01922          response->Send(kXR_InvalidRequest, "cannot resolve word with size!");
01923          return 0;
01924       }
01925       // Extract size
01926       sscanf(ssiz.c_str(), "%lld", &size);
01927       if (size < 0) {
01928          TRACEP(p, XERR, "cannot resolve size!");
01929          response->Send(kXR_InvalidRequest, "cannot resolve size!");
01930          return 0;
01931       }
01932       // Any option?
01933       cmd.tokenize(opt, from, ' ');
01934    }
01935    TRACEP(p, DBG, "path: '"<<path<<"'; size: "<<size<<" bytes; opt: '"<<opt<<"'");
01936 
01937    // Default open and mode flags
01938    kXR_unt32 openflags = O_WRONLY | O_TRUNC | O_CREAT;
01939    kXR_unt32 modeflags = 0600;
01940 
01941    // Get the full path and check if in sandbox and if the user is allowed
01942    // to create/access it
01943    XrdOucString fullpath(path);
01944    bool sandbox = 0, check = 1;
01945    struct stat st;
01946    int rccp = 0;
01947    if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
01948                          fullpath, check, sandbox, &st, emsg)) != 0) {
01949       if (rccp == -3) {
01950          emsg = "File `";
01951          emsg += fullpath;
01952          emsg += "' exists but cannot be stat: errno: ";
01953          emsg += (int) errno;
01954       }
01955       if (rccp != -2) {
01956          TRACEP(p, XERR, emsg);
01957          response->Send(kXR_InvalidRequest, emsg.c_str());
01958          return 0;
01959       }
01960    } else {
01961       // File exists: either force deletion or fail
01962       if (opt == "force") {
01963          openflags = O_WRONLY | O_TRUNC;
01964       } else {
01965          emsg = "file'";
01966          emsg += fullpath;
01967          emsg += "' exists; user option 'force' to override it";
01968          TRACEP(p, XERR, emsg);
01969          response->Send(kXR_InvalidRequest, emsg.c_str());
01970          return 0;
01971       }
01972    }
01973 
01974    // Pipe for child-to-parent communications
01975    XrdProofdPipe pp;
01976    if (!pp.IsValid()) {
01977       emsg = "cannot create the pipe for internal communications";
01978       TRACEP(p, XERR, emsg);
01979       response->Send(kXR_InvalidRequest, emsg.c_str());
01980    }
01981 
01982    // Fork a test agent process to handle this session
01983    TRACEP(p, DBG, "forking to execute in the private sandbox");
01984    int pid = -1;
01985    if (!(pid = fMgr->Sched()->Fork("adminputfile"))) {
01986       // Child process
01987       // We set to the user environment as we must to run the command
01988       // in the user space
01989       if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
01990          emsg = "SetUserEnvironment did not return OK";
01991          rc = 1;
01992       } else {
01993          // Open the file
01994          int fd = open(fullpath.c_str(), openflags, modeflags);
01995          if (fd < 0) {
01996             emsg = "cannot open file: ";
01997             emsg += fullpath;
01998             emsg += " - errno: ";
01999             emsg += (int) errno;
02000             TRACEP(p, XERR, emsg);
02001             response->Send(kXR_ServerError, emsg.c_str());
02002             rc = 1;
02003          } else {
02004             // We read in the content sent by the client
02005             rc = 0;
02006             response->Send("OK");
02007             // Receive the file
02008             const int kMAXBUF = XrdProofdProtocol::MaxBuffsz();
02009             // Get a buffer
02010             XrdBuffer *argp = XrdProofdProtocol::GetBuff(kMAXBUF);
02011             if (!argp) {
02012                emsg = "cannot get buffer to read data out";
02013                rc = 1;
02014             }
02015             int r;
02016             kXR_int64 filesize = 0, left = 0;
02017             while (rc == 0 && filesize < size) {
02018                left = size - filesize;
02019                if (left > kMAXBUF) left = kMAXBUF;
02020                // Read a bunch of data
02021                TRACEP(p, ALL, "receiving "<<left<<" ...");
02022                if ((rc = p->GetData("data", argp->buff, left))) {
02023                   XrdProofdProtocol::ReleaseBuff(argp);
02024                   emsg = "cannot read data out";
02025                   rc = 1;
02026                   break;
02027                }
02028                // Update counters
02029                filesize += left;
02030                // Write to local file
02031                char *b = argp->buff;
02032                r = left;
02033                while (r) {
02034                   int w = 0;
02035                   while ((w = write(fd, b, r)) < 0 && errno == EINTR)
02036                      errno = 0;
02037                   if (w < 0) {
02038                      emsg = "error writing to unit: ";
02039                      emsg += fd;
02040                      rc = 1;
02041                      break;
02042                   }
02043                   r -= w;
02044                   b += w;
02045                }
02046                // Reset the timeout
02047                if (pp.Post(0, "") != 0) {
02048                   rc = 1;
02049                   break;
02050                }
02051             }
02052             // Close the file
02053             close(fd);
02054             // Release the buffer
02055             XrdProofdProtocol::ReleaseBuff(argp);
02056             // Send error, if any
02057             if (rc != 0) {
02058                TRACEP(p, XERR, emsg);
02059                response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) emsg.c_str(), emsg.length());
02060             }
02061          }
02062       }
02063       // Send error, if any
02064       if (rc == 1) {
02065          // Post Error
02066          if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
02067       } else {
02068          // End-Of-Transmission
02069          if (pp.Post(1, "") != 0) rc = 1;
02070       }
02071       // Done
02072       exit(rc);
02073    }
02074 
02075    // Parent process
02076    if (pid < 0) {
02077       emsg = "forking failed - errno: "; emsg += (int) errno;
02078       TRACEP(p, XERR, emsg);
02079       response->Send(kXR_ServerError, emsg.c_str());
02080       return 0;
02081    }
02082 
02083    // The parent is done: wait for the child
02084    TRACEP(p, DBG, "forking OK: execution will continue in the child process");
02085 
02086    // Wait for end-of-operations from pipe
02087    int prc = 0, rst = 0;
02088    // We wait for 60 secs max among transfers
02089    while (rst == 0 && rc >= 0) {
02090       while ((prc = pp.Poll(60)) > 0) {
02091          XpdMsg msg;
02092          if (pp.Recv(msg) != 0) {
02093             emsg = "error receiving message from pipe";
02094             return -1;
02095          }
02096          // Status is the message type
02097          rst = msg.Type();
02098          // Read string, if any
02099          if (rst < 0) {
02100             // Error
02101             rc = -1;
02102             // Store error message
02103             emsg = msg.Buf();
02104             if (emsg.length() <= 0) {
02105                emsg = "error reading string from received message";
02106             }
02107             // We stop here
02108             break;
02109          } else if (rst > 0) {
02110             // We are done
02111             break;
02112          }
02113       }
02114       if (prc == 0) {
02115          emsg = "timeout from poll";
02116          rc = -1;
02117       } else if (prc < 0) {
02118          emsg = "error from poll - errno: "; emsg += -prc;
02119          rc = -1;
02120       }
02121    }
02122 
02123    // The parent is done
02124    TRACEP(p, DBG, "execution over: "<< ((rc == 0) ? "ok" : "failed"));
02125 
02126    // Done
02127    return 0;
02128 }
02129 
02130 //______________________________________________________________________________
02131 int XrdProofdAdmin::CpFile(XrdProofdProtocol *p)
02132 {
02133    // Handle request for copy files from / to the sandbox
02134 
02135    XPDLOC(ALL, "Admin::CpFile")
02136 
02137    int rc = 0;
02138    XPD_SETRESP(p, "CpFile");
02139 
02140    XrdOucString emsg;
02141 
02142    // Target client (default us)
02143    XrdProofdClient *tgtclnt = p->Client();
02144    if (!tgtclnt) {
02145       emsg = "client instance not found";
02146       TRACEP(p, XERR, emsg);
02147       response->Send(kXR_InvalidRequest, emsg.c_str());
02148       return 0;
02149    }
02150 
02151    // Parse the string
02152    int dlen = p->Request()->header.dlen;
02153    XrdOucString buf, src, dst, fmt;
02154    if (dlen > 0 && p->Argp()->buff) {
02155       buf.assign((const char *)p->Argp()->buff, 0, dlen);
02156       if (buf.length() <= 0) {
02157          TRACEP(p, XERR, "input buffer missing!");
02158          response->Send(kXR_InvalidRequest, "input buffer missing!");
02159          return 0;
02160       }
02161       int from = 0;
02162       if ((from = buf.tokenize(src, from, ' ')) < 0) {
02163          TRACEP(p, XERR, "cannot resolve src path!");
02164          response->Send(kXR_InvalidRequest, "cannot resolve src path!");
02165          return 0;
02166       }
02167       if ((from = buf.tokenize(dst, from, ' ')) < 0) {
02168          TRACEP(p, XERR, "cannot resolve dst path!");
02169          response->Send(kXR_InvalidRequest, "cannot resolve dst path!");
02170          return 0;
02171       }
02172       // The rest, if any, is the format string (including options)
02173       fmt.assign(buf, from);
02174    }
02175    TRACEP(p, DBG, "src: '"<<src<<"'; dst: '"<<dst<<"'; fmt: '"<<fmt<<"'");
02176 
02177    // Check paths
02178    bool locsrc = 1;
02179    XrdClientUrlInfo usrc(src.c_str());
02180    if (usrc.Proto.length() > 0 && usrc.Proto != "file") {
02181       locsrc = 0;
02182       if (!fAllowedCpCmds.Find(usrc.Proto.c_str())) {
02183          TRACEP(p, XERR, "protocol for source file not supported");
02184          response->Send(kXR_InvalidRequest, "protocol for source file not supported");
02185          return 0;
02186       }
02187    }
02188    if (usrc.Proto == "file") src = usrc.File;
02189    bool locdst = 1;
02190    XrdClientUrlInfo udst(dst.c_str());
02191    if (udst.Proto.length() > 0 && udst.Proto != "file") {
02192       locdst = 0;
02193       if (!fAllowedCpCmds.Find(udst.Proto.c_str())) {
02194          TRACEP(p, XERR, "protocol for destination file not supported");
02195          response->Send(kXR_InvalidRequest, "protocol for destination file not supported");
02196          return 0;
02197       }
02198    }
02199    if (udst.Proto == "file") dst = udst.File;
02200 
02201    // Locate the remote protocol, if any
02202    bool loc2loc = 1;
02203    bool loc2rem = 0;
02204    bool rem2loc = 0;
02205    XpdAdminCpCmd *xc = 0;
02206    if (!locsrc && !locdst) {
02207       // Files cannot be both remote
02208       TRACEP(p, XERR, "At least destination or source must be local");
02209       response->Send(kXR_InvalidRequest, "At least destination or source must be local");
02210       return 0;
02211    } else if (!locdst) {
02212       // Find the requested protocol and check if we can put
02213       xc = fAllowedCpCmds.Find(udst.Proto.c_str());
02214       if (!xc->fCanPut) {
02215          TRACEP(p, XERR, "not allowed to create destination file with the chosen protocol");
02216          response->Send(kXR_InvalidRequest, "not allowed to create destination file with the chosen protocol");
02217          return 0;
02218       }
02219       loc2loc = 0;
02220       loc2rem = 1;
02221    } else if (!locsrc) {
02222       // Find the requested protocol
02223       xc = fAllowedCpCmds.Find(usrc.Proto.c_str());
02224       loc2loc = 0;
02225       rem2loc = 1;
02226    } else {
02227       // Default local protocol
02228       xc = fAllowedCpCmds.Find("file");
02229    }
02230 
02231    // Check the local paths
02232    XrdOucString srcpath(src), dstpath(dst);
02233    bool sbsrc = 0, sbdst = 0;
02234    struct stat stsrc, stdst;
02235    int rccpsrc = 0, rccpdst = 0;
02236    if (loc2loc || loc2rem) {
02237       if ((rccpsrc = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
02238                                srcpath, 2, sbsrc, &stsrc, emsg)) != 0) {
02239          if (rccpsrc == -2) {
02240             emsg = xc->fCmd;
02241             emsg += ": cannot open `";
02242             emsg += srcpath;
02243             emsg += "': No such file or directory";
02244          } else if (rccpsrc == -3) {
02245             emsg = xc->fCmd;
02246             emsg += ": cannot stat ";
02247             emsg += srcpath;
02248             emsg += ": errno: ";
02249             emsg += (int) errno;
02250          } else if (rccpsrc == -4) {
02251             emsg = xc->fCmd;
02252             emsg += ": ";
02253             emsg += srcpath;
02254             emsg += ": Is not a regular file";
02255          }
02256          TRACEP(p, XERR, emsg);
02257          response->Send(kXR_InvalidRequest, emsg.c_str());
02258          return 0;
02259       }
02260    }
02261    if (loc2loc || rem2loc) {
02262       if ((rccpdst = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
02263                                dstpath, 0, sbdst, &stdst, emsg)) != 0) {
02264          if (rccpdst == -2) {
02265             emsg = xc->fCmd;
02266             emsg += ": cannot open `";
02267             emsg += dstpath;
02268             emsg += "': No such file or directory";
02269          } else if (rccpdst == -3) {
02270             emsg = xc->fCmd;
02271             emsg += ": cannot stat ";
02272             emsg += dstpath;
02273             emsg += ": errno: ";
02274             emsg += (int) errno;
02275          } else if (rccpdst == -4) {
02276             emsg = xc->fCmd;
02277             emsg += ": ";
02278             emsg += dstpath;
02279             emsg += ": Is not a regular file";
02280          }
02281          TRACEP(p, XERR, emsg);
02282          response->Send(kXR_InvalidRequest, emsg.c_str());
02283          return 0;
02284       }
02285    }
02286 
02287    // Check the format string
02288    if (fmt.length() <= 0) {
02289       fmt = xc->fFmt;
02290    } else {
02291       if (!fmt.beginswith(xc->fCmd)) {
02292          fmt.insert(" ", 0);
02293          fmt.insert(xc->fCmd, 0);
02294       }
02295       if (fmt.find("%s") == STR_NPOS) {
02296          fmt.insert(" %s %s", -1);
02297       }
02298    }
02299 
02300    // Create the command now
02301    XrdOucString cmd;
02302    XrdProofdAux::Form(cmd, fmt.c_str(), srcpath.c_str(), dstpath.c_str());
02303    cmd += " 2>&1";
02304    TRACEP(p, DBG, "Executing command: " << cmd);
02305 
02306    // Pipe for child-to-parent communications
02307    XrdProofdPipe pp;
02308    if (!pp.IsValid()) {
02309       emsg = "cannot create the pipe";
02310       TRACEP(p, XERR, emsg);
02311       response->Send(kXR_ServerError, emsg.c_str());
02312       return 0;
02313    }
02314 
02315    // Fork a test agent process to handle this session
02316    TRACEP(p, DBG, "forking to execute in the private sandbox");
02317    int pid = -1;
02318    if (!(pid = fMgr->Sched()->Fork("admincpfile"))) {
02319       // Child process
02320       // We set to the user environment as we must to run the command
02321       // in the user space
02322       if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
02323          emsg = "SetUserEnvironment did not return OK";
02324          rc = 1;
02325       } else {
02326          // Execute the command in a pipe
02327          FILE *fp = popen(cmd.c_str(), "r");
02328          if (!fp) {
02329             emsg = "could not run '"; emsg += cmd; emsg += "'";
02330             rc = 1;
02331          } else {
02332             // Read line by line
02333             char line[2048];
02334             while (fgets(line, sizeof(line), fp)) {
02335                // Parse the line
02336                int llen = strlen(line);
02337                if (llen > 0 && line[llen-1] == '\n') {
02338                   line[llen-1] = '\0';
02339                   llen--;
02340                }
02341                // Real-time sending (line-by-line)
02342                if (llen > 0 &&
02343                    response->Send(kXR_attn, kXPD_srvmsg, 4, (char *) &line[0], llen) != 0) {
02344                   emsg = "error sending message to requester";
02345                   rc = 1;
02346                   break;
02347                }
02348                // Check if we have been interrupted
02349                char b[1];
02350                if (p->Link()->Peek(&b[0], 1, 0) == 1) {
02351                   p->Process(p->Link());
02352                   if (p->IsCtrlC()) break;
02353                }
02354                // Reset timeout
02355                if (pp.Post(0, "") != 0) {
02356                   rc = 1;
02357                   break;
02358                }
02359             }
02360             // Close the pipe if not in error state (otherwise we may block here)
02361             int rcpc = 0;
02362             if (rc == 0 && (rcpc = pclose(fp)) == -1) {
02363                emsg = "error while trying to close the command pipe";
02364                rc = 1;
02365             }
02366             if (WEXITSTATUS(rcpc) != 0) {
02367                emsg = "return code: ";
02368                emsg += (int) WEXITSTATUS(rcpc);
02369                rc = 1;
02370             }
02371             // Close the notification messages
02372             char cp[1] = {'\n'};
02373             if (response->Send(kXR_attn, kXPD_srvmsg, 3, (char *) &cp[0], 1) != 0) {
02374                emsg = "error sending progress notification to requester";
02375                rc = 1;
02376             }
02377          }
02378       }
02379       // Send error, if any
02380       if (rc == 1) {
02381          // Post Error
02382          if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
02383       }
02384 
02385       // End-Of-Transmission
02386       if (pp.Post(1, "") != 0) rc = 1;
02387 
02388       // Done
02389       exit(rc);
02390    }
02391 
02392    // Parent process
02393    if (pid < 0) {
02394       emsg = "forking failed - errno: "; emsg += (int) errno;
02395       return -1;
02396    }
02397 
02398    // now we wait for the callback to be (successfully) established
02399    TRACEP(p, DBG, "forking OK: wait for execution");
02400 
02401    // Read status-of-setup from pipe
02402    int prc = 0, rst = 0;
02403    // We wait for 60 secs max among transfers
02404    while (rst == 0 && rc >= 0) {
02405       while ((prc = pp.Poll(60)) > 0) {
02406          XpdMsg msg;
02407          if (pp.Recv(msg) != 0) {
02408             emsg = "error receiving message from pipe";;
02409             rc = -1;
02410          }
02411          // Status is the message type
02412          rst = msg.Type();
02413          // Read string, if any
02414          if (rst < 0) {
02415             // Error
02416             rc = -1;
02417             // Store error message
02418             emsg = msg.Buf();
02419             if (emsg.length() <= 0)
02420                emsg = "error reading string from received message";
02421          } else if (rst == 1) {
02422             // Done
02423             break;
02424          }
02425       }
02426       if (prc == 0) {
02427          emsg = "timeout from poll";
02428          rc = -1;
02429       } else if (prc < 0) {
02430          emsg = "error from poll - errno: "; emsg += -prc;
02431          rc = -1;
02432       }
02433    }
02434 
02435    // The parent is done
02436    TRACEP(p, DBG, "execution over: "<< ((rc == 0) ? "ok" : "failed"));
02437 
02438    if (rc != 0) {
02439       emsg.insert("failure: ", 0);
02440       TRACEP(p, XERR, emsg);
02441       response->Send(kXR_ServerError, emsg.c_str());
02442    } else {
02443       response->Send("OK");
02444    }
02445 
02446    // Done
02447    return 0;
02448 }

Generated on Tue Jul 5 14:51:50 2011 for ROOT_528-00b_version by  doxygen 1.5.1