00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "TPacketizerMulti.h"
00028
00029 #include "TClass.h"
00030 #include "TDSet.h"
00031 #include "TError.h"
00032 #include "TFileInfo.h"
00033 #include "TList.h"
00034 #include "TMap.h"
00035 #include "TMethodCall.h"
00036 #include "TProof.h"
00037 #include "TProofDebug.h"
00038
00039 ClassImp(TPacketizerMulti)
00040
00041
00042 TPacketizerMulti::TPacketizerMulti(TDSet *dset, TList *wrks,
00043 Long64_t first, Long64_t num,
00044 TList *input, TProofProgressStatus *st)
00045 : TVirtualPacketizer(input, st)
00046 {
00047
00048
00049 PDB(kPacketizer,1) Info("TPacketizerMulti",
00050 "enter (first %lld, num %lld)", first, num);
00051 fValid = kFALSE;
00052 fPacketizersIter = 0;
00053 fCurrent = 0;
00054 fAssignedPack = 0;
00055
00056
00057 if (!dset || !wrks || !input || !st) {
00058 Error("TPacketizerMulti", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
00059 dset, wrks, input, st);
00060 return;
00061 }
00062
00063 fPacketizers = new TList;
00064
00065
00066 TNamed *progTimerFlag = new TNamed("PROOF_StartProgressTimer", "no");
00067 input->Add(progTimerFlag);
00068
00069 fTotalEntries = 0;
00070 TVirtualPacketizer *packetizer = 0;
00071
00072 if (!(dset->TestBit(TDSet::kMultiDSet))) {
00073 if ((packetizer = CreatePacketizer(dset, wrks, first, num, input, st))) {
00074 fPacketizers->Add(packetizer);
00075 fTotalEntries = packetizer->GetTotalEntries();
00076 } else {
00077 Error("TPacketizerMulti", "problems initializing packetizer for single dataset");
00078 input->Remove(progTimerFlag);
00079 delete progTimerFlag;
00080 return;
00081 }
00082 } else {
00083
00084 TIter nxds(dset->GetListOfElements());
00085 TDSet *ds = 0;
00086 while ((ds = (TDSet *)nxds())) {
00087 if ((packetizer = CreatePacketizer(ds, wrks, first, num, input, st))) {
00088 fPacketizers->Add(packetizer);
00089 fTotalEntries += packetizer->GetTotalEntries();
00090 } else {
00091 Error("TPacketizerMulti", "problems initializing packetizer for dataset '%s'", ds->GetName());
00092 }
00093 }
00094 }
00095
00096 input->Remove(progTimerFlag);
00097 delete progTimerFlag;
00098
00099
00100 if (fPacketizers->GetSize() <= 0) {
00101 Error("TPacketizerMulti", "no valid packetizer could be initialized - aborting");
00102 SafeDelete(fPacketizers);
00103 return;
00104 } else {
00105 Info("TPacketizerMulti", "%d packetizer(s) have been successfully initialized (%lld events in total)",
00106 fPacketizers->GetSize(), fTotalEntries);
00107
00108 TIter nxp(fPacketizers);
00109 while ((packetizer = (TVirtualPacketizer *) nxp()))
00110 packetizer->SetTotalEntries(fTotalEntries);
00111 }
00112
00113
00114 fPacketizersIter = new TIter(fPacketizers);
00115
00116
00117 if (!(fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next())) {
00118
00119 Error("TPacketizerMulti", "could not point to the first valid packetizer");
00120 fPacketizers->SetOwner(kTRUE);
00121 SafeDelete(fPacketizers);
00122 SafeDelete(fPacketizersIter);
00123 return;
00124 }
00125
00126
00127 fAssignedPack = new TMap;
00128
00129
00130 fValid = kTRUE;
00131
00132 PDB(kPacketizer,1) Info("TPacketizerMulti", "done");
00133 }
00134
00135
00136 TPacketizerMulti::~TPacketizerMulti()
00137 {
00138
00139
00140 if (fPacketizers) {
00141 fPacketizers->SetOwner(kTRUE);
00142 SafeDelete(fPacketizers);
00143 }
00144 SafeDelete(fPacketizers);
00145 fCurrent = 0;
00146 if (fAssignedPack) {
00147 fAssignedPack->SetOwner(kFALSE);
00148 SafeDelete(fAssignedPack);
00149 }
00150 }
00151
00152
00153 TDSetElement *TPacketizerMulti::GetNextPacket(TSlave *wrk, TMessage *r)
00154 {
00155
00156
00157
00158
00159 TDSetElement *elem = 0;
00160
00161
00162 if (!fValid) return elem;
00163
00164
00165 TVirtualPacketizer *lastPacketizer = dynamic_cast<TVirtualPacketizer *>(fAssignedPack->GetValue(wrk));
00166 if (lastPacketizer && lastPacketizer != fCurrent) {
00167 PDB(kPacketizer,2)
00168 Info("GetNextPacket", "%s: asking old packetizer %p ... ", wrk->GetOrdinal(), lastPacketizer);
00169 if ((elem = lastPacketizer->GetNextPacket(wrk, r))) return elem;
00170 if (fCurrent) {
00171
00172 TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(lastPacketizer->GetSlaveStats()->GetValue(wrk));
00173 TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
00174 if (oldstat && curstat)
00175 *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
00176 }
00177 }
00178
00179
00180 if (!fCurrent) {
00181 HandleTimer(0);
00182 return elem;
00183 }
00184
00185
00186 PDB(kPacketizer,2)
00187 Info("GetNextPacket", "%s: asking current packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
00188 if (!(elem = fCurrent->GetNextPacket(wrk, r))) {
00189
00190 TMap *oldStats = (lastPacketizer && lastPacketizer == fCurrent) ? lastPacketizer->GetSlaveStats() : 0;
00191
00192 fCurrent = (TVirtualPacketizer *) fPacketizersIter->Next();
00193 if (fCurrent) {
00194
00195 if (oldStats) {
00196 TVirtualSlaveStat *oldstat = dynamic_cast<TVirtualSlaveStat *>(oldStats->GetValue(wrk));
00197 TVirtualSlaveStat *curstat = dynamic_cast<TVirtualSlaveStat *>(fCurrent->GetSlaveStats()->GetValue(wrk));
00198 if (oldstat && curstat)
00199 *(curstat->GetProgressStatus()) += *(oldstat->GetProgressStatus());
00200 }
00201 PDB(kPacketizer,2)
00202 Info("GetNextPacket", "%s: asking new packetizer %p ... ", wrk->GetOrdinal(), fCurrent);
00203 elem = fCurrent->GetNextPacket(wrk, r);
00204 }
00205 }
00206 if (fCurrent) {
00207
00208 TPair *pair = dynamic_cast<TPair *>(fAssignedPack->FindObject(wrk));
00209 if (pair) {
00210 pair->SetValue(fCurrent);
00211 } else {
00212 fAssignedPack->Add(wrk, fCurrent);
00213 }
00214 PDB(kPacketizer,2)
00215 Info("GetNextPacket", "assigned packetizer %p to %s (check: %p)",
00216 fCurrent, wrk->GetOrdinal(), fAssignedPack->GetValue(wrk));
00217 }
00218
00219
00220 if (fProgressStatus->GetEntries() >= fTotalEntries) {
00221 if (fProgressStatus->GetEntries() > fTotalEntries)
00222 Error("GetNextPacket", "Processed too many entries!");
00223 HandleTimer(0);
00224 SafeDelete(fProgress);
00225 }
00226
00227
00228 return elem;
00229 }
00230
00231
00232 TVirtualPacketizer *TPacketizerMulti::CreatePacketizer(TDSet *dset, TList *wrks,
00233 Long64_t first, Long64_t num,
00234 TList *input, TProofProgressStatus *st)
00235 {
00236
00237
00238
00239 TVirtualPacketizer *packetizer = 0;
00240
00241
00242 if (!dset || !wrks || !input || !st) {
00243 Error("CreatePacketizer", "invalid inputs: dset:%p wrks:%p input:%p st:%p",
00244 dset, wrks, input, st);
00245 return packetizer;
00246 }
00247
00248
00249 if (dset->TestBit(TDSet::kEmpty)) {
00250 Error("CreatePacketizer", "dataset is empty: protocol error?");
00251 return packetizer;
00252 }
00253
00254 TString packetizername;
00255 TList *listOfMissingFiles = 0;
00256
00257 TMethodCall callEnv;
00258 TClass *cl;
00259
00260
00261
00262
00263
00264 if (!(listOfMissingFiles = (TList *) input->FindObject("MissingFiles"))) {
00265
00266 listOfMissingFiles = new TList;
00267
00268 input->Add(listOfMissingFiles);
00269 }
00270 dset->Lookup(kTRUE, &listOfMissingFiles);
00271
00272 if (!(dset->GetListOfElements()) ||
00273 !(dset->GetListOfElements()->GetSize())) {
00274 Error("CreatePacketizer", "no files from the data set were found - skipping");
00275 return packetizer;
00276 }
00277
00278 if (TProof::GetParameter(input, "PROOF_Packetizer", packetizername) != 0) {
00279
00280 packetizername = "TPacketizerAdaptive";
00281 } else {
00282 Info("CreatePacketizer", "using alternate packetizer: %s", packetizername.Data());
00283 }
00284
00285
00286 cl = TClass::GetClass(packetizername);
00287 if (cl == 0) {
00288 Error("CreatePacketizer", "class '%s' not found", packetizername.Data());
00289 return packetizer;
00290 }
00291
00292
00293 callEnv.InitWithPrototype(cl, cl->GetName(),"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
00294 if (!callEnv.IsValid()) {
00295 Error("CreatePacketizer", "cannot find correct constructor for '%s'", cl->GetName());
00296 return packetizer;
00297 }
00298 callEnv.ResetParam();
00299 callEnv.SetParam((Long_t) dset);
00300 callEnv.SetParam((Long_t) wrks);
00301 callEnv.SetParam((Long64_t) first);
00302 callEnv.SetParam((Long64_t) num);
00303 callEnv.SetParam((Long_t) input);
00304 callEnv.SetParam((Long_t) st);
00305
00306
00307 dset->SetBit(TDSet::kValidityChecked);
00308 dset->ResetBit(TDSet::kSomeInvalid);
00309
00310
00311 Long_t ret = 0;
00312 callEnv.Execute(ret);
00313 if ((packetizer = (TVirtualPacketizer *)ret) == 0) {
00314 Error("CreatePacketizer", "cannot construct '%s'", cl->GetName());
00315 return packetizer;
00316 }
00317
00318 if (!packetizer->IsValid()) {
00319 Error("CreatePacketizer",
00320 "instantiated packetizer object '%s' is invalid", cl->GetName());
00321 SafeDelete(packetizer);
00322 }
00323
00324
00325 TDSetElement *elem = 0;
00326 if (dset->TestBit(TDSet::kSomeInvalid)) {
00327 TIter nxe(dset->GetListOfElements());
00328 while ((elem = (TDSetElement *)nxe())) {
00329 if (!elem->GetValid()) {
00330 listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
00331 dset->Remove(elem, kFALSE);
00332 }
00333 }
00334
00335 dset->ResetBit(TDSet::kSomeInvalid);
00336 }
00337
00338
00339 return packetizer;
00340 }