00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdCmsXmiReqCVSID = "$Id: XrdCmsXmiReq.cc 24468 2008-06-22 16:47:03Z ganis $";
00014
00015 #include <string.h>
00016
00017 #include "XrdCms/XrdCmsTrace.hh"
00018 #include "XrdCms/XrdCmsXmiReq.hh"
00019
00020
00021
00022
00023
00024 XrdCmsXmi *XrdCmsXmiReq::XmiP;
00025 XrdSysMutex XrdCmsXmiReq::prpMutex;
00026 XrdSysSemaphore XrdCmsXmiReq::prpReady(0);
00027 XrdCmsXmiReq *XrdCmsXmiReq::prpFirst = 0;
00028 XrdCmsXmiReq *XrdCmsXmiReq::prpLast = 0;
00029 XrdSysMutex XrdCmsXmiReq::reqMutex;
00030 XrdSysSemaphore XrdCmsXmiReq::reqReady(0);
00031 XrdCmsXmiReq *XrdCmsXmiReq::reqFirst = 0;
00032 XrdCmsXmiReq *XrdCmsXmiReq::reqLast = 0;
00033 XrdSysMutex XrdCmsXmiReq::stgMutex;
00034 XrdSysSemaphore XrdCmsXmiReq::stgReady(0);
00035 XrdCmsXmiReq *XrdCmsXmiReq::stgFirst = 0;
00036 XrdCmsXmiReq *XrdCmsXmiReq::stgLast = 0;
00037
00038 using namespace XrdCms;
00039
00040
00041
00042
00043
00044 void *XrdCmsXmi_StartPrpQ(void *parg)
00045 {
00046 XrdCmsXmiReq *requestProcessor = (XrdCmsXmiReq *)parg;
00047
00048 requestProcessor->processPrpQ();
00049
00050 return (void *)0;
00051 }
00052
00053 void *XrdCmsXmi_StartReqQ(void *parg)
00054 {
00055 XrdCmsXmiReq *requestProcessor = (XrdCmsXmiReq *)parg;
00056
00057 requestProcessor->processReqQ();
00058
00059 return (void *)0;
00060 }
00061
00062 void *XrdCmsXmi_StartStgQ(void *parg)
00063 {
00064 XrdCmsXmiReq *requestProcessor = (XrdCmsXmiReq *)parg;
00065
00066 requestProcessor->processStgQ();
00067
00068 return (void *)0;
00069 }
00070
00071
00072
00073
00074
00075 XrdCmsXmiReq::XrdCmsXmiReq(XrdCmsXmi *xp)
00076 {
00077 ReqP = 0;
00078 Path = 0;
00079 Parms = 0;
00080 Next = 0;
00081 XmiP = xp;
00082 Start();
00083 }
00084
00085 XrdCmsXmiReq::XrdCmsXmiReq(XrdCmsReq *reqp, ReqType rtype, int parms,
00086 const char *path, const char *opaque,
00087 const char *path2, const char *opaque2)
00088 {
00089
00090 ReqP = reqp;
00091 Parms = parms;
00092 Path = strdup(path);
00093 Opaque = (opaque ? strdup(opaque) : 0);
00094 Path2 = (path2 ? strdup(path2) : 0);
00095 Opaque2 = (opaque2 ? strdup(opaque2) : 0);
00096 Rtype = rtype;
00097 Next = 0;
00098
00099
00100
00101 if (rtype == do_stage)
00102 {stgMutex.Lock();
00103 if (stgLast) {stgLast->Next = this; stgLast = this;}
00104 else {stgFirst = stgLast = this; stgReady.Post();}
00105 stgMutex.UnLock();
00106 }
00107 else if (rtype == do_prep)
00108 {prpMutex.Lock();
00109 if (prpLast) {prpLast->Next = this; prpLast = this;}
00110 else {prpFirst = prpLast = this; prpReady.Post();}
00111 prpMutex.UnLock();
00112 }
00113 else {reqMutex.Lock();
00114 if (reqLast) {reqLast->Next = this; reqLast = this;}
00115 else {reqFirst = reqLast = this; reqReady.Post();}
00116 reqMutex.UnLock();
00117 }
00118 }
00119
00120
00121
00122
00123
00124 XrdCmsXmiReq::~XrdCmsXmiReq()
00125 {
00126 if (Path) free(Path);
00127 if (Opaque) free(Opaque);
00128 if (Path2) free(Path2);
00129 if (Opaque2) free(Opaque2);
00130 if (ReqP) delete ReqP;
00131 }
00132
00133
00134
00135
00136
00137 void XrdCmsXmiReq::processPrpQ()
00138 {
00139 XrdCmsXmiReq *myQueue, *rp;
00140
00141
00142
00143
00144
00145
00146
00147 while(1)
00148 {prpReady.Wait();
00149 prpMutex.Lock();
00150 myQueue = prpFirst;
00151 prpFirst = prpLast = 0;
00152 prpMutex.UnLock();
00153
00154 while((rp = myQueue))
00155 {myQueue = rp->Next;
00156 XmiP->Prep(rp->Path2, rp->Parms, rp->Path, rp->Opaque);
00157 delete rp;
00158 }
00159 }
00160 }
00161
00162
00163
00164
00165
00166 void XrdCmsXmiReq::processReqQ()
00167 {
00168 XrdCmsXmiReq *myQueue, *rp;
00169 int rc;
00170
00171
00172
00173
00174 do {reqReady.Wait();
00175 reqMutex.Lock();
00176 myQueue = reqFirst;
00177 reqFirst = reqLast = 0;
00178 reqMutex.UnLock();
00179
00180 while((rp = myQueue))
00181 {myQueue = rp->Next;
00182 switch(rp->Rtype)
00183 {case do_stat:
00184 rc = XmiP->Stat(rp->ReqP, rp->Path, rp->Opaque);
00185 break;
00186 case do_mkdir:
00187 rc = XmiP->Mkdir(rp->ReqP, rp->Parms, rp->Path, rp->Opaque);
00188 break;
00189 case do_mkpath:
00190 rc = XmiP->Mkpath(rp->ReqP, rp->Parms, rp->Path, rp->Opaque);
00191 break;
00192 case do_rmdir:
00193 rc = XmiP->Remdir(rp->ReqP, rp->Path, rp->Opaque);
00194 break;
00195 case do_rm:
00196 rc = XmiP->Remove(rp->ReqP, rp->Path, rp->Opaque);
00197 break;
00198 case do_mv:
00199 rc = XmiP->Rename(rp->ReqP, rp->Path, rp->Opaque,
00200 rp->Path2, rp->Opaque2);
00201 break;
00202 case do_chmod:
00203 rc = XmiP->Chmod(rp->ReqP, rp->Parms, rp->Path, rp->Opaque);
00204 break;
00205 default: Say.Emsg("reqQ", "Invalid request code.");
00206 rp->ReqP->Reply_Error("Internal server error");
00207 rc = 1;
00208 break;
00209 }
00210 if (!rc) rp->ReqP->Reply_Error("Function failed in xmi handler");
00211 delete rp;
00212 }
00213 } while(1);
00214 }
00215
00216
00217
00218
00219
00220 void XrdCmsXmiReq::processStgQ()
00221 {
00222 XrdCmsXmiReq *myQueue, *rp;
00223
00224
00225
00226
00227
00228
00229 while(1)
00230 {stgReady.Wait();
00231 stgMutex.Lock();
00232 myQueue = stgFirst;
00233 stgFirst = stgLast = 0;
00234 stgMutex.UnLock();
00235
00236 while((rp = myQueue))
00237 {myQueue = rp->Next;
00238 if (!XmiP->Select(rp->ReqP, rp->Parms,rp->Path, rp->Opaque))
00239 rp->ReqP->Reply_Error("Select failed in xmi handler");
00240 delete rp;
00241 }
00242 }
00243 }
00244
00245
00246
00247
00248
00249 void XrdCmsXmiReq::Start()
00250 {
00251 pthread_t tid;
00252 int retc;
00253
00254
00255
00256 if ((retc = XrdSysThread::Run(&tid, XrdCmsXmi_StartPrpQ, (void *)this,
00257 XRDSYSTHREAD_BIND, "xmi prepare handler")))
00258 {Say.Emsg("XmiReq", retc, "create prepare thread"); _exit(3);}
00259
00260
00261
00262 if ((retc = XrdSysThread::Run(&tid, XrdCmsXmi_StartReqQ, (void *)this,
00263 XRDSYSTHREAD_BIND, "xmi request handler")))
00264 {Say.Emsg("XmiReq", retc, "create request thread"); _exit(3);}
00265
00266
00267
00268 if ((retc = XrdSysThread::Run(&tid, XrdCmsXmi_StartStgQ, (void *)this,
00269 XRDSYSTHREAD_BIND, "xmi staging handler")))
00270 {Say.Emsg("XmiReq", retc, "create staging thread"); _exit(3);}
00271 }
00272
00273
00274
00275
00276
00277 int XrdCmsXmiReq::Qit(XrdCmsReq *rp, ReqType rt, int parms,
00278 const char *path, const char *opaque,
00279 const char *path2, const char *opaque2)
00280 {
00281 new XrdCmsXmiReq((rp ? rp->Reply_WaitResp(5940) : 0),
00282 rt, parms, path, opaque, path2, opaque2);
00283 return 1;
00284 }