RooRealMPFE.cxx

Go to the documentation of this file.
00001 /*****************************************************************************
00002  * Project: RooFit                                                           *
00003  * Package: RooFitCore                                                       *
00004  * @(#)root/roofitcore:$Id: RooRealMPFE.cxx 37585 2010-12-14 14:19:18Z brun $
00005  * Authors:                                                                  *
00006  *   WV, Wouter Verkerke, UC Santa Barbara, verkerke@slac.stanford.edu       *
00007  *   DK, David Kirkby,    UC Irvine,         dkirkby@uci.edu                 *
00008  *                                                                           *
00009  * Copyright (c) 2000-2005, Regents of the University of California          *
00010  *                          and Stanford University. All rights reserved.    *
00011  *                                                                           *
00012  * Redistribution and use in source and binary forms,                        *
00013  * with or without modification, are permitted according to the terms        *
00014  * listed in LICENSE (http://roofit.sourceforge.net/license.txt)             *
00015  *****************************************************************************/
00016 
00017 //////////////////////////////////////////////////////////////////////////////
00018 //
00019 // BEGIN_HTML
00020 // RooRealMPFE is the multi-processor front-end for parallel calculation
00021 // of RooAbsReal objects. Each RooRealMPFE forks a process that calculates
00022 // the value of the proxies RooAbsReal object. The (re)calculation of
00023 // the proxied object is started asynchronously with the calculate() option.
00024 // A subsequent call to getVal() will return the calculated value when available
00025 // If the calculation is still in progress when getVal() is called it blocks
00026 // the calling process until the calculation is done. The forked calculation process 
00027 // is terminated when the front-end object is deleted
00028 // Simple use demonstration
00029 //
00030 // <pre>
00031 // RooAbsReal* slowFunc ;
00032 //
00033 // Double_t val = slowFunc->getVal() // Evaluate slowFunc in current process
00034 //
00035 // RooRealMPFE mpfe("mpfe","frontend to slowFunc",*slowFunc) ;
00036 // mpfe.calculate() ;           // Start calculation of slow-func in remote process
00037 //                              // .. do other stuff here ..
00038 // Double_t val = mpfe.getVal() // Wait for remote calculation to finish and retrieve value
00039 // </pre>
00040 //
00041 // END_HTML
00042 //
00043 
00044 #include "Riostream.h"
00045 #include "RooFit.h"
00046 
00047 #ifndef _WIN32
00048 #include <unistd.h>
00049 #include <sys/types.h>
00050 #include <sys/wait.h>
00051 #endif
00052 
00053 #include <errno.h>
00054 #include <sstream>
00055 #include "RooRealMPFE.h"
00056 #include "RooArgSet.h"
00057 #include "RooAbsCategory.h"
00058 #include "RooRealVar.h"
00059 #include "RooCategory.h"
00060 #include "RooMPSentinel.h"
00061 #include "RooMsgService.h"
00062 
00063 #include "TSystem.h"
00064 
00065 RooMPSentinel RooRealMPFE::_sentinel ;
00066 
00067 ClassImp(RooRealMPFE)
00068   ;
00069 
00070 
00071 //_____________________________________________________________________________
00072 RooRealMPFE::RooRealMPFE(const char *name, const char *title, RooAbsReal& arg, Bool_t calcInline) : 
00073   RooAbsReal(name,title),
00074   _state(Initialize),
00075   _arg("arg","arg",this,arg),
00076   _vars("vars","vars",this),
00077   _calcInProgress(kFALSE),
00078   _verboseClient(kFALSE),
00079   _verboseServer(kFALSE),
00080   _inlineMode(calcInline),
00081   _remoteEvalErrorLoggingState(RooAbsReal::PrintErrors),
00082   _pid(0)
00083 {  
00084   // Construct front-end object for object 'arg' whose evaluation will be calculated
00085   // asynchronously in a separate process. If calcInline is true the value of 'arg'
00086   // is calculate synchronously in the current process.
00087 #ifdef _WIN32
00088   _inlineMode = kTRUE;
00089 #endif
00090   initVars() ;
00091   _sentinel.add(*this) ;
00092 }
00093 
00094 
00095 
00096 //_____________________________________________________________________________
00097 RooRealMPFE::RooRealMPFE(const RooRealMPFE& other, const char* name) : 
00098   RooAbsReal(other, name),
00099   _state(Initialize),
00100   _arg("arg",this,other._arg),
00101   _vars("vars",this,other._vars),
00102   _calcInProgress(kFALSE),
00103   _verboseClient(other._verboseClient),
00104   _verboseServer(other._verboseServer),
00105   _inlineMode(other._inlineMode),
00106   _forceCalc(other._forceCalc),
00107   _remoteEvalErrorLoggingState(other._remoteEvalErrorLoggingState),
00108   _pid(0)
00109 {
00110   // Copy constructor. Initializes in clean state so that upon eval
00111   // this instance will create its own server processes
00112 
00113   initVars() ;
00114   _sentinel.add(*this) ;
00115 }
00116 
00117 
00118 
00119 //_____________________________________________________________________________
00120 RooRealMPFE::~RooRealMPFE() 
00121 {
00122   // Destructor
00123 
00124   if (_state==Client) {
00125     standby() ;
00126   }
00127   _sentinel.remove(*this) ;
00128 }
00129 
00130 
00131 
00132 //_____________________________________________________________________________
00133 void RooRealMPFE::initVars()
00134 {
00135   // Initialize list of variables of front-end argument 'arg'
00136 
00137   // Empty current lists
00138   _vars.removeAll() ;
00139   _saveVars.removeAll() ;
00140 
00141   // Retrieve non-constant parameters
00142   RooArgSet* vars = _arg.arg().getParameters(RooArgSet()) ;
00143   RooArgSet* ncVars = (RooArgSet*) vars->selectByAttrib("Constant",kFALSE) ;
00144   RooArgList varList(*ncVars) ;
00145 
00146   // Save in lists 
00147   _vars.add(varList) ;
00148   _saveVars.addClone(varList) ;
00149 
00150   // Force next calculation
00151   _forceCalc = kTRUE ;
00152 
00153   delete vars ;
00154   delete ncVars ;
00155 }
00156 
00157 
00158 
00159 //_____________________________________________________________________________
00160 void RooRealMPFE::initialize() 
00161 {
00162   // Initialize the remote process and message passing
00163   // pipes between current process and remote process
00164 
00165   // Trivial case: Inline mode 
00166   if (_inlineMode) {
00167     _state = Inline ;
00168     return ;
00169   }
00170 
00171 #ifndef _WIN32
00172   // Fork server process and setup IPC
00173   
00174   // Make client/server pipes
00175   UInt_t tmp1 = pipe(_pipeToClient) ;
00176   UInt_t tmp2 = pipe(_pipeToServer) ;
00177   if (tmp1 || tmp2) perror("pipe") ;
00178   
00179   // Clear eval error log prior to forking
00180   // to avoid confusions...
00181   clearEvalErrorLog() ;
00182 
00183   _pid = fork() ;
00184 
00185   if (_pid==0) {
00186 
00187     // Start server loop 
00188     _state = Server ;
00189     serverLoop() ;
00190    
00191     // Kill server at end of service
00192     coutI(Minimization) << "RooRealMPFE::initialize(" << GetName() 
00193                         << ") server process terminating" << endl ;
00194     _exit(0) ;
00195 
00196   } else if (_pid>0) {
00197  
00198     // Client process - fork successul
00199     coutI(Minimization) << "RooRealMPFE::initialize(" << GetName() 
00200                         << ") successfully forked server process " << _pid << endl ;
00201     _state = Client ;
00202     _calcInProgress = kFALSE ;
00203 
00204   } else {
00205     // Client process - fork failed    
00206     coutE(Minimization) << "RooRealMPFE::initialize(" << GetName() << ") ERROR fork() failed" << endl ; 
00207     _state = Inline ;
00208   }
00209 #endif // _WIN32
00210 }
00211 
00212 
00213 
00214 //_____________________________________________________________________________
00215 void RooRealMPFE::serverLoop() 
00216 {
00217   // Server loop of remote processes. This function will return
00218   // only when an incoming TERMINATE message is received.
00219 
00220 #ifndef _WIN32
00221   Bool_t doLoop(kTRUE) ;
00222   Message msg ;
00223 
00224   Int_t idx, index, numErrors ;
00225   Double_t value ;
00226   Bool_t isConst ;
00227   
00228   clearEvalErrorLog() ;
00229 
00230   while(doLoop) {
00231     ssize_t n = read(_pipeToServer[0],&msg,sizeof(msg)) ;
00232     if (n<0&&_verboseServer) perror("read") ;
00233 
00234     switch (msg) {
00235     case SendReal:
00236       {
00237       UInt_t tmp1 = read(_pipeToServer[0],&idx,sizeof(Int_t)) ;
00238       UInt_t tmp2 = read(_pipeToServer[0],&value,sizeof(Double_t)) ;      
00239       UInt_t tmp3 = read(_pipeToServer[0],&isConst,sizeof(Bool_t)) ;      
00240       if (tmp1+tmp2+tmp3<sizeof(Int_t)+sizeof(Double_t)+sizeof(Bool_t)) perror("read") ;
00241       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00242                                << ") IPC fromClient> SendReal [" << idx << "]=" << value << endl ;       
00243       ((RooRealVar*)_vars.at(idx))->setVal(value) ;
00244       ((RooRealVar*)_vars.at(idx))->setConstant(isConst) ;
00245       }
00246       break ;
00247 
00248     case SendCat:
00249       {
00250       UInt_t tmp1 = read(_pipeToServer[0],&idx,sizeof(Int_t)) ;
00251       UInt_t tmp2 = read(_pipeToServer[0],&index,sizeof(Int_t)) ;      
00252       if (tmp1+tmp2<sizeof(Int_t)+sizeof(Int_t)) perror("read") ;
00253       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00254                                << ") IPC fromClient> SendCat [" << idx << "]=" << index << endl ; 
00255       ((RooCategory*)_vars.at(idx))->setIndex(index) ;
00256       }
00257       break ;
00258 
00259     case Calculate:
00260       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00261                                << ") IPC fromClient> Calculate" << endl ; 
00262       _value = _arg ;
00263       break ;
00264 
00265     case Retrieve:
00266       {
00267       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00268                                << ") IPC fromClient> Retrieve" << endl ; 
00269       msg = ReturnValue ;
00270       UInt_t tmp1 = write(_pipeToClient[1],&msg,sizeof(Message)) ;
00271       UInt_t tmp2 = write(_pipeToClient[1],&_value,sizeof(Double_t)) ;
00272       numErrors = numEvalErrors() ;
00273       UInt_t tmp3 = write(_pipeToClient[1],&numErrors,sizeof(Int_t)) ;
00274       if (tmp1+tmp2+tmp3<sizeof(Message)+sizeof(Double_t)+sizeof(Int_t)) perror("write") ;
00275 
00276       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00277                                << ") IPC toClient> ReturnValue " << _value << " NumError " << numErrors << endl ; 
00278       }
00279       break ;
00280 
00281     case ConstOpt:
00282       {
00283       ConstOpCode code ;      
00284       UInt_t tmp1 = read(_pipeToServer[0],&code,sizeof(ConstOpCode)) ;
00285       if (tmp1<sizeof(ConstOpCode)) perror("read") ;
00286       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00287                                << ") IPC fromClient> ConstOpt " << code << endl ; 
00288       ((RooAbsReal&)_arg.arg()).constOptimizeTestStatistic(code) ;      
00289       break ;
00290       }
00291 
00292     case Verbose:
00293       {
00294       Bool_t flag ;
00295       UInt_t tmp1 = read(_pipeToServer[0],&flag,sizeof(Bool_t)) ;
00296       if (tmp1<sizeof(Bool_t)) perror("read") ;
00297       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00298                                << ") IPC fromClient> Verbose " << (flag?1:0) << endl ; 
00299       _verboseServer = flag ;
00300       }
00301       break ;
00302 
00303     case Terminate: 
00304       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00305                                << ") IPC fromClient> Terminate" << endl ; 
00306       doLoop = kFALSE ;
00307       break ;
00308 
00309     case LogEvalError:
00310       {
00311       RooAbsReal::ErrorLoggingMode flag2 ;
00312       UInt_t tmp1 = read(_pipeToServer[0],&flag2,sizeof(RooAbsReal::ErrorLoggingMode)) ;
00313       if (tmp1<sizeof(RooAbsReal::ErrorLoggingMode)) perror("read") ;
00314       RooAbsReal::setEvalErrorLoggingMode(flag2) ;
00315       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00316                                << ") IPC fromClient> LogEvalError flag = " << flag2 << endl ;       
00317       }
00318       break ;
00319 
00320     case RetrieveErrors:
00321 
00322       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00323                                << ") IPC fromClient> RetrieveErrors" << endl ; 
00324 
00325       // Loop over errors
00326       {
00327         std::map<const RooAbsArg*,pair<string,list<EvalError> > >::const_iterator iter = evalErrorIter() ;
00328         for (int i=0 ; i<numEvalErrorItems() ; i++) {
00329           list<EvalError>::const_iterator iter2 = iter->second.second.begin() ;
00330           for (;iter2!=iter->second.second.end();++iter2) {
00331             
00332             // Reply with SendError message
00333             msg = SendError ;
00334             UInt_t tmp1 = write(_pipeToClient[1],&msg,sizeof(Message)) ;
00335             UInt_t tmp2 = write(_pipeToClient[1],&iter->first,sizeof(RooAbsReal*)) ;
00336             
00337             UInt_t ntext = strlen(iter2->_msg) ;
00338             UInt_t tmp3 = write(_pipeToClient[1],&ntext,sizeof(Int_t)) ;
00339             UInt_t tmp4 = write(_pipeToClient[1],iter2->_msg,ntext+1) ;
00340             
00341             UInt_t ntext2 = strlen(iter2->_srvval) ;
00342             UInt_t tmp5 = write(_pipeToClient[1],&ntext2,sizeof(Int_t)) ;
00343             UInt_t tmp6 = write(_pipeToClient[1],iter2->_srvval,ntext2+1) ;
00344 
00345             // Format string with object identity as this cannot be evaluated on the other side
00346             ostringstream oss2 ;
00347             oss2 << "PID" << gSystem->GetPid() << "/" ;
00348             printStream(oss2,kName|kClassName|kArgs,kInline)  ;
00349             UInt_t ntext3 = strlen(oss2.str().c_str()) ;
00350             UInt_t tmp7 = write(_pipeToClient[1],&ntext3,sizeof(Int_t)) ;
00351             UInt_t tmp8 = write(_pipeToClient[1],oss2.str().c_str(),ntext3+1) ;
00352 
00353             if (tmp1+tmp2+tmp3+tmp4+tmp5+tmp6+tmp7+tmp8<
00354                 sizeof(Message)+sizeof(RooAbsReal*)+sizeof(Int_t)+ntext+1+sizeof(Int_t)+ntext2+1+sizeof(Int_t)+ntext3+1) 
00355               perror("write") ;
00356                     
00357             if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00358                                      << ") IPC toClient> SendError Arg " << iter->first << " Msg " << iter2->_msg << endl ; 
00359           }
00360         }  
00361         
00362         RooAbsReal* null(0) ;
00363         UInt_t tmp1 = write(_pipeToClient[1],&msg,sizeof(Message)) ;
00364         UInt_t tmp2 = write(_pipeToClient[1],&null,sizeof(RooAbsReal*)) ;
00365         if (tmp1+tmp2<sizeof(Message)+sizeof(RooAbsReal*)) perror("read") ;
00366       
00367       }
00368       // Clear error list on local side
00369       clearEvalErrorLog() ;           
00370       break ;
00371 
00372     default:
00373       if (_verboseServer) cout << "RooRealMPFE::serverLoop(" << GetName() 
00374                                << ") IPC fromClient> Unknown message (code = " << msg << ")" << endl ; 
00375       break ;
00376     }
00377   }
00378 #endif // _WIN32
00379 }
00380 
00381 
00382 
00383 //_____________________________________________________________________________
00384 void RooRealMPFE::calculate() const 
00385 {
00386   // Client-side function that instructs server process to start
00387   // asynchronuous (re)calculation of function value. This function
00388   // returns immediately. The calculated value can be retrieved
00389   // using getVal()
00390 
00391   // Start asynchronous calculation of arg value
00392   if (_state==Initialize) {
00393     const_cast<RooRealMPFE*>(this)->initialize() ;
00394   }
00395 
00396   // Inline mode -- Calculate value now
00397   if (_state==Inline) {
00398     //cout << "RooRealMPFE::calculate(" << GetName() << ") performing Inline calculation NOW" << endl ;
00399     _value = _arg ;
00400     clearValueDirty() ;
00401   }
00402 
00403 #ifndef _WIN32
00404   // Compare current value of variables with saved values and send changes to server
00405   if (_state==Client) {
00406     Int_t i ;
00407     for (i=0 ; i<_vars.getSize() ; i++) {
00408       RooAbsArg* var = _vars.at(i) ;
00409       RooAbsArg* saveVar = _saveVars.at(i) ;
00410       
00411       if (!(*var==*saveVar) || (var->isConstant() != saveVar->isConstant()) || _forceCalc) {
00412         if (_verboseClient) cout << "RooRealMPFE::calculate(" << GetName()
00413                                  << ") variable " << _vars.at(i)->GetName() << " changed" << endl ;
00414 
00415         ((RooRealVar*)saveVar)->setConstant(var->isConstant()) ;
00416         saveVar->copyCache(var) ;
00417         
00418         // send message to server
00419         if (dynamic_cast<RooAbsReal*>(var)) {
00420           Message msg = SendReal ;
00421           Double_t val = ((RooAbsReal*)var)->getVal() ;
00422           Bool_t isC = var->isConstant() ;
00423           UInt_t tmp1 = write(_pipeToServer[1],&msg,sizeof(msg)) ;
00424           UInt_t tmp2 = write(_pipeToServer[1],&i,sizeof(Int_t)) ;
00425           UInt_t tmp3 = write(_pipeToServer[1],&val,sizeof(Double_t)) ;
00426           UInt_t tmp4 = write(_pipeToServer[1],&isC,sizeof(Bool_t)) ;
00427           if (tmp1+tmp2+tmp3+tmp4<sizeof(Message)+sizeof(Int_t)+sizeof(Double_t)+sizeof(Bool_t)) perror("write") ;
00428 
00429           if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName() 
00430                                    << ") IPC toServer> SendReal [" << i << "]=" << val << (isC?" (Constant)":"") <<  endl ;
00431         } else if (dynamic_cast<RooAbsCategory*>(var)) {
00432           Message msg = SendCat ;
00433           UInt_t idx = ((RooAbsCategory*)var)->getIndex() ;
00434           UInt_t tmp5 = write(_pipeToServer[1],&msg,sizeof(msg)) ;
00435           UInt_t tmp6 = write(_pipeToServer[1],&i,sizeof(Int_t)) ;
00436           UInt_t tmp7 = write(_pipeToServer[1],&idx,sizeof(Int_t)) ;    
00437           if (tmp5+tmp6+tmp7<sizeof(Message)+sizeof(Int_t)+sizeof(Int_t)) perror("write") ;
00438           if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName() 
00439                                    << ") IPC toServer> SendCat [" << i << "]=" << idx << endl ;
00440         }
00441       }
00442     }
00443 
00444     Message msg = Calculate ;
00445     UInt_t tmp8 = write(_pipeToServer[1],&msg,sizeof(msg)) ;
00446     if (tmp8<sizeof(Message)) perror("write") ;
00447     if (_verboseServer) cout << "RooRealMPFE::calculate(" << GetName() 
00448                              << ") IPC toServer> Calculate " << endl ;
00449 
00450     // Clear dirty state and mark that calculation request was dispatched
00451     clearValueDirty() ;
00452     _calcInProgress = kTRUE ;
00453     _forceCalc = kFALSE ;
00454 
00455   } else if (_state!=Inline) {
00456     cout << "RooRealMPFE::calculate(" << GetName() 
00457          << ") ERROR not in Client or Inline mode" << endl ;
00458   }
00459 #endif // _WIN32
00460 }
00461 
00462 
00463 
00464 
00465 //_____________________________________________________________________________
00466 Double_t RooRealMPFE::getVal(const RooArgSet* /*nset*/) const 
00467 {
00468   // If value needs recalculation and calculation has not beed started
00469   // with a call to calculate() start it now. This function blocks
00470   // until remote process has finished calculation and returns
00471   // remote value
00472 
00473   if (isValueDirty()) {
00474     // Cache is dirty, no calculation has been started yet
00475     //cout << "RooRealMPFE::getVal(" << GetName() << ") cache is dirty, caling calculate and evaluate" << endl ;
00476     calculate() ;
00477     _value = evaluate() ;
00478   } else if (_calcInProgress) {
00479     //cout << "RooRealMPFE::getVal(" << GetName() << ") calculation in progress, calling evaluate" << endl ;
00480     // Cache is clean and calculation is in progress
00481     _value = evaluate() ;    
00482   } else {
00483     //cout << "RooRealMPFE::getVal(" << GetName() << ") cache is clean, doing nothing" << endl ;
00484     // Cache is clean and calculated value is in cache
00485   }
00486 
00487   return _value ;
00488 }
00489 
00490 
00491 
00492 //_____________________________________________________________________________
00493 Double_t RooRealMPFE::evaluate() const
00494 {
00495   // Send message to server process to retrieve output value
00496   // If error were logged use logEvalError() on remote side
00497   // transfer those errors to the local eval error queue.
00498 
00499   // Retrieve value of arg
00500   Double_t return_value = 0;
00501   if (_state==Inline) {
00502     return_value = _arg ; 
00503   } else if (_state==Client) {
00504 #ifndef _WIN32
00505     Message msg ;
00506     Double_t value ;
00507 
00508     // If current error loggin state is not the same as remote state
00509     // update the remote state
00510     if (evalErrorLoggingMode() != _remoteEvalErrorLoggingState) {
00511       msg = LogEvalError ;
00512       UInt_t tmp1 = write(_pipeToServer[1],&msg,sizeof(Message)) ;    
00513       RooAbsReal::ErrorLoggingMode flag = evalErrorLoggingMode() ;
00514       UInt_t tmp2 = write(_pipeToServer[1],&flag,sizeof(RooAbsReal::ErrorLoggingMode)) ;      
00515       _remoteEvalErrorLoggingState = evalErrorLoggingMode() ;
00516       if (tmp1+tmp2<sizeof(Message)+sizeof(RooAbsReal::ErrorLoggingMode)) perror("write") ;
00517     }
00518 
00519     msg = Retrieve ;
00520     UInt_t tmp1 = write(_pipeToServer[1],&msg,sizeof(Message)) ;    
00521     if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName() 
00522                              << ") IPC toServer> Retrieve " << endl ;    
00523     if (tmp1<sizeof(Message)) perror("write") ;
00524 
00525     UInt_t tmp2 = read(_pipeToClient[0],&msg,sizeof(Message)) ;
00526     if (msg!=ReturnValue) {
00527       cout << "RooRealMPFE::evaluate(" << GetName() 
00528            << ") ERROR: unexpected message from server process: " << msg << endl ;
00529       return 0 ;
00530     }
00531     UInt_t tmp3 = read(_pipeToClient[0],&value,sizeof(Double_t)) ;
00532     if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName() 
00533                              << ") IPC fromServer> ReturnValue " << value << endl ;
00534 
00535     Int_t numError ;
00536     UInt_t tmp4 = read(_pipeToClient[0],&numError,sizeof(Int_t)) ;
00537     if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName() 
00538                              << ") IPC fromServer> NumErrors " << numError << endl ;
00539     if (tmp2+tmp3+tmp4<sizeof(Message)+sizeof(Double_t)+sizeof(Int_t)) perror("read") ;
00540 
00541 
00542     // Retrieve remote errors and feed into local error queue
00543     if (numError>0) {
00544       msg=RetrieveErrors ;
00545       UInt_t tmp5 = write(_pipeToServer[1],&msg,sizeof(Message)) ;    
00546       if (tmp5<sizeof(Message)) perror("write") ;
00547       if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName() 
00548                              << ") IPC toServer> RetrieveErrors " << endl ;    
00549 
00550       while(true) {
00551         RooAbsReal* ptr(0) ;
00552         Int_t ntext1,ntext2,ntext3 ;
00553         char msgbuf1[1024] ;
00554         char msgbuf2[1024] ;
00555         char msgbuf3[1024] ;
00556         UInt_t tmp12= read(_pipeToClient[0],&msg,sizeof(Message)) ;
00557         UInt_t tmp13= read(_pipeToClient[0],&ptr,sizeof(RooAbsReal*)) ;
00558         if (tmp12+tmp13<sizeof(Message)+sizeof(RooAbsReal*)) perror("read") ;
00559         if (ptr==0) {
00560           break ;
00561         }
00562 
00563         UInt_t tmp6 = read(_pipeToClient[0],&ntext1,sizeof(Int_t)) ;
00564         if (ntext1>1023) ntext1=1023 ; if (ntext1<0) ntext1=0 ; 
00565         UInt_t tmp7 = read(_pipeToClient[0],msgbuf1,ntext1+1) ;
00566         UInt_t tmp8 = read(_pipeToClient[0],&ntext2,sizeof(Int_t)) ;
00567         if (ntext2>1023) ntext2=1023 ; if (ntext2<0) ntext2=0 ; 
00568         UInt_t tmp9 = read(_pipeToClient[0],msgbuf2,ntext2+1) ;
00569         UInt_t tmp10= read(_pipeToClient[0],&ntext3,sizeof(Int_t)) ;
00570         if (ntext3>1023) ntext3=1023 ; if (ntext3<0) ntext3=0 ; 
00571         UInt_t tmp11= read(_pipeToClient[0],msgbuf3,ntext3+1) ;
00572         if (tmp6+tmp7+tmp8+tmp9+tmp10+tmp11<sizeof(Int_t)+ntext1+1+sizeof(Int_t)+ntext2+1+sizeof(Int_t)+ntext3+1) perror("read") ;
00573         
00574         if (_verboseServer) cout << "RooRealMPFE::evaluate(" << GetName() 
00575                                  << ") IPC fromServer> SendError Arg " << ptr << " Msg " << msgbuf1 << endl ;    
00576         
00577         // WVE: null char is read from pipe  
00578         // coverity[STRING_NULL]
00579         logEvalError(ptr,msgbuf3,msgbuf1,msgbuf2) ;
00580       }
00581         
00582     }
00583 
00584     // Mark end of calculation in progress 
00585     _calcInProgress = kFALSE ;
00586     return_value = value ;
00587 #endif // _WIN32
00588   }
00589 
00590   return return_value;
00591 }
00592 
00593 
00594 
00595 //_____________________________________________________________________________
00596 void RooRealMPFE::standby()
00597 {
00598   // Terminate remote server process and return front-end class
00599   // to standby mode. Calls to calculate() or evaluate() after
00600   // this call will automatically recreated the server process.
00601 
00602 #ifndef _WIN32
00603   if (_state==Client) {
00604 
00605     // Terminate server process ;
00606     Message msg = Terminate ;
00607     UInt_t tmp1 = write(_pipeToServer[1],&msg,sizeof(msg)) ;
00608     if (tmp1<sizeof(Message)) perror("write") ;
00609     if (_verboseServer) cout << "RooRealMPFE::standby(" << GetName() 
00610                              << ") IPC toServer> Terminate " << endl ;  
00611 
00612     // Close pipes
00613     close(_pipeToClient[0]) ;
00614     close(_pipeToClient[1]) ;
00615     close(_pipeToServer[0]) ;
00616     close(_pipeToServer[1]) ;
00617 
00618     // Release resource of child process
00619     waitpid(_pid,0,0) ;
00620     
00621     // Revert to initialize state 
00622     _state = Initialize ;
00623   }
00624 #endif // _WIN32
00625 }
00626 
00627 
00628 
00629 //_____________________________________________________________________________
00630 void RooRealMPFE::constOptimizeTestStatistic(ConstOpCode opcode) 
00631 {
00632   // Intercept call to optimize constant term in test statistics
00633   // and forward it to object on server side.
00634 
00635 #ifndef _WIN32
00636   if (_state==Client) {
00637     Message msg = ConstOpt ;
00638     UInt_t tmp1 = write(_pipeToServer[1],&msg,sizeof(msg)) ;
00639     UInt_t tmp2 = write(_pipeToServer[1],&opcode,sizeof(ConstOpCode)) ;
00640     if (tmp1+tmp2<sizeof(Message)+sizeof(ConstOpCode)) perror("write") ;
00641     if (_verboseServer) cout << "RooRealMPFE::constOptimize(" << GetName() 
00642                              << ") IPC toServer> ConstOpt " << opcode << endl ;  
00643 
00644     initVars() ;
00645   }
00646 #endif // _WIN32
00647 
00648   if (_state==Inline) {
00649     ((RooAbsReal&)_arg.arg()).constOptimizeTestStatistic(opcode) ;
00650   }
00651 }
00652 
00653 
00654 
00655 //_____________________________________________________________________________
00656 void RooRealMPFE::setVerbose(Bool_t clientFlag, Bool_t serverFlag) 
00657 {
00658   // Control verbose messaging related to inter process communication
00659   // on both client and server side
00660 
00661 #ifndef _WIN32
00662   if (_state==Client) {
00663     Message msg = Verbose ;
00664     UInt_t tmp1 = write(_pipeToServer[1],&msg,sizeof(msg)) ;
00665     UInt_t tmp2 = write(_pipeToServer[1],&serverFlag,sizeof(Bool_t)) ;
00666     if (tmp1+tmp2<sizeof(Message)+sizeof(Bool_t)) perror("write") ;
00667     if (_verboseServer) cout << "RooRealMPFE::setVerbose(" << GetName() 
00668                              << ") IPC toServer> Verbose " << (serverFlag?1:0) << endl ;      
00669   }
00670 #endif // _WIN32
00671   _verboseClient = clientFlag ; _verboseServer = serverFlag ;
00672 }

Generated on Tue Jul 5 15:07:14 2011 for ROOT_528-00b_version by  doxygen 1.5.1