00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4BufferQueue.h"
00017
00018 #include "Riostream.h"
00019
00020 #include "TROOT.h"
00021 #include "TMutex.h"
00022 #include "TFile.h"
00023 #include "TGo4Buffer.h"
00024 #include "RVersion.h"
00025
00026 #include "TGo4Socket.h"
00027 #include "TGo4RuntimeException.h"
00028 #include "TGo4Log.h"
00029 #include "TGo4LockGuard.h"
00030
00031 #if ROOT_VERSION_CODE > ROOT_VERSION(4,3,2)
00032
00033 const Int_t TGo4BufferQueue::fgiISOWNER=TBuffer::kIsOwner;
00034 #else
00035 const Int_t TGo4BufferQueue::fgiISOWNER=BIT(14);
00036
00037 #endif
00038
00039
00040
00041 TGo4BufferQueue::TGo4BufferQueue()
00042 : TGo4Queue("Default buffer queue"),fiOverflowcount(0), fiMaxBuffers(10)
00043 {
00044 TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
00045
00046 InitBuffers();
00047 }
00048
00049 TGo4BufferQueue::TGo4BufferQueue(const char* name)
00050 : TGo4Queue(name), fiOverflowcount(0), fiMaxBuffers(10)
00051
00052 {
00053 TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(Text_t*)", __LINE__, __FILE__));
00054 InitBuffers();
00055 }
00056
00057 void TGo4BufferQueue::InitBuffers()
00058
00059 {
00060 TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
00061 TGo4LockGuard mainguard;
00062 fxBufferList=new TList;
00063 fxFreeList=new TList;
00064 fxBufferIterator=fxFreeList->MakeIterator();
00065 fxBufferMutex=new TMutex;
00066 for (Int_t i=0; i< fiMaxBuffers; ++i)
00067 {
00068 TBuffer* buf=NewEntry();
00069 fxBufferList->Add(buf);
00070 fxFreeList->Add(buf);
00071 }
00072 }
00073
00074 TGo4BufferQueue::~TGo4BufferQueue()
00075 {
00076 TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
00077
00078 delete fxBufferIterator;
00079 fxBufferList->Delete();
00080 TCollection::EmptyGarbageCollection();
00081 delete fxBufferList;
00082 delete fxBufferMutex;
00083 }
00084
00085 TBuffer * TGo4BufferQueue::WaitBuffer()
00086 {
00087 TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
00088 TObject* ob = Wait();
00089 TBuffer* rev= dynamic_cast<TBuffer*> ( ob );
00090 return rev;
00091 }
00092
00093
00094 TObject * TGo4BufferQueue::WaitObjectFromBuffer()
00095 {
00096 TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
00097 TObject* obj=0;
00098 TBuffer* buffer=WaitBuffer();
00099 if(buffer)
00100 {
00101 {
00102 TGo4LockGuard mainguard;
00103
00104 TDirectory* savdir=gDirectory;
00105 gROOT->cd();
00106 buffer->SetReadMode();
00107
00108 buffer->Reset();
00109 buffer->InitMap();
00110
00111
00112
00113 TClass* cl = buffer->ReadClass();
00114
00115 if(cl == (TClass*) -1)
00116 {
00117
00118 cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<endl;
00119 obj=0;
00120 }
00121 else
00122 {
00123
00124
00125
00126 buffer->Reset();
00127 obj=buffer->ReadObject(cl);
00128 }
00129 if(savdir) savdir->cd();
00130 }
00131 FreeBuffer(buffer);
00132 }
00133 else
00134 {
00135
00136 }
00137 return obj;
00138 }
00139
00140 void TGo4BufferQueue::AddBuffer(TBuffer * buffer, Bool_t clone)
00141 {
00142 TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00143
00144 TBuffer* entry=0;
00145 Bool_t entryisnew=kFALSE;
00146 if(clone)
00147 {
00148 TGo4LockGuard qguard(fxBufferMutex);
00149 entry= dynamic_cast<TBuffer*>(fxFreeList->Remove(fxFreeList->First()));
00150
00151 if(entry==0)
00152 {
00153
00154 TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
00155
00156
00157 entry=NewEntry();
00158 fxBufferList->Add(entry);
00159
00160 entryisnew=kTRUE;
00161 fiMaxBuffers++;
00162
00163 }
00164 else
00165 {
00166
00167 }
00168
00169 Int_t srcsize=buffer->BufferSize();
00170 Int_t destsize=entry->BufferSize();
00171 if (srcsize>destsize)
00172 {
00173 Realloc(entry,destsize,srcsize);
00174 destsize=srcsize;
00175 }
00176 else
00177 {
00178
00179 }
00180 char* source= buffer->Buffer();
00181 char* destination= entry->Buffer();
00182 memcpy(destination,source,destsize);
00183
00184 Int_t messlen = buffer->Length();
00185 entry->SetBufferOffset(messlen);
00186 entry->SetByteCount(0);
00187 }
00188 else
00189 {
00190 entry=buffer;
00191 }
00192
00193
00194 try
00195 {
00196 Add(entry);
00197 }
00198 catch(TGo4RuntimeException ex)
00199 {
00200 cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00201 if(entryisnew)
00202 {
00203 TGo4LockGuard qguard(fxBufferMutex);
00204 fxBufferList->Remove(entry);
00205 delete entry;
00206 }
00207 }
00208 }
00209
00210 void TGo4BufferQueue::AddBufferFromObject(TObject * object)
00211 {
00212 TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject*)", __LINE__, __FILE__));
00213 if(object==0) return;
00214 TGo4LockGuard mainguard;
00215 TBuffer* entry = 0;
00216 entry = new TGo4Buffer(TBuffer::kWrite);
00217
00218 TFile *filsav = gFile;
00219 gFile = 0;
00220 entry->WriteObject(object);
00221 gFile = filsav;
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00242
00243
00244
00245 try
00246 {
00247 Add(entry);
00248 }
00249 catch(TGo4RuntimeException& ex)
00250 {
00251 cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00252 delete entry;
00253 }
00254
00255 }
00256
00257 void TGo4BufferQueue::FreeBuffer(TBuffer * buffer)
00258 {
00259 TRACE((19,"TGo4BufferQueue::FreeBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00260
00261 TGo4LockGuard qguard(fxBufferMutex);
00262
00263
00264 if (fxBufferList->FindObject(buffer)!=0)
00265 {
00266
00267
00268
00269 Int_t memsize=buffer->BufferSize();
00270 if (memsize>TGo4Socket::fgiBUFINITSIZE)
00271 {
00272 Realloc(buffer,memsize, TGo4Socket::fgiBUFINITSIZE);
00273 }
00274 fxFreeList->AddLast(buffer);
00275
00276 }
00277 else
00278 {
00279
00280 delete buffer;
00281
00282
00283
00284
00285 }
00286
00287
00288
00289
00290 }
00291
00292 void TGo4BufferQueue::Clear(Option_t* opt)
00293 {
00294 TObject* ob=0;
00295 while((ob=Next()) !=0)
00296 {
00297
00298 FreeBuffer(dynamic_cast<TBuffer*> (ob) );
00299 }
00300
00301 }
00302
00303
00304
00305
00306 void TGo4BufferQueue::Realloc(TBuffer* buffer, Int_t oldsize, Int_t newsize)
00307 {
00308 if(buffer==0) return;
00309 TGo4LockGuard mainguard;
00310 char* memfield=buffer->Buffer();
00311
00312
00313 Int_t extraspace=TGo4Socket::fgiBUFEXTRASPACE;
00314
00315
00316
00317
00318 memfield = TStorage::ReAllocChar(memfield,
00319 (newsize+extraspace),
00320 (oldsize+extraspace));
00321
00322 buffer->ResetBit(fgiISOWNER);
00323 buffer->SetBuffer(memfield, newsize);
00324 buffer->SetBit(fgiISOWNER);
00325
00326
00327
00328
00329
00330 buffer->SetBufferOffset(newsize);
00331 }
00332
00333 TBuffer* TGo4BufferQueue::NewEntry()
00334 {
00335 TGo4LockGuard mainguard;
00336 TBuffer* buf = new TGo4Buffer(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
00337
00338 TNamed* dummy= new TNamed("This is a default buffer filler","GO4 is fun!");
00339 TFile *filsav = gFile;
00340 gFile = 0;
00341 buf->WriteObject(dummy);
00342 gFile = filsav;
00343 delete dummy;
00344 return buf;
00345 }
00346
00347 TBuffer* TGo4BufferQueue::CreateValueBuffer(UInt_t val)
00348 {
00349 TGo4Buffer* buf= new TGo4Buffer(TBuffer::kWrite);
00350 char* field= buf->Buffer() + sizeof(UInt_t);
00351 tobuf(field ,val);
00352 buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) );
00353 buf->SetByteCount(0);
00354 return buf;
00355 }
00356
00357
00358 Int_t TGo4BufferQueue::DecodeValueBuffer(TBuffer* buf)
00359 {
00360 if(buf==0) return -1;
00361 UInt_t len= buf->Length();
00362 if(len==sizeof(UInt_t)+sizeof(UInt_t))
00363
00364 {
00365 char* field=buf->Buffer();
00366 char* temp= field + sizeof(UInt_t);
00367 UInt_t val=0;
00368 frombuf(temp, &val);
00369
00370 return (Int_t) val;
00371 }
00372 else
00373 {
00374 return -2;
00375 }
00376 }
00377
00378