00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "XrdVersion.hh"
00012
00013 #include "XrdSfs/XrdSfsInterface.hh"
00014 #include "Xrd/XrdBuffer.hh"
00015 #include "Xrd/XrdLink.hh"
00016 #include "XProtocol/XProtocol.hh"
00017 #include "XrdSys/XrdSysTimer.hh"
00018 #include "XrdXrootd/XrdXrootdAio.hh"
00019 #include "XrdXrootd/XrdXrootdFile.hh"
00020 #include "XrdXrootd/XrdXrootdFileLock.hh"
00021 #include "XrdXrootd/XrdXrootdFileLock1.hh"
00022 #include "XrdXrootd/XrdXrootdMonitor.hh"
00023 #include "XrdXrootd/XrdXrootdPio.hh"
00024 #include "XrdXrootd/XrdXrootdProtocol.hh"
00025 #include "XrdXrootd/XrdXrootdStats.hh"
00026 #include "XrdXrootd/XrdXrootdTrace.hh"
00027 #include "XrdXrootd/XrdXrootdXPath.hh"
00028
00029
00030
00031
00032
00033 XrdOucTrace *XrdXrootdTrace;
00034
00035 XrdXrootdXPath XrdXrootdProtocol::RPList;
00036 XrdXrootdXPath XrdXrootdProtocol::XPList;
00037 XrdSfsFileSystem *XrdXrootdProtocol::osFS;
00038 char *XrdXrootdProtocol::FSLib = 0;
00039 XrdXrootdFileLock *XrdXrootdProtocol::Locker;
00040 XrdSecService *XrdXrootdProtocol::CIA = 0;
00041 char *XrdXrootdProtocol::SecLib = 0;
00042 char *XrdXrootdProtocol::pidPath = strdup("/tmp");
00043 XrdScheduler *XrdXrootdProtocol::Sched;
00044 XrdBuffManager *XrdXrootdProtocol::BPool;
00045 XrdSysError XrdXrootdProtocol::eDest(0, "Xrootd");
00046 XrdXrootdStats *XrdXrootdProtocol::SI;
00047 XrdXrootdJob *XrdXrootdProtocol::JobCKS = 0;
00048 char *XrdXrootdProtocol::JobCKT = 0;
00049
00050 char *XrdXrootdProtocol::Notify = 0;
00051 int XrdXrootdProtocol::hailWait;
00052 int XrdXrootdProtocol::readWait;
00053 int XrdXrootdProtocol::Port;
00054 int XrdXrootdProtocol::Window;
00055 int XrdXrootdProtocol::WANPort;
00056 int XrdXrootdProtocol::WANWindow;
00057 char XrdXrootdProtocol::isRedir = 0;
00058 char XrdXrootdProtocol::chkfsV = 0;
00059 XrdNetSocket *XrdXrootdProtocol::AdminSock= 0;
00060
00061 int XrdXrootdProtocol::hcMax = 28657;
00062 int XrdXrootdProtocol::maxBuffsz;
00063 int XrdXrootdProtocol::maxTransz = 262144;
00064 int XrdXrootdProtocol::as_maxperlnk = 8;
00065 int XrdXrootdProtocol::as_maxperreq = 8;
00066 int XrdXrootdProtocol::as_maxpersrv = 4096;
00067 int XrdXrootdProtocol::as_segsize = 131072;
00068 int XrdXrootdProtocol::as_miniosz = 32768;
00069 #ifdef __solaris__
00070 int XrdXrootdProtocol::as_minsfsz = 1;
00071 #else
00072 int XrdXrootdProtocol::as_minsfsz = 8192;
00073 #endif
00074 int XrdXrootdProtocol::as_maxstalls = 5;
00075 int XrdXrootdProtocol::as_force = 0;
00076 int XrdXrootdProtocol::as_noaio = 0;
00077 int XrdXrootdProtocol::as_nosf = 0;
00078 int XrdXrootdProtocol::as_syncw = 0;
00079
00080 const char *XrdXrootdProtocol::myInst = 0;
00081 const char *XrdXrootdProtocol::TraceID = "Protocol";
00082 int XrdXrootdProtocol::myPID = static_cast<int>(getpid());
00083
00084 struct XrdXrootdProtocol::RD_Table XrdXrootdProtocol::Route[RD_Num] = {{0,0}};
00085
00086
00087
00088
00089
00090 XrdObjectQ<XrdXrootdProtocol>
00091 XrdXrootdProtocol::ProtStack("ProtStack",
00092 "xrootd protocol anchor");
00093
00094
00095
00096
00097
00098 #define UPSTATS(x) SI->statsMutex.Lock(); SI->x++; SI->statsMutex.UnLock()
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109 extern "C"
00110 {
00111 XrdProtocol *XrdgetProtocol(const char *pname, char *parms,
00112 XrdProtocol_Config *pi)
00113 {
00114 XrdProtocol *pp = 0;
00115 const char *txt = "completed.";
00116
00117
00118
00119 pi->eDest->Say("Copr. 2007 Stanford University, xrootd version "
00120 XROOTD_VERSION " build " XrdVERSION);
00121 pi->eDest->Say("++++++ xrootd protocol initialization started.");
00122
00123
00124
00125 if (XrdXrootdProtocol::Configure(parms, pi))
00126 pp = (XrdProtocol *)new XrdXrootdProtocol();
00127 else txt = "failed.";
00128 pi->eDest->Say("------ xrootd protocol initialization ", txt);
00129 return pp;
00130 }
00131 }
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142 extern "C"
00143 {
00144 int XrdgetProtocolPort(const char *pname, char *parms, XrdProtocol_Config *pi)
00145 {
00146
00147
00148
00149
00150
00151 if (pi->Port < 0) return 1094;
00152 return pi->Port;
00153 }
00154 }
00155
00156
00157
00158
00159
00160
00161
00162
00163 XrdXrootdProtocol::XrdXrootdProtocol()
00164 : XrdProtocol("xrootd protocol handler"), ProtLink(this),
00165 Entity("")
00166 {
00167 Reset();
00168 }
00169
00170
00171
00172
00173
00174 XrdXrootdProtocol XrdXrootdProtocol::operator =(const XrdXrootdProtocol &rhs)
00175 {
00176
00177
00178 Reset();
00179
00180
00181
00182 Link = rhs.Link;
00183 Link->setRef(1);
00184 Status = rhs.Status;
00185 myFile = rhs.myFile;
00186 myIOLen = rhs.myIOLen;
00187 myOffset = rhs.myOffset;
00188 Response = rhs.Response;
00189 memcpy((void *)&Request,(const void *)&rhs.Request, sizeof(Request));
00190 Client = rhs.Client;
00191 AuthProt = rhs.AuthProt;
00192 return *this;
00193 }
00194
00195
00196
00197
00198
00199 #define TRACELINK lp
00200
00201 XrdProtocol *XrdXrootdProtocol::Match(XrdLink *lp)
00202 {
00203 struct ClientInitHandShake hsdata;
00204 char *hsbuff = (char *)&hsdata;
00205
00206 static struct hs_response
00207 {kXR_unt16 streamid;
00208 kXR_unt16 status;
00209 kXR_int32 rlen;
00210 kXR_int32 pval;
00211 kXR_int32 styp;
00212 } hsresp={(isRedir == 'M' ? 0xffff : 0), 0, htonl(8),
00213 htonl(XROOTD_VERSBIN),
00214 (isRedir ? htonl(kXR_LBalServer)
00215 : htonl(kXR_DataServer))};
00216
00217 XrdXrootdProtocol *xp;
00218 int dlen;
00219
00220
00221
00222 if ((dlen = lp->Peek(hsbuff,sizeof(hsdata), hailWait)) != sizeof(hsdata))
00223 {if (dlen <= 0) lp->setEtext("handshake not received");
00224 return (XrdProtocol *)0;
00225 }
00226
00227
00228
00229
00230
00231
00232
00233 hsdata.fourth = ntohl(hsdata.fourth);
00234 hsdata.fifth = ntohl(hsdata.fifth);
00235 if (dlen != sizeof(hsdata) || hsdata.first || hsdata.second
00236 || hsdata.third || hsdata.fourth != 4 || hsdata.fifth != ROOTD_PQ) return 0;
00237
00238
00239
00240 if (!lp->Send((char *)&hsresp, sizeof(hsresp)))
00241 {lp->setEtext("handshake failed");
00242 return (XrdProtocol *)0;
00243 }
00244
00245
00246
00247 if (lp->Recv(hsbuff, sizeof(hsdata)) != sizeof(hsdata))
00248 {lp->setEtext("reread failed");
00249 return (XrdProtocol *)0;
00250 }
00251
00252
00253
00254 if (!(xp = ProtStack.Pop())) xp = new XrdXrootdProtocol();
00255
00256
00257
00258 UPSTATS(Count);
00259 xp->Link = lp;
00260 xp->Response.Set(lp);
00261 strcpy(xp->Entity.prot, "host");
00262 xp->Entity.host = (char *)lp->Host();
00263 return (XrdProtocol *)xp;
00264 }
00265
00266
00267
00268
00269
00270 #undef TRACELINK
00271 #define TRACELINK Link
00272
00273 int XrdXrootdProtocol::Process(XrdLink *lp)
00274 {
00275 int rc;
00276
00277
00278
00279 if (Resume)
00280 {if (myBlen && (rc = getData("data", myBuff, myBlen)) != 0)
00281 {if (rc < 0 && myAioReq) myAioReq->Recycle(-1);
00282 return rc;
00283 }
00284 else if ((rc = (*this.*Resume)()) != 0) return rc;
00285 else {Resume = 0; return 0;}
00286 }
00287
00288
00289
00290 if ((rc=getData("request",(char *)&Request,sizeof(Request))) != 0) return rc;
00291
00292
00293
00294 Request.header.requestid = ntohs(Request.header.requestid);
00295 Request.header.dlen = ntohl(Request.header.dlen);
00296 Response.Set(Request.header.streamid);
00297 TRACEP(REQ, "req=" <<Request.header.requestid <<" dlen=" <<Request.header.dlen);
00298
00299
00300
00301
00302 if (Request.header.dlen < 0)
00303 {Response.Send(kXR_ArgInvalid, "Invalid request data length");
00304 return Link->setEtext("protocol data length error");
00305 }
00306
00307
00308
00309
00310 if (Request.header.requestid != kXR_write && Request.header.dlen)
00311 {if (!argp || Request.header.dlen+1 > argp->bsize)
00312 {if (argp) BPool->Release(argp);
00313 if (!(argp = BPool->Obtain(Request.header.dlen+1)))
00314 {Response.Send(kXR_ArgTooLong, "Request argument is too long");
00315 return 0;
00316 }
00317 hcNow = hcPrev; halfBSize = argp->bsize >> 1;
00318 }
00319 if ((rc = getData("arg", argp->buff, Request.header.dlen)))
00320 {Resume = &XrdXrootdProtocol::Process2; return rc;}
00321 argp->buff[Request.header.dlen] = '\0';
00322 }
00323
00324
00325
00326 return Process2();
00327 }
00328
00329
00330
00331
00332
00333 int XrdXrootdProtocol::Process2()
00334 {
00335
00336
00337
00338 if (!Status)
00339 switch(Request.header.requestid)
00340 {case kXR_login: return do_Login();
00341 case kXR_protocol: return do_Protocol();
00342 case kXR_bind: return do_Bind();
00343 default: Response.Send(kXR_InvalidRequest,
00344 "Invalid request; user not logged in");
00345 return Link->setEtext("protocol sequence error 1");
00346 }
00347
00348
00349
00350
00351 switch(Request.header.requestid)
00352 {case kXR_read: return do_Read();
00353 case kXR_readv: return do_ReadV();
00354 case kXR_write: return do_Write();
00355 case kXR_sync: return do_Sync();
00356 case kXR_close: return do_Close();
00357 case kXR_truncate: if (!Request.header.dlen) return do_Truncate();
00358 break;
00359 case kXR_query: if (!Request.header.dlen) return do_Qfh();
00360 default: break;
00361 }
00362
00363
00364
00365 switch(Request.header.requestid)
00366 {case kXR_protocol: return do_Protocol();
00367 case kXR_ping: return do_Ping();
00368 default: break;
00369 }
00370
00371
00372
00373 if (Status & XRD_NEED_AUTH)
00374 {if (Request.header.requestid == kXR_auth) return do_Auth();
00375 else {Response.Send(kXR_InvalidRequest,
00376 "Invalid request; user not authenticated");
00377 return -1;
00378 }
00379 }
00380
00381
00382
00383 switch(Request.header.requestid)
00384 {case kXR_endsess: return do_Endsess();
00385 default: break;
00386 }
00387
00388
00389
00390 if (!argp || !Request.header.dlen)
00391 {Response.Send(kXR_ArgMissing, "Required argument not present");
00392 return 0;
00393 }
00394
00395
00396
00397 ReqID.setID(Request.header.streamid);
00398
00399
00400
00401 switch(Request.header.requestid)
00402 {case kXR_open: return do_Open();
00403 case kXR_getfile: return do_Getfile();
00404 case kXR_putfile: return do_Putfile();
00405 default: break;
00406 }
00407
00408
00409
00410 UPSTATS(miscCnt);
00411
00412
00413
00414 switch(Request.header.requestid)
00415 {case kXR_admin: if (Status & XRD_ADMINUSER) return do_Admin();
00416 else break;
00417 case kXR_chmod: return do_Chmod();
00418 case kXR_dirlist: return do_Dirlist();
00419 case kXR_locate: return do_Locate();
00420 case kXR_mkdir: return do_Mkdir();
00421 case kXR_mv: return do_Mv();
00422 case kXR_query: return do_Query();
00423 case kXR_prepare: return do_Prepare();
00424 case kXR_rm: return do_Rm();
00425 case kXR_rmdir: return do_Rmdir();
00426 case kXR_set: return do_Set();
00427 case kXR_stat: return do_Stat();
00428 case kXR_statx: return do_Statx();
00429 case kXR_truncate: return do_Truncate();
00430 default: break;
00431 }
00432
00433
00434
00435 Response.Send(kXR_InvalidRequest, "Invalid request code");
00436 return 0;
00437 }
00438
00439
00440
00441
00442
00443 #undef TRACELINK
00444 #define TRACELINK Link
00445
00446 void XrdXrootdProtocol::Recycle(XrdLink *lp, int csec, const char *reason)
00447 {
00448 char *sfxp, ctbuff[24], buff[128];
00449
00450
00451
00452 if (lp)
00453 {XrdSysTimer::s2hms(csec, ctbuff, sizeof(ctbuff));
00454 if (reason) {snprintf(buff, sizeof(buff), "%s (%s)", ctbuff, reason);
00455 sfxp = buff;
00456 } else sfxp = ctbuff;
00457
00458 eDest.Log(SYS_LOG_02, "Xeq", lp->ID,
00459 (Status == XRD_BOUNDPATH ? (char *)"unbind":(char *)"disc"), sfxp);
00460 }
00461
00462
00463
00464
00465
00466
00467 if (Status == XRD_BOUNDPATH && Stream[0])
00468 {Stream[0]->streamMutex.Lock();
00469 isDead = 1;
00470 if (isActive)
00471 {isActive = 0;
00472 Stream[0]->Link->setRef(-1);
00473 }
00474 Stream[0]->streamMutex.UnLock();
00475 if (lp) return;
00476 }
00477
00478
00479
00480 if (XrdXrootdMonitor::monUSER && Monitor) Monitor->Disc(monUID, csec);
00481
00482
00483
00484 Cleanup();
00485
00486
00487
00488 Reset();
00489
00490
00491
00492 ProtStack.Push(&ProtLink);
00493 }
00494
00495
00496
00497
00498
00499 int XrdXrootdProtocol::Stats(char *buff, int blen, int do_sync)
00500 {
00501
00502
00503 if (do_sync)
00504 {SI->statsMutex.Lock();
00505 SI->readCnt += numReads;
00506 cumReads += numReads; numReads = 0;
00507 SI->prerCnt += numReadP;
00508 cumReadP += numReadP; numReadP = 0;
00509 SI->writeCnt += numWrites;
00510 cumWrites+= numWrites;numWrites = 0;
00511 SI->statsMutex.UnLock();
00512 }
00513
00514
00515
00516 return SI->Stats(buff, blen, do_sync);
00517 }
00518
00519
00520
00521
00522
00523
00524
00525
00526 void XrdXrootdProtocol::Cleanup()
00527 {
00528 XrdXrootdPio *pioP;
00529 int i;
00530
00531
00532
00533 if (argp) {BPool->Release(argp); argp = 0;}
00534
00535
00536
00537 if (FTab) {delete FTab; FTab = 0;}
00538
00539
00540
00541
00542
00543
00544 if (isBound && Status != XRD_BOUNDPATH)
00545 {streamMutex.Lock();
00546 for (i = 1; i < maxStreams; i++)
00547 if (Stream[i])
00548 {Stream[i]->isBound = 0; Stream[i]->Stream[0] = 0;
00549 if (Stream[i]->isDead) Stream[i]->Recycle(0, 0, 0);
00550 else Stream[i]->Link->Close();
00551 Stream[i] = 0;
00552 }
00553 streamMutex.UnLock();
00554 }
00555
00556
00557
00558 SI->statsMutex.Lock();
00559 SI->readCnt += numReads; SI->writeCnt += numWrites;
00560 SI->statsMutex.UnLock();
00561
00562
00563
00564 if (Monitor) {Monitor->unAlloc(Monitor); Monitor = 0;}
00565
00566
00567
00568 if (AuthProt) {AuthProt->Delete(); AuthProt = 0;}
00569
00570
00571
00572 while((pioP = pioFirst)) {pioFirst = pioP->Next; pioP->Recycle();}
00573 while((pioP = pioFree )) {pioFree = pioP->Next; pioP->Recycle();}
00574 }
00575
00576
00577
00578
00579
00580 int XrdXrootdProtocol::getData(const char *dtype, char *buff, int blen)
00581 {
00582 int rlen;
00583
00584
00585
00586
00587 rlen = Link->Recv(buff, blen, readWait);
00588 if (rlen < 0)
00589 {if (rlen != -ENOMSG) return Link->setEtext("link read error");
00590 else return -1;
00591 }
00592 if (rlen < blen)
00593 {myBuff = buff+rlen; myBlen = blen-rlen;
00594 TRACEP(REQ, dtype <<" timeout; read " <<rlen <<" of " <<blen <<" bytes");
00595 return 1;
00596 }
00597 return 0;
00598 }
00599
00600
00601
00602
00603
00604 void XrdXrootdProtocol::Reset()
00605 {
00606 Status = 0;
00607 argp = 0;
00608 Link = 0;
00609 FTab = 0;
00610 Resume = 0;
00611 myBuff = (char *)&Request;
00612 myBlen = sizeof(Request);
00613 myBlast = 0;
00614 myOffset = 0;
00615 myIOLen = 0;
00616 myStalls = 0;
00617 myAioReq = 0;
00618 numReads = 0;
00619 numReadP = 0;
00620 numWrites = 0;
00621 numFiles = 0;
00622 cumReads = 0;
00623 cumReadP = 0;
00624 cumWrites = 0;
00625 totReadP = 0;
00626 hcPrev =13;
00627 hcNext =21;
00628 hcNow =13;
00629 Monitor = 0;
00630 monUID = 0;
00631 monFILE = 0;
00632 monIO = 0;
00633 Client = 0;
00634 AuthProt = 0;
00635 mySID = 0;
00636 CapVer = 0;
00637 reTry = 0;
00638 PathID = 0;
00639 pioFree = pioFirst = pioLast = 0;
00640 isActive = isDead = isNOP = isBound = 0;
00641 memset(&Entity, 0, sizeof(Entity));
00642 memset(Stream, 0, sizeof(Stream));
00643 }