00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdCmsRRQCVSID = "$Id: XrdCmsRRQ.cc 25932 2008-10-23 10:58:11Z ganis $";
00016
00017 #include <sys/types.h>
00018 #include <netinet/in.h>
00019 #include <inttypes.h>
00020
00021 #include "XrdCms/XrdCmsCluster.hh"
00022 #include "XrdCms/XrdCmsNode.hh"
00023 #include "XrdCms/XrdCmsRRQ.hh"
00024 #include "XrdCms/XrdCmsRTable.hh"
00025 #include "XrdCms/XrdCmsTrace.hh"
00026 #include "XrdSys/XrdSysError.hh"
00027 #include "XrdSys/XrdSysTimer.hh"
00028 #include <stdio.h>
00029
00030 using namespace XrdCms;
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 XrdCmsRRQ XrdCms::RRQ;
00041
00042 XrdSysMutex XrdCmsRRQSlot::myMutex;
00043 XrdCmsRRQSlot *XrdCmsRRQSlot::freeSlot = 0;
00044 short XrdCmsRRQSlot::initSlot = 0;
00045
00046
00047
00048
00049
00050 void *XrdCmsRRQ_StartTimeOut(void *parg) {return RRQ.TimeOut();}
00051
00052 void *XrdCmsRRQ_StartRespond(void *parg) {return RRQ.Respond();}
00053
00054
00055
00056
00057
00058
00059
00060
00061 short XrdCmsRRQ::Add(short Snum, XrdCmsRRQInfo *Info)
00062 {
00063
00064 XrdCmsRRQSlot *sp;
00065
00066
00067
00068 if (!(sp = XrdCmsRRQSlot::Alloc(Info))) return 0;
00069
00070
00071
00072
00073
00074 myMutex.Lock();
00075 if (Snum && Slot[Snum].Info.Key == Info->Key && Slot[Snum].Expire)
00076 {if (Info->isLU)
00077 {sp->LkUp = Slot[Snum].LkUp;
00078 Slot[Snum].LkUp = sp;
00079 } else {
00080 sp->Cont = Slot[Snum].Cont;
00081 Slot[Snum].Cont = sp;
00082 }
00083 myMutex.UnLock();
00084 return Snum;
00085 }
00086
00087
00088
00089 sp->Expire = myClock+1;
00090 if (waitQ.Singleton()) isWaiting.Post();
00091 waitQ.Prev()->Insert(&sp->Link);
00092 myMutex.UnLock();
00093 return sp->slotNum;
00094 }
00095
00096
00097
00098
00099
00100 void XrdCmsRRQ::Del(short Snum, const void *Key)
00101 {
00102 Ready(Snum, Key, 0, 0);
00103 }
00104
00105
00106
00107
00108
00109 int XrdCmsRRQ::Init(int Tint, int Tdly)
00110 {
00111 int rc;
00112 pthread_t tid;
00113
00114
00115
00116 if (Tint) Tslice = Tint;
00117 if (Tdly) Tdelay = Tdly;
00118
00119
00120
00121 dataResp.Hdr.streamid = 0;
00122 dataResp.Hdr.rrCode = kYR_data;
00123 dataResp.Hdr.modifier = 0;
00124 dataResp.Hdr.datalen = 0;
00125 dataResp.Val = 0;
00126
00127
00128
00129 data_iov[0].iov_base = (char *)&dataResp;
00130 data_iov[0].iov_len = sizeof(dataResp);
00131 data_iov[1].iov_base = databuff;;
00132
00133
00134
00135 redrResp.Hdr.streamid = 0;
00136 redrResp.Hdr.rrCode = kYR_redirect;
00137 redrResp.Hdr.modifier = 0;
00138 redrResp.Hdr.datalen = 0;
00139 redrResp.Val = 0;
00140
00141
00142
00143 redr_iov[0].iov_base = (char *)&redrResp;
00144 redr_iov[0].iov_len = sizeof(redrResp);
00145 redr_iov[1].iov_base = hostbuff;;
00146
00147
00148
00149 waitResp.Hdr.streamid = 0;
00150 waitResp.Hdr.rrCode = kYR_wait;
00151 waitResp.Hdr.modifier = 0;
00152 waitResp.Hdr.datalen = htons(static_cast<unsigned short>(sizeof(waitResp.Val)));
00153 waitResp.Val = htonl(Tdelay);
00154
00155
00156
00157 if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartRespond, (void *)0,
00158 0, "Request Responder")))
00159 {Say.Emsg("Config", rc, "create request responder thread");
00160 return 1;
00161 }
00162
00163
00164
00165 if ((rc = XrdSysThread::Run(&tid, XrdCmsRRQ_StartTimeOut, (void *)0,
00166 0, "Request Timeout")))
00167 {Say.Emsg("Config", rc, "create request timeout thread");
00168 return 1;
00169 }
00170
00171
00172
00173 return 0;
00174 }
00175
00176
00177
00178
00179
00180 void XrdCmsRRQ::Ready(int Snum, const void *Key, SMask_t mask1, SMask_t mask2)
00181 {
00182
00183 XrdCmsRRQSlot *sp;
00184
00185
00186
00187 myMutex.Lock();
00188 sp = &Slot[Snum];
00189 if (sp->Info.Key != Key || !sp->Expire)
00190 {myMutex.UnLock();
00191
00192 return;
00193 }
00194
00195
00196
00197 sp->Link.Remove();
00198 if (readyQ.Singleton()) isReady.Post();
00199 sp->Arg1 = mask1; sp->Arg2 = mask2;
00200 readyQ.Prev()->Insert(&sp->Link);
00201 myMutex.UnLock();
00202
00203 }
00204
00205
00206
00207
00208
00209 void *XrdCmsRRQ::Respond()
00210 {
00211
00212 static const int ovhd = sizeof(kXR_unt32);
00213 XrdCmsRRQSlot *lupQ, *sp, *cp;
00214 int doredir, port, hlen;
00215
00216
00217
00218 do {isReady.Wait();
00219 do {myMutex.Lock();
00220 lupQ = 0;
00221 if (readyQ.Singleton()) {myMutex.UnLock(); break;}
00222 sp = readyQ.Next()->Item(); sp->Link.Remove(); sp->Expire = 0;
00223 myMutex.UnLock();
00224 if (sp->Info.isLU) {lupQ = sp;
00225 if (!(sp = sp->Cont)) break;
00226 sp->Arg1 = lupQ->Arg1; sp->Arg2 = lupQ->Arg2;
00227 } else lupQ = sp->LkUp;
00228 if ((doredir = (sp->Arg1 && Cluster.Select(sp->Info.isRW, sp->Arg1,
00229 port, hostbuff, hlen))))
00230 {redrResp.Val = htonl(port);
00231 redrResp.Hdr.datalen = htons(static_cast<unsigned short>(hlen+ovhd));
00232 redr_iov[1].iov_len = hlen;
00233 hlen += ovhd + sizeof(redrResp.Hdr);
00234 }
00235 sendResponse(&sp->Info, doredir, hlen);
00236 cp = sp->Cont;
00237 while(cp) {sendResponse(&cp->Info, doredir, hlen); cp = cp->Cont;}
00238 sp->Recycle();
00239 } while(1);
00240 if (lupQ) {lupQ->Cont = lupQ->LkUp;
00241 sendLocResp(lupQ);
00242 lupQ->Recycle();
00243 }
00244 } while(1);
00245
00246
00247
00248 return (void *)0;
00249 }
00250
00251
00252
00253
00254
00255 void XrdCmsRRQ::sendLocResp(XrdCmsRRQSlot *lP)
00256 {
00257 static const int ovhd = sizeof(kXR_unt32);
00258 XrdCmsSelected *sP;
00259 XrdCmsNode *nP;
00260 int bytes;
00261
00262
00263
00264 if (!(lP->Arg1))
00265 {do {sendResponse(&lP->Info, 0); lP = lP->Cont;} while(lP);
00266 return;
00267 }
00268
00269
00270
00271
00272 if (!(sP = Cluster.List(lP->Arg1, XrdCmsCluster::LS_IPV6))
00273 || (!(bytes = XrdCmsNode::do_LocFmt(databuff,sP,lP->Arg2,lP->Info.rwVec))))
00274 {while(lP) {sendResponse(&lP->Info, 0); lP = lP->Cont;}
00275 return;
00276 }
00277
00278
00279
00280 bytes++;
00281 data_iov[1].iov_len = bytes;
00282 bytes += ovhd;
00283 dataResp.Hdr.datalen = htons(static_cast<unsigned short>(bytes));
00284 bytes += sizeof(dataResp.Hdr);
00285
00286
00287
00288 while(lP)
00289 {RTable.Lock();
00290 if ((nP = RTable.Find(lP->Info.Rnum, lP->Info.Rinst)))
00291 {dataResp.Hdr.streamid = lP->Info.ID;
00292 nP->Send(data_iov, iov_cnt, bytes);
00293 }
00294 RTable.UnLock();
00295 lP = lP->Cont;
00296 }
00297 }
00298
00299
00300
00301
00302
00303 void XrdCmsRRQ::sendResponse(XrdCmsRRQInfo *Info, int doredir, int totlen)
00304 {
00305
00306 XrdCmsNode *nP;
00307
00308
00309
00310 RTable.Lock();
00311 if ((nP = RTable.Find(Info->Rnum, Info->Rinst)))
00312 {if (doredir){redrResp.Hdr.streamid = Info->ID;
00313 nP->Send(redr_iov, iov_cnt, totlen);
00314
00315 }
00316 else {waitResp.Hdr.streamid = Info->ID;
00317 nP->Send((char *)&waitResp, sizeof(waitResp));
00318
00319 }
00320 }
00321
00322 RTable.UnLock();
00323 }
00324
00325
00326
00327
00328
00329 void *XrdCmsRRQ::TimeOut()
00330 {
00331
00332 XrdCmsRRQSlot *sp;
00333
00334
00335
00336 while(1)
00337 {isWaiting.Wait();
00338 myMutex.Lock();
00339 while(1)
00340 {myClock++;
00341 myMutex.UnLock();
00342 XrdSysTimer::Wait(Tslice);
00343 myMutex.Lock();
00344 while((sp=waitQ.Next()->Item()) && sp->Expire < myClock)
00345 {sp->Link.Remove();
00346 if (readyQ.Singleton()) isReady.Post();
00347 sp->Arg1 = 0; sp->Arg2 = 0;
00348
00349 readyQ.Prev()->Insert(&sp->Link);
00350 }
00351 if (waitQ.Singleton()) break;
00352 }
00353 myMutex.UnLock();
00354 }
00355
00356
00357
00358 return (void *)0;
00359 }
00360
00361
00362
00363
00364
00365
00366
00367
00368 XrdCmsRRQSlot::XrdCmsRRQSlot() : Link(this)
00369 {
00370
00371 slotNum = initSlot++;
00372 if (slotNum)
00373 {Cont = freeSlot;
00374 freeSlot = this;
00375 } else Cont = 0;
00376 Arg1 = Arg2 = 0;
00377 Info.Key = 0;
00378 }
00379
00380
00381
00382
00383
00384 XrdCmsRRQSlot *XrdCmsRRQSlot::Alloc(XrdCmsRRQInfo *theInfo)
00385 {
00386 XrdCmsRRQSlot *sp;
00387
00388 myMutex.Lock();
00389 if ((sp = freeSlot))
00390 {sp->Info = *theInfo;
00391 freeSlot = sp->Cont;
00392 sp->Cont = 0;
00393 sp->LkUp = 0;
00394 sp->Arg1 = 0;
00395 sp->Arg2 = 0;
00396 }
00397 myMutex.UnLock();
00398 return sp;
00399 }
00400
00401
00402
00403
00404
00405 void XrdCmsRRQSlot::Recycle()
00406 {
00407 XrdCmsRRQSlot *sp, *np = Cont;
00408
00409 myMutex.Lock();
00410 if (!Link.Singleton()) Link.Remove();
00411 while((sp = np))
00412 {np = sp->Cont;
00413 sp->Cont = freeSlot;
00414 freeSlot = sp;
00415 sp->Info.Key = 0;
00416 }
00417 Info.Key = 0;
00418 Cont = freeSlot;
00419 freeSlot = this;
00420 myMutex.UnLock();
00421 }