00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdFrmReqAgentCVSID = "$Id: XrdFrmReqAgent.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <stdio.h>
00016 #include <stdlib.h>
00017 #include <string.h>
00018 #include <strings.h>
00019 #include <unistd.h>
00020 #include <fcntl.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023
00024 #include "XrdFrm/XrdFrmReqAgent.hh"
00025 #include "XrdFrm/XrdFrmTrace.hh"
00026 #include "XrdFrm/XrdFrmUtils.hh"
00027 #include "XrdNet/XrdNetMsg.hh"
00028 #include "XrdOuc/XrdOucUtils.hh"
00029 #include "XrdSys/XrdSysHeaders.hh"
00030 #include "XrdSys/XrdSysPlatform.hh"
00031
00032 using namespace XrdFrm;
00033
00034
00035
00036
00037
00038 char *XrdFrmReqAgent::c2sFN = 0;
00039
00040
00041
00042
00043
00044 XrdFrmReqAgent::XrdFrmReqAgent(const char *Me, int qVal)
00045 : Persona(Me),theQ(qVal)
00046 {
00047
00048
00049 switch(qVal)
00050 {case XrdFrmRequest::getQ: pingMsg = "!<\n"; break;
00051 case XrdFrmRequest::migQ: pingMsg = "!&\n"; break;
00052 case XrdFrmRequest::stgQ: pingMsg = "!+\n"; break;
00053 case XrdFrmRequest::putQ: pingMsg = "!>\n"; break;
00054 default: pingMsg = "!\n" ; break;
00055 }
00056 }
00057
00058
00059
00060
00061
00062 void XrdFrmReqAgent::Add(XrdFrmRequest &Request)
00063 {
00064
00065
00066
00067 if (Request.Prty > XrdFrmRequest::maxPrty)
00068 Request.Prty = XrdFrmRequest::maxPrty;
00069 else if (Request.Prty < 0)Request.Prty = 0;
00070
00071
00072
00073 Request.addTOD = time(0);
00074 if (myName) strlcpy(Request.iName, myName, sizeof(Request.iName));
00075
00076
00077
00078 rQueue[static_cast<int>(Request.Prty)]->Add(&Request);
00079
00080
00081
00082 Ping();
00083 }
00084
00085
00086
00087
00088
00089 void XrdFrmReqAgent::Del(XrdFrmRequest &Request)
00090 {
00091 int i;
00092
00093
00094
00095 for (i = 0; i <= XrdFrmRequest::maxPrty; i++) rQueue[i]->Can(&Request);
00096 }
00097
00098
00099
00100
00101
00102 int XrdFrmReqAgent::List(XrdFrmRequest::Item *Items, int Num)
00103 {
00104 char myLfn[8192];
00105 int i, Offs, n = 0;
00106
00107
00108
00109 for (i = 0; i <= XrdFrmRequest::maxPrty; i++)
00110 {Offs = 0;
00111 while(rQueue[i]->List(myLfn, sizeof(myLfn), Offs, Items, Num))
00112 {cout <<myLfn <<endl; n++;}
00113 }
00114
00115
00116 return n;
00117 }
00118
00119
00120
00121 int XrdFrmReqAgent::List(XrdFrmRequest::Item *Items, int Num, int Prty)
00122 {
00123 char myLfn[8192];
00124 int Offs, n = 0;
00125
00126
00127
00128 if (Prty <= XrdFrmRequest::maxPrty)
00129 {Offs = 0;
00130 while(rQueue[Prty]->List(myLfn, sizeof(myLfn), Offs, Items, Num))
00131 {cout <<myLfn <<endl; n++;}
00132 }
00133
00134
00135
00136 return n;
00137 }
00138
00139
00140
00141
00142
00143 int XrdFrmReqAgent::NextLFN(char *Buff, int Bsz, int Prty, int &Offs)
00144 {
00145 static XrdFrmRequest::Item Items[1] = {XrdFrmRequest::getLFN};
00146
00147
00148
00149 return rQueue[Prty]->List(Buff, Bsz, Offs, Items, 1) != 0;
00150 }
00151
00152
00153
00154
00155
00156 void XrdFrmReqAgent::Ping(const char *Msg)
00157 {
00158 static XrdNetMsg udpMsg(&Say, c2sFN);
00159 static int udpOK = 0;
00160 struct stat buf;
00161
00162
00163
00164 if (udpOK || !stat(c2sFN, &buf))
00165 {udpMsg.Send(Msg ? Msg : pingMsg); udpOK = 1;}
00166 }
00167
00168
00169
00170
00171
00172 int XrdFrmReqAgent::Start(char *aPath, int aMode)
00173 {
00174 XrdFrmRequest Request;
00175 const char *myClid;
00176 char buff[2048], *qPath;
00177 int i;
00178
00179
00180
00181 if (!c2sFN)
00182 {sprintf(buff, "%sxfrd.udp", aPath);
00183 c2sFN = strdup(buff);
00184 }
00185
00186
00187
00188 myName = XrdOucUtils::InstName(1);
00189
00190
00191
00192 if (!(qPath = XrdFrmUtils::makeQDir(aPath, aMode))) return 0;
00193
00194
00195
00196 if ((myClid = getenv("XRDCMSCLUSTERID")))
00197 {int Uid = static_cast<int>(geteuid());
00198 int Gid = static_cast<int>(getegid());
00199 memset(&Request, 0, sizeof(Request));
00200 strlcpy(Request.LFN, myClid, sizeof(Request.LFN));
00201 sprintf(Request.User,"%d %d", Uid, Gid);
00202 sprintf(Request.ID, "%d", static_cast<int>(getpid()));
00203 strlcpy(Request.iName, myName, sizeof(Request.iName));
00204 Request.addTOD = time(0);
00205 Request.Options = XrdFrmRequest::Register;
00206 Request.OPc = '@';
00207 }
00208
00209
00210
00211 for (i = 0; i <= XrdFrmRequest::maxPrty; i++)
00212 {sprintf(buff, "%s%sQ.%d", qPath, Persona, i);
00213 rQueue[i] = new XrdFrmReqFile(buff, 1);
00214 if (!rQueue[i]->Init()) return 0;
00215 if (myClid) rQueue[i]->Add(&Request);
00216 }
00217
00218
00219
00220 if (myClid) Ping();
00221 return 1;
00222 }