00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdFrmMigrateCVSID = "$Id: XrdFrmMigrate.cc 34000 2010-06-21 06:49:56Z ganis $";
00014
00015 #include <stdio.h>
00016 #include <string.h>
00017 #include <strings.h>
00018 #include <utime.h>
00019 #include <sys/param.h>
00020 #include <sys/types.h>
00021
00022 #include "XrdOss/XrdOss.hh"
00023 #include "XrdOss/XrdOssPath.hh"
00024 #include "XrdOuc/XrdOucNSWalk.hh"
00025 #include "XrdOuc/XrdOucTList.hh"
00026 #include "XrdFrm/XrdFrmFiles.hh"
00027 #include "XrdFrm/XrdFrmConfig.hh"
00028 #include "XrdFrm/XrdFrmMigrate.hh"
00029 #include "XrdFrm/XrdFrmRequest.hh"
00030 #include "XrdFrm/XrdFrmTrace.hh"
00031 #include "XrdFrm/XrdFrmTransfer.hh"
00032 #include "XrdFrm/XrdFrmXfrQueue.hh"
00033 #include "XrdSys/XrdSysPthread.hh"
00034 #include "XrdSys/XrdSysPlatform.hh"
00035 #include "XrdSys/XrdSysTimer.hh"
00036
00037 using namespace XrdFrm;
00038
00039
00040
00041
00042
00043
00044 XrdOucHash<char> XrdFrmMigrate::BadFiles;
00045
00046 XrdFrmFileset *XrdFrmMigrate::fsDefer = 0;
00047
00048 int XrdFrmMigrate::numMig = 0;
00049
00050
00051
00052
00053
00054 void XrdFrmMigrate::Add(XrdFrmFileset *sP)
00055 {
00056 EPNAME("Add");
00057 const char *Why;
00058 time_t xTime;
00059
00060
00061
00062 if ((Why = Eligible(sP, xTime)))
00063 {DEBUG(sP->basePath() <<"cannot be migrated; " <<Why);
00064 delete sP;
00065 return;
00066 }
00067
00068
00069
00070 if (xTime < Config.IdleHold) Defer(sP);
00071 else Queue(sP);
00072 }
00073
00074
00075
00076
00077
00078 int XrdFrmMigrate::Advance()
00079 {
00080 XrdFrmFileset *fP;
00081 int xTime;
00082
00083
00084
00085 while(fsDefer)
00086 {xTime = static_cast<int>(time(0) - fsDefer->baseFile()->Stat.st_mtime);
00087 if (xTime < Config.IdleHold) break;
00088 fP = fsDefer; fsDefer = fsDefer->Next;
00089 if (fP->Refresh(1,0)) Add(fP);
00090 else delete fP;
00091 }
00092
00093
00094
00095 return fsDefer ? Config.IdleHold - xTime : 0;
00096 }
00097
00098
00099
00100
00101
00102 void XrdFrmMigrate::Defer(XrdFrmFileset *sP)
00103 {
00104 XrdFrmFileset *fP = fsDefer, *pfP = 0;
00105 time_t mTime = sP->baseFile()->Stat.st_mtime;
00106
00107
00108
00109 while(fP && fP->baseFile()->Stat.st_mtime < mTime)
00110 {pfP = fP; fP = fP->Next;}
00111
00112
00113
00114 sP->Next = fP;
00115 if (pfP) pfP->Next = sP;
00116 else fsDefer = sP;
00117 }
00118
00119
00120
00121
00122
00123 void XrdFrmMigrate::Display()
00124 {
00125 XrdFrmConfig::VPInfo *vP = Config.pathList;
00126 XrdOucTList *tP;
00127
00128
00129
00130 Say.Say("=====> ", "Migrate configuration:");
00131
00132
00133
00134 while(vP)
00135 {Say.Say("=====> ", "Scanning ", (vP->Val?"r/w: ":"r/o: "), vP->Name);
00136 tP = vP->Dir;
00137 while(tP) {Say.Say("=====> ", "Excluded ", tP->text); tP = tP->next;}
00138 vP = vP->Next;
00139 }
00140 }
00141
00142
00143
00144
00145
00146 const char *XrdFrmMigrate::Eligible(XrdFrmFileset *sP, time_t &xTime)
00147 {
00148 XrdOucNSWalk::NSEnt *baseFile = sP->baseFile();
00149 XrdOucNSWalk::NSEnt *lockFile = sP->lockFile();
00150 XrdOucNSWalk::NSEnt *failFile = sP->failFile();
00151 time_t mTimeBF, mTimeLK, nowTime = time(0);
00152 const char *eTxt;
00153
00154
00155
00156 mTimeLK = lockFile->Stat.st_mtime;
00157 if (!mTimeLK) return "migration defered";
00158
00159
00160
00161 mTimeBF = baseFile->Stat.st_mtime;
00162 if (mTimeLK >= mTimeBF) return "file unchanged";
00163
00164
00165
00166 if (failFile && (eTxt=XrdFrmTransfer::checkFF(sP->failPath()))) return eTxt;
00167
00168
00169
00170
00171 xTime = static_cast<int>(nowTime - mTimeBF);
00172
00173
00174
00175 return 0;
00176 }
00177
00178
00179
00180
00181
00182 void *XrdMigrateStart(void *parg)
00183 {
00184 XrdFrmMigrate::Migrate(0);
00185 return (void *)0;
00186 }
00187
00188 void XrdFrmMigrate::Migrate(int doinit)
00189 {
00190 XrdFrmFileset *fP;
00191 char buff[80];
00192 int migWait, wTime;
00193
00194
00195
00196 if (doinit)
00197 {pthread_t tid;
00198 int retc;
00199 if ((retc = XrdSysThread::Run(&tid, XrdMigrateStart, (void *)0,
00200 XRDSYSTHREAD_BIND, "migration scan")))
00201 Say.Emsg("Migrate", retc, "create migrtion thread");
00202 return;
00203 }
00204
00205
00206
00207
00208
00209 do{migWait = Config.WaitMigr; numMig = 0;
00210 Scan();
00211 while((wTime = Advance()))
00212 {if ((migWait -= wTime) <= 0) break;
00213 else XrdSysTimer::Snooze(wTime);
00214 }
00215 while(fsDefer) {fP = fsDefer; fsDefer = fsDefer->Next; delete fP;}
00216 sprintf(buff, "%d file%s selected for transfer.",numMig,(numMig==1?"":"s"));
00217 Say.Emsg("Migrate", buff);
00218 if (migWait > 0) XrdSysTimer::Snooze(migWait);
00219 } while(1);
00220 }
00221
00222
00223
00224
00225
00226 void XrdFrmMigrate::Queue(XrdFrmFileset *sP)
00227 {
00228 static int reqID = 0;
00229 XrdFrmRequest myReq;
00230
00231
00232
00233 memset(&myReq, 0, sizeof(myReq));
00234 strlcpy(myReq.User, Config.myProg, sizeof(myReq.User));
00235 sprintf(myReq.ID, "Internal%d", reqID++);
00236 myReq.Options = XrdFrmRequest::Migrate;
00237 myReq.addTOD = static_cast<long long>(time(0));
00238 if (Config.LogicalPath(sP->basePath(), myReq.LFN, sizeof(myReq.LFN)))
00239 {XrdFrmXfrQueue::Add(&myReq, 0, XrdFrmRequest::migQ); numMig++;}
00240
00241
00242
00243 delete sP;
00244 }
00245
00246
00247
00248
00249
00250 void XrdFrmMigrate::Remfix(const char *Ftype, const char *Fname)
00251 {
00252
00253
00254 if (!Config.ossFS->Unlink(Fname,XRDOSS_isPFN))
00255 Say.Emsg("Remfix", Ftype, "file orphan fixed; removed", Fname);
00256 }
00257
00258
00259
00260
00261
00262 void XrdFrmMigrate::Scan()
00263 {
00264 static const int Opts = XrdFrmFiles::Recursive | XrdFrmFiles::CompressD
00265 | XrdFrmFiles::NoAutoDel;
00266 static time_t lastHP = time(0), nowT = time(0);
00267
00268 XrdFrmConfig::VPInfo *vP = Config.pathList;
00269 XrdFrmFileset *sP;
00270 XrdFrmFiles *fP;
00271 char buff[128];
00272 int ec = 0, Bad = 0, aFiles = 0, bFiles = 0;
00273
00274
00275
00276 if (nowT - lastHP >= 86400) {BadFiles.Purge(); lastHP = nowT;}
00277
00278
00279
00280 VMSG("Scan", "Name space scan started. . .");
00281
00282
00283
00284 do {fP = new XrdFrmFiles(vP->Name, Opts, vP->Dir);
00285 while((sP = fP->Get(ec,1)))
00286 {aFiles++;
00287 if (Screen(sP)) Add(sP);
00288 else {delete sP; bFiles++;}
00289 }
00290 if (ec) Bad = 1;
00291 delete fP;
00292 } while((vP = vP->Next));
00293
00294
00295
00296 sprintf(buff, "%d file%s with %d error%s", aFiles, (aFiles != 1 ? "s":""),
00297 bFiles, (bFiles != 1 ? "s":""));
00298 VMSG("Scan", "Name space scan ended;", buff);
00299
00300
00301
00302 if (Bad) Say.Emsg("Scan", "Errors encountered while scanning for "
00303 "migratable files.");
00304 }
00305
00306
00307
00308
00309
00310 int XrdFrmMigrate::Screen(XrdFrmFileset *sP)
00311 {
00312 const char *What = 0, *badFN = 0;
00313 char dPath[MAXPATHLEN+1];
00314
00315
00316
00317 if (!(sP->baseFile()))
00318 {if (Config.Fix)
00319 {if (sP->lockFile()) Remfix("Lock", sP->lockPath());
00320 if (sP-> pinFile()) Remfix("Pin", sP-> pinPath());
00321 if (sP->failFile()) Remfix("Fail", sP->failPath());
00322 return 0;
00323 }
00324 What = "No base file for";
00325 if (sP->lockFile()) badFN = sP->lockPath();
00326 else if (sP-> pinFile()) badFN = sP-> pinPath();
00327 else if (sP->failFile()) badFN = sP->failPath();
00328 else {What = "Orhpaned files in"; badFN = dPath;
00329 sP->dirPath(dPath, sizeof(dPath));
00330 }
00331 } else if (!(sP->lockFile()))
00332 {What = "No lock file for"; badFN = sP->basePath();}
00333 else return 1;
00334
00335
00336
00337 if (!BadFiles.Add(badFN, 0, 0, Hash_data_is_key))
00338 Say.Emsg("Screen", What, badFN);
00339 return 0;
00340 }