XrdOfsEvr.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                          X r d O f s E v r . c c                           */
00004 /*                                                                            */
00005 /* (c) 2006 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010   
00011 //         $Id: XrdOfsEvr.cc 35287 2010-09-14 21:19:35Z ganis $
00012 
00013 const char *XrdOfsEvrCVSID = "$Id: XrdOfsEvr.cc 35287 2010-09-14 21:19:35Z ganis $";
00014 
00015 #include <stdlib.h>
00016 #include <stdio.h>
00017 #include <string.h>
00018 
00019 #include "XrdCms/XrdCmsClient.hh"
00020 #include "XrdOfs/XrdOfsEvr.hh"
00021 #include "XrdOfs/XrdOfsStats.hh"
00022 #include "XrdOfs/XrdOfsTrace.hh"
00023 #include "XrdSys/XrdSysError.hh"
00024 #include "XrdSys/XrdSysTimer.hh"
00025 #include "XrdOuc/XrdOucEnv.hh"
00026 #include "XrdOuc/XrdOucTrace.hh"
00027 #include "XrdNet/XrdNetOpts.hh"
00028 #include "XrdNet/XrdNetSocket.hh"
00029 #include "XrdSys/XrdSysHeaders.hh"
00030 
00031 /******************************************************************************/
00032 /*                     E x t e r n a l   L i n k a g e s                      */
00033 /******************************************************************************/
00034 
00035 extern XrdOfsStats OfsStats;
00036 
00037 extern XrdOucTrace OfsTrace;
00038   
00039 void *XrdOfsEvRecv(void *pp)
00040 {
00041      XrdOfsEvr *evr = (XrdOfsEvr *)pp;
00042      evr->recvEvents();
00043      return (void *)0;
00044 }
00045   
00046 void *XrdOfsEvFlush(void *pp)
00047 {
00048      XrdOfsEvr *evr = (XrdOfsEvr *)pp;
00049      evr->flushEvents();
00050      return (void *)0;
00051 }
00052 
00053 int XrdOfsScrubScan(const char *key, XrdOfsEvr::theEvent *cip, void *xargp) 
00054     {return 0;}
00055   
00056 /******************************************************************************/
00057 /*                            D e s t r u c t o r                             */
00058 /******************************************************************************/
00059 
00060 XrdOfsEvr::~XrdOfsEvr()
00061 {
00062 
00063 // Close the FIFO. This will cause the reader to exit
00064 //
00065    myMutex.Lock();
00066    eventFIFO.Close();
00067    myMutex.UnLock();
00068 }
00069   
00070 /******************************************************************************/
00071 /*                           f l u s h E v e n t s                            */
00072 /******************************************************************************/
00073   
00074 void XrdOfsEvr::flushEvents()
00075 {
00076    theClient *tp, *ntp;
00077    int expWait, expClock;
00078 
00079 // Compute the hash flush interval
00080 //
00081    if ((expWait = maxLife/4) == 0) expWait = 60;
00082    expClock = expWait;
00083 
00084 // We wait for the right period of time, unless there is a defered event
00085 //
00086    do {myMutex.Lock(); 
00087        if ((ntp = deferQ)) deferQ = 0;
00088           else runQ = 0;
00089        myMutex.UnLock();
00090        while(ntp)
00091             {XrdSysTimer::Wait(1000*60);
00092              expClock -= 60;
00093              myMutex.Lock();
00094              while((tp = ntp))
00095                   {Events.Del(tp->Path);
00096                    ntp = tp->Next;
00097                    delete tp;
00098                   }
00099              if ((ntp = deferQ)) deferQ = 0;
00100                 else runQ = 0;
00101              myMutex.UnLock();
00102              if (expClock <= 0)
00103                 {myMutex.Lock(); 
00104                  Events.Apply(XrdOfsScrubScan, (void *)0);
00105                  myMutex.UnLock();
00106                  expClock = expWait;
00107                 }
00108             }
00109        mySem.Wait();
00110       } while(1);
00111 }
00112 
00113 /******************************************************************************/
00114 /*                                  I n i t                                   */
00115 /******************************************************************************/
00116   
00117 int XrdOfsEvr::Init(XrdSysError *eobj, XrdCmsClient *trgp)
00118 {
00119    XrdNetSocket *msgSock;
00120    pthread_t     tid;
00121    int n, rc;
00122    char *p, *path, pbuff[2048];
00123 
00124 // Set the error object and balancer pointers
00125 //
00126    eDest    = eobj;
00127    Balancer = trgp;
00128 
00129 // Create path to the pipe we will creat
00130 //
00131    if (!(p = getenv("XRDADMINPATH")) || !*p)
00132       {eobj->Emsg("Events", "XRDADMINPATH not defined");
00133        return 0;
00134       }
00135    path = pbuff;
00136    strcpy(path, p); n = strlen(p);
00137    if (path[n-1] != '/') {path[n] = '/'; n++;}
00138    strcpy(&path[n], "ofsEvents");
00139    XrdOucEnv::Export("XRDOFSEVENTS", pbuff);
00140 
00141 // Now create a socket to a path
00142 //
00143    if (!(msgSock = XrdNetSocket::Create(eobj,path,0,0660,XRDNET_FIFO)))
00144       return 0;
00145    msgFD = msgSock->Detach();
00146    delete msgSock;
00147 
00148 // Now start a thread to get incomming messages
00149 //
00150    if ((rc = XrdSysThread::Run(&tid, XrdOfsEvRecv, static_cast<void *>(this),
00151                           0, "Event receiver")))
00152       {eobj->Emsg("Evr", rc, "create event reader thread");
00153        return 0;
00154       }
00155 
00156 // Now start a thread to flush posted events
00157 //
00158    if ((rc = XrdSysThread::Run(&tid, XrdOfsEvFlush,static_cast<void *>(this),
00159                           0, "Event flusher")))
00160       {eobj->Emsg("Evr", rc, "create event flush thread");
00161        return 0;
00162       }
00163 
00164 // All done
00165 //
00166    return 1;
00167 }
00168 
00169 /******************************************************************************/
00170 /*                            r e c v E v e n t s                             */
00171 /******************************************************************************/
00172   
00173 void XrdOfsEvr::recvEvents()
00174 {
00175    EPNAME("recvEvent");
00176    const char *tident = 0;
00177    char *lp,*tp;
00178 
00179 // Attach the fifo FD to the stream
00180 //
00181    eventFIFO.Attach(msgFD);
00182 
00183 // Now just start reading the events until the FD is closed
00184 //
00185    while((lp = eventFIFO.GetLine()))
00186         {DEBUG("-->" <<lp);
00187          if ((tp = eventFIFO.GetToken()) && *tp)
00188             {if (!strcmp(tp, "stage")) eventStage();
00189                 else eDest->Emsg("Evr", "Unknown event name -", tp);
00190             }
00191         }
00192 }
00193  
00194 /******************************************************************************/
00195 /*                            W a i t 4 E v e n t                             */
00196 /******************************************************************************/
00197   
00198 void XrdOfsEvr::Wait4Event(const char *path, XrdOucErrInfo *einfo)
00199 {
00200 
00201 // Replace original callback with our callback so we can queue this event
00202 // after the wait request has been sent to the client. This avoids a race
00203 // where the client might get the resume signal before the wait request.
00204 //
00205    einfo->setErrCB((XrdOucEICB *)new theClient(this, einfo, path));
00206 }
00207  
00208 /******************************************************************************/
00209 /*                            W o r k 4 E v e n t                             */
00210 /******************************************************************************/
00211   
00212 void XrdOfsEvr::Work4Event(theClient *Client)
00213 {
00214    struct theEvent *anEvent;
00215    theClient *aClient = 0;
00216 
00217 // First ste is to see if this event was posted
00218 //
00219    myMutex.Lock();
00220    if (!(anEvent = Events.Find(Client->Path)))
00221       Events.Add(Client->Path, new theEvent(0, 0, Client), maxLife);
00222       else {aClient = anEvent->aClient;
00223             while(aClient)
00224                  {if (aClient->evtCB->Same(Client->evtCBarg,aClient->evtCBarg))
00225                      {aClient->evtCBarg = Client->evtCBarg;
00226                       break;
00227                      }
00228                   aClient = aClient->Next;
00229                  }
00230             if (!aClient) {Client->Next = anEvent->aClient;
00231                            anEvent->aClient = Client;
00232                           }
00233             if (anEvent->Happened) sendEvent(anEvent);
00234            }
00235    myMutex.UnLock();
00236 
00237 // Delete the Client object if we really don't need it
00238 //
00239    if (aClient) delete Client;
00240 }
00241 
00242 /******************************************************************************/
00243 /*                       P r i v a t e   M e t h o d s                        */
00244 /******************************************************************************/
00245 /******************************************************************************/
00246 /*                            e v e n t S t a g e                             */
00247 /******************************************************************************/
00248   
00249 // stage {OK | ENOENT | BAD} <path> [<msg>] \n
00250 
00251 void XrdOfsEvr::eventStage()
00252 {
00253    int rc;
00254    char *tp, *eMsg, *altMsg = 0;
00255    struct theEvent *anEvent;
00256 
00257 // Get the status token and decode it
00258 //
00259    if (!(tp = eventFIFO.GetToken()))
00260       {eDest->Emsg("Evr", "Missing stage event status"); return;}
00261 
00262         if (!strcmp(tp, "OK"))     {rc = 0;
00263                                     OfsStats.Add(OfsStats.Data.numSeventOK);
00264                                    }
00265    else if (!strcmp(tp, "ENOENT")) {rc = ENOENT;
00266                                     altMsg = (char *)"file does not exist.";
00267                                    }
00268    else if (!strcmp(tp, "BAD"))    {rc = -1;
00269                                     OfsStats.Add(OfsStats.Data.numSeventOK);
00270                                     altMsg = (char *)"Dynamic staging failed.";
00271                                    }
00272    else {rc = -1;
00273          eDest->Emsg("Evr", "Invalid stage event status -", tp);
00274          altMsg = (char *)"Dynamic staging malfunctioned.";
00275          OfsStats.Add(OfsStats.Data.numSeventOK);
00276         }
00277 
00278 // Get the path and optional message
00279 //
00280    if (!(tp = eventFIFO.GetToken(&eMsg)))
00281       {eDest->Emsg("Evr", "Missing stage event path"); return;}
00282    if (rc)
00283       if (eMsg) {while(*eMsg == ' ') eMsg++;
00284                  if (!*eMsg) eMsg = altMsg;
00285                 } else eMsg = altMsg;
00286       else eMsg = 0;
00287 
00288 // At this point if we have a balancer, tell it what happened
00289 //
00290    if (Balancer)
00291       {if (rc == 0) Balancer->Added(tp);
00292           else      Balancer->Removed(tp);
00293       }
00294 
00295 // Either people are waiting for this event or it is preposted event.
00296 //
00297    myMutex.Lock();
00298    if (!(anEvent = Events.Find(tp)))
00299       Events.Add(tp, new theEvent(rc, eMsg), maxLife);
00300       else {if (anEvent->finalRC == 0)
00301                {anEvent->finalRC = rc;
00302                 if (eMsg) anEvent->finalMsg = strdup(eMsg);
00303                 anEvent->Happened = 1;
00304                }
00305             if (anEvent->aClient) sendEvent(anEvent);
00306            }
00307    myMutex.UnLock();
00308 }
00309 
00310 /******************************************************************************/
00311 /*                             s e n d E v e n t                              */
00312 /******************************************************************************/
00313   
00314 void XrdOfsEvr::sendEvent(theEvent *ep)
00315 {
00316    theClient *cp;
00317    XrdOucErrInfo *einfo;
00318    int doDel = 0, Result = (ep->finalRC ? SFS_ERROR : SFS_OK);
00319 
00320 // For each client, issue a call back sending the result back
00321 // The event also goes in the defered delete queue as we need to hold on
00322 // to it just in case a client is in-transit
00323 //
00324    while((cp = ep->aClient))
00325         {einfo = new XrdOucErrInfo(cp->User, 0, cp->evtCBarg);
00326          einfo->setErrInfo(ep->finalRC, (ep->finalMsg ? ep->finalMsg : ""));
00327          cp->evtCB->Done(Result, einfo);
00328          ep->aClient = cp->Next;
00329          if (doDel) delete cp;
00330             else {cp->Next = deferQ; deferQ = cp; doDel = 1;}
00331         }
00332 
00333 // Post the defer queue handler
00334 //
00335    if (!runQ) {runQ = 1; mySem.Post();}
00336 }

Generated on Tue Jul 5 14:46:44 2011 for ROOT_528-00b_version by  doxygen 1.5.1