XrdFrmProxy.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                        X r d F r m P r o x y . c c                         */
00004 /*                                                                            */
00005 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010   
00011 //          $Id: XrdFrmProxy.cc 38011 2011-02-08 18:35:57Z ganis $
00012 
00013 const char *XrdFrmProxyCVSID = "$Id: XrdFrmProxy.cc 38011 2011-02-08 18:35:57Z ganis $";
00014 
00015 #include "errno.h"
00016 #include <fcntl.h>
00017 #include "stdio.h"
00018 #include "unistd.h"
00019 #include <sys/stat.h>
00020 #include <sys/types.h>
00021 
00022 #include "XrdFrm/XrdFrmReqAgent.hh"
00023 #include "XrdFrm/XrdFrmProxy.hh"
00024 #include "XrdFrm/XrdFrmTrace.hh"
00025 #include "XrdFrm/XrdFrmUtils.hh"
00026 #include "XrdOuc/XrdOucEnv.hh"
00027 #include "XrdOuc/XrdOucStream.hh"
00028 #include "XrdOuc/XrdOucUtils.hh"
00029 #include "XrdSys/XrdSysError.hh"
00030 #include "XrdSys/XrdSysLogger.hh"
00031 #include "XrdSys/XrdSysPlatform.hh"
00032 
00033 using namespace XrdFrm;
00034 
00035 /******************************************************************************/
00036 /*                      S t a t i c   V a r i a b l e s                       */
00037 /******************************************************************************/
00038   
00039 XrdFrmProxy::o2qMap XrdFrmProxy::oqMap[] =
00040                                {{"getf", XrdFrmRequest::getQ, opGet},
00041                                 {"migr", XrdFrmRequest::migQ, opMig},
00042                                 {"pstg", XrdFrmRequest::stgQ, opStg},
00043                                 {"putf", XrdFrmRequest::putQ, opPut}};
00044 
00045 int                 XrdFrmProxy::oqNum = sizeof(oqMap)/sizeof(oqMap[0]);
00046 
00047 /******************************************************************************/
00048 /*                           C o n s t r u c t o r                            */
00049 /******************************************************************************/
00050   
00051 XrdFrmProxy::XrdFrmProxy(XrdSysLogger *lP, const char *iName, int Debug)
00052 {
00053    char buff[256];
00054 
00055 // Clear agent vector
00056 //
00057    memset(Agent, 0, sizeof(Agent));
00058 
00059 // Link the logger to our message facility
00060 //
00061    Say.logger(lP);
00062 
00063 // Set the debug flag
00064 //
00065    if (Debug) Trace.What |= TRACE_ALL;
00066 
00067 // Develop our internal name
00068 //
00069    QPath = 0;
00070    insName = XrdOucUtils::InstName(iName,0);
00071    sprintf(buff,"%s.%d",XrdOucUtils::InstName(iName),static_cast<int>(getpid()));
00072    intName = strdup(buff);
00073 }
00074 
00075 /******************************************************************************/
00076 /*                                   A d d                                    */
00077 /******************************************************************************/
00078   
00079 int XrdFrmProxy::Add(char Opc, const char *Lfn, const char *Opq,
00080                                const char *Usr, const char *Rid,
00081                                const char *Nop, const char *Pop, int Prty)
00082 {
00083    XrdFrmRequest myReq;
00084    int n, Options = 0;
00085    int qType = XrdFrmUtils::MapR2Q(Opc, &Options);
00086 
00087 // Verify that we can support this operation
00088 //
00089    if (!Agent[qType]) return -ENOTSUP;
00090 
00091 // Initialize the request element
00092 //
00093    memset(&myReq, 0, sizeof(myReq));
00094    myReq.OPc = Opc;
00095 
00096 // Insert the Lfn and Opaque information
00097 //
00098    n = strlen(Lfn);
00099    if (Opq && *Opq)
00100       {if (n + strlen(Opq) + 2 > sizeof(myReq.LFN)) return -ENAMETOOLONG;
00101        strcpy(myReq.LFN, Lfn); strcpy(myReq.LFN+n+1, Opq), myReq.Opaque = n+1;
00102       } else if (n < int(sizeof(myReq.LFN))) strcpy(myReq.LFN, Lfn);
00103                 else return -ENAMETOOLONG;
00104 
00105 // Get the LFN offset in case this is a url
00106 //
00107    if (myReq.LFN[0] != '/' && !(myReq.LFO = XrdFrmUtils::chkURL(myReq.LFN)))
00108       return -EILSEQ;
00109 
00110 // Set the user, request id, notification path, and priority
00111 //
00112    if (Usr && *Usr) strlcpy(myReq.User, Usr, sizeof(myReq.User));
00113       else strcpy(myReq.User, intName);
00114    if (Rid) strlcpy(myReq.ID, Rid, sizeof(myReq.ID));
00115       else *(myReq.ID) = '?';
00116    if (Nop && *Nop) strlcpy(myReq.Notify, Nop, sizeof(myReq.Notify));
00117       else *(myReq.Notify) = '-';
00118    myReq.Prty = Prty;
00119 
00120 // Establish processing options
00121 //
00122    myReq.Options = Options | XrdFrmUtils::MapM2O(myReq.Notify, Pop);
00123 
00124 // Add this request to the queue of requests via the agent
00125 //
00126    Agent[qType]->Add(myReq);
00127    return 0;
00128 }
00129 
00130 /******************************************************************************/
00131 /*                                   D e l                                    */
00132 /******************************************************************************/
00133   
00134 int XrdFrmProxy::Del(char Opc, const char *Rid)
00135 {
00136    XrdFrmRequest myReq;
00137    int qType = XrdFrmUtils::MapR2Q(Opc);
00138 
00139 // Verify that we can support this operation
00140 //
00141    if (!Agent[qType]) return -ENOTSUP;
00142 
00143 // Initialize the request element
00144 //
00145    memset(&myReq, 0, sizeof(myReq));
00146    strlcpy(myReq.ID, Rid, sizeof(myReq.ID));
00147 
00148 // Delete the request from the queue
00149 //
00150    Agent[qType]->Del(myReq);
00151    return 0;
00152 }
00153 
00154 /******************************************************************************/
00155 /*                                  L i s t                                   */
00156 /******************************************************************************/
00157   
00158 int XrdFrmProxy::List(XrdFrmProxy::Queues &State, char *Buff, int Bsz)
00159 {
00160    int i;
00161 
00162 // Get a queue type
00163 //
00164 do{if (!State.Active)
00165       while(State.QList & opAll)
00166            {for (i = 0; i < oqNum; i++) if (oqMap[i].oType & State.QList) break;
00167             if (i >= oqNum) return 0;
00168             State.QNow   =  oqMap[i].qType;
00169             State.QList &= ~oqMap[i].oType;
00170             if (!Agent[int(State.QNow)]) continue;
00171             State.Active = 1;
00172             break;
00173            }
00174 
00175    for (i = State.Prty; i <= XrdFrmRequest::maxPrty; i++)
00176        if (Agent[int(State.QNow)]->NextLFN(Buff,Bsz,i,State.Offset)) return 1;
00177           else State.Prty = i+1;
00178 
00179    State.Active = 0; State.Offset = 0; State.Prty = 0;
00180   } while(State.QList & opAll);
00181 
00182 // We've completed returning all info
00183 //
00184    return 0;
00185 }
00186 
00187 /******************************************************************************/
00188   
00189 int XrdFrmProxy::List(int qType, int qPrty, XrdFrmRequest::Item *Items, int Num)
00190 {
00191    int i, n, Cnt = 0;
00192 
00193 // List each queue
00194 //
00195    while(qType & opAll)
00196         {for (i = 0; i < oqNum; i++) if (oqMap[i].oType & qType) break;
00197          if (i >= oqNum) return Cnt;
00198          qType &= ~oqMap[i].oType; n = oqMap[i].qType;
00199          if (!Agent[n]) continue;
00200          if (qPrty < 0) Cnt += Agent[n]->List(Items, Num);
00201             else Cnt += Agent[n]->List(Items, Num, qPrty);
00202         }
00203 
00204 // All done
00205 //
00206    return Cnt;
00207 }
00208 
00209 /******************************************************************************/
00210 /*                                  I n i t                                   */
00211 /******************************************************************************/
00212 
00213 int XrdFrmProxy::Init(int opX, const char *aPath, int aMode, const char *qPath)
00214 {
00215    const char *configFN = getenv("XRDCONFIGFN"), *iName = 0;
00216    int i;
00217 
00218 // If a qPath was specified, and the "Queues" component will be added later.
00219 // Otherwise, we check the config file to see if there is a qpath there.
00220 // If not we use the aPath which must be unqualified with a component name
00221 // which we will add here). All paths must have the instance name if so needed.
00222 //
00223         if (qPath) QPath = strdup(qPath);
00224    else if (!configFN) iName = insName;
00225    else if (Init2(configFN)) return 0;
00226 
00227 // Create the queue path directory if it does not exists
00228 //
00229    if (!QPath && !(QPath = XrdFrmUtils::makePath(iName, aPath, aMode)))
00230       return 0;
00231 
00232 // Now create and start an agent for each wanted service
00233 //
00234    for (i = 0; i < oqNum; i++)
00235        if (opX & oqMap[i].oType)
00236           {Agent[oqMap[i].qType]
00237                 = new XrdFrmReqAgent(oqMap[i].qName, oqMap[i].qType);
00238            if (!Agent[oqMap[i].qType]->Start(QPath, aMode)) return 0;
00239           }
00240 
00241 // All done
00242 //
00243    return 1;
00244 }
00245 
00246 /******************************************************************************/
00247 /* Private:                        I n i t 2                                  */
00248 /******************************************************************************/
00249 
00250 int XrdFrmProxy::Init2(const char *ConfigFN)
00251 {
00252   char *var;
00253   int  cfgFD, retc, NoGo = 0;
00254   XrdOucEnv myEnv;
00255   XrdOucStream cfgFile(&Say, getenv("XRDINSTANCE"), &myEnv, "=====> ");
00256 
00257 // Try to open the configuration file.
00258 //
00259    if ( (cfgFD = open(ConfigFN, O_RDONLY, 0)) < 0)
00260       {Say.Emsg("Config", errno, "open config file", ConfigFN);
00261        return 1;
00262       }
00263    cfgFile.Attach(cfgFD);
00264 
00265 // Now start reading records until eof looking for our directive
00266 //
00267    while((var = cfgFile.GetMyFirstWord()))
00268         {if (!strcmp(var, "frm.xfr.qcheck") &&  qChk(cfgFile))
00269             {cfgFile.Echo(); NoGo = 1;}
00270         }
00271 
00272 // Now check if any errors occured during file i/o
00273 //
00274    if ((retc = cfgFile.LastError()))
00275       NoGo = Say.Emsg("Config", retc, "read config file", ConfigFN);
00276    cfgFile.Close();
00277 
00278 // All done
00279 //
00280    return NoGo;
00281 }
00282 
00283 /******************************************************************************/
00284 /* Private:                         q C h k                                   */
00285 /******************************************************************************/
00286   
00287 int XrdFrmProxy::qChk(XrdOucStream &cfgFile)
00288 {
00289     char *val;
00290 
00291 // Get the next token, we must have one here
00292 //
00293    if (!(val = cfgFile.GetWord()))
00294       {Say.Emsg("Config", "qcheck time not specified"); return 1;}
00295 
00296 // If not a path, then it must be a time
00297 //
00298    if (*val != '/' && !(val = cfgFile.GetWord())) return 0;
00299 
00300 // The next token has to be an absolute path if it is present at all
00301 //
00302    if (*val != '/')
00303       {Say.Emsg("Config", "qcheck path not absolute"); return 1;}
00304    if (QPath) free(QPath);
00305    QPath = strdup(val);
00306    return 0;
00307 }

Generated on Tue Jul 5 14:46:35 2011 for ROOT_528-00b_version by  doxygen 1.5.1