00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdXrootdCallBackCVSID = "$Id: XrdXrootdCallBack.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <errno.h>
00016 #include <stdio.h>
00017 #include <string.h>
00018 #include <netinet/in.h>
00019 #include <sys/uio.h>
00020
00021 #include "Xrd/XrdScheduler.hh"
00022 #include "XProtocol/XProtocol.hh"
00023 #include "XProtocol/XPtypes.hh"
00024 #include "XrdSys/XrdSysError.hh"
00025 #include "XrdSfs/XrdSfsInterface.hh"
00026 #include "XrdXrootd/XrdXrootdCallBack.hh"
00027 #include "XrdXrootd/XrdXrootdProtocol.hh"
00028 #include "XrdXrootd/XrdXrootdStats.hh"
00029 #include "XrdXrootd/XrdXrootdReqID.hh"
00030 #include "XrdXrootd/XrdXrootdTrace.hh"
00031
00032
00033
00034
00035
00036 class XrdXrootdCBJob : XrdJob
00037 {
00038 public:
00039
00040 static XrdXrootdCBJob *Alloc(XrdXrootdCallBack *cbF, XrdOucErrInfo *erp, int rval);
00041
00042 void DoIt();
00043
00044 inline void Recycle(){myMutex.Lock();
00045 Next = FreeJob;
00046 FreeJob = this;
00047 myMutex.UnLock();
00048 }
00049
00050 XrdXrootdCBJob(XrdXrootdCallBack *cbp,
00051 XrdOucErrInfo *erp,
00052 int rval)
00053 : XrdJob("async response"),
00054 cbFunc(cbp), eInfo(erp), Result(rval) {}
00055 ~XrdXrootdCBJob() {}
00056
00057 private:
00058 void DoStatx(XrdOucErrInfo *eInfo);
00059 static XrdSysMutex myMutex;
00060 static XrdXrootdCBJob *FreeJob;
00061
00062 XrdXrootdCBJob *Next;
00063 XrdXrootdCallBack *cbFunc;
00064 XrdOucErrInfo *eInfo;
00065 int Result;
00066 };
00067
00068
00069
00070
00071
00072 extern XrdOucTrace *XrdXrootdTrace;
00073
00074 XrdSysError *XrdXrootdCallBack::eDest;
00075 XrdXrootdStats *XrdXrootdCallBack::SI;
00076 XrdScheduler *XrdXrootdCallBack::Sched;
00077 int XrdXrootdCallBack::Port;
00078
00079 XrdSysMutex XrdXrootdCBJob::myMutex;
00080 XrdXrootdCBJob *XrdXrootdCBJob::FreeJob;
00081
00082
00083
00084
00085
00086
00087
00088
00089 XrdXrootdCBJob *XrdXrootdCBJob::Alloc(XrdXrootdCallBack *cbF,
00090 XrdOucErrInfo *erp,
00091 int rval)
00092 {
00093 XrdXrootdCBJob *cbj;
00094
00095
00096
00097 myMutex.Lock();
00098 if (!(cbj = FreeJob)) cbj = new XrdXrootdCBJob(cbF, erp, rval);
00099 else {cbj->cbFunc = cbF, cbj->eInfo = erp;
00100 cbj->Result = rval;FreeJob = cbj->Next;
00101 }
00102 myMutex.UnLock();
00103
00104
00105
00106 return cbj;
00107 }
00108
00109
00110
00111
00112
00113 void XrdXrootdCBJob::DoIt()
00114 {
00115
00116
00117
00118
00119
00120
00121 if (SFS_OK == Result)
00122 {if (*(cbFunc->Func()) == 'o') cbFunc->sendResp(eInfo, kXR_wait, 0);
00123 else {if (*(cbFunc->Func()) == 'x') DoStatx(eInfo);
00124 cbFunc->sendResp(eInfo, kXR_ok, 0, eInfo->getErrText());
00125 }
00126 } else cbFunc->sendError(Result, eInfo);
00127
00128
00129
00130 if (eInfo->getErrCB()) eInfo->getErrCB()->Done(Result, eInfo);
00131 else delete eInfo;
00132 eInfo = 0;
00133 Recycle();
00134 }
00135
00136
00137
00138
00139
00140 void XrdXrootdCBJob::DoStatx(XrdOucErrInfo *einfo)
00141 {
00142 const char *tp = einfo->getErrText();
00143 char cflags[2];
00144 int flags;
00145
00146
00147
00148 while(*tp && *tp == ' ') tp++;
00149 while(*tp && *tp != ' ') tp++;
00150 while(*tp && *tp == ' ') tp++;
00151 while(*tp && *tp != ' ') tp++;
00152
00153
00154
00155 flags = atoi(tp);
00156
00157
00158
00159 if (flags & kXR_offline) cflags[0] = (char)kXR_offline;
00160 else if (flags & kXR_isDir) cflags[0] = (char)kXR_isDir;
00161 else cflags[0] = (char)kXR_file;
00162
00163
00164
00165 cflags[1] = '\0';
00166 einfo->setErrInfo(0, cflags);
00167 }
00168
00169
00170
00171
00172
00173
00174
00175
00176 void XrdXrootdCallBack::Done(int &Result,
00177 XrdOucErrInfo *eInfo)
00178 {
00179 XrdXrootdCBJob *cbj;
00180
00181
00182
00183
00184 if (!(cbj = XrdXrootdCBJob::Alloc(this, eInfo, Result)))
00185 {eDest->Emsg("Done",ENOMEM,"get call back job; user",eInfo->getErrUser());
00186 if (eInfo->getErrCB()) eInfo->getErrCB()->Done(Result, eInfo);
00187 else delete eInfo;
00188 } else Sched->Schedule((XrdJob *)cbj);
00189 }
00190
00191
00192
00193
00194
00195 int XrdXrootdCallBack::Same(unsigned long long arg1, unsigned long long arg2)
00196 {
00197 XrdXrootdReqID ReqID1(arg1), ReqID2(arg2);
00198 unsigned char sid1[2], sid2[2];
00199 unsigned int inst1, inst2;
00200 int lid1, lid2;
00201
00202 ReqID1.getID(sid1, lid1, inst1);
00203 ReqID2.getID(sid2, lid2, inst2);
00204 return lid1 == lid2;
00205 }
00206
00207
00208
00209
00210
00211 void XrdXrootdCallBack::sendError(int rc,
00212 XrdOucErrInfo *eInfo)
00213 {
00214 const char *TraceID = "fsError";
00215 static int Xserr = kXR_ServerError;
00216 int ecode;
00217 const char *eMsg = eInfo->getErrText(ecode);
00218 const char *User = eInfo->getErrUser();
00219
00220
00221
00222 if (eMsg && !*eMsg) eMsg = 0;
00223
00224
00225
00226 if (rc == SFS_ERROR)
00227 {SI->errorCnt++;
00228 rc = XrdXrootdProtocol::mapError(ecode);
00229 sendResp(eInfo, kXR_error, &rc, eMsg, 1);
00230 return;
00231 }
00232
00233
00234
00235 if (rc == SFS_REDIRECT)
00236 {SI->redirCnt++;
00237 if (ecode <= 0) ecode = (ecode ? -ecode : Port);
00238 TRACE(REDIR, User <<" async redir to " << eMsg <<':' <<ecode);
00239 sendResp(eInfo, kXR_redirect, &ecode, eMsg);
00240 return;
00241 }
00242
00243
00244
00245 if (rc >= SFS_STALL)
00246 {SI->stallCnt++;
00247 TRACE(STALL, "Stalling " <<User <<" for " <<rc <<" sec");
00248 sendResp(eInfo, kXR_wait, &rc, eMsg, 1);
00249 return;
00250 }
00251
00252
00253
00254 if (rc == SFS_DATA)
00255 {if (ecode) sendResp(eInfo, kXR_ok, 0, eMsg, ecode);
00256 else sendResp(eInfo, kXR_ok, 0);
00257 return;
00258 }
00259
00260
00261
00262 {char buff[32];
00263 SI->errorCnt++;
00264 sprintf(buff, "%d", rc);
00265 eDest->Emsg("sendError", "Unknown error code", buff, eMsg);
00266 sendResp(eInfo, kXR_error, &Xserr, eMsg, 1);
00267 return;
00268 }
00269 }
00270
00271
00272
00273
00274
00275 void XrdXrootdCallBack::sendResp(XrdOucErrInfo *eInfo,
00276 XResponseType Status,
00277 int *Data,
00278 const char *Msg,
00279 int ovhd)
00280 {
00281 const char *TraceID = "sendResp";
00282 struct iovec rspVec[4];
00283 XrdXrootdReqID ReqID;
00284 int dlen = 0, n = 1;
00285 kXR_int32 xbuf;
00286
00287 if (Data)
00288 {xbuf = static_cast<kXR_int32>(htonl(*Data));
00289 rspVec[n].iov_base = (caddr_t)(&xbuf);
00290 dlen = rspVec[n].iov_len = sizeof(xbuf); n++;
00291 }
00292 if (Msg && *Msg)
00293 { rspVec[n].iov_base = (caddr_t)Msg;
00294 dlen += rspVec[n].iov_len = strlen(Msg)+ovhd; n++;
00295 }
00296
00297
00298
00299 ReqID.setID(eInfo->getErrArg());
00300
00301
00302
00303 if (XrdXrootdResponse::Send(ReqID, Status, rspVec, n, dlen) < 0)
00304 eDest->Emsg("sendResp", eInfo->getErrUser(), Opname,
00305 "async resp aborted; user gone.");
00306 else if (TRACING(TRACE_RSP))
00307 {XrdXrootdResponse theResp;
00308 theResp.Set(ReqID.Stream());
00309 TRACE(RSP, eInfo->getErrUser() <<" async " <<theResp.ID()
00310 <<' ' <<Opname <<" status " <<Status);
00311 }
00312 }