GSI Object Oriented Online Offline (Go4)  GO4-5.3.2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TGo4BufferQueue.cxx
Go to the documentation of this file.
1 // $Id: TGo4BufferQueue.cxx 2162 2018-10-10 14:16:24Z adamczew $
2 //-----------------------------------------------------------------------
3 // The GSI Online Offline Object Oriented (Go4) Project
4 // Experiment Data Processing at EE department, GSI
5 //-----------------------------------------------------------------------
6 // Copyright (C) 2000- GSI Helmholtzzentrum f�r Schwerionenforschung GmbH
7 // Planckstr. 1, 64291 Darmstadt, Germany
8 // Contact: http://go4.gsi.de
9 //-----------------------------------------------------------------------
10 // This software can be used under the license agreements as stated
11 // in Go4License.txt file which is part of the distribution.
12 //-----------------------------------------------------------------------
13 
14 #include "TGo4BufferQueue.h"
15 
16 #include "TROOT.h"
17 #include "TClass.h"
18 #include "TMutex.h"
19 #include "TFile.h"
20 #include "TGo4Buffer.h"
21 #include "RVersion.h"
22 #include "Riostream.h"
23 
24 
25 #include "TGo4Socket.h"
26 #include "TGo4RuntimeException.h"
27 #include "TGo4Log.h"
28 #include "TGo4LockGuard.h"
29 
30 #if ROOT_VERSION_CODE > ROOT_VERSION(4,3,2)
31  const Int_t TGo4BufferQueue::fgiISOWNER = TBuffer::kIsOwner;
32 #else
33  const Int_t TGo4BufferQueue::fgiISOWNER = BIT(14);
34 // we emulate the protected owner flag of the TBuffer class, needed for reallocation!
35 #endif
36 
37 
39  TGo4Queue("Default buffer queue"),
40  fxBufferList(0),
41  fxFreeList(0),
42  fxBufferMutex(0),
43  fiOverflowcount(0),
44  fiMaxBuffers(10)
45 {
46  GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
47 
48  InitBuffers();
49 }
50 
52  TGo4Queue(name),
53  fxBufferList(0),
54  fxFreeList(0),
55  fxBufferMutex(0),
56  fiOverflowcount(0),
57  fiMaxBuffers(10)
58 {
59  GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(const char*)", __LINE__, __FILE__));
60 
61  InitBuffers();
62 }
63 
65 
66 {
67  GO4TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
68  TGo4LockGuard mainguard;
69  fxBufferList = new TList; // list owning all buffers
70  fxFreeList = new TList; // list indicating the free buffers
71  fxFreeList->SetOwner(kFALSE); // JAM2018 - avoid root6 problems at shutdown???
72  fxBufferMutex = new TMutex;
73  for (Int_t i=0; i< fiMaxBuffers; ++i) {
74  TBuffer* buf = NewEntry();
75  fxBufferList->Add(buf);
76  fxFreeList->Add(buf);
77  }
78 }
79 
81 {
82  GO4TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
83 
84  //printf ("JAM*************** DTOR of TGo4BufferQueue %s BEGIN\n", GetName());
85  //fxBufferList->Delete();// JAM2018 - avoid root6 problems at shutdown???
86  TCollection::EmptyGarbageCollection();
87  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after EmptyGarbageCollection \n", GetName());
88 
89  delete fxFreeList; fxFreeList = 0;
90  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after delete fxFreeList \n", GetName());
91  delete fxBufferList; fxBufferList = 0;
92  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after delete fxBufferList \n", GetName());
93  delete fxBufferMutex; fxBufferMutex = 0;
94  //printf ("JAM*************** DTOR of TGo4BufferQueue %s END\n", GetName());
95 }
96 
98 {
99  GO4TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
100  TObject* ob = Wait();
101  return dynamic_cast<TBuffer*> ( ob );
102 }
103 
105 {
106  GO4TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
107  TObject* obj=0;
108  TBuffer* buffer = WaitBuffer();
109  if(buffer) {
110  {
111  //std::cout << "BBBBBBBBBBBBBBB TGo4BufferQueue::WaitObjectFromBuffer() before mainguard "<< std::endl;
112  TGo4LockGuard mainguard;
113  // lock go4 main mutex for streaming
114  TDirectory* savdir=gDirectory;
115  gROOT->cd(); // be sure to be in the top directory when creating histo
116  buffer->SetReadMode();
117  //std::cout << " Reading object from buffer..."<< std::endl;
118  buffer->Reset();
119  buffer->InitMap();
120  // note: root version 3.05/02 crashes again when unknown class
121  // shall be read; this was working in 3.03/09
122  // therefore, we put in our own check again from revision 1.23
123  TClass* cl = buffer->ReadClass();
124  //std::cout << "buffer queue "<< GetName() <<" :waitobject, got Class: " << cl << std::endl;
125  if(cl == (TClass*) -1)
126  {
127  // case of unknown class
128  std::cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<std::endl;
129  obj=0;
130  } else {
131  // if(cl)
132  // std::cout << "Classname: " << cl->GetName() << std::endl;
133  // std::cout << "Reading object from buffer..."<< std::endl;
134  buffer->Reset();
135  obj = buffer->ReadObject(cl);
136  }
137  if(savdir) savdir->cd();
138  } // TGo4LockGuard
139  FreeBuffer(buffer); // give internal buffer back to queue
140  } // if(buffer)
141 
142  return obj;
143 }
144 
145 void TGo4BufferQueue::AddBuffer(TBuffer * buffer, Bool_t clone)
146 {
147  GO4TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
148 
149  TBuffer* entry=0;
150  Bool_t entryisnew=kFALSE;
151  if(clone)
152  {
153  //std::cout <<"BBBBBBBBBBBBBBB TGo4BufferQueue "<< GetName()<< " before lockguard of buffer mutex "<<fxBufferMutex<<std::endl;
155  entry= dynamic_cast<TBuffer*>(fxFreeList->Remove(fxFreeList->First()));
156  // get next free buffer
157  if(entry==0)
158  {
159  // no buffer entry there, we create one
160  TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
161  //std::cout <<"Buffer Queue: creating new internal buffer... "<< GetName();
162 
163  entry=NewEntry();
164  //std::cout <<"BBBBBBBBBBBBBBB TGo4BufferQueue "<< GetName()<< " before Add to free list... "<<std::endl;
165 
166  fxBufferList->Add(entry); // add to list of existing buffers
167  // need not add new buffer to list of free buffers
168  entryisnew=kTRUE;
169  fiMaxBuffers++;
170  //std::cout <<"buffers:"<<fiMaxBuffers <<", "<< fiMaxBuffers*TGo4Socket::fgiBUFINITSIZE << std::endl;
171  }
172  else
173  {
174  // ok, we have a buffer to clone
175  }
176 
177  Int_t srcsize=buffer->BufferSize();
178  Int_t destsize=entry->BufferSize();
179  if (srcsize>destsize)
180  {
181  Realloc(entry,destsize,srcsize);
182  destsize=srcsize;
183  }
184  else
185  {
186  // buffer big enough, proceed
187  }
188  char* source= buffer->Buffer();
189  char* destination= entry->Buffer();
190  memcpy(destination,source,destsize);
191  //std::cout <<"))))))))))Buffer Queue: copied "<< destsize <<"bytes to buffer field" << std::endl;
192  Int_t messlen = buffer->Length(); // compatible with root TMessage protocol
193  entry->SetBufferOffset(messlen);
194  entry->SetByteCount(0);
195  } // if(clone)
196  else
197  {
198  entry=buffer;
199  }
200  //std::cout <<"Bufferqueue AB adding buffer "<< entry << std::endl;
201 
202  try
203  {
204  //std::cout <<"BBBBBBBBBBBBBBB TGo4BufferQueue "<< GetName()<< " before Add to queue... "<<std::endl;
205  Add(entry);
206  //std::cout <<"BBBBBBBBBBBBBBB after Add to queue. "<<std::endl;
207  }
208  catch(TGo4RuntimeException ex)
209  {
210  std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
211  if(entryisnew)
212  {
214  fxBufferList->Remove(entry); // remove new entry again from pool
215  delete entry;
216  }
217  }
218 }
219 
221 {
222  GO4TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject*)", __LINE__, __FILE__));
223  if(object==0) return;
224  TGo4LockGuard mainguard;
225  TBuffer* entry = 0;
226  entry = new TGo4Buffer(TBuffer::kWrite);
227  //std::cout <<" Buffer queue ABFO created buffer "<<entry << std::endl;
228  TFile *filsav = gFile;
229  gFile = 0;
230  entry->WriteObject(object);
231  gFile = filsav;
233 // std::cout << "wrote object "<< object->GetName() <<" into message" << std::endl;
234 //
235 // entry->SetReadMode();
236 // entry->Reset();
237 //
238 // entry->InitMap();
239 // TClass* cl= entry->ReadClass();
240 // entry->Reset();
241 // std::cout << "buffer contains class "<< cl << std::endl;
242 // if(cl)
243 // {
244 // std::cout << "classname "<< cl->GetName() << std::endl;
245 //
246 // TObject* ob=entry->ReadObject(cl);
247 // std::cout << "read object "<< ob << std::endl;
248 // if(ob)
249 // std::cout << "read object "<< ob->GetName() << std::endl;
250 // }
252 
253 // do not reset, may need object length information later!
254 // entry->Reset();
255  try
256  {
257  Add(entry);
258  }
259  catch(TGo4RuntimeException& ex)
260  {
261  std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
262  delete entry;
263  }
264 
265  }
266 
267 void TGo4BufferQueue::FreeBuffer(TBuffer * buffer)
268 {
269  GO4TRACE((19,"TGo4BufferQueue::FreeBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
270  //std::cout << "BBBBBBBBBBBBBBB TGo4BufferQueue::FreeBuffer before taking buffer mutex "<< fxBufferMutex<< std::endl;
272  //std::cout << " bufferlock acquired by bufferqueue: freebuffer"<< std::endl;
273  // does buffer belong to our internal buffers?
274  if (fxBufferList->FindObject(buffer)!=0)
275  {
276  // yes, put it back into the free list
277  // new: we allocate the buffersize back to the initial size to
278  // avoid extended memory consumption:
279  Int_t memsize=buffer->BufferSize();
280  if (memsize>TGo4Socket::fgiBUFINITSIZE)
281  {
282  Realloc(buffer,memsize, TGo4Socket::fgiBUFINITSIZE);
283  }
284  fxFreeList->AddLast(buffer);
285  //std::cout <<"freed buffer"<< buffer <<" of bufferqueue "<< GetName() << std::endl;
286  }
287  else
288  {
289  // no, we delete it to avoid leakage
290  delete buffer;
291  //std::cout <<" Buffer queue FB deleted buffer "<<buffer << std::endl;
292  //std::cout <<"deleted external buffer of bufferqueue "<< GetName() << std::endl;
293  // TGo4Log::Debug(" Buffer Queue : deleted external buffer !!! ");
294 
295  }
296 
297 
298 
299 
300 }
301 
302 
303 void TGo4BufferQueue::Clear(Option_t* opt)
304 {
305  TObject* ob=0;
306  while((ob=Next()) !=0) {
307  //std::cout <<"cleared entry "<<ob<<" of queue "<<GetName() << std::endl;
308  FreeBuffer(dynamic_cast<TBuffer*> (ob) );
309  }
310 }
311 
312 
313 void TGo4BufferQueue::Realloc(TBuffer* buffer, Int_t oldsize, Int_t newsize)
314 {
315  if(buffer==0) return;
316  //std::cout << "TGo4Bufferqueue "<<GetName()<< " Realloc before mainguard"<< std::endl;
317  TGo4LockGuard mainguard;
318  char* memfield = buffer->Buffer();
319  //buffer->Expand(newsize); // is protected! we make it by hand...
320  //Int_t current = buffer->Length(); // cursor position
321  Int_t extraspace=TGo4Socket::fgiBUFEXTRASPACE; // =8, constant within TBuffer
322 // memfield = (char *) TStorage::ReAlloc(memfield,
323 // (newsize + extraspace) * sizeof(char),
324 // (oldsize+ extraspace) * sizeof(char));
325  // this works only for ROOT versions > 3.02/04
326  memfield = TStorage::ReAllocChar(memfield,
327  (newsize+extraspace),
328  (oldsize+extraspace));
329  //std::cout << "Bufferqueue reallocating char from"<<oldsize<< " to " << newsize<< std::endl;
330  buffer->ResetBit(fgiISOWNER);
331 //#if ROOT_VERSION_CODE > ROOT_VERSION(5,23,2)
332 // buffer->SetBuffer(memfield, newsize + extraspace);
333 //#else
334  buffer->SetBuffer(memfield, newsize);
335 //#endif
336  buffer->SetBit(fgiISOWNER);
337  // <- here we avoid the ownership of TBuffer for the internal buffer
338  // (new feature of ROOT versions > 3.02/04)
339  // problem: SetBuffer will delete previous buffer in adopt mode (isowner=true)
340  // which might be at the same location as the new buffer after ReAlloc
341  // ergo SetBuffer would set a buffer which it deleted before itself!
342  buffer->SetBufferOffset(newsize);
343 }
344 
346 {
347  //std::cout <<"nnnnnnnn BufferQueue "<<GetName()<<" new entry before mainguard"<< std::endl;
348  TGo4LockGuard mainguard;
349  TBuffer* buf = new TGo4Buffer(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
350  //std::cout <<"nnnnnnnn BufferQueue "<<GetName()<<" made new entry "<<buf << std::endl;
351  TNamed* dummy= new TNamed("This is a default buffer filler","GO4 is fun!");
352  TFile *filsav = gFile;
353  gFile = 0;
354  buf->WriteObject(dummy);
355  gFile = filsav;
356  delete dummy;
357  return buf;
358 }
359 
361 {
362  TGo4Buffer* buf= new TGo4Buffer(TBuffer::kWrite);
363  char* field= buf->Buffer() + sizeof(UInt_t);
364  tobuf(field ,val);
365  buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) ); // set length for receiver check
366  buf->SetByteCount(0); // correctly set first longword
367  return buf;
368 }
369 
370 
372 {
373  if(buf==0) return -1;
374  UInt_t len= buf->Length();
375  if(len != (sizeof(UInt_t)+sizeof(UInt_t))) return -2;
376  // note: first length is length of encoded type
377  // second length is always UInt_t
378  char* field=buf->Buffer();
379  char* temp= field + sizeof(UInt_t); // skip length header
380  UInt_t val=0;
381  frombuf(temp, &val);
382  //std::cout <<"DDDDDD DecodeValueBuffer val="<<val << std::endl;
383  return (Int_t) val;
384 }
void Add(TObject *ob)
Definition: TGo4Queue.cxx:70
TMutex * fxBufferMutex
TObject * WaitObjectFromBuffer()
virtual void Clear(Option_t *opt="")
TBuffer * WaitBuffer()
void Realloc(TBuffer *buffer, Int_t oldsize, Int_t newsize)
static const Int_t fgiBUFINITSIZE
Definition: TGo4Socket.h:70
TBuffer * NewEntry()
void AddBuffer(TBuffer *buffer, Bool_t clone=kFALSE)
static Int_t DecodeValueBuffer(TBuffer *buf)
#define TGo4Buffer
Definition: TGo4Buffer.h:27
#define GO4TRACE(X)
Definition: TGo4Log.h:26
TObject * Wait()
Definition: TGo4Queue.cxx:51
static const Int_t fgiISOWNER
static TBuffer * CreateValueBuffer(UInt_t val)
static const Int_t fgiBUFEXTRASPACE
Definition: TGo4Socket.h:73
void FreeBuffer(TBuffer *buffer)
void AddBufferFromObject(TObject *object)
virtual ~TGo4BufferQueue()
static void Debug(const char *text,...)
Definition: TGo4Log.cxx:270
TObject * Next()
Definition: TGo4Queue.cxx:64