00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4BufferQueue.h"
00017
00018 #include <iostream.h>
00019
00020 #include "TROOT.h"
00021 #include "TMutex.h"
00022 #include "TFile.h"
00023 #include "TMessage.h"
00024 #include "TBuffer.h"
00025
00026 #include "Go4Socket/TGo4Socket.h"
00027 #include "Go4Exceptions/TGo4RuntimeException.h"
00028 #include "Go4Log/TGo4Log.h"
00029 #include "Go4LockGuard/TGo4LockGuard.h"
00030
00031 #if __GO4ROOTVERSION__ > 40302
00032 const Int_t TGo4BufferQueue::fgiISOWNER=TBuffer::kIsOwner;
00033 #else
00034 const Int_t TGo4BufferQueue::fgiISOWNER=BIT(14);
00035
00036 #endif
00037
00038
00039
00040 TGo4BufferQueue::TGo4BufferQueue()
00041 : TGo4Queue("Default buffer queue"),fiOverflowcount(0), fiMaxBuffers(10)
00042 {
00043 TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
00044
00045 InitBuffers();
00046 }
00047
00048 TGo4BufferQueue::TGo4BufferQueue(const char* name)
00049 : TGo4Queue(name), fiOverflowcount(0), fiMaxBuffers(10)
00050
00051 {
00052 TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(Text_t*)", __LINE__, __FILE__));
00053 InitBuffers();
00054 }
00055
00056 void TGo4BufferQueue::InitBuffers()
00057
00058 {
00059 TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
00060 TGo4LockGuard mainguard;
00061 fxBufferList=new TList;
00062 fxFreeList=new TList;
00063 fxBufferIterator=fxFreeList->MakeIterator();
00064 fxBufferMutex=new TMutex;
00065 for (Int_t i=0; i< fiMaxBuffers; ++i)
00066 {
00067 TBuffer* buf=NewEntry();
00068 fxBufferList->Add(buf);
00069 fxFreeList->Add(buf);
00070 }
00071 }
00072
00073 TGo4BufferQueue::~TGo4BufferQueue()
00074 {
00075 TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
00076
00077 delete fxBufferIterator;
00078 fxBufferList->Delete();
00079 TCollection::EmptyGarbageCollection();
00080 delete fxBufferList;
00081 delete fxBufferMutex;
00082 }
00083
00084 TBuffer * TGo4BufferQueue::WaitBuffer()
00085 {
00086 TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
00087 TObject* ob = Wait();
00088 TBuffer* rev= dynamic_cast<TBuffer*> ( ob );
00089 return rev;
00090 }
00091
00092
00093 TObject * TGo4BufferQueue::WaitObjectFromBuffer()
00094 {
00095 TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
00096 TObject* obj=0;
00097 TBuffer* buffer=WaitBuffer();
00098 if(buffer)
00099 {
00100 {
00101 TGo4LockGuard mainguard;
00102
00103 TDirectory* savdir=gDirectory;
00104 gROOT->cd();
00105 buffer->SetReadMode();
00106
00107 buffer->Reset();
00108 buffer->InitMap();
00109
00110
00111
00112 TClass* cl = buffer->ReadClass();
00113
00114 if(cl == (TClass*) -1)
00115 {
00116
00117 cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<endl;
00118 obj=0;
00119 }
00120 else
00121 {
00122
00123
00124
00125 buffer->Reset();
00126 obj=buffer->ReadObject(cl);
00127 }
00128 if(savdir) savdir->cd();
00129 }
00130 FreeBuffer(buffer);
00131 }
00132 else
00133 {
00134
00135 }
00136 return obj;
00137 }
00138
00139 void TGo4BufferQueue::AddBuffer(TBuffer * buffer, Bool_t clone)
00140 {
00141 TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00142
00143 TBuffer* entry=0;
00144 Bool_t entryisnew=kFALSE;
00145 if(clone)
00146 {
00147 TGo4LockGuard qguard(fxBufferMutex);
00148 entry= dynamic_cast<TBuffer*>(fxFreeList->Remove(fxFreeList->First()));
00149
00150 if(entry==0)
00151 {
00152
00153 TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
00154
00155
00156 entry=NewEntry();
00157 fxBufferList->Add(entry);
00158
00159 entryisnew=kTRUE;
00160 fiMaxBuffers++;
00161
00162 }
00163 else
00164 {
00165
00166 }
00167
00168 Int_t srcsize=buffer->BufferSize();
00169 Int_t destsize=entry->BufferSize();
00170 if (srcsize>destsize)
00171 {
00172 Realloc(entry,destsize,srcsize);
00173 destsize=srcsize;
00174 }
00175 else
00176 {
00177
00178 }
00179 char* source= buffer->Buffer();
00180 char* destination= entry->Buffer();
00181 memcpy(destination,source,destsize);
00182
00183 Int_t messlen = buffer->Length();
00184 entry->SetBufferOffset(messlen);
00185 entry->SetByteCount(0);
00186 }
00187 else
00188 {
00189 entry=buffer;
00190 }
00191
00192
00193 try
00194 {
00195 Add(entry);
00196 }
00197 catch(TGo4RuntimeException ex)
00198 {
00199 cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00200 if(entryisnew)
00201 {
00202 TGo4LockGuard qguard(fxBufferMutex);
00203 fxBufferList->Remove(entry);
00204 delete entry;
00205 }
00206 }
00207 }
00208
00209 void TGo4BufferQueue::AddBufferFromObject(TObject * object)
00210 {
00211 TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject*)", __LINE__, __FILE__));
00212 if(object==0) return;
00213 TGo4LockGuard mainguard;
00214 TBuffer* entry = 0;
00215 entry = new TBuffer(TBuffer::kWrite);
00216
00217 TFile *filsav = gFile;
00218 gFile = 0;
00219 entry->WriteObject(object);
00220 gFile = filsav;
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00241
00242
00243
00244 try
00245 {
00246 Add(entry);
00247 }
00248 catch(TGo4RuntimeException& ex)
00249 {
00250 cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00251 delete entry;
00252 }
00253
00254 }
00255
00256 void TGo4BufferQueue::FreeBuffer(TBuffer * buffer)
00257 {
00258 TRACE((19,"TGo4BufferQueue::FreeBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00259
00260 TGo4LockGuard qguard(fxBufferMutex);
00261
00262
00263 if (fxBufferList->FindObject(buffer)!=0)
00264 {
00265
00266
00267
00268 Int_t memsize=buffer->BufferSize();
00269 if (memsize>TGo4Socket::fgiBUFINITSIZE)
00270 {
00271 Realloc(buffer,memsize, TGo4Socket::fgiBUFINITSIZE);
00272 }
00273 fxFreeList->AddLast(buffer);
00274
00275 }
00276 else
00277 {
00278
00279 delete buffer;
00280
00281
00282
00283
00284 }
00285
00286
00287
00288
00289 }
00290
00291 void TGo4BufferQueue::Clear(Option_t* opt)
00292 {
00293
00294 TObject* ob=0;
00295 while((ob=Next()) !=0)
00296 {
00297
00298 FreeBuffer(dynamic_cast<TBuffer*> (ob) );
00299 }
00300
00301 }
00302
00303
00304
00305
00306 void TGo4BufferQueue::Realloc(TBuffer* buffer, Int_t oldsize, Int_t newsize)
00307 {
00308 if(buffer==0) return;
00309 TGo4LockGuard mainguard;
00310 char* memfield=buffer->Buffer();
00311
00312
00313 Int_t extraspace=TGo4Socket::fgiBUFEXTRASPACE;
00314
00315
00316
00317
00318 memfield = TStorage::ReAllocChar(memfield,
00319 (newsize+extraspace),
00320 (oldsize+extraspace));
00321
00322 buffer->ResetBit(fgiISOWNER);
00323 buffer->SetBuffer(memfield, newsize);
00324 buffer->SetBit(fgiISOWNER);
00325
00326
00327
00328
00329
00330 buffer->SetBufferOffset(newsize);
00331 }
00332
00333 TBuffer* TGo4BufferQueue::NewEntry()
00334 {
00335 TGo4LockGuard mainguard;
00336 TBuffer* buf = new TBuffer(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
00337
00338 TNamed* dummy= new TNamed("This is a default buffer filler","GO4 is fun!");
00339 TFile *filsav = gFile;
00340 gFile = 0;
00341 buf->WriteObject(dummy);
00342 gFile = filsav;
00343 delete dummy;
00344 return buf;
00345 }
00346
00347 TBuffer* TGo4BufferQueue::CreateValueBuffer(UInt_t val)
00348 {
00349 TBuffer* buf= new TBuffer(TBuffer::kWrite);
00350 char* field= buf->Buffer() + sizeof(UInt_t);
00351 tobuf(field ,val);
00352 buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) );
00353 buf->SetByteCount(0);
00354 return buf;
00355 }
00356
00357
00358 Int_t TGo4BufferQueue::DecodeValueBuffer(TBuffer* buf)
00359 {
00360 if(buf==0) return -1;
00361 UInt_t len= buf->Length();
00362 if(len==sizeof(UInt_t)+sizeof(UInt_t))
00363
00364 {
00365 char* field=buf->Buffer();
00366 char* temp= field + sizeof(UInt_t);
00367 UInt_t val=0;
00368 frombuf(temp, &val);
00369
00370 return (Int_t) val;
00371 }
00372 else
00373 {
00374 return -2;
00375 }
00376 }
00377
00378