00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4BufferQueue.h"
00015
00016 #include "TROOT.h"
00017 #include "TClass.h"
00018 #include "TMutex.h"
00019 #include "TFile.h"
00020 #include "TGo4Buffer.h"
00021 #include "RVersion.h"
00022 #include "Riostream.h"
00023
00024
00025 #include "TGo4Socket.h"
00026 #include "TGo4RuntimeException.h"
00027 #include "TGo4Log.h"
00028 #include "TGo4LockGuard.h"
00029
00030 #if ROOT_VERSION_CODE > ROOT_VERSION(4,3,2)
00031
00032 const Int_t TGo4BufferQueue::fgiISOWNER=TBuffer::kIsOwner;
00033 #else
00034 const Int_t TGo4BufferQueue::fgiISOWNER=BIT(14);
00035
00036 #endif
00037
00038
00039
00040 TGo4BufferQueue::TGo4BufferQueue() :
00041 TGo4Queue("Default buffer queue"),
00042 fxBufferList(0),
00043 fxFreeList(0),
00044 fxBufferMutex(0),
00045 fiOverflowcount(0),
00046 fiMaxBuffers(10)
00047 {
00048 GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
00049
00050 InitBuffers();
00051 }
00052
00053 TGo4BufferQueue::TGo4BufferQueue(const char* name) :
00054 TGo4Queue(name),
00055 fxBufferList(0),
00056 fxFreeList(0),
00057 fxBufferMutex(0),
00058 fiOverflowcount(0),
00059 fiMaxBuffers(10)
00060 {
00061 GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(const char*)", __LINE__, __FILE__));
00062
00063 InitBuffers();
00064 }
00065
00066 void TGo4BufferQueue::InitBuffers()
00067
00068 {
00069 GO4TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
00070 TGo4LockGuard mainguard;
00071 fxBufferList = new TList;
00072 fxFreeList = new TList;
00073 fxBufferMutex = new TMutex;
00074 for (Int_t i=0; i< fiMaxBuffers; ++i) {
00075 TBuffer* buf = NewEntry();
00076 fxBufferList->Add(buf);
00077 fxFreeList->Add(buf);
00078 }
00079 }
00080
00081 TGo4BufferQueue::~TGo4BufferQueue()
00082 {
00083 GO4TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
00084
00085 fxBufferList->Delete();
00086 TCollection::EmptyGarbageCollection();
00087
00088 delete fxFreeList; fxFreeList = 0;
00089 delete fxBufferList; fxBufferList = 0;
00090 delete fxBufferMutex; fxBufferMutex = 0;
00091 }
00092
00093 TBuffer * TGo4BufferQueue::WaitBuffer()
00094 {
00095 GO4TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
00096 TObject* ob = Wait();
00097 return dynamic_cast<TBuffer*> ( ob );
00098 }
00099
00100 TObject * TGo4BufferQueue::WaitObjectFromBuffer()
00101 {
00102 GO4TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
00103 TObject* obj=0;
00104 TBuffer* buffer = WaitBuffer();
00105 if(buffer) {
00106 {
00107 TGo4LockGuard mainguard;
00108
00109 TDirectory* savdir=gDirectory;
00110 gROOT->cd();
00111 buffer->SetReadMode();
00112
00113 buffer->Reset();
00114 buffer->InitMap();
00115
00116
00117
00118 TClass* cl = buffer->ReadClass();
00119
00120 if(cl == (TClass*) -1)
00121 {
00122
00123 std::cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<std::endl;
00124 obj=0;
00125 } else {
00126
00127
00128
00129 buffer->Reset();
00130 obj = buffer->ReadObject(cl);
00131 }
00132 if(savdir) savdir->cd();
00133 }
00134 FreeBuffer(buffer);
00135 }
00136
00137 return obj;
00138 }
00139
00140 void TGo4BufferQueue::AddBuffer(TBuffer * buffer, Bool_t clone)
00141 {
00142 GO4TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00143
00144 TBuffer* entry=0;
00145 Bool_t entryisnew=kFALSE;
00146 if(clone)
00147 {
00148 TGo4LockGuard qguard(fxBufferMutex);
00149 entry= dynamic_cast<TBuffer*>(fxFreeList->Remove(fxFreeList->First()));
00150
00151 if(entry==0)
00152 {
00153
00154 TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
00155
00156
00157 entry=NewEntry();
00158 fxBufferList->Add(entry);
00159
00160 entryisnew=kTRUE;
00161 fiMaxBuffers++;
00162
00163 }
00164 else
00165 {
00166
00167 }
00168
00169 Int_t srcsize=buffer->BufferSize();
00170 Int_t destsize=entry->BufferSize();
00171 if (srcsize>destsize)
00172 {
00173 Realloc(entry,destsize,srcsize);
00174 destsize=srcsize;
00175 }
00176 else
00177 {
00178
00179 }
00180 char* source= buffer->Buffer();
00181 char* destination= entry->Buffer();
00182 memcpy(destination,source,destsize);
00183
00184 Int_t messlen = buffer->Length();
00185 entry->SetBufferOffset(messlen);
00186 entry->SetByteCount(0);
00187 }
00188 else
00189 {
00190 entry=buffer;
00191 }
00192
00193
00194 try
00195 {
00196 Add(entry);
00197 }
00198 catch(TGo4RuntimeException ex)
00199 {
00200 std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
00201 if(entryisnew)
00202 {
00203 TGo4LockGuard qguard(fxBufferMutex);
00204 fxBufferList->Remove(entry);
00205 delete entry;
00206 }
00207 }
00208 }
00209
00210 void TGo4BufferQueue::AddBufferFromObject(TObject * object)
00211 {
00212 GO4TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject*)", __LINE__, __FILE__));
00213 if(object==0) return;
00214 TGo4LockGuard mainguard;
00215 TBuffer* entry = 0;
00216 entry = new TGo4Buffer(TBuffer::kWrite);
00217
00218 TFile *filsav = gFile;
00219 gFile = 0;
00220 entry->WriteObject(object);
00221 gFile = filsav;
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00242
00243
00244
00245 try
00246 {
00247 Add(entry);
00248 }
00249 catch(TGo4RuntimeException& ex)
00250 {
00251 std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
00252 delete entry;
00253 }
00254
00255 }
00256
00257 void TGo4BufferQueue::FreeBuffer(TBuffer * buffer)
00258 {
00259 GO4TRACE((19,"TGo4BufferQueue::FreeBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00260
00261 TGo4LockGuard qguard(fxBufferMutex);
00262
00263
00264 if (fxBufferList->FindObject(buffer)!=0)
00265 {
00266
00267
00268
00269 Int_t memsize=buffer->BufferSize();
00270 if (memsize>TGo4Socket::fgiBUFINITSIZE)
00271 {
00272 Realloc(buffer,memsize, TGo4Socket::fgiBUFINITSIZE);
00273 }
00274 fxFreeList->AddLast(buffer);
00275
00276 }
00277 else
00278 {
00279
00280 delete buffer;
00281
00282
00283
00284
00285 }
00286
00287
00288
00289
00290 }
00291
00292
00293 void TGo4BufferQueue::Clear(Option_t* opt)
00294 {
00295 TObject* ob=0;
00296 while((ob=Next()) !=0) {
00297
00298 FreeBuffer(dynamic_cast<TBuffer*> (ob) );
00299 }
00300 }
00301
00302
00303 void TGo4BufferQueue::Realloc(TBuffer* buffer, Int_t oldsize, Int_t newsize)
00304 {
00305 if(buffer==0) return;
00306 TGo4LockGuard mainguard;
00307 char* memfield = buffer->Buffer();
00308
00309
00310 Int_t extraspace=TGo4Socket::fgiBUFEXTRASPACE;
00311
00312
00313
00314
00315 memfield = TStorage::ReAllocChar(memfield,
00316 (newsize+extraspace),
00317 (oldsize+extraspace));
00318
00319 buffer->ResetBit(fgiISOWNER);
00320
00321
00322
00323 buffer->SetBuffer(memfield, newsize);
00324
00325 buffer->SetBit(fgiISOWNER);
00326
00327
00328
00329
00330
00331 buffer->SetBufferOffset(newsize);
00332 }
00333
00334 TBuffer* TGo4BufferQueue::NewEntry()
00335 {
00336 TGo4LockGuard mainguard;
00337 TBuffer* buf = new TGo4Buffer(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
00338
00339 TNamed* dummy= new TNamed("This is a default buffer filler","GO4 is fun!");
00340 TFile *filsav = gFile;
00341 gFile = 0;
00342 buf->WriteObject(dummy);
00343 gFile = filsav;
00344 delete dummy;
00345 return buf;
00346 }
00347
00348 TBuffer* TGo4BufferQueue::CreateValueBuffer(UInt_t val)
00349 {
00350 TGo4Buffer* buf= new TGo4Buffer(TBuffer::kWrite);
00351 char* field= buf->Buffer() + sizeof(UInt_t);
00352 tobuf(field ,val);
00353 buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) );
00354 buf->SetByteCount(0);
00355 return buf;
00356 }
00357
00358
00359 Int_t TGo4BufferQueue::DecodeValueBuffer(TBuffer* buf)
00360 {
00361 if(buf==0) return -1;
00362 UInt_t len= buf->Length();
00363 if(len != (sizeof(UInt_t)+sizeof(UInt_t))) return -2;
00364
00365
00366 char* field=buf->Buffer();
00367 char* temp= field + sizeof(UInt_t);
00368 UInt_t val=0;
00369 frombuf(temp, &val);
00370
00371 return (Int_t) val;
00372 }