XrdFrmXfrAgent.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                     X r d F r m X f r A g e n t . c c                      */
00004 /*                                                                            */
00005 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdFrmXfrAgent.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdFrmXfrAgentCVSID = "$Id: XrdFrmXfrAgent.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <stdlib.h>
00016 #include <string.h>
00017 #include <strings.h>
00018 #include <unistd.h>
00019 #include <fcntl.h>
00020 #include <sys/types.h>
00021 #include <sys/stat.h>
00022 
00023 #include "XrdFrm/XrdFrmConfig.hh"
00024 #include "XrdFrm/XrdFrmRequest.hh"
00025 #include "XrdFrm/XrdFrmTrace.hh"
00026 #include "XrdFrm/XrdFrmUtils.hh"
00027 #include "XrdFrm/XrdFrmXfrAgent.hh"
00028 #include "XrdOuc/XrdOucStream.hh"
00029 #include "XrdSys/XrdSysPlatform.hh"
00030 
00031 using namespace XrdFrm;
00032 
00033 /******************************************************************************/
00034 /*                      S t a t i c   V a r i a b l e s                       */
00035 /******************************************************************************/
00036 
00037 XrdFrmReqAgent XrdFrmXfrAgent::GetAgent("getf", XrdFrmRequest::getQ);
00038 
00039 XrdFrmReqAgent XrdFrmXfrAgent::MigAgent("migr", XrdFrmRequest::migQ);
00040 
00041 XrdFrmReqAgent XrdFrmXfrAgent::StgAgent("pstg", XrdFrmRequest::stgQ);
00042 
00043 XrdFrmReqAgent XrdFrmXfrAgent::PutAgent("putf", XrdFrmRequest::putQ);
00044   
00045 /******************************************************************************/
00046 /* Private:                          A d d                                    */
00047 /******************************************************************************/
00048   
00049 void XrdFrmXfrAgent::Add(XrdOucStream   &Request, char *Tok,
00050                          XrdFrmReqAgent &Server)
00051 {
00052    XrdFrmRequest myReq;
00053    const char *Miss = 0;
00054    char *tp, *op;
00055 
00056 // Handle: op[<traceid>] <requestid> <npath> <prty> <mode> <path> [. . .]
00057 //
00058 // op: + | & | ^ | < | = | >
00059 //
00060    memset(&myReq, 0, sizeof(myReq));
00061    myReq.OPc = *Tok;
00062    if (*Tok == '=' || *Tok == '^') myReq.Options |= XrdFrmRequest::Purge;
00063    Tok++;
00064 
00065    if (*Tok) strlcpy(myReq.User, Tok, sizeof(myReq.User));
00066       else   strlcpy(myReq.User, Config.myProg, sizeof(myReq.User));
00067 
00068    if (!(tp = Request.GetToken())) Miss = "request id";
00069       else strlcpy(myReq.ID, tp, sizeof(myReq.ID));
00070 
00071    if (!Miss)
00072       {if (!(tp = Request.GetToken())) Miss = "notify path";
00073           else strlcpy(myReq.Notify, tp, sizeof(myReq.Notify));
00074       }
00075 
00076    if (!Miss)
00077       {if (!(tp = Request.GetToken())) Miss = "priority";
00078           else {myReq.Prty = atoi(tp);
00079                 if (myReq.Prty < 0) myReq.Prty = 0;
00080                    else if (myReq.Prty > XrdFrmRequest::maxPrty)
00081                             myReq.Prty = XrdFrmRequest::maxPrty;
00082                }
00083       }
00084 
00085    if (!Miss)
00086       {if (!(tp = Request.GetToken())) Miss = "mode";
00087           else myReq.Options = XrdFrmUtils::MapM2O(myReq.Notify, tp);
00088       }
00089 
00090    if (!Miss && !(tp = Request.GetToken())) Miss = "path";
00091 
00092 // Check for any errors
00093 //
00094    if (Miss) {Say.Emsg("Agent_Add", Miss, "missing in '+' request.");
00095               return;
00096              }
00097 
00098 // Add all paths in the request
00099 //
00100    do {strlcpy(myReq.LFN, tp, sizeof(myReq.LFN));
00101        if ((op = index(tp, '?'))) {myReq.Opaque = op-tp+1; *op = '\0';}
00102           else myReq.Opaque = 0;
00103        myReq.LFO = 0;
00104        if (myReq.LFN[0] != '/' && !(myReq.LFO = XrdFrmUtils::chkURL(myReq.LFN)))
00105           Say.Emsg("Agent_Add", "Invalid url -", myReq.LFN);
00106           else Server.Add(myReq);
00107        if ((tp = Request.GetToken())) memset(myReq.LFN, 0, sizeof(myReq.LFN));
00108       } while(tp);
00109 }
00110 
00111 /******************************************************************************/
00112 /* Private:                        A g e n t                                  */
00113 /******************************************************************************/
00114 
00115 XrdFrmReqAgent *XrdFrmXfrAgent::Agent(char bType)
00116 {
00117 
00118 // Return the agent corresponding to the type
00119 //
00120    switch(bType)
00121          {case 0  : return &StgAgent;
00122           case '+': return &StgAgent;
00123           case '^':
00124           case '&': return &MigAgent;
00125           case '<': return &GetAgent;
00126           case '=':
00127           case '>': return &PutAgent;
00128           default:  break;
00129          }
00130    return 0;
00131 }
00132 
00133 /******************************************************************************/
00134 /* Private:                          D e l                                    */
00135 /******************************************************************************/
00136   
00137 void XrdFrmXfrAgent::Del(XrdOucStream  &Request, char *Tok,
00138                          XrdFrmReqAgent &Server)
00139 {
00140    XrdFrmRequest myReq;
00141 
00142 // If the requestid is adjacent to the operation, use it o/w get it
00143 //
00144    if (!(*Tok) && (!(Tok = Request.GetToken()) || !(*Tok)))
00145       {Say.Emsg("Del", "request id missing in cancel request.");
00146        return;
00147       }
00148 
00149 // Copy the request ID into the request and remove it from peer server
00150 //
00151    memset(&myReq, 0, sizeof(myReq));
00152    strlcpy(myReq.ID, Tok, sizeof(myReq.ID));
00153    Server.Del(myReq);
00154 }
00155 
00156 /******************************************************************************/
00157 /* Private:                         L i s t                                   */
00158 /******************************************************************************/
00159   
00160 void XrdFrmXfrAgent::List(XrdOucStream &Request, char *Tok)
00161 {
00162    XrdFrmRequest::Item Items[XrdFrmRequest::getLast];
00163    XrdFrmReqAgent *agentP;
00164    int n = 0;
00165    char *tp;
00166 
00167    while((tp = Request.GetToken()) && n < XrdFrmRequest::getLast)
00168         {if (XrdFrmUtils::MapV2I(tp, Items[n])) n++;}
00169 
00170 // List entries queued for specific servers
00171 //
00172    if (!(*Tok)) {StgAgent.List(Items, n); GetAgent.List(Items, n);}
00173       else do {if ((agentP = Agent(*Tok))) agentP->List(Items, n);
00174               } while(*(++Tok));
00175    cout <<endl;
00176 }
00177 
00178 /******************************************************************************/
00179 /* Public:                       P r o c e s s                                */
00180 /******************************************************************************/
00181   
00182 void XrdFrmXfrAgent::Process(XrdOucStream &Request)
00183 {
00184    char *tp;
00185 
00186 // Each frm request comes in as:
00187 //
00188 // Copy in:    <[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
00189 // Copy purge: =[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
00190 // Copy out:   >[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
00191 // Migrate:    &[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
00192 // Migr+Purge: ^[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
00193 // Stage:      +[<traceid>] <reqid> <npath> <prty> <mode> <path> [. . .]
00194 // Cancel in:  - <requestid>
00195 // Cancel out: ~ <requestid>
00196 // List:       ?[<][+][&][>]
00197 // Wakeup:     ![<][+][&][>]
00198 //
00199    if ((tp = Request.GetToken()))
00200       switch(*tp)
00201             {case '+':  Add(Request, tp,   StgAgent); break;
00202              case '<':  Add(Request, tp,   GetAgent); break;
00203              case '=':
00204              case '>':  Add(Request, tp,   PutAgent); break;
00205              case '&':
00206              case '^':  Add(Request, tp,   MigAgent); break;
00207              case '-':  Del(Request, tp+1, StgAgent);
00208                         Del(Request, tp+1, GetAgent);
00209                         break;
00210              case '~':  Del(Request, tp+1, MigAgent);
00211                         Del(Request, tp+1, PutAgent);
00212                         break;
00213              case '?': List(Request, tp+1);           break;
00214              case '!': GetAgent.Ping(tp);             break;
00215              default: Say.Emsg("Agent", "Invalid request, '", tp, "'.");
00216             }
00217 }
00218 
00219 /******************************************************************************/
00220 /* Public:                         S t a r t                                  */
00221 /******************************************************************************/
00222   
00223 int XrdFrmXfrAgent::Start()
00224 {
00225    EPNAME("Agent");
00226    XrdOucStream Request;
00227    char *tp;
00228 
00229 // Prepare our agents
00230 //
00231    if (!StgAgent.Start(Config.QPath, Config.AdminMode)
00232    ||  !MigAgent.Start(Config.QPath, Config.AdminMode)
00233    ||  !GetAgent.Start(Config.QPath, Config.AdminMode)
00234    ||  !PutAgent.Start(Config.QPath, Config.AdminMode)) return 2;
00235 
00236 // Attach stdin to the Request stream
00237 //
00238    Request.Attach(STDIN_FILENO, 8*1024);
00239 
00240 // Process all input
00241 //
00242    while((tp = Request.GetLine()))
00243         {DEBUG ("Request: '" <<tp <<"'");
00244          Process(Request);
00245         }
00246 
00247 // If we exit then we lost the connection
00248 //
00249    Say.Emsg("Agent", "Exiting; lost request connection!");
00250    return 8;
00251 }

Generated on Tue Jul 5 14:46:36 2011 for ROOT_528-00b_version by  doxygen 1.5.1