00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdBwmHandleCVSID = "$Id: XrdBwmHandle.cc 32231 2010-02-05 18:24:46Z ganis $";
00014
00015 #include <stdio.h>
00016 #include <string.h>
00017
00018 #include "XrdBwm/XrdBwmHandle.hh"
00019 #include "XrdBwm/XrdBwmLogger.hh"
00020 #include "XrdBwm/XrdBwmTrace.hh"
00021 #include "XrdSfs/XrdSfsInterface.hh"
00022 #include "XrdSys/XrdSysError.hh"
00023 #include "XrdSys/XrdSysPlatform.hh"
00024
00025 #include "XProtocol/XProtocol.hh"
00026
00027
00028
00029
00030
00031 XrdBwmLogger *XrdBwmHandle::Logger = 0;
00032 XrdBwmPolicy *XrdBwmHandle::Policy = 0;
00033 XrdBwmHandle *XrdBwmHandle::Free = 0;
00034 unsigned int XrdBwmHandle::numQueued = 0;
00035
00036 extern XrdSysError BwmEroute;
00037
00038
00039
00040
00041
00042 class XrdBwmHandleCB : public XrdOucEICB, public XrdOucErrInfo
00043 {
00044 public:
00045
00046 static
00047 XrdBwmHandleCB *Alloc()
00048 {XrdBwmHandleCB *mP;
00049 xMutex.Lock();
00050 if (!(mP = Free)) mP = new XrdBwmHandleCB;
00051 else Free = mP->Next;
00052 xMutex.UnLock();
00053 return mP;
00054 }
00055
00056 void Done(int &Results, XrdOucErrInfo *eInfo)
00057 {xMutex.Lock();
00058 Next = Free;
00059 Free = this;
00060 xMutex.UnLock();
00061 }
00062
00063 int Same(unsigned long long arg1, unsigned long long arg2) {return 0;}
00064
00065 XrdBwmHandleCB() : Next(0) {}
00066 ~XrdBwmHandleCB() {}
00067
00068 private:
00069 XrdBwmHandleCB *Next;
00070 static XrdSysMutex xMutex;
00071 static XrdBwmHandleCB *Free;
00072 };
00073
00074 XrdSysMutex XrdBwmHandleCB::xMutex;
00075 XrdBwmHandleCB *XrdBwmHandleCB::Free = 0;
00076
00077
00078
00079
00080
00081 void *XrdBwmHanXeq(void *pp)
00082 {
00083 return XrdBwmHandle::Dispatch();
00084 }
00085
00086
00087
00088
00089
00090
00091
00092
00093 #define tident Parms.Tident
00094
00095 int XrdBwmHandle::Activate(XrdOucErrInfo &einfo)
00096 {
00097 EPNAME("Activate");
00098 XrdSysMutexHelper myHelper(hMutex);
00099 char *rBuff;
00100 int rSize, rc;
00101
00102
00103
00104 if (Status != Idle)
00105 {if (Status == Scheduled)
00106 einfo.setErrInfo(kXR_inProgress, "Request already scheduled.");
00107 else einfo.setErrInfo(kXR_InvalidRequest, "Visa already issued.");
00108 return SFS_ERROR;
00109 }
00110
00111
00112
00113 qTime = time(0);
00114 rBuff = einfo.getMsgBuff(rSize);
00115 if (!(rc = Policy->Schedule(rBuff, rSize, Parms))) return SFS_ERROR;
00116
00117
00118
00119 if (rc > 0)
00120 {rHandle = rc;
00121 Status = Dispatched;
00122 rTime = time(0);
00123 ZTRACE(sched,"Run " <<Parms.Lfn <<' ' <<Parms.LclNode
00124 <<(Parms.Direction==XrdBwmPolicy::Incomming?" <- ":" -> ")
00125 <<Parms.RmtNode);
00126 einfo.setErrCode(strlen(rBuff));
00127 return (*rBuff ? SFS_DATA : SFS_OK);
00128 }
00129
00130
00131
00132
00133 rHandle = -rc;
00134 ErrCB = einfo.getErrCB(ErrCBarg);
00135 einfo.setErrCB((XrdOucEICB *)&myEICB);
00136 Status = Scheduled;
00137 refHandle(rHandle, this);
00138 ZTRACE(sched, "inQ " <<Parms.Lfn <<' ' <<Parms.LclNode
00139 <<(Parms.Direction==XrdBwmPolicy::Incomming?" <- ":" -> ")
00140 <<Parms.RmtNode);
00141
00142
00143
00144 return SFS_STARTED;
00145 }
00146 #undef tident
00147
00148
00149
00150
00151
00152 XrdBwmHandle *XrdBwmHandle::Alloc(const char *theUsr, const char *thePath,
00153 const char *LclNode, const char *RmtNode,
00154 int Incomming)
00155 {
00156 XrdBwmHandle *hP = Alloc();
00157
00158
00159
00160 if (hP)
00161 {hP->Parms.Tident = theUsr;
00162 hP->Parms.Lfn = strdup(thePath);
00163 hP->Parms.LclNode = strdup(LclNode);
00164 hP->Parms.RmtNode = strdup(RmtNode);
00165 hP->Parms.Direction = (Incomming ? XrdBwmPolicy::Incomming
00166 : XrdBwmPolicy::Outgoing);
00167 hP->Status = Idle;
00168 hP->qTime = 0;
00169 hP->rTime = 0;
00170 hP->xSize = 0;
00171 hP->xTime = 0;
00172 }
00173
00174
00175
00176 return hP;
00177 }
00178
00179
00180
00181
00182
00183 XrdBwmHandle *XrdBwmHandle::Alloc(XrdBwmHandle *old_hP)
00184 {
00185 static const int minAlloc = 4096/sizeof(XrdBwmHandle);
00186 static XrdSysMutex aMutex;
00187 XrdBwmHandle *hP;
00188
00189
00190
00191
00192 aMutex.Lock();
00193 if (old_hP) {old_hP->Next = Free; Free = old_hP; hP = 0;}
00194 else {if (!Free && (hP = new XrdBwmHandle[minAlloc]))
00195 {int i = minAlloc; while(i--) {hP->Next = Free; Free = hP; hP++;}}
00196 if ((hP = Free)) Free = hP->Next;
00197 }
00198 aMutex.UnLock();
00199
00200 return hP;
00201 }
00202
00203
00204
00205
00206
00207 #define tident hP->Parms.Tident
00208
00209 void *XrdBwmHandle::Dispatch()
00210 {
00211 EPNAME("Dispatch");
00212 XrdBwmHandleCB *erP = XrdBwmHandleCB::Alloc();
00213 XrdBwmHandle *hP;
00214 char *RespBuff;
00215 int RespSize, readyH, Result, Err;
00216
00217
00218
00219 do {
00220
00221
00222
00223 RespBuff = erP->getMsgBuff(RespSize);
00224 *RespBuff = '\0';
00225 erP->setErrCode(0);
00226
00227
00228
00229 if ((Err = (readyH = Policy->Dispatch(RespBuff, RespSize)) < 0))
00230 readyH = -readyH;
00231
00232
00233
00234 if (!(hP = refHandle(readyH)))
00235 {sprintf(RespBuff, "%d", readyH);
00236 BwmEroute.Emsg("Dispatch", "Lost handle from", RespBuff);
00237 if (!Err) Policy->Done(readyH);
00238 continue;
00239 }
00240
00241
00242
00243 hP->hMutex.Lock();
00244 if (hP->Status != Scheduled)
00245 {BwmEroute.Emsg("Dispatch", "ref to unscheduled handle",
00246 hP->Parms.Tident, hP->Parms.Lfn);
00247 if (!Err) Policy->Done(readyH);
00248 } else {
00249 hP->myEICB.Wait(); hP->rTime = time(0);
00250 erP->setErrCB((XrdOucEICB *)erP, hP->ErrCBarg);
00251 if (Err) {hP->Status = Idle; Result = SFS_ERROR;}
00252 else {hP->Status = Dispatched;
00253 erP->setErrCode(strlen(RespBuff));
00254 Result = (*RespBuff ? SFS_DATA : SFS_OK);
00255 }
00256 ZTRACE(sched,(Err?"Err ":"Run ") <<hP->Parms.Lfn <<' ' <<hP->Parms.LclNode
00257 <<(hP->Parms.Direction == XrdBwmPolicy::Incomming ? " <- ":" -> ")
00258 <<hP->Parms.RmtNode);
00259 hP->ErrCB->Done(Result, (XrdOucErrInfo *)erP);
00260 erP = XrdBwmHandleCB::Alloc();
00261 }
00262 hP->hMutex.UnLock();
00263 } while(1);
00264
00265
00266
00267 return (void *)0;
00268 }
00269
00270 #undef tident
00271
00272
00273
00274
00275
00276 XrdBwmHandle *XrdBwmHandle::refHandle(int refID, XrdBwmHandle *hP)
00277 {
00278 static XrdSysMutex tMutex;
00279 static struct {XrdBwmHandle *First;
00280 XrdBwmHandle *Last;
00281 } hTab[256] = {{0,0}};
00282 XrdBwmHandle *pP = 0;
00283 int i = refID % 256;
00284
00285
00286
00287 tMutex.Lock();
00288 if (hP)
00289 {hP->Next = 0;
00290 if (hTab[i].Last) {hTab[i].Last->Next = hP; hTab[i].Last = hP;}
00291 else {hTab[i].First = hTab[i].Last = hP; hP->Next = 0;}
00292 numQueued++;
00293 } else {
00294 hP = hTab[i].First;
00295 while(hP && hP->rHandle != refID) {pP = hP; hP = hP->Next;}
00296 if (hP)
00297 {if (pP) pP->Next = hP->Next;
00298 else hTab[i].First = hP->Next;
00299 if (hTab[i].Last == hP) hTab[i].Last = pP;
00300 numQueued--;
00301 }
00302 }
00303 tMutex.UnLock();
00304
00305
00306
00307 return hP;
00308 }
00309
00310
00311
00312
00313
00314
00315
00316 void XrdBwmHandle::Retire()
00317 {
00318 XrdSysMutexHelper myHelper(hMutex);
00319
00320
00321
00322
00323 if (Status != Idle)
00324 {Policy->Done(rHandle);
00325 if (Status == Scheduled && !refHandle(rHandle, this))
00326 BwmEroute.Emsg("Retire", "Lost handle to", Parms.Tident, Parms.Lfn);
00327 Status = Idle; rHandle = 0;
00328 }
00329
00330
00331
00332 if (Logger && qTime)
00333 {XrdBwmLogger::Info myInfo;
00334 myInfo.Tident = Parms.Tident;
00335 myInfo.Lfn = Parms.Lfn;
00336 myInfo.lclNode = Parms.LclNode;
00337 myInfo.rmtNode = Parms.RmtNode;
00338 myInfo.ATime = qTime;
00339 myInfo.BTime = rTime;
00340 myInfo.CTime = time(0);
00341 myInfo.Size = xSize;
00342 myInfo.ESec = xTime;
00343 myInfo.Flow = (Parms.Direction == XrdBwmPolicy::Incomming ? 'I':'O');
00344 Policy->Status(myInfo.numqIn, myInfo.numqOut, myInfo.numqXeq);
00345 Logger->Event(myInfo);
00346 }
00347
00348
00349
00350 if (Parms.Lfn) {free(Parms.Lfn); Parms.Lfn = 0;}
00351 if (Parms.LclNode) {free(Parms.LclNode); Parms.LclNode = 0;}
00352 if (Parms.RmtNode) {free(Parms.RmtNode); Parms.RmtNode = 0;}
00353 Alloc(this);
00354 }
00355
00356
00357
00358
00359
00360 int XrdBwmHandle::setPolicy(XrdBwmPolicy *pP, XrdBwmLogger *lP)
00361 {
00362 pthread_t tid;
00363 int rc, startThread = (Policy == 0);
00364
00365
00366
00367 Policy = pP;
00368 if (startThread)
00369 if ((rc = XrdSysThread::Run(&tid, XrdBwmHanXeq, (void *)0,
00370 0, "Handle Dispatcher")))
00371 {BwmEroute.Emsg("setPolicy", rc, "create handle dispatch thread");
00372 return 1;
00373 }
00374
00375
00376
00377 Logger = lP;
00378 return 0;
00379 }