Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members

TGo4BufferQueue.cxx

Go to the documentation of this file.
00001 //-------------------------------------------------------------
00002 //        Go4 Release Package v3.04-01 (build 30401)
00003 //                      28-November-2008
00004 //---------------------------------------------------------------
00005 //   The GSI Online Offline Object Oriented (Go4) Project
00006 //   Experiment Data Processing at EE department, GSI
00007 //---------------------------------------------------------------
00008 //
00009 //Copyright (C) 2000- Gesellschaft f. Schwerionenforschung, GSI
00010 //                    Planckstr. 1, 64291 Darmstadt, Germany
00011 //Contact:            http://go4.gsi.de
00012 //----------------------------------------------------------------
00013 //This software can be used under the license agreements as stated
00014 //in Go4License.txt file which is part of the distribution.
00015 //----------------------------------------------------------------
00016 #include "TGo4BufferQueue.h"
00017 
00018 #include "Riostream.h"
00019 
00020 #include "TROOT.h"
00021 #include "TMutex.h"
00022 #include "TFile.h"
00023 #include "TGo4Buffer.h"
00024 #include "RVersion.h"
00025 
00026 #include "TGo4Socket.h"
00027 #include "TGo4RuntimeException.h"
00028 #include "TGo4Log.h"
00029 #include "TGo4LockGuard.h"
00030 
00031 #if ROOT_VERSION_CODE > ROOT_VERSION(4,3,2)
00032 //#if __GO4ROOTVERSION__ > 40302
00033    const Int_t TGo4BufferQueue::fgiISOWNER=TBuffer::kIsOwner;
00034 #else
00035    const Int_t TGo4BufferQueue::fgiISOWNER=BIT(14);
00036 // we emulate the protected owner flag of the TBuffer class, needed for reallocation!
00037 #endif
00038 
00039 
00040 
00041 TGo4BufferQueue::TGo4BufferQueue()
00042    : TGo4Queue("Default buffer queue"),fiOverflowcount(0), fiMaxBuffers(10)
00043 {
00044    TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
00045 
00046    InitBuffers();
00047 }
00048 
00049 TGo4BufferQueue::TGo4BufferQueue(const char* name)
00050 : TGo4Queue(name), fiOverflowcount(0), fiMaxBuffers(10)
00051 
00052 {
00053    TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(Text_t*)", __LINE__, __FILE__));
00054    InitBuffers();
00055 }
00056 
00057 void TGo4BufferQueue::InitBuffers()
00058 
00059 {
00060    TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
00061    TGo4LockGuard mainguard;
00062    fxBufferList=new TList; // list owning all buffers
00063    fxFreeList=new TList;    // list indicating the free buffers
00064    fxBufferIterator=fxFreeList->MakeIterator();
00065    fxBufferMutex=new TMutex;
00066    for (Int_t i=0; i< fiMaxBuffers; ++i)
00067       {
00068           TBuffer* buf=NewEntry();
00069           fxBufferList->Add(buf);
00070           fxFreeList->Add(buf);
00071       }
00072 }
00073 
00074 TGo4BufferQueue::~TGo4BufferQueue()
00075 {
00076    TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
00077 
00078    delete fxBufferIterator;
00079    fxBufferList->Delete();
00080    TCollection::EmptyGarbageCollection();
00081    delete fxBufferList;
00082    delete fxBufferMutex;
00083 }
00084 
00085 TBuffer * TGo4BufferQueue::WaitBuffer()
00086 {
00087    TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
00088    TObject* ob = Wait();
00089    TBuffer* rev= dynamic_cast<TBuffer*> ( ob );
00090    return rev;
00091 }
00092 
00093 
00094 TObject * TGo4BufferQueue::WaitObjectFromBuffer()
00095 {
00096 TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
00097    TObject* obj=0;
00098    TBuffer* buffer=WaitBuffer();
00099    if(buffer)
00100        {
00101             {
00102             TGo4LockGuard mainguard;
00103             // lock go4 main mutex for streaming
00104             TDirectory* savdir=gDirectory;
00105             gROOT->cd(); // be sure to be in the top directory when creating histo
00106             buffer->SetReadMode();
00107             //cout << "Reading object from buffer..."<< endl;
00108             buffer->Reset();
00109             buffer->InitMap();
00110             // note: root version 3.05/02 crashes again when unknown class
00111             // shall be read; this  was working in 3.03/09
00112             // therefore, we put in our own check again from revision 1.23
00113             TClass* cl = buffer->ReadClass();
00114             //cout << "buffer queue "<< GetName() <<" :waitobject, got Class: " << cl << endl;
00115             if(cl == (TClass*) -1)
00116                {
00117                   // case of unknown class
00118                   cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<endl;
00119                   obj=0;
00120                }
00121             else
00122                {
00123 //                  if(cl)
00124 //                     cout << "Classname: " << cl->GetName() << endl;
00125 //                  cout << "Reading object from buffer..."<< endl;
00126                   buffer->Reset();
00127                   obj=buffer->ReadObject(cl);
00128               }
00129             if(savdir) savdir->cd();
00130             } //  TGo4LockGuard
00131          FreeBuffer(buffer); // give internal buffer back to queue
00132          } // if(buffer)
00133    else
00134      {
00135    //  cout << "WaitObjectFromBuffer got zero buffer in queue "<<GetName() << endl;
00136      }
00137    return obj;
00138 }
00139 
00140 void TGo4BufferQueue::AddBuffer(TBuffer * buffer, Bool_t clone)
00141 {
00142    TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00143 
00144    TBuffer* entry=0;
00145    Bool_t entryisnew=kFALSE;
00146    if(clone)
00147       {
00148       TGo4LockGuard qguard(fxBufferMutex);
00149          entry= dynamic_cast<TBuffer*>(fxFreeList->Remove(fxFreeList->First()));
00150                 // get next free buffer
00151          if(entry==0)
00152           {
00153             // no buffer entry there, we create one
00154             TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
00155             //cout <<"Buffer Queue: creating new internal buffer... "<< GetName();
00156 
00157             entry=NewEntry();
00158             fxBufferList->Add(entry); // add to list of existing buffers
00159                 // need not add new buffer to list of free buffers
00160             entryisnew=kTRUE;
00161             fiMaxBuffers++;
00162             //cout <<"buffers:"<<fiMaxBuffers <<", "<< fiMaxBuffers*TGo4Socket::fgiBUFINITSIZE << endl;
00163           }
00164          else
00165            {
00166              // ok, we have a buffer to clone
00167            }
00168 
00169            Int_t srcsize=buffer->BufferSize();
00170            Int_t destsize=entry->BufferSize();
00171            if (srcsize>destsize)
00172                   {
00173                      Realloc(entry,destsize,srcsize);
00174                      destsize=srcsize;
00175                   }
00176                else
00177                   {
00178                      // buffer big enough, proceed
00179                   }
00180            char* source= buffer->Buffer();
00181            char* destination= entry->Buffer();
00182             memcpy(destination,source,destsize);
00183             //cout <<"))))))))))Buffer Queue: copied "<< destsize <<"bytes to buffer field" << endl;
00184             Int_t messlen = buffer->Length(); // compatible with root TMessage protocol
00185             entry->SetBufferOffset(messlen);
00186             entry->SetByteCount(0);
00187       } // if(clone)
00188    else
00189       {
00190          entry=buffer;
00191       }
00192    //cout <<"Bufferqueue AB adding buffer "<< entry << endl;
00193 
00194    try
00195       {
00196          Add(entry);
00197       }
00198    catch(TGo4RuntimeException ex)
00199       {
00200          cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00201          if(entryisnew)
00202             {
00203                TGo4LockGuard qguard(fxBufferMutex);
00204                fxBufferList->Remove(entry); // remove new entry again from pool
00205                delete entry;
00206             }
00207       }
00208 }
00209 
00210 void TGo4BufferQueue::AddBufferFromObject(TObject * object)
00211 {
00212    TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject*)", __LINE__, __FILE__));
00213    if(object==0) return;
00214    TGo4LockGuard mainguard;
00215    TBuffer* entry = 0;
00216    entry = new TGo4Buffer(TBuffer::kWrite);
00217    //cout <<" Buffer  queue ABFO created buffer "<<entry << endl;
00218    TFile *filsav = gFile;
00219    gFile = 0;
00220    entry->WriteObject(object);
00221    gFile = filsav;
00223 //         cout << "wrote object "<< object->GetName() <<" into message" <<  endl;
00224 //
00225 //         entry->SetReadMode();
00226 //         entry->Reset();
00227 //
00228 //         entry->InitMap();
00229 //         TClass* cl= entry->ReadClass();
00230 //         entry->Reset();
00231 //         cout << "buffer contains class "<< cl << endl;
00232 //         if(cl)
00233 //            {
00234 //               cout << "classname "<< cl->GetName() << endl;
00235 //
00236 //               TObject* ob=entry->ReadObject(cl);
00237 //               cout << "read object "<< ob << endl;
00238 //               if(ob)
00239 //                  cout << "read object "<< ob->GetName() << endl;
00240 //            }
00242 
00243 // do not reset, may need object length information later!
00244 //         entry->Reset();
00245    try
00246       {
00247          Add(entry);
00248       }
00249    catch(TGo4RuntimeException& ex)
00250       {
00251          cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00252          delete entry;
00253       }
00254 
00255  }
00256 
00257 void TGo4BufferQueue::FreeBuffer(TBuffer * buffer)
00258 {
00259    TRACE((19,"TGo4BufferQueue::FreeBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00260    //
00261    TGo4LockGuard qguard(fxBufferMutex);
00262    //cout << "bufferlock acquired by bufferqueue: freebuffer"<< endl;
00263    // does buffer belong to our internal buffers?
00264    if (fxBufferList->FindObject(buffer)!=0)
00265             {
00266                // yes, put it back into the free list
00267                 // new: we allocate the buffersize back to the initial size to
00268                 // avoid extended memory consumption:
00269                 Int_t memsize=buffer->BufferSize();
00270                 if (memsize>TGo4Socket::fgiBUFINITSIZE)
00271                   {
00272                      Realloc(buffer,memsize, TGo4Socket::fgiBUFINITSIZE);
00273                   }
00274                 fxFreeList->AddLast(buffer);
00275                 //cout <<"freed buffer"<< buffer <<" of bufferqueue "<< GetName() << endl;
00276             }
00277          else
00278             {
00279                // no, we delete it to avoid leakage
00280                delete buffer;
00281                //cout <<" Buffer  queue FB deleted buffer "<<buffer << endl;
00282                //cout <<"deleted external buffer of bufferqueue "<< GetName() << endl;
00283                // TGo4Log::Debug(" Buffer Queue : deleted external buffer !!! ");
00284 
00285             }
00286 
00287 
00288 
00289 
00290 }
00291 
00292 void TGo4BufferQueue::Clear(Option_t* opt)
00293 {
00294    TObject* ob=0;
00295    while((ob=Next()) !=0)
00296       {
00297          //cout <<"cleared entry "<<ob<<" of queue "<<GetName() << endl;
00298          FreeBuffer(dynamic_cast<TBuffer*> (ob) );
00299       }
00300 
00301 }
00302 
00303 
00304 
00305 
00306 void TGo4BufferQueue::Realloc(TBuffer* buffer, Int_t oldsize, Int_t newsize)
00307 {
00308 if(buffer==0) return;
00309 TGo4LockGuard mainguard;
00310    char* memfield=buffer->Buffer();
00311    //buffer->Expand(newsize); // is protected! we make it by hand...
00312    //Int_t current = buffer->Length(); // cursor position
00313    Int_t extraspace=TGo4Socket::fgiBUFEXTRASPACE; // =8, constant within TBuffer
00314 //   memfield = (char *) TStorage::ReAlloc(memfield,
00315 //                                           (newsize + extraspace) * sizeof(char),
00316 //                                           (oldsize+ extraspace) * sizeof(char));
00317    // this works only for ROOT versions > 3.02/04
00318    memfield = TStorage::ReAllocChar(memfield,
00319                                            (newsize+extraspace),
00320                                            (oldsize+extraspace));
00321    //cout << "Bufferqueue reallocating char from"<<oldsize<< " to " << newsize<< endl;
00322    buffer->ResetBit(fgiISOWNER);
00323    buffer->SetBuffer(memfield, newsize);
00324    buffer->SetBit(fgiISOWNER);
00325    // <- here we avoid the ownership of TBuffer for the internal buffer
00326    // (new feature of ROOT versions > 3.02/04)
00327    // problem: SetBuffer will delete previous buffer in adopt mode (isowner=true)
00328    // which might be at the same location as the new buffer after ReAlloc
00329    // ergo SetBuffer would set a buffer which it deleted before itself!
00330    buffer->SetBufferOffset(newsize);
00331 }
00332 
00333 TBuffer* TGo4BufferQueue::NewEntry()
00334 {
00335    TGo4LockGuard mainguard;
00336    TBuffer* buf = new TGo4Buffer(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
00337    //cout <<"nnnnnnnn BufferQueue "<<GetName()<<" made new entry "<<buf << endl;
00338    TNamed* dummy= new TNamed("This is a default buffer filler","GO4 is fun!");
00339    TFile *filsav = gFile;
00340    gFile = 0;
00341    buf->WriteObject(dummy);
00342    gFile = filsav;
00343    delete dummy;
00344    return buf;
00345 }
00346 
00347 TBuffer* TGo4BufferQueue::CreateValueBuffer(UInt_t val)
00348 {
00349    TGo4Buffer* buf= new TGo4Buffer(TBuffer::kWrite);
00350    char* field= buf->Buffer() + sizeof(UInt_t);
00351    tobuf(field ,val);
00352    buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) ); // set length for receiver check
00353    buf->SetByteCount(0); // correctly set first longword
00354    return buf;
00355 }
00356 
00357 
00358 Int_t TGo4BufferQueue::DecodeValueBuffer(TBuffer* buf)
00359 {
00360 if(buf==0) return -1;
00361 UInt_t len= buf->Length();
00362 if(len==sizeof(UInt_t)+sizeof(UInt_t)) // note: first length is length of encoded type
00363                                        // second length is always UInt_t
00364    {
00365      char* field=buf->Buffer();
00366      char* temp= field + sizeof(UInt_t); // skip length header
00367      UInt_t val=0;
00368      frombuf(temp, &val);
00369      //cout <<"DDDDDD DecodeValueBuffer val="<<val << endl;
00370      return (Int_t) val;
00371    }
00372 else
00373    {
00374      return -2;
00375    }
00376 }
00377 
00378 //----------------------------END OF GO4 SOURCE FILE ---------------------

Generated on Fri Nov 28 12:59:28 2008 for Go4-v3.04-1 by  doxygen 1.4.2