00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdOfsEvrCVSID = "$Id: XrdOfsEvr.cc 35287 2010-09-14 21:19:35Z ganis $";
00014
00015 #include <stdlib.h>
00016 #include <stdio.h>
00017 #include <string.h>
00018
00019 #include "XrdCms/XrdCmsClient.hh"
00020 #include "XrdOfs/XrdOfsEvr.hh"
00021 #include "XrdOfs/XrdOfsStats.hh"
00022 #include "XrdOfs/XrdOfsTrace.hh"
00023 #include "XrdSys/XrdSysError.hh"
00024 #include "XrdSys/XrdSysTimer.hh"
00025 #include "XrdOuc/XrdOucEnv.hh"
00026 #include "XrdOuc/XrdOucTrace.hh"
00027 #include "XrdNet/XrdNetOpts.hh"
00028 #include "XrdNet/XrdNetSocket.hh"
00029 #include "XrdSys/XrdSysHeaders.hh"
00030
00031
00032
00033
00034
00035 extern XrdOfsStats OfsStats;
00036
00037 extern XrdOucTrace OfsTrace;
00038
00039 void *XrdOfsEvRecv(void *pp)
00040 {
00041 XrdOfsEvr *evr = (XrdOfsEvr *)pp;
00042 evr->recvEvents();
00043 return (void *)0;
00044 }
00045
00046 void *XrdOfsEvFlush(void *pp)
00047 {
00048 XrdOfsEvr *evr = (XrdOfsEvr *)pp;
00049 evr->flushEvents();
00050 return (void *)0;
00051 }
00052
00053 int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp)
00054 {return 0;}
00055
00056
00057
00058
00059
00060 XrdOfsEvr::~XrdOfsEvr()
00061 {
00062
00063
00064
00065 myMutex.Lock();
00066 eventFIFO.Close();
00067 myMutex.UnLock();
00068 }
00069
00070
00071
00072
00073
00074 void XrdOfsEvr::flushEvents()
00075 {
00076 theClient *tp, *ntp;
00077 int expWait, expClock;
00078
00079
00080
00081 if ((expWait = maxLife/4) == 0) expWait = 60;
00082 expClock = expWait;
00083
00084
00085
00086 do {myMutex.Lock();
00087 if ((ntp = deferQ)) deferQ = 0;
00088 else runQ = 0;
00089 myMutex.UnLock();
00090 while(ntp)
00091 {XrdSysTimer::Wait(1000*60);
00092 expClock -= 60;
00093 myMutex.Lock();
00094 while((tp = ntp))
00095 {Events.Del(tp->Path);
00096 ntp = tp->Next;
00097 delete tp;
00098 }
00099 if ((ntp = deferQ)) deferQ = 0;
00100 else runQ = 0;
00101 myMutex.UnLock();
00102 if (expClock <= 0)
00103 {myMutex.Lock();
00104 Events.Apply(XrdOfsScrubScan, (void *)0);
00105 myMutex.UnLock();
00106 expClock = expWait;
00107 }
00108 }
00109 mySem.Wait();
00110 } while(1);
00111 }
00112
00113
00114
00115
00116
00117 int XrdOfsEvr::Init(XrdSysError *eobj, XrdCmsClient *trgp)
00118 {
00119 XrdNetSocket *msgSock;
00120 pthread_t tid;
00121 int n, rc;
00122 char *p, *path, pbuff[2048];
00123
00124
00125
00126 eDest = eobj;
00127 Balancer = trgp;
00128
00129
00130
00131 if (!(p = getenv("XRDADMINPATH")) || !*p)
00132 {eobj->Emsg("Events", "XRDADMINPATH not defined");
00133 return 0;
00134 }
00135 path = pbuff;
00136 strcpy(path, p); n = strlen(p);
00137 if (path[n-1] != '/') {path[n] = '/'; n++;}
00138 strcpy(&path[n], "ofsEvents");
00139 XrdOucEnv::Export("XRDOFSEVENTS", pbuff);
00140
00141
00142
00143 if (!(msgSock = XrdNetSocket::Create(eobj,path,0,0660,XRDNET_FIFO)))
00144 return 0;
00145 msgFD = msgSock->Detach();
00146 delete msgSock;
00147
00148
00149
00150 if ((rc = XrdSysThread::Run(&tid, XrdOfsEvRecv, static_cast<void *>(this),
00151 0, "Event receiver")))
00152 {eobj->Emsg("Evr", rc, "create event reader thread");
00153 return 0;
00154 }
00155
00156
00157
00158 if ((rc = XrdSysThread::Run(&tid, XrdOfsEvFlush,static_cast<void *>(this),
00159 0, "Event flusher")))
00160 {eobj->Emsg("Evr", rc, "create event flush thread");
00161 return 0;
00162 }
00163
00164
00165
00166 return 1;
00167 }
00168
00169
00170
00171
00172
00173 void XrdOfsEvr::recvEvents()
00174 {
00175 EPNAME("recvEvent");
00176 const char *tident = 0;
00177 char *lp,*tp;
00178
00179
00180
00181 eventFIFO.Attach(msgFD);
00182
00183
00184
00185 while((lp = eventFIFO.GetLine()))
00186 {DEBUG("-->" <<lp);
00187 if ((tp = eventFIFO.GetToken()) && *tp)
00188 {if (!strcmp(tp, "stage")) eventStage();
00189 else eDest->Emsg("Evr", "Unknown event name -", tp);
00190 }
00191 }
00192 }
00193
00194
00195
00196
00197
00198 void XrdOfsEvr::Wait4Event(const char *path, XrdOucErrInfo *einfo)
00199 {
00200
00201
00202
00203
00204
00205 einfo->setErrCB((XrdOucEICB *)new theClient(this, einfo, path));
00206 }
00207
00208
00209
00210
00211
00212 void XrdOfsEvr::Work4Event(theClient *Client)
00213 {
00214 struct theEvent *anEvent;
00215 theClient *aClient = 0;
00216
00217
00218
00219 myMutex.Lock();
00220 if (!(anEvent = Events.Find(Client->Path)))
00221 Events.Add(Client->Path, new theEvent(0, 0, Client), maxLife);
00222 else {aClient = anEvent->aClient;
00223 while(aClient)
00224 {if (aClient->evtCB->Same(Client->evtCBarg,aClient->evtCBarg))
00225 {aClient->evtCBarg = Client->evtCBarg;
00226 break;
00227 }
00228 aClient = aClient->Next;
00229 }
00230 if (!aClient) {Client->Next = anEvent->aClient;
00231 anEvent->aClient = Client;
00232 }
00233 if (anEvent->Happened) sendEvent(anEvent);
00234 }
00235 myMutex.UnLock();
00236
00237
00238
00239 if (aClient) delete Client;
00240 }
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251 void XrdOfsEvr::eventStage()
00252 {
00253 int rc;
00254 char *tp, *eMsg, *altMsg = 0;
00255 struct theEvent *anEvent;
00256
00257
00258
00259 if (!(tp = eventFIFO.GetToken()))
00260 {eDest->Emsg("Evr", "Missing stage event status"); return;}
00261
00262 if (!strcmp(tp, "OK")) {rc = 0;
00263 OfsStats.Add(OfsStats.Data.numSeventOK);
00264 }
00265 else if (!strcmp(tp, "ENOENT")) {rc = ENOENT;
00266 altMsg = (char *)"file does not exist.";
00267 }
00268 else if (!strcmp(tp, "BAD")) {rc = -1;
00269 OfsStats.Add(OfsStats.Data.numSeventOK);
00270 altMsg = (char *)"Dynamic staging failed.";
00271 }
00272 else {rc = -1;
00273 eDest->Emsg("Evr", "Invalid stage event status -", tp);
00274 altMsg = (char *)"Dynamic staging malfunctioned.";
00275 OfsStats.Add(OfsStats.Data.numSeventOK);
00276 }
00277
00278
00279
00280 if (!(tp = eventFIFO.GetToken(&eMsg)))
00281 {eDest->Emsg("Evr", "Missing stage event path"); return;}
00282 if (rc)
00283 if (eMsg) {while(*eMsg == ' ') eMsg++;
00284 if (!*eMsg) eMsg = altMsg;
00285 } else eMsg = altMsg;
00286 else eMsg = 0;
00287
00288
00289
00290 if (Balancer)
00291 {if (rc == 0) Balancer->Added(tp);
00292 else Balancer->Removed(tp);
00293 }
00294
00295
00296
00297 myMutex.Lock();
00298 if (!(anEvent = Events.Find(tp)))
00299 Events.Add(tp, new theEvent(rc, eMsg), maxLife);
00300 else {if (anEvent->finalRC == 0)
00301 {anEvent->finalRC = rc;
00302 if (eMsg) anEvent->finalMsg = strdup(eMsg);
00303 anEvent->Happened = 1;
00304 }
00305 if (anEvent->aClient) sendEvent(anEvent);
00306 }
00307 myMutex.UnLock();
00308 }
00309
00310
00311
00312
00313
00314 void XrdOfsEvr::sendEvent(theEvent *ep)
00315 {
00316 theClient *cp;
00317 XrdOucErrInfo *einfo;
00318 int doDel = 0, Result = (ep->finalRC ? SFS_ERROR : SFS_OK);
00319
00320
00321
00322
00323
00324 while((cp = ep->aClient))
00325 {einfo = new XrdOucErrInfo(cp->User, 0, cp->evtCBarg);
00326 einfo->setErrInfo(ep->finalRC, (ep->finalMsg ? ep->finalMsg : ""));
00327 cp->evtCB->Done(Result, einfo);
00328 ep->aClient = cp->Next;
00329 if (doDel) delete cp;
00330 else {cp->Next = deferQ; deferQ = cp; doDel = 1;}
00331 }
00332
00333
00334
00335 if (!runQ) {runQ = 1; mySem.Post();}
00336 }