GSI Object Oriented Online Offline (Go4) GO4-6.4.0
Loading...
Searching...
No Matches
TGo4BufferQueue.cxx
Go to the documentation of this file.
1// $Id$
2//-----------------------------------------------------------------------
3// The GSI Online Offline Object Oriented (Go4) Project
4// Experiment Data Processing at EE department, GSI
5//-----------------------------------------------------------------------
6// Copyright (C) 2000- GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
7// Planckstr. 1, 64291 Darmstadt, Germany
8// Contact: http://go4.gsi.de
9//-----------------------------------------------------------------------
10// This software can be used under the license agreements as stated
11// in Go4License.txt file which is part of the distribution.
12//-----------------------------------------------------------------------
13
14#include "TGo4BufferQueue.h"
15
16#include <iostream>
17
18#include "TROOT.h"
19#include "TMutex.h"
20#include "TFile.h"
21#include "TBufferFile.h"
22
23#include "TGo4Socket.h"
25#include "TGo4Log.h"
26#include "TGo4LockGuard.h"
27
29 TGo4Queue("Default buffer queue"),
30 fxBufferList(nullptr),
31 fxFreeList(nullptr),
32 fxBufferMutex(nullptr),
33 fiMaxBuffers(10)
34{
35 GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
36
38}
39
41 TGo4Queue(name),
42 fxBufferList(nullptr),
43 fxFreeList(nullptr),
44 fxBufferMutex(nullptr),
45 fiMaxBuffers(10)
46{
47 GO4TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(const char *)", __LINE__, __FILE__));
48
50}
51
53
54{
55 GO4TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
56 TGo4LockGuard mainguard;
57 fxBufferList = new TList; // list owning all buffers
58 fxFreeList = new TList; // list indicating the free buffers
59 fxFreeList->SetOwner(kFALSE); // JAM2018 - avoid root6 problems at shutdown???
60 fxBufferMutex = new TMutex;
61 for (Int_t i = 0; i < fiMaxBuffers; ++i) {
62 TBuffer *buf = NewEntry();
63 fxBufferList->Add(buf);
64 fxFreeList->Add(buf);
65 }
66}
67
69{
70 GO4TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
71
72 //printf ("JAM*************** DTOR of TGo4BufferQueue %s BEGIN\n", GetName());
73 //fxBufferList->Delete();// JAM2018 - avoid root6 problems at shutdown???
74 TCollection::EmptyGarbageCollection();
75 //printf ("JAM*************** DTOR of TGo4BufferQueue %s after EmptyGarbageCollection \n", GetName());
76
77 delete fxFreeList; fxFreeList = nullptr;
78 //printf ("JAM*************** DTOR of TGo4BufferQueue %s after delete fxFreeList \n", GetName());
79 delete fxBufferList; fxBufferList = nullptr;
80 //printf ("JAM*************** DTOR of TGo4BufferQueue %s after delete fxBufferList \n", GetName());
81 delete fxBufferMutex; fxBufferMutex = nullptr;
82 //printf ("JAM*************** DTOR of TGo4BufferQueue %s END\n", GetName());
83}
84
86{
87 GO4TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
88 TObject *ob = Wait();
89 return dynamic_cast<TBuffer *>(ob);
90}
91
93{
94 GO4TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
95 TObject *obj = nullptr;
96 TBuffer *buffer = WaitBuffer();
97 if(buffer) {
98 {
99 TGo4LockGuard mainguard;
100 // lock go4 main mutex for streaming
101 TDirectory *savdir = gDirectory;
102 gROOT->cd(); // be sure to be in the top directory when creating histo
103 buffer->SetReadMode();
104 buffer->Reset();
105 buffer->InitMap();
106 // note: root version 3.05/02 crashes again when unknown class
107 // shall be read; this was working in 3.03/09
108 // therefore, we put in our own check again from revision 1.23
109 TClass *cl = buffer->ReadClass();
110 if(cl == (TClass *) -1)
111 {
112 // case of unknown class
113 std::cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<std::endl;
114 obj = nullptr;
115 } else {
116 buffer->Reset();
117 obj = buffer->ReadObject(cl);
118 }
119 if(savdir) savdir->cd();
120 } // TGo4LockGuard
121 FreeBuffer(buffer); // give internal buffer back to queue
122 } // if(buffer)
123
124 return obj;
125}
126
127void TGo4BufferQueue::AddBuffer(TBuffer *buffer, Bool_t clone)
128{
129 GO4TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer *, Bool_t)", __LINE__, __FILE__));
130
131 TBuffer *entry = nullptr;
132 Bool_t entryisnew = kFALSE;
133 if(clone)
134 {
136 entry = dynamic_cast<TBuffer *>(fxFreeList->Remove(fxFreeList->First()));
137 // get next free buffer
138 if(!entry)
139 {
140 // no buffer entry there, we create one
141 TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
142
143 entry = NewEntry();
144
145 fxBufferList->Add(entry); // add to list of existing buffers
146 // need not add new buffer to list of free buffers
147 entryisnew=kTRUE;
148 fiMaxBuffers++;
149 }
150 else
151 {
152 // ok, we have a buffer to clone
153 }
154
155 Int_t srcsize=buffer->BufferSize();
156 Int_t destsize=entry->BufferSize();
157 if (srcsize>destsize)
158 {
159 Realloc(entry,destsize,srcsize);
160 destsize=srcsize;
161 }
162 else
163 {
164 // buffer big enough, proceed
165 }
166 char *source = buffer->Buffer();
167 char *destination = entry->Buffer();
168 memcpy(destination,source,srcsize);
169 Int_t messlen = buffer->Length(); // compatible with root TMessage protocol
170 entry->SetBufferOffset(messlen);
171 entry->SetByteCount(0);
172 } // if(clone)
173 else
174 {
175 entry = buffer;
176 }
177
178 try
179 {
180 Add(entry);
181 }
182 catch(TGo4RuntimeException &)
183 {
184 std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
185 if(entryisnew)
186 {
188 fxBufferList->Remove(entry); // remove new entry again from pool
189 delete entry;
190 }
191 }
192}
193
195{
196 GO4TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject *)", __LINE__, __FILE__));
197 if(!object) return;
198 TGo4LockGuard mainguard;
199 TBuffer *entry = new TBufferFile(TBuffer::kWrite);
200 TFile *filsav = gFile;
201 gFile = nullptr;
202 entry->WriteObject(object);
203 gFile = filsav;
205// std::cout << "wrote object "<< object->GetName() <<" into message" << std::endl;
206//
207// entry->SetReadMode();
208// entry->Reset();
209//
210// entry->InitMap();
211// TClass *cl= entry->ReadClass();
212// entry->Reset();
213// std::cout << "buffer contains class "<< cl << std::endl;
214// if(cl)
215// {
216// std::cout << "classname "<< cl->GetName() << std::endl;
217//
218// TObject *ob=entry->ReadObject(cl);
219// std::cout << "read object "<< ob << std::endl;
220// if(ob)
221// std::cout << "read object "<< ob->GetName() << std::endl;
222// }
224
225// do not reset, may need object length information later!
226// entry->Reset();
227 try
228 {
229 Add(entry);
230 }
231 catch(TGo4RuntimeException &)
232 {
233 std::cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << std::endl;
234 delete entry;
235 }
236}
237
238void TGo4BufferQueue::FreeBuffer(TBuffer *buffer)
239{
240 GO4TRACE((19, "TGo4BufferQueue::FreeBuffer(TBuffer *, Bool_t)", __LINE__, __FILE__));
242 // does buffer belong to our internal buffers?
243 if (fxBufferList->FindObject(buffer)) {
244 // yes, put it back into the free list
245 // new: we allocate the buffersize back to the initial size to
246 // avoid extended memory consumption:
247 Int_t memsize = buffer->BufferSize();
248 if (memsize > TGo4Socket::fgiBUFINITSIZE) {
249 Realloc(buffer, memsize, TGo4Socket::fgiBUFINITSIZE);
250 }
251 fxFreeList->AddLast(buffer);
252 } else {
253 // no, we delete it to avoid leakage
254 delete buffer;
255 }
256}
257
259{
260 while(auto ob = Next())
261 FreeBuffer(dynamic_cast<TBuffer *>(ob));
262}
263
264void TGo4BufferQueue::Realloc(TBuffer *buffer, Int_t oldsize, Int_t newsize)
265{
266 if(!buffer) return;
267 TGo4LockGuard mainguard;
268
269 buffer->Expand(newsize); // JAM2021- always use framework method to avoid recent check byte count problems
270}
271
273{
274 TGo4LockGuard mainguard;
275 TBuffer *buf = new TBufferFile(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
276 TNamed *dummy = new TNamed("This is a default buffer filler","GO4 is fun!");
277 TFile *filsav = gFile;
278 gFile = nullptr;
279 buf->WriteObject(dummy);
280 gFile = filsav;
281 delete dummy;
282 return buf;
283}
284
286{
287 TBuffer *buf = new TBufferFile(TBuffer::kWrite);
288 char *field= buf->Buffer() + sizeof(UInt_t);
289 tobuf(field ,val);
290 buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) ); // set length for receiver check
291 buf->SetByteCount(0); // correctly set first longword
292 return buf;
293}
294
295
297{
298 if(!buf) return -1;
299 UInt_t len = buf->Length();
300 if(len != (sizeof(UInt_t)+sizeof(UInt_t))) return -2;
301 // note: first length is length of encoded type
302 // second length is always UInt_t
303 char *field = buf->Buffer();
304 char *temp = field + sizeof(UInt_t); // skip length header
305 UInt_t val = 0;
306 frombuf(temp, &val);
307 return (Int_t) val;
308}
#define TGo4LockGuard
#define GO4TRACE(X)
Definition TGo4Log.h:25
static Int_t DecodeValueBuffer(TBuffer *buf)
Extract value from buffer that was created by CreateValueBuffer method.
TList * fxBufferList
List of preallocated buffer TBuffers which are used when TBuffer added to queue should be "cloned".
static TBuffer * CreateValueBuffer(UInt_t val)
Create a root buffer that contains a single value val.
TBuffer * NewEntry()
Create dummy buffer for queue.
void Clear(Option_t *opt="") override
Empty the queue and give free buffers back.
TBuffer * WaitBuffer()
Wait for buffer object from queue.
void InitBuffers()
Initialization of internal queue buffer.
Int_t fiMaxBuffers
Number of preallocated buffer TBuffers (maximum entries in fxBufferList).
void AddBuffer(TBuffer *buffer, Bool_t clone=kFALSE)
Add buffer pointer to queue.
void AddBufferFromObject(TObject *object)
Reconstruct a TObject queue entry from a given TBuffer pointer.
void Realloc(TBuffer *buffer, Int_t oldsize, Int_t newsize)
Reallocate buffer of TBuffer to newsize.
virtual ~TGo4BufferQueue()
TList * fxFreeList
List of buffers which are free for the next add.
TObject * WaitObjectFromBuffer()
Wait for buffer object from queue.
void FreeBuffer(TBuffer *buffer)
Free internal buffer to be re-used by the AddBuffer as clone method.
static void Debug(const char *text,...) GO4_PRINTF_ARGS
User shortcut for message with prio 0.
Definition TGo4Log.cxx:281
TObject * Next()
Definition TGo4Queue.cxx:61
void Add(TObject *ob)
Definition TGo4Queue.cxx:67
TGo4Queue(const char *name=nullptr)
Definition TGo4Queue.cxx:23
TObject * Wait()
Definition TGo4Queue.cxx:49
static const Int_t fgiBUFINITSIZE
Initial size for object receive buffer (TBuffer)
Definition TGo4Socket.h:71