Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

/Go4Queue/TGo4BufferQueue.cxx

Go to the documentation of this file.
00001 //---------------------------------------------------------------
00002 //        Go4 Release Package v2.10-5 (build 21005) 
00003 //                      03-Nov-2005
00004 //---------------------------------------------------------------
00005 //       The GSI Online Offline Object Oriented (Go4) Project
00006 //       Experiment Data Processing at DVEE department, GSI
00007 //---------------------------------------------------------------
00008 //
00009 //Copyright (C) 2000- Gesellschaft f. Schwerionenforschung, GSI
00010 //                    Planckstr. 1, 64291 Darmstadt, Germany
00011 //Contact:            http://go4.gsi.de
00012 //----------------------------------------------------------------
00013 //This software can be used under the license agreements as stated
00014 //in Go4License.txt file which is part of the distribution.
00015 //----------------------------------------------------------------
00016 #include "TGo4BufferQueue.h"
00017 
00018 #include <iostream.h>
00019 
00020 #include "TROOT.h"
00021 #include "TMutex.h"
00022 #include "TFile.h"
00023 #include "TMessage.h"
00024 #include "TBuffer.h"
00025 
00026 #include "Go4Socket/TGo4Socket.h"
00027 #include "Go4Exceptions/TGo4RuntimeException.h"
00028 #include "Go4Log/TGo4Log.h"
00029 #include "Go4LockGuard/TGo4LockGuard.h"
00030 
00031 #if __GO4ROOTVERSION__ > 40302
00032    const Int_t TGo4BufferQueue::fgiISOWNER=TBuffer::kIsOwner;
00033 #else
00034    const Int_t TGo4BufferQueue::fgiISOWNER=BIT(14);
00035 // we emulate the protected owner flag of the TBuffer class, needed for reallocation!
00036 #endif
00037 
00038 
00039 
00040 TGo4BufferQueue::TGo4BufferQueue()
00041    : TGo4Queue("Default buffer queue"),fiOverflowcount(0), fiMaxBuffers(10)
00042 {
00043    TRACE((14,"TGo4BufferQueue::TGo4BufferQueue()", __LINE__, __FILE__));
00044 
00045    InitBuffers();
00046 }
00047 
00048 TGo4BufferQueue::TGo4BufferQueue(const char* name)
00049 : TGo4Queue(name), fiOverflowcount(0), fiMaxBuffers(10)
00050 
00051 {
00052    TRACE((14,"TGo4BufferQueue::TGo4BufferQueue(Text_t*)", __LINE__, __FILE__));
00053    InitBuffers();
00054 }
00055 
00056 void TGo4BufferQueue::InitBuffers()
00057 
00058 {
00059    TRACE((14,"TGo4BufferQueue::InitBuffers()", __LINE__, __FILE__));
00060    TGo4LockGuard mainguard;
00061    fxBufferList=new TList; // list owning all buffers
00062    fxFreeList=new TList;    // list indicating the free buffers
00063    fxBufferIterator=fxFreeList->MakeIterator();
00064    fxBufferMutex=new TMutex;
00065    for (Int_t i=0; i< fiMaxBuffers; ++i)
00066       {
00067           TBuffer* buf=NewEntry();
00068           fxBufferList->Add(buf);
00069           fxFreeList->Add(buf);
00070       }
00071 }
00072 
00073 TGo4BufferQueue::~TGo4BufferQueue()
00074 {
00075    TRACE((14,"TGo4BufferQueue::~TTGo4BufferQueue()", __LINE__, __FILE__));
00076 
00077    delete fxBufferIterator;
00078    fxBufferList->Delete();
00079    TCollection::EmptyGarbageCollection();
00080    delete fxBufferList;
00081    delete fxBufferMutex;
00082 }
00083 
00084 TBuffer * TGo4BufferQueue::WaitBuffer()
00085 {
00086    TRACE((19,"TGo4BufferQueue::WaitBuffer()", __LINE__, __FILE__));
00087    TObject* ob = Wait();
00088    TBuffer* rev= dynamic_cast<TBuffer*> ( ob );
00089    return rev;
00090 }
00091 
00092 
00093 TObject * TGo4BufferQueue::WaitObjectFromBuffer()
00094 {
00095 TRACE((19,"TGo4BufferQueue::WaitObjectFromBuffer()", __LINE__, __FILE__));
00096    TObject* obj=0;
00097    TBuffer* buffer=WaitBuffer();
00098    if(buffer)
00099        {
00100             {
00101             TGo4LockGuard mainguard;
00102             // lock go4 main mutex for streaming
00103             TDirectory* savdir=gDirectory;
00104             gROOT->cd(); // be sure to be in the top directory when creating histo
00105             buffer->SetReadMode();
00106             //cout << "Reading object from buffer..."<< endl;
00107             buffer->Reset();
00108             buffer->InitMap();
00109             // note: root version 3.05/02 crashes again when unknown class
00110             // shall be read; this  was working in 3.03/09
00111             // therefore, we put in our own check again from revision 1.23
00112             TClass* cl = buffer->ReadClass();
00113             //cout << "buffer queue "<< GetName() <<" :waitobject, got Class: " << cl << endl;
00114             if(cl == (TClass*) -1)
00115                {
00116                   // case of unknown class
00117                   cout << " Could not receive object of unknown class on buffer queue "<<GetName() <<" !!!" <<endl;
00118                   obj=0;
00119                }
00120             else
00121                {
00122 //                  if(cl)
00123 //                     cout << "Classname: " << cl->GetName() << endl;
00124 //                  cout << "Reading object from buffer..."<< endl;
00125                   buffer->Reset();
00126                   obj=buffer->ReadObject(cl);
00127               }
00128             if(savdir) savdir->cd();
00129             } //  TGo4LockGuard
00130          FreeBuffer(buffer); // give internal buffer back to queue
00131          } // if(buffer)
00132    else
00133      {
00134    //  cout << "WaitObjectFromBuffer got zero buffer in queue "<<GetName() << endl;
00135      }
00136    return obj;
00137 }
00138 
00139 void TGo4BufferQueue::AddBuffer(TBuffer * buffer, Bool_t clone)
00140 {
00141    TRACE((19,"TGo4BufferQueue::AddBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00142 
00143    TBuffer* entry=0;
00144    Bool_t entryisnew=kFALSE;
00145    if(clone)
00146       {
00147       TGo4LockGuard qguard(fxBufferMutex);
00148          entry= dynamic_cast<TBuffer*>(fxFreeList->Remove(fxFreeList->First()));
00149                 // get next free buffer
00150          if(entry==0)
00151           {
00152             // no buffer entry there, we create one
00153             TGo4Log::Debug(" Buffer Queue adding new internal buffer... ");
00154             //cout <<"Buffer Queue: creating new internal buffer... "<< GetName();
00155 
00156             entry=NewEntry();
00157             fxBufferList->Add(entry); // add to list of existing buffers
00158                 // need not add new buffer to list of free buffers
00159             entryisnew=kTRUE;
00160             fiMaxBuffers++;
00161             //cout <<"buffers:"<<fiMaxBuffers <<", "<< fiMaxBuffers*TGo4Socket::fgiBUFINITSIZE << endl;
00162           }
00163          else
00164            {
00165              // ok, we have a buffer to clone
00166            }
00167 
00168            Int_t srcsize=buffer->BufferSize();
00169            Int_t destsize=entry->BufferSize();
00170            if (srcsize>destsize)
00171                   {
00172                      Realloc(entry,destsize,srcsize);
00173                      destsize=srcsize;
00174                   }
00175                else
00176                   {
00177                      // buffer big enough, proceed
00178                   }
00179            char* source= buffer->Buffer();
00180            char* destination= entry->Buffer();
00181             memcpy(destination,source,destsize);
00182             //cout <<"))))))))))Buffer Queue: copied "<< destsize <<"bytes to buffer field" << endl;
00183             Int_t messlen = buffer->Length(); // compatible with root TMessage protocol
00184             entry->SetBufferOffset(messlen);
00185             entry->SetByteCount(0);
00186       } // if(clone)
00187    else
00188       {
00189          entry=buffer;
00190       }
00191    //cout <<"Bufferqueue AB adding buffer "<< entry << endl;
00192 
00193    try
00194       {
00195          Add(entry);
00196       }
00197    catch(TGo4RuntimeException ex)
00198       {
00199          cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00200          if(entryisnew)
00201             {
00202                TGo4LockGuard qguard(fxBufferMutex);
00203                fxBufferList->Remove(entry); // remove new entry again from pool
00204                delete entry;
00205             }
00206       }
00207 }
00208 
00209 void TGo4BufferQueue::AddBufferFromObject(TObject * object)
00210 {
00211    TRACE((12,"TGo4BufferQueue::AddBufferFromObject(TObject*)", __LINE__, __FILE__));
00212    if(object==0) return;
00213    TGo4LockGuard mainguard;
00214    TBuffer* entry = 0;
00215    entry = new TBuffer(TBuffer::kWrite);
00216    //cout <<" Buffer  queue ABFO created buffer "<<entry << endl;
00217    TFile *filsav = gFile;
00218    gFile = 0;
00219    entry->WriteObject(object);
00220    gFile = filsav;
00222 //         cout << "wrote object "<< object->GetName() <<" into message" <<  endl;
00223 //
00224 //         entry->SetReadMode();
00225 //         entry->Reset();
00226 //
00227 //         entry->InitMap();
00228 //         TClass* cl= entry->ReadClass();
00229 //         entry->Reset();
00230 //         cout << "buffer contains class "<< cl << endl;
00231 //         if(cl)
00232 //            {
00233 //               cout << "classname "<< cl->GetName() << endl;
00234 //
00235 //               TObject* ob=entry->ReadObject(cl);
00236 //               cout << "read object "<< ob << endl;
00237 //               if(ob)
00238 //                  cout << "read object "<< ob->GetName() << endl;
00239 //            }
00241 
00242 // do not reset, may need object length information later!
00243 //         entry->Reset();
00244    try
00245       {
00246          Add(entry);
00247       }
00248    catch(TGo4RuntimeException& ex)
00249       {
00250          cout << "Buffer queue "<< GetName()<<" is full, dropping new entry "<< entry <<" !!!" << endl;
00251          delete entry;
00252       }
00253 
00254  }
00255 
00256 void TGo4BufferQueue::FreeBuffer(TBuffer * buffer)
00257 {
00258    TRACE((19,"TGo4BufferQueue::FreeBuffer(TBuffer*, Bool_t)", __LINE__, __FILE__));
00259    //
00260    TGo4LockGuard qguard(fxBufferMutex);
00261    //cout << "bufferlock acquired by bufferqueue: freebuffer"<< endl;
00262    // does buffer belong to our internal buffers?
00263    if (fxBufferList->FindObject(buffer)!=0)
00264             {
00265                // yes, put it back into the free list
00266                 // new: we allocate the buffersize back to the initial size to
00267                 // avoid extended memory consumption:
00268                 Int_t memsize=buffer->BufferSize();
00269                 if (memsize>TGo4Socket::fgiBUFINITSIZE)
00270                   {
00271                      Realloc(buffer,memsize, TGo4Socket::fgiBUFINITSIZE);
00272                   }
00273                 fxFreeList->AddLast(buffer);
00274                 //cout <<"freed buffer"<< buffer <<" of bufferqueue "<< GetName() << endl;
00275             }
00276          else
00277             {
00278                // no, we delete it to avoid leakage
00279                delete buffer;
00280                //cout <<" Buffer  queue FB deleted buffer "<<buffer << endl;
00281                //cout <<"deleted external buffer of bufferqueue "<< GetName() << endl;
00282                // TGo4Log::Debug(" Buffer Queue : deleted external buffer !!! ");
00283 
00284             }
00285 
00286 
00287 
00288 
00289 }
00290 
00291 void TGo4BufferQueue::Clear(Option_t* opt)
00292 {
00293    //cout <<"TGo4BufferQueue::Clear on"<<GetName() << endl;
00294    TObject* ob=0;
00295    while((ob=Next()) !=0)
00296       {
00297          //cout <<"cleared entry "<<ob<<" of queue "<<GetName() << endl;
00298          FreeBuffer(dynamic_cast<TBuffer*> (ob) );           
00299       }
00300 
00301 }
00302 
00303 
00304 
00305 
00306 void TGo4BufferQueue::Realloc(TBuffer* buffer, Int_t oldsize, Int_t newsize)
00307 {
00308 if(buffer==0) return;
00309 TGo4LockGuard mainguard;
00310    char* memfield=buffer->Buffer();
00311    //buffer->Expand(newsize); // is protected! we make it by hand...
00312    //Int_t current = buffer->Length(); // cursor position
00313    Int_t extraspace=TGo4Socket::fgiBUFEXTRASPACE; // =8, constant within TBuffer
00314 //   memfield = (char *) TStorage::ReAlloc(memfield,
00315 //                                           (newsize + extraspace) * sizeof(char),
00316 //                                           (oldsize+ extraspace) * sizeof(char));
00317    // this works only for ROOT versions > 3.02/04
00318    memfield = TStorage::ReAllocChar(memfield,
00319                                            (newsize+extraspace),
00320                                            (oldsize+extraspace));
00321    //cout << "Bufferqueue reallocating char from"<<oldsize<< " to " << newsize<< endl;
00322    buffer->ResetBit(fgiISOWNER);
00323    buffer->SetBuffer(memfield, newsize);
00324    buffer->SetBit(fgiISOWNER);
00325    // <- here we avoid the ownership of TBuffer for the internal buffer
00326    // (new feature of ROOT versions > 3.02/04)
00327    // problem: SetBuffer will delete previous buffer in adopt mode (isowner=true)
00328    // which might be at the same location as the new buffer after ReAlloc
00329    // ergo SetBuffer would set a buffer which it deleted before itself!
00330    buffer->SetBufferOffset(newsize);
00331 }
00332 
00333 TBuffer* TGo4BufferQueue::NewEntry()
00334 {
00335    TGo4LockGuard mainguard;
00336    TBuffer* buf = new TBuffer(TBuffer::kWrite, TGo4Socket::fgiBUFINITSIZE);
00337    //cout <<"nnnnnnnn BufferQueue "<<GetName()<<" made new entry "<<buf << endl;
00338    TNamed* dummy= new TNamed("This is a default buffer filler","GO4 is fun!");
00339    TFile *filsav = gFile;
00340    gFile = 0;
00341    buf->WriteObject(dummy);
00342    gFile = filsav;
00343    delete dummy;
00344    return buf;
00345 }
00346 
00347 TBuffer* TGo4BufferQueue::CreateValueBuffer(UInt_t val)
00348 {
00349    TBuffer* buf= new TBuffer(TBuffer::kWrite);
00350    char* field= buf->Buffer() + sizeof(UInt_t);
00351    tobuf(field ,val);
00352    buf->SetBufferOffset( sizeof(UInt_t)+ sizeof(UInt_t) ); // set length for receiver check
00353    buf->SetByteCount(0); // correctly set first longword
00354    return buf;   
00355 }
00356 
00357 
00358 Int_t TGo4BufferQueue::DecodeValueBuffer(TBuffer* buf)
00359 {
00360 if(buf==0) return -1;
00361 UInt_t len= buf->Length();
00362 if(len==sizeof(UInt_t)+sizeof(UInt_t)) // note: first length is length of encoded type
00363                                        // second length is always UInt_t         
00364    {
00365      char* field=buf->Buffer();
00366      char* temp= field + sizeof(UInt_t); // skip length header
00367      UInt_t val=0;
00368      frombuf(temp, &val);
00369      //cout <<"DDDDDD DecodeValueBuffer val="<<val << endl;
00370      return (Int_t) val; 
00371    }
00372 else
00373    {
00374      return -2;  
00375    }           
00376 }
00377 
00378 //----------------------------END OF GO4 SOURCE FILE ---------------------

Generated on Tue Nov 8 10:56:04 2005 for Go4-v2.10-5 by doxygen1.2.15