XrdFrmXfrDaemon.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                    X r d F r m X f r D a e m o n . c c                     */
00004 /*                                                                            */
00005 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdFrmXfrDaemon.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdFrmXfrDaemonCVSID = "$Id: XrdFrmXfrDaemon.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <stdio.h>
00016 #include <stdlib.h>
00017 #include <string.h>
00018 #include <strings.h>
00019 #include <unistd.h>
00020 #include <fcntl.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023 
00024 #include "XrdFrm/XrdFrmConfig.hh"
00025 #include "XrdFrm/XrdFrmMigrate.hh"
00026 #include "XrdFrm/XrdFrmRequest.hh"
00027 #include "XrdFrm/XrdFrmTrace.hh"
00028 #include "XrdFrm/XrdFrmTransfer.hh"
00029 #include "XrdFrm/XrdFrmUtils.hh"
00030 #include "XrdFrm/XrdFrmXfrAgent.hh"
00031 #include "XrdFrm/XrdFrmXfrDaemon.hh"
00032 #include "XrdNet/XrdNetOpts.hh"
00033 #include "XrdNet/XrdNetSocket.hh"
00034 #include "XrdOuc/XrdOucStream.hh"
00035 #include "XrdSys/XrdSysPthread.hh"
00036 #include "XrdSys/XrdSysTimer.hh"
00037 
00038 using namespace XrdFrm;
00039 
00040 /******************************************************************************/
00041 /*                      S t a t i c   V a r i a b l e s                       */
00042 /******************************************************************************/
00043 
00044 XrdFrmReqBoss XrdFrmXfrDaemon::GetBoss("getf", XrdFrmRequest::getQ);
00045 
00046 XrdFrmReqBoss XrdFrmXfrDaemon::MigBoss("migr", XrdFrmRequest::migQ);
00047 
00048 XrdFrmReqBoss XrdFrmXfrDaemon::StgBoss("pstg", XrdFrmRequest::stgQ);
00049 
00050 XrdFrmReqBoss XrdFrmXfrDaemon::PutBoss("putf", XrdFrmRequest::putQ);
00051 
00052 /******************************************************************************/
00053 /* Private:                         B o s s                                   */
00054 /******************************************************************************/
00055 
00056 XrdFrmReqBoss *XrdFrmXfrDaemon::Boss(char bType)
00057 {
00058 
00059 // Return the boss corresponding to the type
00060 //
00061    switch(bType)
00062          {case 0  :
00063           case '+': return &StgBoss;
00064           case '^':
00065           case '&': return &MigBoss;
00066           case '<': return &GetBoss;
00067           case '=':
00068           case '>': return &PutBoss;
00069           default:  break;
00070          }
00071    return 0;
00072 }
00073 
00074 /******************************************************************************/
00075 /* Public:                          I n i t                                   */
00076 /******************************************************************************/
00077 
00078 int XrdFrmXfrDaemon::Init()
00079 {
00080    char buff[80];
00081 
00082 // Make sure we are the only daemon running
00083 //
00084    sprintf(buff, "%s/frm_xfrd.lock", Config.QPath);
00085    if (!XrdFrmUtils::Unique(buff, Config.myProg)) return 0;
00086 
00087 // Initiliaze the transfer processor (it need to be active now)
00088 //
00089    if (!XrdFrmTransfer::Init()) return 0;
00090 
00091 // Fix up some values that might not make sense
00092 //
00093    if (Config.WaitMigr < Config.IdleHold) Config.WaitMigr = Config.IdleHold;
00094 
00095 // Check if it makes any sense to migrate and, if so, initialize migration
00096 //
00097    if (Config.pathList)
00098       {if (!Config.xfrOUT)
00099           Say.Emsg("Config","Output copy command not specified; "
00100                             "auto-migration disabled!");
00101           else XrdFrmMigrate::Migrate();
00102       } else Say.Emsg("Config","No migratable paths; "
00103                                "auto-migration disabled!");
00104 
00105 // Start the external interfaces
00106 //
00107    if (!StgBoss.Start(Config.QPath, Config.AdminMode)
00108    ||  !MigBoss.Start(Config.QPath, Config.AdminMode)
00109    ||  !GetBoss.Start(Config.QPath, Config.AdminMode)
00110    ||  !PutBoss.Start(Config.QPath, Config.AdminMode)) return 0;
00111 
00112 // All done
00113 //
00114    return 1;
00115 }
00116   
00117 /******************************************************************************/
00118 /* Public:                          P o n g                                   */
00119 /******************************************************************************/
00120   
00121 void *XrdFrmXfrDaemonPong(void *parg)
00122 {
00123     XrdFrmXfrDaemon::Pong();
00124     return (void *)0;
00125 }
00126   
00127 void XrdFrmXfrDaemon::Pong()
00128 {
00129    EPNAME("Pong");
00130    static int udpFD = -1;
00131    XrdOucStream Request(&Say);
00132    XrdFrmReqBoss *bossP;
00133    char *tp;
00134 
00135 // Get a UDP socket for the server if we haven't already done so and start
00136 // a thread to re-enter this code and wait for messages from an agent.
00137 //
00138    if (udpFD < 0)
00139       {XrdNetSocket *udpSock;
00140        pthread_t tid;
00141        int retc;
00142        if ((udpSock = XrdNetSocket::Create(&Say, Config.QPath,
00143                    "xfrd.udp", Config.AdminMode, XRDNET_UDPSOCKET)))
00144           {udpFD = udpSock->Detach(); delete udpSock;
00145            if ((retc = XrdSysThread::Run(&tid, XrdFrmXfrDaemonPong, (void *)0,
00146                                          XRDSYSTHREAD_BIND, "Pong")))
00147               Say.Emsg("main", retc, "create udp listner");
00148           }
00149        return;
00150       }
00151 
00152 // Hookup to the udp socket as a stream
00153 //
00154    Request.Attach(udpFD, 64*1024);
00155 
00156 // Now simply get requests (see XrdFrmXfrDaemon for details). Here we screen
00157 // out ping and list requests.
00158 //
00159    while((tp = Request.GetLine()))
00160         {DEBUG(": '" <<tp <<"'");
00161          switch(*tp)
00162                {case '?': break;
00163                 case '!': if ((tp = Request.GetToken()))
00164                              while(*tp++)
00165                                   {if ((bossP = Boss(*tp))) bossP->Wakeup(1);}
00166                           break;
00167                 default:  XrdFrmXfrAgent::Process(Request);
00168                }
00169         }
00170 
00171 // We should never get here (but....)
00172 //
00173    Say.Emsg("Server", "Lost udp connection!");
00174 }
00175 
00176 /******************************************************************************/
00177 /* Public:                         S t a r t                                  */
00178 /******************************************************************************/
00179   
00180 int XrdFrmXfrDaemon::Start()
00181 {
00182 
00183 // Start the ponger
00184 //
00185    Pong();
00186 
00187 // Now start nudging
00188 //
00189    while(1)
00190         {StgBoss.Wakeup(); GetBoss.Wakeup();
00191          MigBoss.Wakeup(); PutBoss.Wakeup();
00192          XrdSysTimer::Snooze(Config.WaitQChk);
00193         }
00194 
00195 // We should never get here
00196 //
00197    return 0;
00198 }

Generated on Tue Jul 5 14:46:36 2011 for ROOT_528-00b_version by  doxygen 1.5.1