XrdOfsPoscq.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                        X r d O f s P o s c q . c c                         */
00004 /*                                                                            */
00005 /* (c) 2009 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010 
00011 //          $Id: XrdOfsPoscq.cc 30949 2009-11-02 16:37:58Z ganis $
00012 
00013 const char *XrdOfsPoscqCVSID = "$Id: XrdOfsPoscq.cc 30949 2009-11-02 16:37:58Z ganis $";
00014 
00015 #include <string.h>
00016 #include <strings.h>
00017 #include <stddef.h>
00018 #include <stdio.h>
00019 #include <fcntl.h>
00020 #include <unistd.h>
00021 #include <errno.h>
00022 #include <sys/param.h>
00023 #include <sys/types.h>
00024 #include <sys/stat.h>
00025 
00026 #include "XrdOfs/XrdOfsPoscq.hh"
00027 #include "XrdOss/XrdOss.hh"
00028 #include "XrdSys/XrdSysError.hh"
00029 #include "XrdSys/XrdSysPlatform.hh"
00030 
00031 /******************************************************************************/
00032 /*                           C o n s t r u c t o r                            */
00033 /******************************************************************************/
00034 
00035 XrdOfsPoscq::XrdOfsPoscq(XrdSysError *erp, XrdOss *oss, const char *fn)
00036 {
00037    eDest = erp;
00038    ossFS = oss;
00039    pocFN = strdup(fn);
00040    pocFD = -1;
00041    pocSZ = 0;
00042    pocIQ = 0;
00043    SlotList = SlotLust = 0;
00044 }
00045   
00046 /******************************************************************************/
00047 /*                                   A d d                                    */
00048 /******************************************************************************/
00049   
00050 int XrdOfsPoscq::Add(const char *Tident, const char *Lfn)
00051 {
00052    XrdOfsPoscq::Request tmpReq;
00053    FileSlot *freeSlot;
00054    int fP;
00055 
00056 // Construct the request
00057 //
00058    tmpReq.addT = 0;
00059    strlcpy(tmpReq.LFN,  Lfn,    sizeof(tmpReq.LFN));
00060    strlcpy(tmpReq.User, Tident, sizeof(tmpReq.User));
00061    memset(tmpReq.Reserved, 0, sizeof(tmpReq.Reserved));
00062 
00063 // Obtain a free slot
00064 //
00065    myMutex.Lock();
00066    if ((freeSlot = SlotList))
00067       {fP = freeSlot->Offset;
00068        SlotList = freeSlot->Next;
00069        freeSlot->Next = SlotLust;
00070        SlotLust = freeSlot;
00071       } else {fP = pocSZ; pocSZ += ReqSize;}
00072    pocIQ++;
00073    myMutex.UnLock();
00074 
00075 // Write out the record
00076 //
00077    if (!reqWrite((void *)&tmpReq, sizeof(tmpReq), fP))
00078       {eDest->Emsg("Add", Lfn, "not added to the persist queue.");
00079        myMutex.Lock(); pocIQ--; myMutex.UnLock();
00080        return -EIO;
00081       }
00082 
00083 // Return the record offset
00084 //
00085    return fP;
00086 }
00087   
00088 /******************************************************************************/
00089 /*                                C o m m i t                                 */
00090 /******************************************************************************/
00091 
00092 int XrdOfsPoscq::Commit(const char *Lfn, int Offset)
00093 {
00094    long long addT = static_cast<long long>(time(0));
00095 
00096 // Verify the offset it must be correct
00097 //
00098    if (!VerOffset(Lfn, Offset)) return -EINVAL;
00099 
00100 // Indicate the record is free
00101 //
00102    if (reqWrite((void *)&addT, sizeof(addT), Offset)) return 0;
00103    eDest->Emsg("Commit", Lfn, "not commited to the persist queue.");
00104    return -EIO;
00105 }
00106 
00107 /******************************************************************************/
00108 /*                                   D e l                                    */
00109 /******************************************************************************/
00110 
00111 int XrdOfsPoscq::Del(const char *Lfn, int Offset, int Unlink)
00112 {
00113    static int Zero = 0;
00114    FileSlot *freeSlot;
00115    int retc;
00116 
00117 // Verify the offset it must be correct
00118 //
00119    if (!VerOffset(Lfn, Offset)) return -EINVAL;
00120 
00121 // Unlink the file if need be
00122 //
00123    if (Unlink && (retc = ossFS->Unlink(Lfn)) && retc != -ENOENT)
00124       {eDest->Emsg("Del", retc, "remove", Lfn);
00125        return (retc < 0 ? retc : -retc);
00126       }
00127 
00128 // Indicate the record is free
00129 //
00130    if (!reqWrite((void *)&Zero, sizeof(Zero), Offset+offsetof(Request,LFN)))
00131       {eDest->Emsg("Del", Lfn, "not removed from the persist queue.");
00132        return -EIO;
00133       }
00134 
00135 // Serialize and place this on the free queue
00136 //
00137    myMutex.Lock();
00138    if ((freeSlot = SlotLust)) SlotLust = freeSlot->Next;
00139       else freeSlot = new FileSlot;
00140    freeSlot->Offset = Offset;
00141    freeSlot->Next   = SlotList;
00142    SlotList         = freeSlot;
00143    if (pocIQ > 0) pocIQ--;
00144    myMutex.UnLock();
00145 
00146 // All done
00147 //
00148    return 0;
00149 }
00150   
00151 /******************************************************************************/
00152 /*                                  I n i t                                   */
00153 /******************************************************************************/
00154 
00155 XrdOfsPoscq::recEnt *XrdOfsPoscq::Init(int &Ok)
00156 {
00157    static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00158    Request     tmpReq;
00159    struct stat buf, Stat;
00160    recEnt     *First = 0;
00161    char        Buff[80];
00162    int         rc, Offs, numreq = 0;
00163 
00164 // Assume we will fail
00165 //
00166    Ok = 0;
00167 
00168 // Open the file first in r/w mode
00169 //
00170    if ((pocFD = open(pocFN, O_RDWR|O_CREAT, Mode)) < 0)
00171       {eDest->Emsg("Init",errno,"open",pocFN);
00172        return 0;
00173       }
00174 
00175 // Get file status
00176 //
00177    if (fstat(pocFD, &buf)) {FailIni("stat"); return 0;}
00178 
00179 // Check for a new file here
00180 //
00181    if (buf.st_size < ReqSize)
00182       {pocSZ = ReqOffs;
00183        if (ftruncate(pocFD, ReqOffs)) FailIni("trunc");
00184           else Ok = 1;
00185        return 0;
00186       }
00187 
00188 // Read the full file
00189 //
00190    for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
00191        {do {rc = pread(pocFD, (void *)&tmpReq, ReqSize, Offs);}
00192            while(rc < 0 && errno == EINTR);
00193         if (rc < 0) {eDest->Emsg("Init",errno,"read",pocFN); return First;}
00194         if (*tmpReq.LFN == '\0'
00195         ||  ossFS->Stat(tmpReq.LFN, &Stat)
00196         ||  !(S_ISREG(Stat.st_mode) || !(Stat.st_mode & S_ISUID))) continue;
00197         First = new recEnt(tmpReq, Stat.st_mode & S_IAMB, First); numreq++;
00198        }
00199 
00200 // Now write out the file and return
00201 //
00202    sprintf(Buff, " %d pending create%s", numreq, (numreq != 1 ? "s" : ""));
00203    eDest->Say("Init", Buff, " recovered from ", pocFN);
00204    if (ReWrite(First)) Ok = 1;
00205    return First;
00206 }
00207   
00208 /******************************************************************************/
00209 /*                                  L i s t                                   */
00210 /******************************************************************************/
00211   
00212 XrdOfsPoscq::recEnt *XrdOfsPoscq::List(XrdSysError *Say, const char *theFN)
00213 {
00214    XrdOfsPoscq::Request tmpReq;
00215    struct stat buf;
00216    recEnt *First = 0;
00217    int    rc, theFD, Offs;
00218 
00219 // Open the file first in r/o mode
00220 //
00221    if ((theFD = open(theFN, O_RDONLY)) < 0)
00222       {Say->Emsg("Init",errno,"open",theFN);
00223        return 0;
00224       }
00225 
00226 // Get file status
00227 //
00228    if (fstat(theFD, &buf))
00229       {Say->Emsg("Init",errno,"stat",theFN);
00230        close(theFD);
00231        return 0;
00232       }
00233    if (buf.st_size < ReqSize) buf.st_size = 0;
00234 
00235 // Read the full file
00236 //
00237    for (Offs = ReqOffs; Offs < buf.st_size; Offs += ReqSize)
00238        {do {rc = pread(theFD, (void *)&tmpReq, ReqSize, Offs);}
00239            while(rc < 0 && errno == EINTR);
00240         if (rc < 0) {Say->Emsg("List",errno,"read",theFN); return First;}
00241         if (*tmpReq.LFN != '\0') First = new recEnt(tmpReq, 0, First);
00242        }
00243 
00244 // All done
00245 //
00246    close(theFD);
00247    return First;
00248 }
00249 
00250 /******************************************************************************/
00251 /*                               F a i l I n i                                */
00252 /******************************************************************************/
00253 
00254 void XrdOfsPoscq::FailIni(const char *txt)
00255 {
00256    eDest->Emsg("Init", errno, txt, pocFN);
00257 }
00258 
00259 /******************************************************************************/
00260 /*                              r e q W r i t e                               */
00261 /******************************************************************************/
00262   
00263 int XrdOfsPoscq::reqWrite(void *Buff, int Bsz, int Offs)
00264 {
00265    int rc = 0;
00266 
00267    do {rc = pwrite(pocFD, Buff, Bsz, Offs);} while(rc < 0 && errno == EINTR);
00268 
00269    if (rc >= 0 && Bsz > 8) rc = fsync(pocFD);
00270 
00271    if (rc < 0) {eDest->Emsg("reqWrite",errno,"write", pocFN); return 0;}
00272    return 1;
00273 }
00274 
00275 /******************************************************************************/
00276 /*                               R e W r i t e                                */
00277 /******************************************************************************/
00278   
00279 int XrdOfsPoscq::ReWrite(XrdOfsPoscq::recEnt *rP)
00280 {
00281    static const int Mode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
00282    char newFN[MAXPATHLEN], *oldFN;
00283    int  newFD, oldFD, Offs = ReqOffs, aOK = 1;
00284 
00285 // Construct new file and open it
00286 //
00287    strcpy(newFN, pocFN); strcat(newFN, ".new");
00288    if ((newFD = open(newFN, O_RDWR|O_CREAT|O_TRUNC, Mode)) < 0)
00289       {eDest->Emsg("ReWrite",errno,"open",newFN); return 0;}
00290 
00291 // Setup to write/swap the file
00292 //
00293    oldFD = pocFD; pocFD = newFD;
00294    oldFN = pocFN; pocFN = newFN;
00295 
00296 // Rewrite all records if we have any
00297 //
00298    while(rP)
00299         {rP->Offset = Offs;
00300          if (!reqWrite((void *)&rP->reqData, ReqSize, Offs)) {aOK = 0; break;}
00301          Offs += ReqSize;
00302          rP = rP->Next;
00303         }
00304 
00305 // If all went well, rename the file
00306 //
00307    if (aOK && rename(newFN, oldFN) < 0)
00308       {eDest->Emsg("ReWrite",errno,"rename",newFN); aOK = 0;}
00309 
00310 // Perform post processing
00311 //
00312    if (aOK)  close(oldFD);
00313       else  {close(newFD); pocFD = oldFD;}
00314    pocFN = oldFN;
00315    pocSZ = Offs;
00316    return aOK;
00317 }
00318 
00319 /******************************************************************************/
00320 /*                             V e r O f f s e t                              */
00321 /******************************************************************************/
00322   
00323 int XrdOfsPoscq::VerOffset(const char *Lfn, int Offset)
00324 {
00325 
00326 // Verify the offset
00327 //
00328    if (Offset < ReqOffs || (Offset-ReqOffs)%ReqSize)
00329       {char buff[128];
00330        sprintf(buff, "Invalid slot %d for", Offset);
00331        eDest->Emsg("VerOffset", buff, Lfn);
00332        return 0;
00333       }
00334    return 1;
00335 }

Generated on Tue Jul 5 14:46:44 2011 for ROOT_528-00b_version by  doxygen 1.5.1