GSI Object Oriented Online Offline (Go4)  GO4-6.3.0
TGo4BufferQueue.cxx
Go to the documentation of this file.
1 // $Id$
2 //-----------------------------------------------------------------------
3 // The GSI Online Offline Object Oriented (Go4) Project
4 // Experiment Data Processing at EE department, GSI
5 //-----------------------------------------------------------------------
6 // Copyright (C) 2000- GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
7 // Planckstr. 1, 64291 Darmstadt, Germany
8 // Contact: http://go4.gsi.de
9 //-----------------------------------------------------------------------
10 // This software can be used under the license agreements as stated
11 // in Go4License.txt file which is part of the distribution.
12 //-----------------------------------------------------------------------
13 
14 #include "TGo4BufferQueue.h"
15 
16 #include <iostream>
17 
18 #include "TROOT.h"
19 #include "TMutex.h"
20 #include "TFile.h"
21 #include "TBufferFile.h"
22 
23 #include "TGo4Socket.h"
24 #include "TGo4RuntimeException.h"
25 #include "TGo4Log.h"
26 #include "TGo4LockGuard.h"
27 
29  TGo4Queue("Default buffer queue"),
30  fxBufferList(nullptr),
31  fxFreeList(nullptr),
32  fxBufferMutex(nullptr),
33  fiMaxBuffers(10)
34 {
35  GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
36 
37  InitBuffers();
38 }
39 
41  TGo4Queue(name),
42  fxBufferList(nullptr),
43  fxFreeList(nullptr),
44  fxBufferMutex(nullptr),
45  fiMaxBuffers(10)
46 {
47  GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(const char *)", __LINE__, __FILE__));
48 
49  InitBuffers();
50 }
51 
53 
54 {
55  GO4TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
56  TGo4LockGuard mainguard;
57  fxBufferList = new TList; // list owning all buffers
58  fxFreeList = new TList; // list indicating the free buffers
59  fxFreeList->SetOwner(kFALSE); // JAM2018 - avoid root6 problems at shutdown???
60  fxBufferMutex = new TMutex;
61  for (Int_t i = 0; i < fiMaxBuffers; ++i) {
62  TBuffer *buf = NewEntry();
63  fxBufferList->Add(buf);
64  fxFreeList->Add(buf);
65  }
66 }
67 
69 {
70  GO4TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
71 
72  //printf ("JAM*************** DTOR of TGo4BufferQueue %s BEGIN\n", GetName());
73  //fxBufferList->Delete();// JAM2018 - avoid root6 problems at shutdown???
74  TCollection::EmptyGarbageCollection();
75  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after EmptyGarbageCollection \n", GetName());
76 
77  delete fxFreeList; fxFreeList = nullptr;
78  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after delete fxFreeList \n", GetName());
79  delete fxBufferList; fxBufferList = nullptr;
80  //printf ("JAM*************** DTOR of TGo4BufferQueue %s after delete fxBufferList \n", GetName());
81  delete fxBufferMutex; fxBufferMutex = nullptr;
82  //printf ("JAM*************** DTOR of TGo4BufferQueue %s END\n", GetName());
83 }
84 
86 {
87  GO4TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
88  TObject *ob = Wait();
89  return dynamic_cast<TBuffer *>(ob);
90 }
91 
93 {
94  GO4TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
95  TObject *obj = nullptr;
96  TBuffer *buffer = WaitBuffer();
97  if(buffer) {
98  {
99  TGo4LockGuard mainguard;
100  // lock go4 main mutex for streaming
101  TDirectory *savdir = gDirectory;
102  gROOT->cd(); // be sure to be in the top directory when creating histo
103  buffer->SetReadMode();
104  buffer->Reset();
105  buffer->InitMap();
106  // note: root version 3.05/02 crashes again when unknown class
107  // shall be read; this was working in 3.03/09
108  // therefore, we put in our own check again from revision 1.23
109  TClass *cl = buffer->ReadClass();
110  if(cl == (TClass *) -1)
111  {
112  // case of unknown class
113  std::cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<std::endl;
114  obj = nullptr;
115  } else {
116  buffer->Reset();
117  obj = buffer->ReadObject(cl);
118  }
119  if(savdir) savdir->cd();
120  } // TGo4LockGuard
121  FreeBuffer(buffer); // give internal buffer back to queue
122  } // if(buffer)
123 
124  return obj;
125 }
126 
127 void TGo4BufferQueue::AddBuffer(TBuffer *buffer, Bool_t clone)
128 {
129  GO4TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer *, Bool_t)", __LINE__, __FILE__));
130 
131  TBuffer *entry = nullptr;
132  Bool_t entryisnew = kFALSE;
133  if(clone)
134  {
136  entry = dynamic_cast<TBuffer *>(fxFreeList->Remove(fxFreeList->First()));
137  // get next free buffer
138  if(!entry)
139  {
140  // no buffer entry there, we create one
141  TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
142 
143  entry = NewEntry();
144 
145  fxBufferList->Add(entry); // add to list of existing buffers
146  // need not add new buffer to list of free buffers
147  entryisnew=kTRUE;
148  fiMaxBuffers++;
149  }
150  else
151  {
152  // ok, we have a buffer to clone
153  }
154 
155  Int_t srcsize=buffer->BufferSize();
156  Int_t destsize=entry->BufferSize();
157  if (srcsize>destsize)
158  {
159  Realloc(entry,destsize,srcsize);
160  destsize=srcsize;
161  }
162  else
163  {
164  // buffer big enough, proceed
165  }
166  char *source = buffer->Buffer();
167  char *destination = entry->Buffer();
168  memcpy(destination,source,srcsize);
169  Int_t messlen = buffer->Length(); // compatible with root TMessage protocol
170  entry->SetBufferOffset(messlen);
171  entry->SetByteCount(0);
172  } // if(clone)
173  else
174  {
175  entry = buffer;
176  }
177 
178  try
179  {
180  Add(entry);
181  }
182  catch(TGo4RuntimeException &)
183  {
184  std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
185  if(entryisnew)
186  {
188  fxBufferList->Remove(entry); // remove new entry again from pool
189  delete entry;
190  }
191  }
192 }
193 
195 {
196  GO4TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject *)", __LINE__, __FILE__));
197  if(!object) return;
198  TGo4LockGuard mainguard;
199  TBuffer *entry = new TBufferFile(TBuffer::kWrite);
200  TFile *filsav = gFile;
201  gFile = nullptr;
202  entry->WriteObject(object);
203  gFile = filsav;
205 // std::cout << "wrote object "<< object->GetName() <<" into message" << std::endl;
206 //
207 // entry->SetReadMode();
208 // entry->Reset();
209 //
210 // entry->InitMap();
211 // TClass *cl= entry->ReadClass();
212 // entry->Reset();
213 // std::cout << "buffer contains class "<< cl << std::endl;
214 // if(cl)
215 // {
216 // std::cout << "classname "<< cl->GetName() << std::endl;
217 //
218 // TObject *ob=entry->ReadObject(cl);
219 // std::cout << "read object "<< ob << std::endl;
220 // if(ob)
221 // std::cout << "read object "<< ob->GetName() << std::endl;
222 // }
224 
225 // do not reset, may need object length information later!
226 // entry->Reset();
227  try
228  {
229  Add(entry);
230  }
231  catch(TGo4RuntimeException &)
232  {
233  std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
234  delete entry;
235  }
236 }
237 
238 void TGo4BufferQueue::FreeBuffer(TBuffer *buffer)
239 {
240  GO4TRACE((19, "TGo4BufferQueue::FreeBuffer(TBuffer *, Bool_t)", __LINE__, __FILE__));
242  // does buffer belong to our internal buffers?
243  if (fxBufferList->FindObject(buffer)) {
244  // yes, put it back into the free list
245  // new: we allocate the buffersize back to the initial size to
246  // avoid extended memory consumption:
247  Int_t memsize = buffer->BufferSize();
248  if (memsize > TGo4Socket::fgiBUFINITSIZE) {
249  Realloc(buffer, memsize, TGo4Socket::fgiBUFINITSIZE);
250  }
251  fxFreeList->AddLast(buffer);
252  } else {
253  // no, we delete it to avoid leakage
254  delete buffer;
255  }
256 }
257 
258 void TGo4BufferQueue::Clear(Option_t *)
259 {
260  while(auto ob = Next())
261  FreeBuffer(dynamic_cast<TBuffer *>(ob));
262 }
263 
264 void TGo4BufferQueue::Realloc(TBuffer *buffer, Int_t oldsize, Int_t newsize)
265 {
266  if(!buffer) return;
267  TGo4LockGuard mainguard;
268 
269  buffer->Expand(newsize); // JAM2021- always use framework method to avoid recent check byte count problems
270 }
271 
273 {
274  TGo4LockGuard mainguard;
275  TBuffer *buf = new TBufferFile(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
276  TNamed *dummy = new TNamed("This is a default buffer filler","GO4 is fun!");
277  TFile *filsav = gFile;
278  gFile = nullptr;
279  buf->WriteObject(dummy);
280  gFile = filsav;
281  delete dummy;
282  return buf;
283 }
284 
286 {
287  TBuffer *buf = new TBufferFile(TBuffer::kWrite);
288  char *field= buf->Buffer() + sizeof(UInt_t);
289  tobuf(field ,val);
290  buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) ); // set length for receiver check
291  buf->SetByteCount(0); // correctly set first longword
292  return buf;
293 }
294 
295 
297 {
298  if(!buf) return -1;
299  UInt_t len = buf->Length();
300  if(len != (sizeof(UInt_t)+sizeof(UInt_t))) return -2;
301  // note: first length is length of encoded type
302  // second length is always UInt_t
303  char *field = buf->Buffer();
304  char *temp = field + sizeof(UInt_t); // skip length header
305  UInt_t val = 0;
306  frombuf(temp, &val);
307  return (Int_t) val;
308 }
void Add(TObject *ob)
Definition: TGo4Queue.cxx:67
TMutex * fxBufferMutex
TObject * WaitObjectFromBuffer()
void Clear(Option_t *opt="") override
TBuffer * WaitBuffer()
void Realloc(TBuffer *buffer, Int_t oldsize, Int_t newsize)
static void Debug(const char *text,...) GO4_PRINTF_ARGS
Definition: TGo4Log.cxx:281
static const Int_t fgiBUFINITSIZE
Definition: TGo4Socket.h:71
TBuffer * NewEntry()
void AddBuffer(TBuffer *buffer, Bool_t clone=kFALSE)
static Int_t DecodeValueBuffer(TBuffer *buf)
#define GO4TRACE(X)
Definition: TGo4Log.h:25
TObject * Wait()
Definition: TGo4Queue.cxx:49
static TBuffer * CreateValueBuffer(UInt_t val)
void FreeBuffer(TBuffer *buffer)
void AddBufferFromObject(TObject *object)
virtual ~TGo4BufferQueue()
TObject * Next()
Definition: TGo4Queue.cxx:61