00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 const char *XrdCmsRespCVSID = "$Id: XrdCmsResp.cc 32231 2010-02-05 18:24:46Z ganis $";
00016
00017 #include <stdio.h>
00018 #include <stdlib.h>
00019 #include <string.h>
00020
00021 #include "XrdCms/XrdCmsClientMsg.hh"
00022 #include "XrdCms/XrdCmsParser.hh"
00023 #include "XrdCms/XrdCmsResp.hh"
00024 #include "XrdCms/XrdCmsTrace.hh"
00025
00026 #include "XrdOuc/XrdOucErrInfo.hh"
00027 #include "XrdNet/XrdNetBuffer.hh"
00028 #include "XrdSfs/XrdSfsInterface.hh"
00029 #include "XrdSys/XrdSysError.hh"
00030
00031 using namespace XrdCms;
00032
00033
00034
00035
00036
00037 XrdSysSemaphore XrdCmsResp::isReady(0);
00038 XrdSysMutex XrdCmsResp::rdyMutex;
00039 XrdCmsResp *XrdCmsResp::First = 0;
00040 XrdCmsResp *XrdCmsResp::Last = 0;
00041 XrdSysMutex XrdCmsResp::myMutex;
00042 XrdCmsResp *XrdCmsResp::nextFree = 0;
00043 int XrdCmsResp::numFree = 0;
00044 int XrdCmsResp::RepDelay = 5;
00045
00046
00047
00048
00049
00050 XrdCmsResp *XrdCmsResp::Alloc(XrdOucErrInfo *erp, int msgid)
00051 {
00052 XrdCmsResp *rp;
00053
00054
00055
00056
00057
00058
00059 myMutex.Lock();
00060 if (nextFree)
00061 {rp = nextFree;
00062 nextFree = rp->next;
00063 numFree--;
00064 rp->SyncCB.Init();
00065 }
00066 else if (!(rp = new XrdCmsResp()))
00067 {myMutex.UnLock();
00068 return (XrdCmsResp *)0;
00069 }
00070 myMutex.UnLock();
00071
00072
00073
00074
00075
00076 strlcpy(rp->UserID, erp->getErrUser(), sizeof(rp->UserID));
00077 rp->setErrInfo(0, erp->getErrText());
00078 rp->ErrCB = erp->getErrCB(rp->ErrCBarg);
00079 erp->setErrCB((XrdOucEICB *)&rp->SyncCB);
00080 rp->myID = msgid;
00081 rp->next = 0;
00082
00083
00084
00085 return rp;
00086 }
00087
00088
00089
00090
00091
00092 void XrdCmsResp::Recycle()
00093 {
00094
00095
00096
00097 if (myBuff) {myBuff->Recycle(); myBuff = 0;}
00098
00099
00100
00101
00102 if (XrdCmsResp::numFree >= XrdCmsResp::maxFree) delete this;
00103 else {myMutex.Lock();
00104 next = nextFree;
00105 nextFree = this;
00106 numFree++;
00107 myMutex.UnLock();
00108 }
00109 }
00110
00111
00112
00113
00114
00115
00116
00117 void XrdCmsResp::Reply(const char *manp, CmsRRHdr &rrhdr, XrdNetBuffer *netbuff)
00118 {
00119
00120
00121
00122 myRRHdr = rrhdr;
00123 myBuff = netbuff;
00124 next = 0;
00125 strlcpy(theMan, manp, sizeof(theMan));
00126
00127
00128
00129 rdyMutex.Lock();
00130 if (Last) {Last->next = this; Last = this;}
00131 else Last=First = this;
00132 rdyMutex.UnLock();
00133
00134
00135
00136 isReady.Post();
00137 }
00138
00139
00140
00141
00142
00143 void XrdCmsResp::Reply()
00144 {
00145 XrdCmsResp *rp;
00146
00147
00148
00149 while(1)
00150 {isReady.Wait();
00151 rdyMutex.Lock();
00152 if ((rp = First))
00153 {if (!(First = rp->next)) Last = 0;
00154 rdyMutex.UnLock();
00155 rp->ReplyXeq();
00156 } else rdyMutex.UnLock();
00157 }
00158 }
00159
00160
00161
00162
00163
00164 void XrdCmsResp::ReplyXeq()
00165 {
00166 EPNAME("Reply")
00167 XrdOucEICB *theCB;
00168 int Result;
00169
00170
00171
00172
00173 if (!ErrCB)
00174 {DEBUG("No callback object for user " <<UserID <<" msgid="
00175 <<myRRHdr.streamid <<' ' <<theMan);
00176 Recycle();
00177 return;
00178 }
00179
00180
00181
00182 Result = XrdCmsParser::Decode(theMan, myRRHdr, myBuff->data, myBuff->dlen,
00183 (XrdOucErrInfo *)this);
00184
00185
00186
00187
00188 if (Result == -EREMOTE) Result = SFS_REDIRECT;
00189 else if (Result == -EAGAIN) Result = SFS_STALL;
00190 else if (Result == -EALREADY) Result = SFS_DATA;
00191 else {if (Result != -EINVAL)
00192 {char buff[16];
00193 sprintf(buff, "%d", Result);
00194 Say.Emsg("Reply", "Invalid call back result code", buff);
00195 }
00196 Result = SFS_ERROR;
00197 }
00198
00199
00200
00201
00202
00203 SyncCB.Wait();
00204
00205
00206
00207
00208 theCB = ErrCB;
00209 ErrCB = (XrdOucEICB *)this;
00210
00211
00212
00213 theCB->Done(Result, (XrdOucErrInfo *)this);
00214 }
00215
00216
00217
00218
00219
00220
00221
00222
00223 XrdCmsRespQ::XrdCmsRespQ()
00224 {
00225 memset(mqTab, 0, sizeof(mqTab));
00226 }
00227
00228
00229
00230
00231
00232 void XrdCmsRespQ::Add(XrdCmsResp *rp)
00233 {
00234 int i;
00235
00236
00237
00238 i = rp->myID % mqSize;
00239 myMutex.Lock();
00240 rp->next = (mqTab[i] ? mqTab[i] : 0);
00241 mqTab[i] = rp;
00242 myMutex.UnLock();
00243 }
00244
00245
00246
00247
00248
00249 void XrdCmsRespQ::Purge()
00250 {
00251 XrdCmsResp *rp;
00252 int i;
00253
00254 myMutex.Lock();
00255 for (i = 0; i < mqSize; i++)
00256 {while ((rp = mqTab[i])) {mqTab[i] = rp->next; delete rp;}}
00257 myMutex.UnLock();
00258 }
00259
00260
00261
00262
00263
00264 XrdCmsResp *XrdCmsRespQ::Rem(int msgid)
00265 {
00266 int i;
00267 XrdCmsResp *rp, *pp = 0;
00268
00269
00270
00271 i = msgid % mqSize;
00272 myMutex.Lock();
00273 rp = mqTab[i];
00274 while(rp && rp->myID != msgid) {pp = rp; rp = rp->next;}
00275
00276
00277
00278 if (rp) {if (pp) pp->next = rp->next;
00279 else mqTab[i] = rp->next;
00280 }
00281
00282
00283
00284 myMutex.UnLock();
00285 return rp;
00286 }