00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4BufferQueue.h"
00015
00016 #include "TROOT.h"
00017 #include "TClass.h"
00018 #include "TMutex.h"
00019 #include "TFile.h"
00020 #include "TGo4Buffer.h"
00021 #include "RVersion.h"
00022 #include "Riostream.h"
00023
00024
00025 #include "TGo4Socket.h"
00026 #include "TGo4RuntimeException.h"
00027 #include "TGo4Log.h"
00028 #include "TGo4LockGuard.h"
00029
00030 #if ROOT_VERSION_CODE > ROOT_VERSION(4,3,2)
00031
00032 const Int_t TGo4BufferQueue::fgiISOWNER=TBuffer::kIsOwner;
00033 #else
00034 const Int_t TGo4BufferQueue::fgiISOWNER=BIT(14);
00035
00036 #endif
00037
00038
00039
00040 TGo4BufferQueue::TGo4BufferQueue()
00041 : TGo4Queue("Default buffer queue"),fiOverflowcount(0), fiMaxBuffers(10)
00042 {
00043 TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
00044
00045 InitBuffers();
00046 }
00047
00048 TGo4BufferQueue::TGo4BufferQueue(const char* name)
00049 : TGo4Queue(name), fiOverflowcount(0), fiMaxBuffers(10)
00050
00051 {
00052 TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(const char*)", __LINE__, __FILE__));
00053 InitBuffers();
00054 }
00055
00056 void TGo4BufferQueue::InitBuffers()
00057
00058 {
00059 TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
00060 TGo4LockGuard mainguard;
00061 fxBufferList=new TList;
00062 fxFreeList=new TList;
00063 fxBufferIterator=fxFreeList->MakeIterator();
00064 fxBufferMutex=new TMutex;
00065 for (Int_t i=0; i< fiMaxBuffers; ++i)
00066 {
00067 TBuffer* buf=NewEntry();
00068 fxBufferList->Add(buf);
00069 fxFreeList->Add(buf);
00070 }
00071 }
00072
00073 TGo4BufferQueue::~TGo4BufferQueue()
00074 {
00075 TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
00076
00077 delete fxBufferIterator;
00078 fxBufferList->Delete();
00079 TCollection::EmptyGarbageCollection();
00080 delete fxBufferList;
00081 delete fxBufferMutex;
00082 }
00083
00084 TBuffer * TGo4BufferQueue::WaitBuffer()
00085 {
00086 TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
00087 TObject* ob = Wait();
00088 TBuffer* rev= dynamic_cast<TBuffer*> ( ob );
00089 return rev;
00090 }
00091
00092 TObject * TGo4BufferQueue::WaitObjectFromBuffer()
00093 {
00094 TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
00095 TObject* obj=0;
00096 TBuffer* buffer=WaitBuffer();
00097 if(buffer) {
00098 {
00099 TGo4LockGuard mainguard;
00100
00101 TDirectory* savdir=gDirectory;
00102 gROOT->cd();
00103 buffer->SetReadMode();
00104
00105 buffer->Reset();
00106 buffer->InitMap();
00107
00108
00109
00110 TClass* cl = buffer->ReadClass();
00111
00112 if(cl == (TClass*) -1)
00113 {
00114
00115 cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<endl;
00116 obj=0;
00117 } else {
00118
00119
00120
00121 buffer->Reset();
00122 obj = buffer->ReadObject(cl);
00123 }
00124 if(savdir) savdir->cd();
00125 }
00126 FreeBuffer(buffer);
00127 }
00128 else
00129 {
00130
00131 }
00132 return obj;
00133 }
00134
00135 void TGo4BufferQueue::AddBuffer(TBuffer * buffer, Bool_t clone)
00136 {
00137 TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00138
00139 TBuffer* entry=0;
00140 Bool_t entryisnew=kFALSE;
00141 if(clone)
00142 {
00143 TGo4LockGuard qguard(fxBufferMutex);
00144 entry= dynamic_cast<TBuffer*>(fxFreeList->Remove(fxFreeList->First()));
00145
00146 if(entry==0)
00147 {
00148
00149 TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
00150
00151
00152 entry=NewEntry();
00153 fxBufferList->Add(entry);
00154
00155 entryisnew=kTRUE;
00156 fiMaxBuffers++;
00157
00158 }
00159 else
00160 {
00161
00162 }
00163
00164 Int_t srcsize=buffer->BufferSize();
00165 Int_t destsize=entry->BufferSize();
00166 if (srcsize>destsize)
00167 {
00168 Realloc(entry,destsize,srcsize);
00169 destsize=srcsize;
00170 }
00171 else
00172 {
00173
00174 }
00175 char* source= buffer->Buffer();
00176 char* destination= entry->Buffer();
00177 memcpy(destination,source,destsize);
00178
00179 Int_t messlen = buffer->Length();
00180 entry->SetBufferOffset(messlen);
00181 entry->SetByteCount(0);
00182 }
00183 else
00184 {
00185 entry=buffer;
00186 }
00187
00188
00189 try
00190 {
00191 Add(entry);
00192 }
00193 catch(TGo4RuntimeException ex)
00194 {
00195 cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00196 if(entryisnew)
00197 {
00198 TGo4LockGuard qguard(fxBufferMutex);
00199 fxBufferList->Remove(entry);
00200 delete entry;
00201 }
00202 }
00203 }
00204
00205 void TGo4BufferQueue::AddBufferFromObject(TObject * object)
00206 {
00207 TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject*)", __LINE__, __FILE__));
00208 if(object==0) return;
00209 TGo4LockGuard mainguard;
00210 TBuffer* entry = 0;
00211 entry = new TGo4Buffer(TBuffer::kWrite);
00212
00213 TFile *filsav = gFile;
00214 gFile = 0;
00215 entry->WriteObject(object);
00216 gFile = filsav;
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00237
00238
00239
00240 try
00241 {
00242 Add(entry);
00243 }
00244 catch(TGo4RuntimeException& ex)
00245 {
00246 cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00247 delete entry;
00248 }
00249
00250 }
00251
00252 void TGo4BufferQueue::FreeBuffer(TBuffer * buffer)
00253 {
00254 TRACE((19,"TGo4BufferQueue::FreeBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00255
00256 TGo4LockGuard qguard(fxBufferMutex);
00257
00258
00259 if (fxBufferList->FindObject(buffer)!=0)
00260 {
00261
00262
00263
00264 Int_t memsize=buffer->BufferSize();
00265 if (memsize>TGo4Socket::fgiBUFINITSIZE)
00266 {
00267 Realloc(buffer,memsize, TGo4Socket::fgiBUFINITSIZE);
00268 }
00269 fxFreeList->AddLast(buffer);
00270
00271 }
00272 else
00273 {
00274
00275 delete buffer;
00276
00277
00278
00279
00280 }
00281
00282
00283
00284
00285 }
00286
00287 void TGo4BufferQueue::Clear(Option_t* opt)
00288 {
00289 TObject* ob=0;
00290 while((ob=Next()) !=0)
00291 {
00292
00293 FreeBuffer(dynamic_cast<TBuffer*> (ob) );
00294 }
00295
00296 }
00297
00298
00299
00300
00301 void TGo4BufferQueue::Realloc(TBuffer* buffer, Int_t oldsize, Int_t newsize)
00302 {
00303 if(buffer==0) return;
00304 TGo4LockGuard mainguard;
00305 char* memfield=buffer->Buffer();
00306
00307
00308 Int_t extraspace=TGo4Socket::fgiBUFEXTRASPACE;
00309
00310
00311
00312
00313 memfield = TStorage::ReAllocChar(memfield,
00314 (newsize+extraspace),
00315 (oldsize+extraspace));
00316
00317 buffer->ResetBit(fgiISOWNER);
00318
00319
00320
00321 buffer->SetBuffer(memfield, newsize);
00322
00323 buffer->SetBit(fgiISOWNER);
00324
00325
00326
00327
00328
00329 buffer->SetBufferOffset(newsize);
00330 }
00331
00332 TBuffer* TGo4BufferQueue::NewEntry()
00333 {
00334 TGo4LockGuard mainguard;
00335 TBuffer* buf = new TGo4Buffer(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
00336
00337 TNamed* dummy= new TNamed("This is a default buffer filler","GO4 is fun!");
00338 TFile *filsav = gFile;
00339 gFile = 0;
00340 buf->WriteObject(dummy);
00341 gFile = filsav;
00342 delete dummy;
00343 return buf;
00344 }
00345
00346 TBuffer* TGo4BufferQueue::CreateValueBuffer(UInt_t val)
00347 {
00348 TGo4Buffer* buf= new TGo4Buffer(TBuffer::kWrite);
00349 char* field= buf->Buffer() + sizeof(UInt_t);
00350 tobuf(field ,val);
00351 buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) );
00352 buf->SetByteCount(0);
00353 return buf;
00354 }
00355
00356
00357 Int_t TGo4BufferQueue::DecodeValueBuffer(TBuffer* buf)
00358 {
00359 if(buf==0) return -1;
00360 UInt_t len= buf->Length();
00361 if(len==sizeof(UInt_t)+sizeof(UInt_t))
00362
00363 {
00364 char* field=buf->Buffer();
00365 char* temp= field + sizeof(UInt_t);
00366 UInt_t val=0;
00367 frombuf(temp, &val);
00368
00369 return (Int_t) val;
00370 }
00371 else
00372 {
00373 return -2;
00374 }
00375 }