00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdFrmReqBossCVSID = "$Id: XrdFrmReqBoss.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <stdio.h>
00016 #include <stdlib.h>
00017 #include <string.h>
00018 #include <strings.h>
00019 #include <unistd.h>
00020 #include <fcntl.h>
00021 #include <sys/types.h>
00022 #include <sys/stat.h>
00023
00024 #include "XrdFrm/XrdFrmCID.hh"
00025 #include "XrdFrm/XrdFrmReqBoss.hh"
00026 #include "XrdFrm/XrdFrmTrace.hh"
00027 #include "XrdFrm/XrdFrmUtils.hh"
00028 #include "XrdFrm/XrdFrmXfrQueue.hh"
00029 #include "XrdNet/XrdNetMsg.hh"
00030 #include "XrdOuc/XrdOucUtils.hh"
00031 #include "XrdSys/XrdSysHeaders.hh"
00032
00033 using namespace XrdFrm;
00034
00035
00036
00037
00038
00039 void *mainServerXeq(void *parg)
00040 {
00041 XrdFrmReqBoss *theBoss = (XrdFrmReqBoss *)parg;
00042 theBoss->Process();
00043 return (void *)0;
00044 }
00045
00046
00047
00048
00049
00050 void XrdFrmReqBoss::Add(XrdFrmRequest &Request)
00051 {
00052
00053
00054
00055 if (Request.Prty > XrdFrmRequest::maxPrty)
00056 Request.Prty = XrdFrmRequest::maxPrty;
00057 else if (Request.Prty < 0)Request.Prty = 0;
00058 Request.addTOD = time(0);
00059
00060
00061
00062 rQueue[static_cast<int>(Request.Prty)]->Add(&Request);
00063
00064
00065
00066 Wakeup(1);
00067 }
00068
00069
00070
00071
00072
00073 void XrdFrmReqBoss::Del(XrdFrmRequest &Request)
00074 {
00075 int i;
00076
00077
00078
00079 for (i = 0; i <= XrdFrmRequest::maxPrty; i++) rQueue[i]->Can(&Request);
00080 }
00081
00082
00083
00084
00085
00086 void XrdFrmReqBoss::Process()
00087 {
00088 EPNAME("Process");
00089 XrdFrmRequest myReq;
00090 int i, rc, numXfr, numPull;;
00091
00092
00093
00094 do{Wakeup(0);
00095 do{numXfr = 0;
00096 for (i = XrdFrmRequest::maxPrty; i >= 0; i--)
00097 {numPull = i+1;
00098 while(numPull && (rc = rQueue[i]->Get(&myReq)))
00099 {if (myReq.Options & XrdFrmRequest::Register) Register(myReq,i);
00100 else {numPull -= XrdFrmXfrQueue::Add(&myReq,rQueue[i],theQ);
00101 numXfr++;
00102 DEBUG(Persona <<" from Q " << i <<' ' <<numPull <<" left");
00103 if (rc < 0) break;
00104 }
00105 }
00106 }
00107 } while(numXfr);
00108 } while(1);
00109 }
00110
00111
00112
00113
00114
00115 void XrdFrmReqBoss::Register(XrdFrmRequest &Req, int qNum)
00116 {
00117 EPNAME("Register");
00118 char *eP;
00119 int Pid;
00120
00121
00122
00123 if (!(*Req.LFN)) return;
00124 Pid = strtol(Req.ID, &eP, 10);
00125 if (*eP || Pid == 0) return;
00126
00127
00128
00129 if (CID.Add(Req.iName, Req.LFN, static_cast<time_t>(Req.addTOD), Pid))
00130 {DEBUG("Instance=" <<Req.iName <<" cluster=" <<Req.LFN <<" pid=" <<Pid);}
00131 else rQueue[qNum]->Del(&Req);
00132 }
00133
00134
00135
00136
00137
00138 int XrdFrmReqBoss::Start(char *aPath, int aMode)
00139 {
00140 pthread_t tid;
00141 char buff[2048], *qPath;
00142 int retc, i;
00143
00144
00145
00146 if (!(qPath = XrdFrmUtils::makeQDir(aPath, aMode))) return 0;
00147
00148
00149
00150 for (i = 0; i <= XrdFrmRequest::maxPrty; i++)
00151 {sprintf(buff, "%s%sQ.%d", qPath, Persona, i);
00152 rQueue[i] = new XrdFrmReqFile(buff, 0);
00153 if (!rQueue[i]->Init()) return 0;
00154 }
00155
00156
00157
00158 if ((retc = XrdSysThread::Run(&tid, mainServerXeq, (void *)this,
00159 XRDSYSTHREAD_BIND, Persona)))
00160 {sprintf(buff, "create %s request thread", Persona);
00161 Say.Emsg("Start", retc, buff);
00162 return 0;
00163 }
00164
00165
00166
00167 return 1;
00168 }
00169
00170
00171
00172
00173
00174 void XrdFrmReqBoss::Wakeup(int PushIt)
00175 {
00176 static XrdSysMutex rqMutex;
00177
00178
00179
00180 if (PushIt) {rqMutex.Lock();
00181 if (!isPosted) {rqReady.Post(); isPosted = 1;}
00182 rqMutex.UnLock();
00183 }
00184 else {rqReady.Wait();
00185 rqMutex.Lock(); isPosted = 0; rqMutex.UnLock();
00186 }
00187 }