TMessage.cxx

Go to the documentation of this file.
00001 // @(#)root/net:$Id: TMessage.cxx 37986 2011-02-04 21:42:15Z pcanal $
00002 // Author: Fons Rademakers   19/12/96
00003 
00004 /*************************************************************************
00005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
00006  * All rights reserved.                                                  *
00007  *                                                                       *
00008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
00009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
00010  *************************************************************************/
00011 
00012 //////////////////////////////////////////////////////////////////////////
00013 //                                                                      //
00014 // TMessage                                                             //
00015 //                                                                      //
00016 // Message buffer class used for serializing objects and sending them   //
00017 // over a network. This class inherits from TBuffer the basic I/O       //
00018 // serializer.                                                          //
00019 //                                                                      //
00020 //////////////////////////////////////////////////////////////////////////
00021 
00022 #include "TMessage.h"
00023 #include "TVirtualStreamerInfo.h"
00024 #include "Bytes.h"
00025 #include "TFile.h"
00026 #include "TProcessID.h"
00027 
00028 
00029 extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
00030 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
00031 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
00032 const Int_t kMAXBUF = 0xffffff;
00033 
00034 Bool_t TMessage::fgEvolution = kFALSE;
00035 
00036 
00037 ClassImp(TMessage)
00038 
00039 //______________________________________________________________________________
00040 TMessage::TMessage(UInt_t what, Int_t bufsiz) :
00041    TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t))
00042 {
00043    // Create a TMessage object for storing objects. The "what" integer
00044    // describes the type of message. Predifined ROOT system message types
00045    // can be found in MessageTypes.h. Make sure your own message types are
00046    // unique from the ROOT defined message types (i.e. 0 - 10000 are
00047    // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
00048    // will wait for an acknowledgement from the remote side. This makes
00049    // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
00050    // the message will be compressed in TSocket using the zip algorithm
00051    // (only if message is > 256 bytes).
00052 
00053    // space at the beginning of the message reserved for the message length
00054    UInt_t   reserved = 0;
00055    *this << reserved;
00056 
00057    fWhat  = what;
00058    *this << what;
00059 
00060    fClass      = 0;
00061    fCompress   = 0;
00062    fBufComp    = 0;
00063    fBufCompCur = 0;
00064    fCompPos    = 0;
00065    fInfos      = 0;
00066    fEvolution  = kFALSE;
00067 
00068    SetBit(kCannotHandleMemberWiseStreaming);
00069 }
00070 
00071 //______________________________________________________________________________
00072 TMessage::TMessage(void *buf, Int_t bufsize) : TBufferFile(TBuffer::kRead, bufsize, buf)
00073 {
00074    // Create a TMessage object for reading objects. The objects will be
00075    // read from buf. Use the What() method to get the message type.
00076 
00077    // skip space at the beginning of the message reserved for the message length
00078    fBufCur += sizeof(UInt_t);
00079 
00080    *this >> fWhat;
00081 
00082    fCompress   = 0;
00083    fBufComp    = 0;
00084    fBufCompCur = 0;
00085    fCompPos    = 0;
00086    fInfos      = 0;
00087    fEvolution  = kFALSE;
00088 
00089    if (fWhat & kMESS_ZIP) {
00090       // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
00091       fBufComp    = fBuffer;
00092       fBufCompCur = fBuffer + bufsize;
00093       fBuffer     = 0;
00094       Uncompress();
00095    }
00096 
00097    if (fWhat == kMESS_OBJECT) {
00098       InitMap();
00099       fClass = ReadClass();     // get first the class stored in message
00100       SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
00101       ResetMap();
00102    } else {
00103       fClass = 0;
00104    }
00105 }
00106 
00107 //______________________________________________________________________________
00108 TMessage::~TMessage()
00109 {
00110    // Clean up compression buffer.
00111 
00112    delete [] fBufComp;
00113    delete fInfos;
00114 }
00115 
00116 //______________________________________________________________________________
00117 void TMessage::EnableSchemaEvolutionForAll(Bool_t enable)
00118 {
00119    // Static function enabling or disabling the automatic schema evolution.
00120    // By default schema evolution support is off.
00121 
00122    fgEvolution = enable;
00123 }
00124 
00125 //______________________________________________________________________________
00126 Bool_t TMessage::UsesSchemaEvolutionForAll()
00127 {
00128    // Static function returning status of global schema evolution.
00129 
00130    return fgEvolution;
00131 }
00132 
00133 //______________________________________________________________________________
00134 void TMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
00135 {
00136    // Force writing the TStreamerInfo to the message.
00137 
00138    if (fgEvolution || fEvolution) {
00139       if (!fInfos) fInfos = new TList();
00140       fInfos->Add(info);
00141    }
00142 }
00143 
00144 //______________________________________________________________________________
00145 void TMessage::Forward()
00146 {
00147    // Change a buffer that was received into one that can be send, i.e.
00148    // forward a just received message.
00149 
00150    if (IsReading()) {
00151       SetWriteMode();
00152       SetBufferOffset(fBufSize);
00153       SetBit(kCannotHandleMemberWiseStreaming);
00154 
00155       if (fBufComp) {
00156          fCompPos = fBufCur;
00157       }
00158    }
00159 }
00160 
00161 //______________________________________________________________________________
00162 void TMessage::TagStreamerInfo(TVirtualStreamerInfo *info)
00163 {
00164    // Remember that the StreamerInfo is being used in writing.
00165 
00166    if (fgEvolution || fEvolution) {
00167       if (!fInfos) fInfos = new TList();
00168       fInfos->Add(info);
00169    }
00170 }
00171 
00172 //______________________________________________________________________________
00173 void TMessage::Reset()
00174 {
00175    // Reset the message buffer so we can use (i.e. fill) it again.
00176 
00177    SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
00178    ResetMap();
00179 
00180    if (fBufComp) {
00181       delete [] fBufComp;
00182       fBufComp    = 0;
00183       fBufCompCur = 0;
00184       fCompPos    = 0;
00185    }
00186 }
00187 
00188 //______________________________________________________________________________
00189 void TMessage::SetLength() const
00190 {
00191    // Set the message length at the beginning of the message buffer.
00192    // This method is only called by TSocket::Send().
00193 
00194    if (IsWriting()) {
00195       char *buf = Buffer();
00196       tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
00197 
00198       if (fBufComp) {
00199          buf = fBufComp;
00200          tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
00201       }
00202    }
00203 }
00204 
00205 //______________________________________________________________________________
00206 void TMessage::SetWhat(UInt_t what)
00207 {
00208    // Using this method one can change the message type a-posteriory.
00209    // In case you OR "what" with kMESS_ACK, the message will wait for
00210    // an acknowledgement from the remote side. This makes the sending
00211    // process synchronous.
00212 
00213    fWhat = what;
00214 
00215    char *buf = Buffer();
00216    buf += sizeof(UInt_t);   // skip reserved length space
00217    tobuf(buf, what);
00218 
00219    if (fBufComp) {
00220       buf = fBufComp;
00221       buf += sizeof(UInt_t);   // skip reserved length space
00222       tobuf(buf, what | kMESS_ZIP);
00223    }
00224 }
00225 
00226 //______________________________________________________________________________
00227 void TMessage::SetCompressionLevel(Int_t level)
00228 {
00229    // Set the message compression level. Can be between 0 and 9 with 0
00230    // being no compression and 9 maximum compression. In general the default
00231    // level of 1 is the best compromise between achieved compression and
00232    // cpu time. Compression will only happen when the message is > 256 bytes.
00233 
00234    if (level < 0) level = 0;
00235    if (level > 9) level = 9;
00236 
00237    if (level != fCompress && fBufComp) {
00238       delete [] fBufComp;
00239       fBufComp    = 0;
00240       fBufCompCur = 0;
00241       fCompPos    = 0;
00242    }
00243    fCompress = level;
00244 }
00245 
00246 //______________________________________________________________________________
00247 Int_t TMessage::Compress()
00248 {
00249    // Compress the message. The message will only be compressed if the
00250    // compression level > 0 and the if the message is > 256 bytes.
00251    // Returns -1 in case of error (when compression fails or
00252    // when the message increases in size in some pathological cases),
00253    // otherwise returns 0.
00254 
00255    if (fCompress == 0) {
00256       // no compression specified
00257       if (fBufComp) {
00258          delete [] fBufComp;
00259          fBufComp    = 0;
00260          fBufCompCur = 0;
00261          fCompPos    = 0;
00262       }
00263       return 0;
00264    }
00265 
00266    if (fBufComp && fCompPos == fBufCur) {
00267       // the message was already compressed
00268       return 0;
00269    }
00270 
00271    // remove any existing compressed buffer before compressing modified message
00272    if (fBufComp) {
00273       delete [] fBufComp;
00274       fBufComp    = 0;
00275       fBufCompCur = 0;
00276       fCompPos    = 0;
00277    }
00278 
00279    if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
00280       // this message is too small to be compressed
00281       return 0;
00282    }
00283 
00284    Int_t hdrlen   = 2*sizeof(UInt_t);
00285    Int_t messlen  = Length() - hdrlen;
00286    Int_t nbuffers = messlen / kMAXBUF;
00287    Int_t chdrlen  = 3*sizeof(UInt_t);   // compressed buffer header length
00288    Int_t buflen   = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
00289    fBufComp       = new char[buflen];
00290    char *messbuf  = Buffer() + hdrlen;
00291    char *bufcur   = fBufComp + chdrlen;
00292    Int_t noutot   = 0;
00293    Int_t nzip     = 0;
00294    Int_t nout, bufmax;
00295    for (Int_t i = 0; i <= nbuffers; i++) {
00296       if (i == nbuffers)
00297          bufmax = messlen - nzip;
00298       else
00299          bufmax = kMAXBUF;
00300       R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
00301       if (nout == 0 || nout >= messlen) {
00302          //this happens when the buffer cannot be compressed
00303          delete [] fBufComp;
00304          fBufComp    = 0;
00305          fBufCompCur = 0;
00306          fCompPos    = 0;
00307          return -1;
00308       }
00309       bufcur  += nout;
00310       noutot  += nout;
00311       messbuf += kMAXBUF;
00312       nzip    += kMAXBUF;
00313    }
00314    fBufCompCur = bufcur;
00315    fCompPos    = fBufCur;
00316 
00317    bufcur = fBufComp;
00318    tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
00319    Int_t what = fWhat | kMESS_ZIP;
00320    tobuf(bufcur, what);
00321    tobuf(bufcur, Length());    // original uncompressed buffer length
00322 
00323    return 0;
00324 }
00325 
00326 //______________________________________________________________________________
00327 Int_t TMessage::Uncompress()
00328 {
00329    // Uncompress the message. The message will only be uncompressed when
00330    // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
00331 
00332    if (!fBufComp || !(fWhat & kMESS_ZIP))
00333       return -1;
00334 
00335    Int_t buflen;
00336    Int_t hdrlen = 2*sizeof(UInt_t);
00337    char *bufcur1 = fBufComp + hdrlen;
00338    frombuf(bufcur1, &buflen);
00339    UChar_t *bufcur = (UChar_t*)bufcur1;
00340 
00341    /* early consistency check */
00342    Int_t nin, nbuf;
00343    if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
00344       Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
00345       return -1;
00346    }
00347 
00348    fBuffer  = new char[buflen];
00349    fBufSize = buflen;
00350    fBufCur  = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
00351    fBufMax  = fBuffer + fBufSize;
00352    char *messbuf = fBuffer + hdrlen;
00353 
00354    Int_t nout;
00355    Int_t noutot = 0;
00356    while (1) {
00357       Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
00358       if (hc!=0) break;
00359       R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
00360       if (!nout) break;
00361       noutot += nout;
00362       if (noutot >= buflen - hdrlen) break;
00363       bufcur  += nin;
00364       messbuf += nout;
00365    }
00366 
00367    fWhat &= ~kMESS_ZIP;
00368    fCompress = 1;
00369 
00370    return 0;
00371 }
00372 
00373 //______________________________________________________________________________
00374 void TMessage::WriteObject(const TObject *obj)
00375 {
00376    // Write object to message buffer.
00377    // When support for schema evolution is enabled the list of TStreamerInfo
00378    // used to stream this object is kept in fInfos. This information is used
00379    // by TSocket::Send that sends this list through the socket. This list is in
00380    // turn used by TSocket::Recv to store the TStreamerInfo objects in the
00381    // relevant TClass in case the TClass does not know yet about a particular
00382    // class version. This feature is implemented to support clients and servers
00383    // with either different ROOT versions or different user classes versions.
00384 
00385    if (fgEvolution || fEvolution) {
00386       if (fInfos)
00387          fInfos->Clear();
00388       else
00389          fInfos = new TList();
00390    }
00391 
00392    fBitsPIDs.ResetAllBits();
00393    WriteObjectAny(obj, TObject::Class());
00394 }
00395 
00396 //______________________________________________________________________________
00397 UShort_t TMessage::WriteProcessID(TProcessID *pid)
00398 {
00399    // Check if the ProcessID pid is already in the message.
00400    // If not, then:
00401    //   - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
00402    //   - mark bit uid+1 where uid id the uid of the ProcessID
00403 
00404    if (fBitsPIDs.TestBitNumber(0)) return 0;
00405    if (!pid)
00406       pid = TProcessID::GetPID();
00407    if (!pid) return 0;
00408    fBitsPIDs.SetBitNumber(0);
00409    UInt_t uid = pid->GetUniqueID();
00410    fBitsPIDs.SetBitNumber(uid+1);
00411    return 1;
00412 }

Generated on Tue Jul 5 14:46:10 2011 for ROOT_528-00b_version by  doxygen 1.5.1