00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "TMessage.h"
00023 #include "TVirtualStreamerInfo.h"
00024 #include "Bytes.h"
00025 #include "TFile.h"
00026 #include "TProcessID.h"
00027
00028
00029 extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
00030 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
00031 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
00032 const Int_t kMAXBUF = 0xffffff;
00033
00034 Bool_t TMessage::fgEvolution = kFALSE;
00035
00036
00037 ClassImp(TMessage)
00038
00039
00040 TMessage::TMessage(UInt_t what, Int_t bufsiz) :
00041 TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t))
00042 {
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054 UInt_t reserved = 0;
00055 *this << reserved;
00056
00057 fWhat = what;
00058 *this << what;
00059
00060 fClass = 0;
00061 fCompress = 0;
00062 fBufComp = 0;
00063 fBufCompCur = 0;
00064 fCompPos = 0;
00065 fInfos = 0;
00066 fEvolution = kFALSE;
00067
00068 SetBit(kCannotHandleMemberWiseStreaming);
00069 }
00070
00071
00072 TMessage::TMessage(void *buf, Int_t bufsize) : TBufferFile(TBuffer::kRead, bufsize, buf)
00073 {
00074
00075
00076
00077
00078 fBufCur += sizeof(UInt_t);
00079
00080 *this >> fWhat;
00081
00082 fCompress = 0;
00083 fBufComp = 0;
00084 fBufCompCur = 0;
00085 fCompPos = 0;
00086 fInfos = 0;
00087 fEvolution = kFALSE;
00088
00089 if (fWhat & kMESS_ZIP) {
00090
00091 fBufComp = fBuffer;
00092 fBufCompCur = fBuffer + bufsize;
00093 fBuffer = 0;
00094 Uncompress();
00095 }
00096
00097 if (fWhat == kMESS_OBJECT) {
00098 InitMap();
00099 fClass = ReadClass();
00100 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
00101 ResetMap();
00102 } else {
00103 fClass = 0;
00104 }
00105 }
00106
00107
00108 TMessage::~TMessage()
00109 {
00110
00111
00112 delete [] fBufComp;
00113 delete fInfos;
00114 }
00115
00116
00117 void TMessage::EnableSchemaEvolutionForAll(Bool_t enable)
00118 {
00119
00120
00121
00122 fgEvolution = enable;
00123 }
00124
00125
00126 Bool_t TMessage::UsesSchemaEvolutionForAll()
00127 {
00128
00129
00130 return fgEvolution;
00131 }
00132
00133
00134 void TMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t )
00135 {
00136
00137
00138 if (fgEvolution || fEvolution) {
00139 if (!fInfos) fInfos = new TList();
00140 fInfos->Add(info);
00141 }
00142 }
00143
00144
00145 void TMessage::Forward()
00146 {
00147
00148
00149
00150 if (IsReading()) {
00151 SetWriteMode();
00152 SetBufferOffset(fBufSize);
00153 SetBit(kCannotHandleMemberWiseStreaming);
00154
00155 if (fBufComp) {
00156 fCompPos = fBufCur;
00157 }
00158 }
00159 }
00160
00161
00162 void TMessage::TagStreamerInfo(TVirtualStreamerInfo *info)
00163 {
00164
00165
00166 if (fgEvolution || fEvolution) {
00167 if (!fInfos) fInfos = new TList();
00168 fInfos->Add(info);
00169 }
00170 }
00171
00172
00173 void TMessage::Reset()
00174 {
00175
00176
00177 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
00178 ResetMap();
00179
00180 if (fBufComp) {
00181 delete [] fBufComp;
00182 fBufComp = 0;
00183 fBufCompCur = 0;
00184 fCompPos = 0;
00185 }
00186 }
00187
00188
00189 void TMessage::SetLength() const
00190 {
00191
00192
00193
00194 if (IsWriting()) {
00195 char *buf = Buffer();
00196 tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
00197
00198 if (fBufComp) {
00199 buf = fBufComp;
00200 tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
00201 }
00202 }
00203 }
00204
00205
00206 void TMessage::SetWhat(UInt_t what)
00207 {
00208
00209
00210
00211
00212
00213 fWhat = what;
00214
00215 char *buf = Buffer();
00216 buf += sizeof(UInt_t);
00217 tobuf(buf, what);
00218
00219 if (fBufComp) {
00220 buf = fBufComp;
00221 buf += sizeof(UInt_t);
00222 tobuf(buf, what | kMESS_ZIP);
00223 }
00224 }
00225
00226
00227 void TMessage::SetCompressionLevel(Int_t level)
00228 {
00229
00230
00231
00232
00233
00234 if (level < 0) level = 0;
00235 if (level > 9) level = 9;
00236
00237 if (level != fCompress && fBufComp) {
00238 delete [] fBufComp;
00239 fBufComp = 0;
00240 fBufCompCur = 0;
00241 fCompPos = 0;
00242 }
00243 fCompress = level;
00244 }
00245
00246
00247 Int_t TMessage::Compress()
00248 {
00249
00250
00251
00252
00253
00254
00255 if (fCompress == 0) {
00256
00257 if (fBufComp) {
00258 delete [] fBufComp;
00259 fBufComp = 0;
00260 fBufCompCur = 0;
00261 fCompPos = 0;
00262 }
00263 return 0;
00264 }
00265
00266 if (fBufComp && fCompPos == fBufCur) {
00267
00268 return 0;
00269 }
00270
00271
00272 if (fBufComp) {
00273 delete [] fBufComp;
00274 fBufComp = 0;
00275 fBufCompCur = 0;
00276 fCompPos = 0;
00277 }
00278
00279 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
00280
00281 return 0;
00282 }
00283
00284 Int_t hdrlen = 2*sizeof(UInt_t);
00285 Int_t messlen = Length() - hdrlen;
00286 Int_t nbuffers = messlen / kMAXBUF;
00287 Int_t chdrlen = 3*sizeof(UInt_t);
00288 Int_t buflen = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
00289 fBufComp = new char[buflen];
00290 char *messbuf = Buffer() + hdrlen;
00291 char *bufcur = fBufComp + chdrlen;
00292 Int_t noutot = 0;
00293 Int_t nzip = 0;
00294 Int_t nout, bufmax;
00295 for (Int_t i = 0; i <= nbuffers; i++) {
00296 if (i == nbuffers)
00297 bufmax = messlen - nzip;
00298 else
00299 bufmax = kMAXBUF;
00300 R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
00301 if (nout == 0 || nout >= messlen) {
00302
00303 delete [] fBufComp;
00304 fBufComp = 0;
00305 fBufCompCur = 0;
00306 fCompPos = 0;
00307 return -1;
00308 }
00309 bufcur += nout;
00310 noutot += nout;
00311 messbuf += kMAXBUF;
00312 nzip += kMAXBUF;
00313 }
00314 fBufCompCur = bufcur;
00315 fCompPos = fBufCur;
00316
00317 bufcur = fBufComp;
00318 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
00319 Int_t what = fWhat | kMESS_ZIP;
00320 tobuf(bufcur, what);
00321 tobuf(bufcur, Length());
00322
00323 return 0;
00324 }
00325
00326
00327 Int_t TMessage::Uncompress()
00328 {
00329
00330
00331
00332 if (!fBufComp || !(fWhat & kMESS_ZIP))
00333 return -1;
00334
00335 Int_t buflen;
00336 Int_t hdrlen = 2*sizeof(UInt_t);
00337 char *bufcur1 = fBufComp + hdrlen;
00338 frombuf(bufcur1, &buflen);
00339 UChar_t *bufcur = (UChar_t*)bufcur1;
00340
00341
00342 Int_t nin, nbuf;
00343 if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
00344 Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
00345 return -1;
00346 }
00347
00348 fBuffer = new char[buflen];
00349 fBufSize = buflen;
00350 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
00351 fBufMax = fBuffer + fBufSize;
00352 char *messbuf = fBuffer + hdrlen;
00353
00354 Int_t nout;
00355 Int_t noutot = 0;
00356 while (1) {
00357 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
00358 if (hc!=0) break;
00359 R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
00360 if (!nout) break;
00361 noutot += nout;
00362 if (noutot >= buflen - hdrlen) break;
00363 bufcur += nin;
00364 messbuf += nout;
00365 }
00366
00367 fWhat &= ~kMESS_ZIP;
00368 fCompress = 1;
00369
00370 return 0;
00371 }
00372
00373
00374 void TMessage::WriteObject(const TObject *obj)
00375 {
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385 if (fgEvolution || fEvolution) {
00386 if (fInfos)
00387 fInfos->Clear();
00388 else
00389 fInfos = new TList();
00390 }
00391
00392 fBitsPIDs.ResetAllBits();
00393 WriteObjectAny(obj, TObject::Class());
00394 }
00395
00396
00397 UShort_t TMessage::WriteProcessID(TProcessID *pid)
00398 {
00399
00400
00401
00402
00403
00404 if (fBitsPIDs.TestBitNumber(0)) return 0;
00405 if (!pid)
00406 pid = TProcessID::GetPID();
00407 if (!pid) return 0;
00408 fBitsPIDs.SetBitNumber(0);
00409 UInt_t uid = pid->GetUniqueID();
00410 fBitsPIDs.SetBitNumber(uid+1);
00411 return 1;
00412 }