XrdFrmMigrate.cc

Go to the documentation of this file.
00001 /******************************************************************************/
00002 /*                                                                            */
00003 /*                      X r d F r m M i g r a t e . c c                       */
00004 /*                                                                            */
00005 /* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University  */
00006 /*                            All Rights Reserved                             */
00007 /*   Produced by Andrew Hanushevsky for Stanford University under contract    */
00008 /*              DE-AC02-76-SFO0515 with the Department of Energy              */
00009 /******************************************************************************/
00010   
00011 //         $Id: XrdFrmMigrate.cc 34000 2010-06-21 06:49:56Z ganis $
00012 
00013 const char *XrdFrmMigrateCVSID = "$Id: XrdFrmMigrate.cc 34000 2010-06-21 06:49:56Z ganis $";
00014 
00015 #include <stdio.h>
00016 #include <string.h>
00017 #include <strings.h>
00018 #include <utime.h>
00019 #include <sys/param.h>
00020 #include <sys/types.h>
00021 
00022 #include "XrdOss/XrdOss.hh"
00023 #include "XrdOss/XrdOssPath.hh"
00024 #include "XrdOuc/XrdOucNSWalk.hh"
00025 #include "XrdOuc/XrdOucTList.hh"
00026 #include "XrdFrm/XrdFrmFiles.hh"
00027 #include "XrdFrm/XrdFrmConfig.hh"
00028 #include "XrdFrm/XrdFrmMigrate.hh"
00029 #include "XrdFrm/XrdFrmRequest.hh"
00030 #include "XrdFrm/XrdFrmTrace.hh"
00031 #include "XrdFrm/XrdFrmTransfer.hh"
00032 #include "XrdFrm/XrdFrmXfrQueue.hh"
00033 #include "XrdSys/XrdSysPthread.hh"
00034 #include "XrdSys/XrdSysPlatform.hh"
00035 #include "XrdSys/XrdSysTimer.hh"
00036 
00037 using namespace XrdFrm;
00038 
00039 /******************************************************************************/
00040 /*                        S t a t i c   M e m b e r s                         */
00041 /******************************************************************************/
00042 
00043 
00044 XrdOucHash<char>  XrdFrmMigrate::BadFiles;
00045 
00046 XrdFrmFileset    *XrdFrmMigrate::fsDefer = 0;
00047 
00048 int               XrdFrmMigrate::numMig = 0;
00049   
00050 /******************************************************************************/
00051 /* Private:                          A d d                                    */
00052 /******************************************************************************/
00053   
00054 void XrdFrmMigrate::Add(XrdFrmFileset *sP)
00055 {
00056    EPNAME("Add");
00057    const char *Why;
00058    time_t xTime;
00059 
00060 // Check to see if the file is really eligible for purging
00061 //
00062    if ((Why = Eligible(sP, xTime)))
00063       {DEBUG(sP->basePath() <<"cannot be migrated; " <<Why);
00064        delete sP;
00065        return;
00066       }
00067 
00068 // Add the file to the migr queue or the defer queue based on mod time
00069 //
00070    if (xTime < Config.IdleHold) Defer(sP);
00071       else Queue(sP);
00072 }
00073   
00074 /******************************************************************************/
00075 /* Private:                      A d v a n c e                                */
00076 /******************************************************************************/
00077 
00078 int XrdFrmMigrate::Advance()
00079 {
00080    XrdFrmFileset *fP;
00081    int xTime;
00082 
00083 // Try to re-add everything in this queue
00084 //
00085    while(fsDefer)
00086         {xTime = static_cast<int>(time(0) - fsDefer->baseFile()->Stat.st_mtime);
00087          if (xTime < Config.IdleHold) break;
00088          fP = fsDefer; fsDefer = fsDefer->Next;
00089          if (fP->Refresh(1,0)) Add(fP);
00090             else delete fP;
00091         }
00092 
00093 // Return number of seconds to next advance event
00094 //
00095    return fsDefer ? Config.IdleHold - xTime : 0;
00096 }
00097   
00098 /******************************************************************************/
00099 /* Private:                        D e f e r                                  */
00100 /******************************************************************************/
00101 
00102 void XrdFrmMigrate::Defer(XrdFrmFileset *sP)
00103 {
00104    XrdFrmFileset *fP = fsDefer, *pfP = 0;
00105    time_t mTime = sP->baseFile()->Stat.st_mtime;
00106 
00107 // Insert this entry into the defer queue in ascending mtime order
00108 //
00109    while(fP && fP->baseFile()->Stat.st_mtime < mTime)
00110         {pfP = fP; fP = fP->Next;}
00111 
00112 // Chain in the fileset
00113 //
00114    sP->Next = fP;
00115    if (pfP) pfP->Next = sP;
00116       else  fsDefer   = sP;
00117 }
00118 
00119 /******************************************************************************/
00120 /*                               D i s p l a y                                */
00121 /******************************************************************************/
00122 
00123 void XrdFrmMigrate::Display()
00124 {
00125    XrdFrmConfig::VPInfo *vP = Config.pathList;
00126    XrdOucTList *tP;
00127 
00128 // Type header
00129 //
00130    Say.Say("=====> ", "Migrate configuration:");
00131 
00132 // Display what we will scan
00133 //
00134    while(vP)
00135         {Say.Say("=====> ", "Scanning ", (vP->Val?"r/w: ":"r/o: "), vP->Name);
00136          tP = vP->Dir;
00137          while(tP) {Say.Say("=====> ", "Excluded ", tP->text); tP = tP->next;}
00138          vP = vP->Next;
00139         }
00140 }
00141   
00142 /******************************************************************************/
00143 /* Private:                     E l i g i b l e                               */
00144 /******************************************************************************/
00145   
00146 const char *XrdFrmMigrate::Eligible(XrdFrmFileset *sP, time_t &xTime)
00147 {
00148    XrdOucNSWalk::NSEnt *baseFile = sP->baseFile();
00149    XrdOucNSWalk::NSEnt *lockFile = sP->lockFile();
00150    XrdOucNSWalk::NSEnt *failFile = sP->failFile();
00151    time_t mTimeBF, mTimeLK, nowTime = time(0);
00152    const char *eTxt;
00153 
00154 // File is inelegible if lockfile mtime is zero (i.e., an mstore placeholder)
00155 //
00156    mTimeLK = lockFile->Stat.st_mtime;
00157    if (!mTimeLK) return "migration defered";
00158 
00159 // File is ineligible if it has not changed since last migration
00160 //
00161    mTimeBF = baseFile->Stat.st_mtime;
00162    if (mTimeLK >= mTimeBF) return "file unchanged";
00163 
00164 // File is ineligible if it has a fail file that is still recent
00165 //
00166    if (failFile && (eTxt=XrdFrmTransfer::checkFF(sP->failPath()))) return eTxt;
00167 
00168 // Migration may need to be defered if the file has been modified too recently
00169 // (caller will check)
00170 //
00171    xTime = static_cast<int>(nowTime - mTimeBF);
00172 
00173 // File can be migrated
00174 //
00175    return 0;
00176 }
00177 
00178 /******************************************************************************/
00179 /*                               M i g r a t e                                */
00180 /******************************************************************************/
00181   
00182 void *XrdMigrateStart(void *parg)
00183 {
00184     XrdFrmMigrate::Migrate(0);
00185     return (void *)0;
00186 }
00187   
00188 void XrdFrmMigrate::Migrate(int doinit)
00189 {
00190    XrdFrmFileset *fP;
00191    char buff[80];
00192    int migWait, wTime;
00193 
00194 // If we have not initialized yet, start a thread to handle this
00195 //
00196    if (doinit)
00197       {pthread_t tid;
00198        int retc;
00199        if ((retc = XrdSysThread::Run(&tid, XrdMigrateStart, (void *)0,
00200                                      XRDSYSTHREAD_BIND, "migration scan")))
00201           Say.Emsg("Migrate", retc, "create migrtion thread");
00202        return;
00203       }
00204 
00205 // Start the migration sequence, first do a name space scan which will trigger
00206 // all eligible migrations and defer any that need to wait. We then drain the
00207 // defer queue and wait for the next period to start.
00208 //
00209 do{migWait = Config.WaitMigr; numMig = 0;
00210    Scan();
00211    while((wTime = Advance()))
00212         {if ((migWait -= wTime) <= 0) break;
00213             else  XrdSysTimer::Snooze(wTime);
00214         }
00215    while(fsDefer) {fP = fsDefer; fsDefer = fsDefer->Next; delete fP;}
00216    sprintf(buff, "%d file%s selected for transfer.",numMig,(numMig==1?"":"s"));
00217    Say.Emsg("Migrate", buff);
00218    if (migWait > 0) XrdSysTimer::Snooze(migWait);
00219   } while(1);
00220 }
00221 
00222 /******************************************************************************/
00223 /* Private:                        Q u e u e                                  */
00224 /******************************************************************************/
00225 
00226 void XrdFrmMigrate::Queue(XrdFrmFileset *sP)
00227 {
00228    static int reqID = 0;
00229    XrdFrmRequest myReq;
00230 
00231 // Convert the fileset to a request element
00232 //
00233    memset(&myReq, 0, sizeof(myReq));
00234    strlcpy(myReq.User, Config.myProg, sizeof(myReq.User));
00235    sprintf(myReq.ID, "Internal%d", reqID++);
00236    myReq.Options = XrdFrmRequest::Migrate;
00237    myReq.addTOD  = static_cast<long long>(time(0));
00238    if (Config.LogicalPath(sP->basePath(), myReq.LFN, sizeof(myReq.LFN)))
00239       {XrdFrmXfrQueue::Add(&myReq, 0, XrdFrmRequest::migQ); numMig++;}
00240 
00241 // All done
00242 //
00243    delete sP;
00244 }
00245   
00246 /******************************************************************************/
00247 /* Private:                       R e m f i x                                 */
00248 /******************************************************************************/
00249 
00250 void XrdFrmMigrate::Remfix(const char *Ftype, const char *Fname)
00251 {
00252 // Remove the offending file
00253 //
00254    if (!Config.ossFS->Unlink(Fname,XRDOSS_isPFN))
00255       Say.Emsg("Remfix", Ftype, "file orphan fixed; removed", Fname);
00256 }
00257   
00258 /******************************************************************************/
00259 /* Private:                         S c a n                                   */
00260 /******************************************************************************/
00261   
00262 void XrdFrmMigrate::Scan()
00263 {
00264    static const int Opts = XrdFrmFiles::Recursive | XrdFrmFiles::CompressD
00265                          | XrdFrmFiles::NoAutoDel;
00266    static time_t lastHP = time(0), nowT = time(0);
00267 
00268    XrdFrmConfig::VPInfo *vP = Config.pathList;
00269    XrdFrmFileset *sP;
00270    XrdFrmFiles   *fP;
00271    char buff[128];
00272    int ec = 0, Bad = 0, aFiles = 0, bFiles = 0;
00273 
00274 // Purge that bad file table evey 24 hours to keep complaints down
00275 //
00276    if (nowT - lastHP >= 86400) {BadFiles.Purge(); lastHP = nowT;}
00277 
00278 // Indicate scan started
00279 //
00280    VMSG("Scan", "Name space scan started. . .");
00281 
00282 // Process each directory
00283 //
00284    do {fP = new XrdFrmFiles(vP->Name, Opts, vP->Dir);
00285        while((sP = fP->Get(ec,1)))
00286             {aFiles++;
00287              if (Screen(sP)) Add(sP);
00288                 else {delete sP; bFiles++;}
00289             }
00290        if (ec) Bad = 1;
00291        delete fP;
00292       } while((vP = vP->Next));
00293 
00294 // Indicate scan ended
00295 //
00296    sprintf(buff, "%d file%s with %d error%s", aFiles, (aFiles != 1 ? "s":""),
00297                                               bFiles, (bFiles != 1 ? "s":""));
00298    VMSG("Scan", "Name space scan ended;", buff);
00299 
00300 // Issue warning if we encountered errors
00301 //
00302    if (Bad) Say.Emsg("Scan", "Errors encountered while scanning for "
00303                              "migratable files.");
00304 }
00305 
00306 /******************************************************************************/
00307 /* Private:                       S c r e e n                                 */
00308 /******************************************************************************/
00309 
00310 int XrdFrmMigrate::Screen(XrdFrmFileset *sP)
00311 {
00312    const char *What = 0, *badFN = 0;
00313    char dPath[MAXPATHLEN+1];
00314 
00315 // Verify that we have all the relevant files
00316 //
00317    if (!(sP->baseFile()))
00318       {if (Config.Fix)
00319           {if (sP->lockFile()) Remfix("Lock", sP->lockPath());
00320            if (sP-> pinFile()) Remfix("Pin",  sP-> pinPath());
00321            if (sP->failFile()) Remfix("Fail", sP->failPath());
00322            return 0;
00323           }
00324        What = "No base file for";
00325             if (sP->lockFile()) badFN = sP->lockPath();
00326        else if (sP-> pinFile()) badFN = sP-> pinPath();
00327        else if (sP->failFile()) badFN = sP->failPath();
00328        else {What = "Orhpaned files in"; badFN = dPath;
00329              sP->dirPath(dPath, sizeof(dPath));
00330             }
00331       } else if (!(sP->lockFile()))
00332                 {What = "No lock file for"; badFN = sP->basePath();}
00333                 else return 1;
00334 
00335 // Issue message if we haven't issued one before
00336 //
00337    if (!BadFiles.Add(badFN, 0, 0, Hash_data_is_key))
00338       Say.Emsg("Screen", What, badFN);
00339    return 0;
00340 }

Generated on Tue Jul 5 14:46:35 2011 for ROOT_528-00b_version by  doxygen 1.5.1