XrdFrmReqAgent.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                     X r d F r m R e q A g e n t . c c                      */
00004 /*                                                                            */
00005 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdFrmReqAgent.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdFrmReqAgentCVSID = "$Id: XrdFrmReqAgent.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <stdio.h>
00016 #include <stdlib.h>
00017 #include <string.h>
00018 #include <strings.h>
00019 #include <unistd.h>
00020 #include <fcntl.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023 
00024 #include "XrdFrm/XrdFrmReqAgent.hh"
00025 #include "XrdFrm/XrdFrmTrace.hh"
00026 #include "XrdFrm/XrdFrmUtils.hh"
00027 #include "XrdNet/XrdNetMsg.hh"
00028 #include "XrdOuc/XrdOucUtils.hh"
00029 #include "XrdSys/XrdSysHeaders.hh"
00030 #include "XrdSys/XrdSysPlatform.hh"
00031 
00032 using namespace XrdFrm;
00033 
00034 /******************************************************************************/
00035 /*                      S t a t i c   V a r i a b l e s                       */
00036 /******************************************************************************/
00037   
00038 char *XrdFrmReqAgent::c2sFN = 0;
00039 
00040 /******************************************************************************/
00041 /*                           C o n s t r u c t o r                            */
00042 /******************************************************************************/
00043   
00044 XrdFrmReqAgent::XrdFrmReqAgent(const char *Me, int qVal)
00045               : Persona(Me),theQ(qVal)
00046 {
00047 // Set default ping message
00048 //
00049    switch(qVal)
00050          {case XrdFrmRequest::getQ: pingMsg = "!<\n"; break;
00051           case XrdFrmRequest::migQ: pingMsg = "!&\n"; break;
00052           case XrdFrmRequest::stgQ: pingMsg = "!+\n"; break;
00053           case XrdFrmRequest::putQ: pingMsg = "!>\n"; break;
00054           default:                  pingMsg = "!\n" ; break;
00055          }
00056 }
00057 
00058 /******************************************************************************/
00059 /* Public:                           A d d                                    */
00060 /******************************************************************************/
00061   
00062 void XrdFrmReqAgent::Add(XrdFrmRequest &Request)
00063 {
00064 
00065 // Complete the request including verifying the priority
00066 //
00067    if (Request.Prty > XrdFrmRequest::maxPrty)
00068       Request.Prty = XrdFrmRequest::maxPrty;
00069       else if (Request.Prty < 0)Request.Prty = 0;
00070 
00071 // Add time and instance name
00072 //
00073    Request.addTOD = time(0);
00074    if (myName) strlcpy(Request.iName, myName, sizeof(Request.iName));
00075 
00076 // Now add it to the queue
00077 //
00078    rQueue[static_cast<int>(Request.Prty)]->Add(&Request);
00079 
00080 // Now wake the boss
00081 //
00082    Ping();
00083 }
00084 
00085 /******************************************************************************/
00086 /* Public:                           D e l                                    */
00087 /******************************************************************************/
00088   
00089 void XrdFrmReqAgent::Del(XrdFrmRequest &Request)
00090 {
00091    int i;
00092   
00093 // Remove all pending requests for this id
00094 //
00095    for (i = 0; i <= XrdFrmRequest::maxPrty; i++) rQueue[i]->Can(&Request);
00096 }
00097 
00098 /******************************************************************************/
00099 /* Public:                          L i s t                                   */
00100 /******************************************************************************/
00101   
00102 int XrdFrmReqAgent::List(XrdFrmRequest::Item *Items, int Num)
00103 {
00104    char myLfn[8192];
00105    int i, Offs, n = 0;
00106 
00107 // List entries in each priority queue
00108 //
00109    for (i = 0; i <= XrdFrmRequest::maxPrty; i++)
00110        {Offs = 0;
00111         while(rQueue[i]->List(myLfn, sizeof(myLfn), Offs, Items, Num))
00112              {cout <<myLfn <<endl; n++;}
00113        }
00114 // All done
00115 //
00116    return n;
00117 }
00118 
00119 /******************************************************************************/
00120   
00121 int XrdFrmReqAgent::List(XrdFrmRequest::Item *Items, int Num, int Prty)
00122 {
00123    char myLfn[8192];
00124    int Offs, n = 0;
00125 
00126 // List entries in each priority queue
00127 //
00128    if (Prty <= XrdFrmRequest::maxPrty)
00129        {Offs = 0;
00130         while(rQueue[Prty]->List(myLfn, sizeof(myLfn), Offs, Items, Num))
00131              {cout <<myLfn <<endl; n++;}
00132        }
00133 
00134 // All done
00135 //
00136    return n;
00137 }
00138   
00139 /******************************************************************************/
00140 /* Public:                       N e x t L F N                                */
00141 /******************************************************************************/
00142   
00143 int XrdFrmReqAgent::NextLFN(char *Buff, int Bsz, int Prty, int &Offs)
00144 {
00145    static XrdFrmRequest::Item Items[1] = {XrdFrmRequest::getLFN};
00146 
00147 // Return entry, if it exists
00148 //
00149    return rQueue[Prty]->List(Buff, Bsz, Offs, Items, 1) != 0;
00150 }
00151 
00152 /******************************************************************************/
00153 /*                                  P i n g                                   */
00154 /******************************************************************************/
00155 
00156 void XrdFrmReqAgent::Ping(const char *Msg)
00157 {
00158    static XrdNetMsg udpMsg(&Say, c2sFN);
00159    static int udpOK = 0;
00160    struct stat buf;
00161 
00162 // Send given message or default message based on our persona
00163 //
00164    if (udpOK || !stat(c2sFN, &buf))
00165       {udpMsg.Send(Msg ? Msg : pingMsg); udpOK = 1;}
00166 }
00167 
00168 /******************************************************************************/
00169 /*                                 S t a r t                                  */
00170 /******************************************************************************/
00171   
00172 int XrdFrmReqAgent::Start(char *aPath, int aMode)
00173 {
00174    XrdFrmRequest Request;
00175    const char *myClid;
00176    char buff[2048], *qPath;
00177    int i;
00178 
00179 // Initialize the udp path for pings, if we have not done so
00180 //
00181    if (!c2sFN)
00182       {sprintf(buff, "%sxfrd.udp", aPath);
00183        c2sFN = strdup(buff);
00184       }
00185 
00186 // Get the instance name
00187 //
00188    myName = XrdOucUtils::InstName(1);
00189 
00190 // Generate the queue directory path
00191 //
00192    if (!(qPath = XrdFrmUtils::makeQDir(aPath, aMode))) return 0;
00193 
00194 // Initialize the registration entry and register ourselves
00195 //
00196    if ((myClid = getenv("XRDCMSCLUSTERID")))
00197       {int Uid = static_cast<int>(geteuid());
00198        int Gid = static_cast<int>(getegid());
00199        memset(&Request, 0, sizeof(Request));
00200        strlcpy(Request.LFN, myClid, sizeof(Request.LFN));
00201        sprintf(Request.User,"%d %d", Uid, Gid);
00202        sprintf(Request.ID, "%d", static_cast<int>(getpid()));
00203        strlcpy(Request.iName, myName, sizeof(Request.iName));
00204        Request.addTOD = time(0);
00205        Request.Options = XrdFrmRequest::Register;
00206        Request.OPc = '@';
00207       }
00208 
00209 // Initialize the request queues if all went well
00210 //
00211    for (i = 0; i <= XrdFrmRequest::maxPrty; i++)
00212        {sprintf(buff, "%s%sQ.%d", qPath, Persona, i);
00213         rQueue[i] = new XrdFrmReqFile(buff, 1);
00214         if (!rQueue[i]->Init()) return 0;
00215         if (myClid) rQueue[i]->Add(&Request);
00216        }
00217 
00218 // All done
00219 //
00220    if (myClid) Ping();
00221    return 1;
00222 }

Generated on Tue Jul 5 14:46:36 2011 for ROOT_528-00b_version by  doxygen 1.5.1