00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdFrmXfrDaemonCVSID = "$Id: XrdFrmXfrDaemon.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <stdio.h>
00016 #include <stdlib.h>
00017 #include <string.h>
00018 #include <strings.h>
00019 #include <unistd.h>
00020 #include <fcntl.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023
00024 #include "XrdFrm/XrdFrmConfig.hh"
00025 #include "XrdFrm/XrdFrmMigrate.hh"
00026 #include "XrdFrm/XrdFrmRequest.hh"
00027 #include "XrdFrm/XrdFrmTrace.hh"
00028 #include "XrdFrm/XrdFrmTransfer.hh"
00029 #include "XrdFrm/XrdFrmUtils.hh"
00030 #include "XrdFrm/XrdFrmXfrAgent.hh"
00031 #include "XrdFrm/XrdFrmXfrDaemon.hh"
00032 #include "XrdNet/XrdNetOpts.hh"
00033 #include "XrdNet/XrdNetSocket.hh"
00034 #include "XrdOuc/XrdOucStream.hh"
00035 #include "XrdSys/XrdSysPthread.hh"
00036 #include "XrdSys/XrdSysTimer.hh"
00037
00038 using namespace XrdFrm;
00039
00040
00041
00042
00043
00044 XrdFrmReqBoss XrdFrmXfrDaemon::GetBoss("getf", XrdFrmRequest::getQ);
00045
00046 XrdFrmReqBoss XrdFrmXfrDaemon::MigBoss("migr", XrdFrmRequest::migQ);
00047
00048 XrdFrmReqBoss XrdFrmXfrDaemon::StgBoss("pstg", XrdFrmRequest::stgQ);
00049
00050 XrdFrmReqBoss XrdFrmXfrDaemon::PutBoss("putf", XrdFrmRequest::putQ);
00051
00052
00053
00054
00055
00056 XrdFrmReqBoss *XrdFrmXfrDaemon::Boss(char bType)
00057 {
00058
00059
00060
00061 switch(bType)
00062 {case 0 :
00063 case '+': return &StgBoss;
00064 case '^':
00065 case '&': return &MigBoss;
00066 case '<': return &GetBoss;
00067 case '=':
00068 case '>': return &PutBoss;
00069 default: break;
00070 }
00071 return 0;
00072 }
00073
00074
00075
00076
00077
00078 int XrdFrmXfrDaemon::Init()
00079 {
00080 char buff[80];
00081
00082
00083
00084 sprintf(buff, "%s/frm_xfrd.lock", Config.QPath);
00085 if (!XrdFrmUtils::Unique(buff, Config.myProg)) return 0;
00086
00087
00088
00089 if (!XrdFrmTransfer::Init()) return 0;
00090
00091
00092
00093 if (Config.WaitMigr < Config.IdleHold) Config.WaitMigr = Config.IdleHold;
00094
00095
00096
00097 if (Config.pathList)
00098 {if (!Config.xfrOUT)
00099 Say.Emsg("Config","Output copy command not specified; "
00100 "auto-migration disabled!");
00101 else XrdFrmMigrate::Migrate();
00102 } else Say.Emsg("Config","No migratable paths; "
00103 "auto-migration disabled!");
00104
00105
00106
00107 if (!StgBoss.Start(Config.QPath, Config.AdminMode)
00108 || !MigBoss.Start(Config.QPath, Config.AdminMode)
00109 || !GetBoss.Start(Config.QPath, Config.AdminMode)
00110 || !PutBoss.Start(Config.QPath, Config.AdminMode)) return 0;
00111
00112
00113
00114 return 1;
00115 }
00116
00117
00118
00119
00120
00121 void *XrdFrmXfrDaemonPong(void *parg)
00122 {
00123 XrdFrmXfrDaemon::Pong();
00124 return (void *)0;
00125 }
00126
00127 void XrdFrmXfrDaemon::Pong()
00128 {
00129 EPNAME("Pong");
00130 static int udpFD = -1;
00131 XrdOucStream Request(&Say);
00132 XrdFrmReqBoss *bossP;
00133 char *tp;
00134
00135
00136
00137
00138 if (udpFD < 0)
00139 {XrdNetSocket *udpSock;
00140 pthread_t tid;
00141 int retc;
00142 if ((udpSock = XrdNetSocket::Create(&Say, Config.QPath,
00143 "xfrd.udp", Config.AdminMode, XRDNET_UDPSOCKET)))
00144 {udpFD = udpSock->Detach(); delete udpSock;
00145 if ((retc = XrdSysThread::Run(&tid, XrdFrmXfrDaemonPong, (void *)0,
00146 XRDSYSTHREAD_BIND, "Pong")))
00147 Say.Emsg("main", retc, "create udp listner");
00148 }
00149 return;
00150 }
00151
00152
00153
00154 Request.Attach(udpFD, 64*1024);
00155
00156
00157
00158
00159 while((tp = Request.GetLine()))
00160 {DEBUG(": '" <<tp <<"'");
00161 switch(*tp)
00162 {case '?': break;
00163 case '!': if ((tp = Request.GetToken()))
00164 while(*tp++)
00165 {if ((bossP = Boss(*tp))) bossP->Wakeup(1);}
00166 break;
00167 default: XrdFrmXfrAgent::Process(Request);
00168 }
00169 }
00170
00171
00172
00173 Say.Emsg("Server", "Lost udp connection!");
00174 }
00175
00176
00177
00178
00179
00180 int XrdFrmXfrDaemon::Start()
00181 {
00182
00183
00184
00185 Pong();
00186
00187
00188
00189 while(1)
00190 {StgBoss.Wakeup(); GetBoss.Wakeup();
00191 MigBoss.Wakeup(); PutBoss.Wakeup();
00192 XrdSysTimer::Snooze(Config.WaitQChk);
00193 }
00194
00195
00196
00197 return 0;
00198 }