GSI Object Oriented Online Offline (Go4)  GO4-6.1.4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TGo4BufferQueue.cxx
Go to the documentation of this file.
1 // $Id: TGo4BufferQueue.cxx 3294 2021-07-23 13:05:31Z adamczew $
2 //-----------------------------------------------------------------------
3 // The GSI Online Offline Object Oriented (Go4) Project
4 // Experiment Data Processing at EE department, GSI
5 //-----------------------------------------------------------------------
6 // Copyright (C) 2000- GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
7 // Planckstr. 1, 64291 Darmstadt, Germany
8 // Contact: http://go4.gsi.de
9 //-----------------------------------------------------------------------
10 // This software can be used under the license agreements as stated
11 // in Go4License.txt file which is part of the distribution.
12 //-----------------------------------------------------------------------
13 
14 #include "TGo4BufferQueue.h"
15 
16 #include <iostream>
17 
18 #include "TROOT.h"
19 #include "TMutex.h"
20 #include "TFile.h"
21 #include "TGo4Buffer.h"
22 
23 #include "TGo4Socket.h"
24 #include "TGo4RuntimeException.h"
25 #include "TGo4Log.h"
26 #include "TGo4LockGuard.h"
27 
29  TGo4Queue("Default buffer queue"),
30  fxBufferList(0),
31  fxFreeList(0),
32  fxBufferMutex(0),
33  fiMaxBuffers(10)
34 {
35  GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
36 
37  InitBuffers();
38 }
39 
41  TGo4Queue(name),
42  fxBufferList(0),
43  fxFreeList(0),
44  fxBufferMutex(0),
45  fiMaxBuffers(10)
46 {
47  GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(const char*)", __LINE__, __FILE__));
48 
49  InitBuffers();
50 }
51 
53 
54 {
55  GO4TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
56  TGo4LockGuard mainguard;
57  fxBufferList = new TList; // list owning all buffers
58  fxFreeList = new TList; // list indicating the free buffers
59  fxFreeList->SetOwner(kFALSE); // JAM2018 - avoid root6 problems at shutdown???
60  fxBufferMutex = new TMutex;
61  for (Int_t i=0; i< fiMaxBuffers; ++i) {
62  TBuffer* buf = NewEntry();
63  fxBufferList->Add(buf);
64  fxFreeList->Add(buf);
65  }
66 }
67 
69 {
70  GO4TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
71 
72  //printf ("JAM*************** DTOR of TGo4BufferQueue %s BEGIN\n", GetName());
73  //fxBufferList->Delete();// JAM2018 - avoid root6 problems at shutdown???
74  TCollection::EmptyGarbageCollection();
75  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after EmptyGarbageCollection \n", GetName());
76 
77  delete fxFreeList; fxFreeList = 0;
78  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after delete fxFreeList \n", GetName());
79  delete fxBufferList; fxBufferList = 0;
80  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after delete fxBufferList \n", GetName());
81  delete fxBufferMutex; fxBufferMutex = 0;
82  //printf ("JAM*************** DTOR of TGo4BufferQueue %s END\n", GetName());
83 }
84 
86 {
87  GO4TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
88  TObject* ob = Wait();
89  return dynamic_cast<TBuffer*> ( ob );
90 }
91 
93 {
94  GO4TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
95  TObject* obj=0;
96  TBuffer* buffer = WaitBuffer();
97  if(buffer) {
98  {
99  //std::cout << "BBBBBBBBBBBBBBB TGo4BufferQueue::WaitObjectFromBuffer() before mainguard "<< std::endl;
100  TGo4LockGuard mainguard;
101  // lock go4 main mutex for streaming
102  TDirectory* savdir=gDirectory;
103  gROOT->cd(); // be sure to be in the top directory when creating histo
104  buffer->SetReadMode();
105  //std::cout << " Reading object from buffer..."<< std::endl;
106  buffer->Reset();
107  buffer->InitMap();
108  // note: root version 3.05/02 crashes again when unknown class
109  // shall be read; this was working in 3.03/09
110  // therefore, we put in our own check again from revision 1.23
111  TClass* cl = buffer->ReadClass();
112  //std::cout << "buffer queue "<< GetName() <<" :waitobject, got Class: " << cl << std::endl;
113  if(cl == (TClass*) -1)
114  {
115  // case of unknown class
116  std::cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<std::endl;
117  obj=0;
118  } else {
119  // if(cl)
120  // std::cout << "Classname: " << cl->GetName() << std::endl;
121  // std::cout << "Reading object from buffer..."<< std::endl;
122  buffer->Reset();
123  obj = buffer->ReadObject(cl);
124  }
125  if(savdir) savdir->cd();
126  } // TGo4LockGuard
127  FreeBuffer(buffer); // give internal buffer back to queue
128  } // if(buffer)
129 
130  return obj;
131 }
132 
133 void TGo4BufferQueue::AddBuffer(TBuffer * buffer, Bool_t clone)
134 {
135  GO4TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
136 
137  TBuffer* entry=0;
138  Bool_t entryisnew=kFALSE;
139  if(clone)
140  {
141  //std::cout <<"BBBBBBBBBBBBBBB TGo4BufferQueue "<< GetName()<< " before lockguard of buffer mutex "<<fxBufferMutex<<std::endl;
143  entry= dynamic_cast<TBuffer*>(fxFreeList->Remove(fxFreeList->First()));
144  // get next free buffer
145  if(entry==0)
146  {
147  // no buffer entry there, we create one
148  TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
149  //std::cout <<"Buffer Queue: creating new internal buffer... "<< GetName();
150 
151  entry=NewEntry();
152  //std::cout <<"BBBBBBBBBBBBBBB TGo4BufferQueue "<< GetName()<< " before Add to free list... "<<std::endl;
153 
154  fxBufferList->Add(entry); // add to list of existing buffers
155  // need not add new buffer to list of free buffers
156  entryisnew=kTRUE;
157  fiMaxBuffers++;
158  //std::cout <<"buffers:"<<fiMaxBuffers <<", "<< fiMaxBuffers*TGo4Socket::fgiBUFINITSIZE << std::endl;
159  }
160  else
161  {
162  // ok, we have a buffer to clone
163  }
164 
165  Int_t srcsize=buffer->BufferSize();
166  Int_t destsize=entry->BufferSize();
167  if (srcsize>destsize)
168  {
169  Realloc(entry,destsize,srcsize);
170  destsize=srcsize;
171  }
172  else
173  {
174  // buffer big enough, proceed
175  }
176  char* source= buffer->Buffer();
177  char* destination= entry->Buffer();
178  memcpy(destination,source,destsize);
179  //std::cout <<"))))))))))Buffer Queue: copied "<< destsize <<"bytes to buffer field" << std::endl;
180  Int_t messlen = buffer->Length(); // compatible with root TMessage protocol
181  entry->SetBufferOffset(messlen);
182  entry->SetByteCount(0);
183  } // if(clone)
184  else
185  {
186  entry=buffer;
187  }
188  //std::cout <<"Bufferqueue AB adding buffer "<< entry << std::endl;
189 
190  try
191  {
192  //std::cout <<"BBBBBBBBBBBBBBB TGo4BufferQueue "<< GetName()<< " before Add to queue... "<<std::endl;
193  Add(entry);
194  //std::cout <<"BBBBBBBBBBBBBBB after Add to queue. "<<std::endl;
195  }
196  catch(TGo4RuntimeException &)
197  {
198  std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
199  if(entryisnew)
200  {
202  fxBufferList->Remove(entry); // remove new entry again from pool
203  delete entry;
204  }
205  }
206 }
207 
209 {
210  GO4TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject*)", __LINE__, __FILE__));
211  if(object==0) return;
212  TGo4LockGuard mainguard;
213  TBuffer* entry = 0;
214  entry = new TGo4Buffer(TBuffer::kWrite);
215  //std::cout <<" Buffer queue ABFO created buffer "<<entry << std::endl;
216  TFile *filsav = gFile;
217  gFile = 0;
218  entry->WriteObject(object);
219  gFile = filsav;
221 // std::cout << "wrote object "<< object->GetName() <<" into message" << std::endl;
222 //
223 // entry->SetReadMode();
224 // entry->Reset();
225 //
226 // entry->InitMap();
227 // TClass* cl= entry->ReadClass();
228 // entry->Reset();
229 // std::cout << "buffer contains class "<< cl << std::endl;
230 // if(cl)
231 // {
232 // std::cout << "classname "<< cl->GetName() << std::endl;
233 //
234 // TObject* ob=entry->ReadObject(cl);
235 // std::cout << "read object "<< ob << std::endl;
236 // if(ob)
237 // std::cout << "read object "<< ob->GetName() << std::endl;
238 // }
240 
241 // do not reset, may need object length information later!
242 // entry->Reset();
243  try
244  {
245  Add(entry);
246  }
247  catch(TGo4RuntimeException &)
248  {
249  std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
250  delete entry;
251  }
252 }
253 
254 void TGo4BufferQueue::FreeBuffer(TBuffer *buffer)
255 {
256  GO4TRACE((19, "TGo4BufferQueue::FreeBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
258  // does buffer belong to our internal buffers?
259  if (fxBufferList->FindObject(buffer) != 0) {
260  // yes, put it back into the free list
261  // new: we allocate the buffersize back to the initial size to
262  // avoid extended memory consumption:
263  Int_t memsize = buffer->BufferSize();
264  if (memsize > TGo4Socket::fgiBUFINITSIZE) {
265  Realloc(buffer, memsize, TGo4Socket::fgiBUFINITSIZE);
266  }
267  fxFreeList->AddLast(buffer);
268  } else {
269  // no, we delete it to avoid leakage
270  delete buffer;
271  }
272 }
273 
274 void TGo4BufferQueue::Clear(Option_t* opt)
275 {
276  TObject* ob=0;
277  while((ob=Next()) !=0) {
278  //std::cout <<"cleared entry "<<ob<<" of queue "<<GetName() << std::endl;
279  FreeBuffer(dynamic_cast<TBuffer*> (ob) );
280  }
281 }
282 
283 
284 void TGo4BufferQueue::Realloc(TBuffer* buffer, Int_t oldsize, Int_t newsize)
285 {
286  if(buffer==0) return;
287  //std::cout << "TGo4Bufferqueue "<<GetName()<< " Realloc with Expand before mainguard"<< std::endl;
288  TGo4LockGuard mainguard;
289 #if ROOT_VERSION_CODE > ROOT_VERSION(6,0,0)
290 
291  buffer->Expand(newsize); // JAM2021- always use framework method to avoid recent check byte count problems
292 #else
293  // in history this was protected! we have implemented it by hand...
294  //Int_t current = buffer->Length(); // cursor position
295  char* memfield = buffer->Buffer();
296  Int_t extraspace=TGo4Socket::fgiBUFEXTRASPACE; // =8, constant within TBuffer
297 // memfield = (char *) TStorage::ReAlloc(memfield,
298 // (newsize + extraspace) * sizeof(char),
299 // (oldsize+ extraspace) * sizeof(char));
300  // this works only for ROOT versions > 3.02/04
301  memfield = TStorage::ReAllocChar(memfield,
302  (newsize+extraspace),
303  (oldsize+extraspace));
304  //std::cout << "Bufferqueue reallocating char from"<<oldsize<< " to " << newsize<< std::endl;
305  buffer->ResetBit(TBuffer::kIsOwner);
306 
307  buffer->SetBuffer(memfield, newsize);
308 
309  buffer->SetBit(TBuffer::kIsOwner);
310  // <- here we avoid the ownership of TBuffer for the internal buffer
311  // (new feature of ROOT versions > 3.02/04)
312  // problem: SetBuffer will delete previous buffer in adopt mode (isowner=true)
313  // which might be at the same location as the new buffer after ReAlloc
314  // ergo SetBuffer would set a buffer which it deleted before itself!
315  buffer->SetBufferOffset(newsize);
316 #endif
317 }
318 
320 {
321  //std::cout <<"nnnnnnnn BufferQueue "<<GetName()<<" new entry before mainguard"<< std::endl;
322  TGo4LockGuard mainguard;
323  TBuffer* buf = new TGo4Buffer(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
324  //std::cout <<"nnnnnnnn BufferQueue "<<GetName()<<" made new entry "<<buf << std::endl;
325  TNamed* dummy= new TNamed("This is a default buffer filler","GO4 is fun!");
326  TFile *filsav = gFile;
327  gFile = 0;
328  buf->WriteObject(dummy);
329  gFile = filsav;
330  delete dummy;
331  return buf;
332 }
333 
335 {
336  TGo4Buffer* buf= new TGo4Buffer(TBuffer::kWrite);
337  char* field= buf->Buffer() + sizeof(UInt_t);
338  tobuf(field ,val);
339  buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) ); // set length for receiver check
340  buf->SetByteCount(0); // correctly set first longword
341  return buf;
342 }
343 
344 
346 {
347  if(buf==0) return -1;
348  UInt_t len= buf->Length();
349  if(len != (sizeof(UInt_t)+sizeof(UInt_t))) return -2;
350  // note: first length is length of encoded type
351  // second length is always UInt_t
352  char* field=buf->Buffer();
353  char* temp= field + sizeof(UInt_t); // skip length header
354  UInt_t val=0;
355  frombuf(temp, &val);
356  //std::cout <<"DDDDDD DecodeValueBuffer val="<<val << std::endl;
357  return (Int_t) val;
358 }
void Add(TObject *ob)
Definition: TGo4Queue.cxx:69
TMutex * fxBufferMutex
TObject * WaitObjectFromBuffer()
virtual void Clear(Option_t *opt="")
TBuffer * WaitBuffer()
void Realloc(TBuffer *buffer, Int_t oldsize, Int_t newsize)
static const Int_t fgiBUFINITSIZE
Definition: TGo4Socket.h:70
TBuffer * NewEntry()
void AddBuffer(TBuffer *buffer, Bool_t clone=kFALSE)
static Int_t DecodeValueBuffer(TBuffer *buf)
#define TGo4Buffer
Definition: TGo4Buffer.h:27
#define GO4TRACE(X)
Definition: TGo4Log.h:26
TObject * Wait()
Definition: TGo4Queue.cxx:50
static TBuffer * CreateValueBuffer(UInt_t val)
static const Int_t fgiBUFEXTRASPACE
Definition: TGo4Socket.h:73
void FreeBuffer(TBuffer *buffer)
void AddBufferFromObject(TObject *object)
virtual ~TGo4BufferQueue()
static void Debug(const char *text,...)
Definition: TGo4Log.cxx:274
TObject * Next()
Definition: TGo4Queue.cxx:63