Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members

TGo4TaskHandler.cxx

Go to the documentation of this file.
00001 //-------------------------------------------------------------
00002 //        Go4 Release Package v3.04-01 (build 30401)
00003 //                      28-November-2008
00004 //---------------------------------------------------------------
00005 //   The GSI Online Offline Object Oriented (Go4) Project
00006 //   Experiment Data Processing at EE department, GSI
00007 //---------------------------------------------------------------
00008 //
00009 //Copyright (C) 2000- Gesellschaft f. Schwerionenforschung, GSI
00010 //                    Planckstr. 1, 64291 Darmstadt, Germany
00011 //Contact:            http://go4.gsi.de
00012 //----------------------------------------------------------------
00013 //This software can be used under the license agreements as stated
00014 //in Go4License.txt file which is part of the distribution.
00015 //----------------------------------------------------------------
00016 #include "TGo4TaskHandler.h"
00017 
00018 #include "Riostream.h"
00019 #include <stdlib.h>
00020 #include "TSystem.h"
00021 #include "snprintf.h"
00022 
00023 #include "TGo4Log.h"
00024 #include "TGo4Thread.h"
00025 #include "TGo4ThreadHandler.h"
00026 #include "TGo4Socket.h"
00027 #include "TGo4BufferQueue.h"
00028 #include "TGo4RuntimeException.h"
00029 #include "TGo4DataRunnable.h"
00030 #include "TGo4StatusRunnable.h"
00031 #include "TGo4CommandRunnable.h"
00032 #include "TGo4ServerTask.h"
00033 #include "TGo4TaskHandlerStatus.h"
00034 
00035 const UInt_t TGo4TaskHandler::fguCONNECTORPORT=5000; // port no of default connector 9229
00036                                                      // 5000 is the first number of ROOT portscan
00037 const UInt_t TGo4TaskHandler::fguTRANSPORTCHECKDELAY=5000; // delay in ms for transport init check
00038 
00039 const UInt_t TGo4TaskHandler::fguSTATUSQUEUESIZE=1000;
00040 const UInt_t TGo4TaskHandler::fguDATAQUEUESIZE=1000;
00041 const UInt_t TGo4TaskHandler::fguCOMMANDQUEUESIZE=1000;
00042 
00043 
00044 const Int_t TGo4TaskHandler::fgiPORTWAITCYCLES=150;//60
00045 
00046 const UInt_t TGo4TaskHandler::fguPORTWAITTIME=200;//500
00047 
00048 const Int_t TGo4TaskHandler::fgiTHREADSTOPCYCLES=6;
00049 
00050 const UInt_t TGo4TaskHandler::fguTHREADSTOPTIME=500;
00051 
00052 const Text_t TGo4TaskHandler::fgcCONNECT[]="CONNECT-VERSION-300";
00053 const Text_t TGo4TaskHandler::fgcDISCONNECT[]="DISCONNECT-VERSION-300";
00054 
00055 const char* TGo4TaskHandler::fgcOK = "OK-VERSION-300";
00056 const char* TGo4TaskHandler::fgcERROR = "ERROR-VERSION-300";
00057 
00058 const Text_t TGo4TaskHandler::fgcMASTER[]="Master-VERSION-300";
00059 const Text_t TGo4TaskHandler::fgcSLAVE[]="Slave-VERSION-300";
00060 
00061 const Text_t TGo4TaskHandler::fgcCOMMANDTHREAD[]="COMMAND-";
00062 const Text_t TGo4TaskHandler::fgcSTATUSTHREAD[]="STATUS-";
00063 const Text_t TGo4TaskHandler::fgcDATATHREAD[]="DATA-";
00064 
00065 TNamed TGo4TaskHandler::fgxOBSERVERACCOUNT("observer","go4view");
00066 TNamed TGo4TaskHandler::fgxCONTROLLERACCOUNT("controller","go4ctrl");
00067 TNamed TGo4TaskHandler::fgxADMINISTRATORACCOUNT("admin","go4super");
00068 
00069 TGo4TaskHandler::TGo4TaskHandler(const char* name, TGo4ThreadManager* threadmanager, Bool_t clientmode, Bool_t mastermode,UInt_t negotiationport)
00070    :TNamed(name,"This is a Go4 Task Handler"),
00071    fbIsAborting(kFALSE), fiComPort(0),fiStatPort(0),fiDatPort(0),fiRole(kGo4ComModeController)
00072 {
00073    fbClientMode=clientmode;
00074    fbMasterMode=mastermode;
00075    if(threadmanager==0)
00076       {
00077          // error
00078          TGo4Log::Debug(" TaskHandler -- constructor error, unspecified ThreadManager: aborting ");
00079          //throw TGo4RuntimeException();
00080       }
00081    else
00082       {
00083          // everything o.k.
00084       }
00085 
00086    // set port number for the client server negotiation channel:
00087    if(negotiationport==0)
00088       {
00089          // default: use taskhandler intrinsic port number
00090          fuNegPort=TGo4TaskHandler::fguCONNECTORPORT;
00091       }
00092    else
00093       {
00094          // use dynamic port number given by taskhandler owner
00095          fuNegPort=negotiationport;
00096       }
00097 
00098    fxThreadManager=threadmanager;
00099    fxThreadHandler=fxThreadManager->GetWorkHandler();
00100    TString namebuffer;
00101    fxInvoker=0;
00102    fxCommandQueue= new TGo4BufferQueue("Command"); // receiv commands
00103    fxStatusQueue= new TGo4BufferQueue("Status");   // send status buffer
00104    fxDataQueue= new TGo4BufferQueue("Data");         // send data
00105 
00106    fxCommandTransport=new TGo4Socket(IsClientMode());
00107    fxStatusTransport=new TGo4Socket(IsClientMode());
00108    fxDataTransport=new TGo4Socket(IsClientMode());
00109    namebuffer.Form("CommandRunnable of %s",GetName());
00110    // command runnable receivermode: receiving as slave and sending as master
00111    fxCommandRun=new TGo4CommandRunnable(namebuffer.Data(), fxThreadManager, this, !IsMasterMode());
00112 
00113    namebuffer.Form("StatusRunnable of %s",GetName());
00114    // status runnable receivermode: sending as slave and receiving as master
00115    fxStatusRun=new TGo4StatusRunnable(namebuffer.Data(), fxThreadManager, this, IsMasterMode());
00116 
00117    namebuffer.Form("DataRunnable of %s",GetName());
00118    // data runnable receivermode: sending as slave and receiving as master
00119    fxDataRun=new TGo4DataRunnable(namebuffer.Data(), fxThreadManager, this, IsMasterMode());
00120 
00121   // adding runnables to thread handler who takes over the responsibility...:
00122    namebuffer.Form("%s%s",fgcCOMMANDTHREAD,GetName());
00123    fxComName=namebuffer;
00124    fxThreadHandler->NewThread(GetComName(), fxCommandRun);
00125    namebuffer.Form("%s%s",fgcSTATUSTHREAD,GetName());
00126    fxStatName=namebuffer;
00127    fxThreadHandler->NewThread(GetStatName(),fxStatusRun);
00128    namebuffer.Form("%s%s",fgcDATATHREAD,GetName());
00129    fxDatName=namebuffer;
00130    fxThreadHandler->NewThread(GetDatName(),fxDataRun);
00131    if(IsClientMode())
00132       TGo4Log::Debug(" New TaskHandler %s in client mode ",GetName());
00133    else
00134       TGo4Log::Debug(" New TaskHandler %s in server mode ",GetName());
00135 
00136  // adjust queue size to our wishes
00137    fxCommandQueue->SetMaxEntries(TGo4TaskHandler::fguCOMMANDQUEUESIZE);
00138    fxDataQueue->SetMaxEntries(TGo4TaskHandler::fguDATAQUEUESIZE);
00139    fxStatusQueue->SetMaxEntries(TGo4TaskHandler::fguSTATUSQUEUESIZE);
00140 
00141 
00142 }
00143 
00144 TGo4TaskHandler::~TGo4TaskHandler()
00145 {
00146    fxThreadHandler->RemoveThread(GetComName());
00147    fxThreadHandler->RemoveThread(GetDatName());
00148    fxThreadHandler->RemoveThread(GetStatName());
00149    delete fxCommandTransport;
00150    delete fxStatusTransport;
00151    delete fxDataTransport;
00152    delete fxCommandQueue;
00153    delete fxStatusQueue;
00154    delete fxDataQueue;
00155 }
00156 
00157 TGo4Socket* TGo4TaskHandler::ServerRequest(const char* host)
00158    {
00159    if(fbClientMode)
00160       {
00161       // we are client and want access to the server task (connector runnable)
00162          TGo4Socket* connector=new TGo4Socket(kTRUE); // raw socket transport
00163          connector->Open(host,fuNegPort); // open connection to server's connector runnable
00164          if(ServerLogin(connector, GetRole()))
00165             {
00166                // client and server know each other- we continue
00167                TString myname=fxThreadManager->GetName();
00168                //cout <<"ServerRequest sends name "<<myname.Data() << endl;
00169                connector->Send(myname.Data()); // tell server the client name
00170                connector->Send(gSystem->HostName()); // tell server our machine hostname
00171                return connector;
00172             } //if(!strcmp(localbuffer,fgcOK))
00173          else
00174             {
00175                // error: client does not match to server-- connect failed
00176                connector->Send(Get_fgcERROR()); // send dummy strings, server will come out of receive
00177                connector->Send(Get_fgcERROR()); // might check the errortext at server later
00178 #ifdef WIN32
00179                gSystem->Sleep(1000);
00180 #endif
00181                connector->Close();
00182                delete connector;
00183                TGo4Log::Debug(" TaskHandler %s server connection ERROR ",GetName());
00184                return 0;
00185 
00186             } // else if(!strcmp(localbuffer,fgcOK))
00187          return 0;
00188 
00189       } //if(fbClientMode)
00190 
00191    else
00192       {
00193          // we _are_ a server task handler , shall not request to our own Server task
00194          return 0;
00195       }
00196    }
00197 
00198 
00199 Bool_t TGo4TaskHandler::Connect(const char* host, TGo4Socket* connector)
00200 // establish connection of all three channels
00201 {
00202    TGo4Log::Debug(" TaskHandler %s connecting to host %s ...",GetName(),host);
00203    Text_t* recvchar;
00204    if(fbClientMode)
00205       {
00207       SetAborting(kFALSE); // reset if we reconnect after exception disconnect
00208       fxHostName=host;// remember hostname for later DisConnect
00209       if(connector==0)
00210          {
00211             // normal mode for client: we establish negotiation connection first
00212             connector=ServerRequest(host); // get negotiation channel from server
00213          }
00214       if(connector)
00215          {
00216             // request was successful, we keep talking:
00217             connector->Send(fgcCONNECT); // tell server we want to connect
00218             recvchar=connector->RecvRaw("dummy");
00219             if(recvchar==0)
00220                {
00221                   TGo4Log::Debug(" TaskHandler %s; Error on server connection, abortin... ",GetName());
00222                   connector->Close();
00223                   throw TGo4RuntimeException();
00224                }
00225             if(!strcmp(recvchar,Get_fgcERROR()))
00226                {
00227                   // server refuses to connect us, we abort
00228                   TGo4Log::Debug(" TaskHandler %s; Server refuses Connection",GetName());
00229                   connector->Send(fgcOK); // tell server we are through
00230 #ifdef WIN32
00231                   gSystem->Sleep(1000);
00232 #endif
00233                   connector->Close();
00234                   throw TGo4RuntimeException();
00235                }
00236             if(!ConnectClientChannel("Command",connector,fxCommandTransport,host))
00237                {
00238                   TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Command Channel",GetName());
00239                   throw TGo4RuntimeException();
00240                }
00241            if(!ConnectClientChannel("Status",connector,fxStatusTransport,host))
00242                {
00243                   TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Status Channel",GetName());
00244                   throw TGo4RuntimeException();
00245                }
00246            if(!ConnectClientChannel("Data",connector,fxDataTransport,host))
00247                {
00248                   TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Data Channel",GetName());
00249                   throw TGo4RuntimeException();
00250                }
00251             connector->Send(fgcOK); // tell server we finish negotiation
00252 #ifdef WIN32
00253             gSystem->Sleep(1000);
00254 #endif
00255             connector->Close();
00256             TGo4Log::Debug(" TaskHandler %s closed negotiation connection ",GetName());
00257             delete connector;
00258          }
00259       else
00260          {
00261          // something failed
00262             TGo4Log::Debug(" TaskHandler %s server connection ERROR ",GetName());
00263             return kFALSE;
00264          } // if(connector)
00265       } //if(fbClientMode)
00266    else
00267       {
00269       const Text_t* client=GetName();
00270       if(connector==0) return kFALSE;
00271       connector->Send(TGo4TaskHandler::fgcOK);
00272             // first ok to initialize client, fgcERROR would abort client
00273       if (!ConnectServerChannel("Command",connector, fxCommandTransport, host))
00274          {
00275             TGo4Log::Debug(" TaskHandler: Command channel connect ERROR for client %s ",client);
00276             return kFALSE;
00277          }
00278       if (!ConnectServerChannel("Status",connector, fxStatusTransport, host))
00279          {
00280             TGo4Log::Debug(" TaskManager: Status channel connect ERROR for client %s ",client);
00281             return kFALSE;
00282          }
00283       if (!ConnectServerChannel("Data",connector, fxDataTransport, host))
00284          {
00285             TGo4Log::Debug(" TaskManager: Data channel connect ERROR for client %s ",client);
00286             return kFALSE;
00287          }
00288       } //if(fbClientMode)
00289 
00290    fiComPort=WaitGetPort(fxCommandTransport); // set port numbers for runnables
00291    fiStatPort=WaitGetPort(fxStatusTransport);
00292    fiDatPort=WaitGetPort(fxDataTransport);
00293    StartTransportThreads();
00294 
00295    return kTRUE;
00296 }
00297 
00298 Bool_t TGo4TaskHandler::ServerLogin(TGo4Socket* connector, Go4CommandMode_t account)
00299 {
00300 if(connector==0) return kFALSE;
00301 //cout <<"ServerLogin with mode "<<account << endl;
00302 //cout <<"observer account is "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle() << endl;
00303 //cout <<"controller account is "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle() << endl;
00304 //cout <<"admin account is "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle() << endl;
00305 
00306 connector->Send(fgcOK); // tell server that we are a valid client
00307 
00308 // tell server if we are master or slave:
00309 if(fbMasterMode)
00310    connector->Send(fgcMASTER);
00311 else
00312    connector->Send(fgcSLAVE);
00313 
00314 // now send accountname and password:
00315 switch (account)
00316 {
00317    case kGo4ComModeObserver:
00318       connector->Send(fgxOBSERVERACCOUNT.GetName());
00319       connector->Send(fgxOBSERVERACCOUNT.GetTitle());
00320       break;
00321 
00322    case kGo4ComModeController:
00323       connector->Send(fgxCONTROLLERACCOUNT.GetName());
00324       connector->Send(fgxCONTROLLERACCOUNT.GetTitle());
00325       break;
00326 
00327    case kGo4ComModeAdministrator:
00328       connector->Send(fgxADMINISTRATORACCOUNT.GetName());
00329       connector->Send(fgxADMINISTRATORACCOUNT.GetTitle());
00330       break;
00331 
00332    default:
00333       connector->Send(Get_fgcERROR());
00334       connector->Send(Get_fgcERROR());
00335       break;
00336 }
00337 
00338 char * recvchar=connector->RecvRaw("dummy");// handshake back if it is ok
00339 if(recvchar && !strcmp(recvchar,fgcOK)) return kTRUE;
00340 return kFALSE;
00341 }
00342 
00343 Bool_t TGo4TaskHandler::DisConnect(Bool_t waitforclient)
00344 {
00345    TGo4Log::Debug(" TaskHandler %s disconnecting ",GetName());
00346    //TGo4Task* task=dynamic_cast<TGo4Task*>(fxThreadManager);
00347    if(fbClientMode)
00348       {
00349       if(!IsAborting())
00350          {
00351          // normal DisConnect mode:
00352          // we are client, have to tell server to let us go...
00353          TGo4Socket* connector=ServerRequest(GetHostName()); // get negotiation channel from server
00354          if(connector)
00355                {
00356                    // request was successful, we keep talking:
00357 //                  task->StopWorkThreads();
00358 //                  if(task->IsMaster())
00359 //                       fxCommandQueue->Clear();// only clear command queue on master side,
00360 //                                              // otherwise we lose status messages from server
00361 //                  task->WakeCommandQueue(TGo4Task::Get_fgiTERMID()); // will stop local command thread, and remote
00362 //                  task->SendStopBuffers(); // only stop remote threads if login was successful!
00364                   connector->Send(fgcDISCONNECT); // tell server we want to disconnect
00365                   StopTransportThreads(kTRUE);// wait until threads are really stopped
00366                   //cout <<"TASKHANDLER DISCONNECT closing the transports now.... " <<endl;
00367                   CloseChannels();
00368                   connector->Send(fgcOK); // tell server we finished transports
00369                      // server will close its transport sockets after this
00370 
00371                   connector->Send(fgcOK); // second ok to let server shutdown connector
00372 #ifdef WIN32
00373                   gSystem->Sleep(1000);
00374 #endif
00375                   connector->Close();
00376                   delete connector;
00377                } 
00378             else
00379                {
00380                // something failed
00381                   TGo4Log::Debug(" TaskHandler %s server disconnect login ERROR - Trying Fast DisConnect... ",GetName());
00382                   StopTransportThreads(kFALSE);
00383                   CloseChannels("force");
00384                   //return kFALSE;
00385                }
00386          }
00387       else // if(!IsAborting())
00388          {
00389             // DisConnect after exception, fast Close without negotiations
00390             TGo4Log::Debug(" Client Aborting mode: Fast DisConnect... ",GetName());
00391             StopTransportThreads(kFALSE);
00392             CloseChannels("force");
00393          }
00394 
00395 
00396       } 
00397    else
00398       {
00399          StopTransportThreads(waitforclient);// wait until threads are really stopped
00400          CloseChannels();
00401       }
00402    return kTRUE;
00403 }
00404 
00405 void TGo4TaskHandler::CloseChannels(Option_t* opt)
00406 {
00407 //TString option=opt;
00408 //if(option=="force")
00409 //   {
00410 //   cout <<"sSSSSSSSSSSSS CloseChannels sending abort buffer" << endl;
00411 //   TGo4Task* task=dynamic_cast<TGo4Task*>(fxThreadManager);
00412 //   // provoke socket exception on receiver channels:
00413 //   if(fbMasterMode)
00414 //      {
00415 //         fxCommandTransport->SendBuffer(task->GetAbortBuffer());
00416 //      }
00417 //   else
00418 //      {
00419 //         fxDataTransport->SendBuffer(task->GetAbortBuffer());
00420 //         fxStatusTransport->SendBuffer(task->GetAbortBuffer());
00421 //
00422 //      }
00423 //   }
00424 fxDataTransport->Close(opt);
00425 fxCommandTransport->Close(opt);
00426 fxStatusTransport->Close(opt);
00427 ClearQueues();
00428 
00429 }
00430 
00431 void TGo4TaskHandler::ClearQueues()
00432 {
00433 fxDataQueue->Clear();
00434 fxCommandQueue->Clear();
00435 fxStatusQueue->Clear();
00436 
00437 }
00438 
00439 TGo4TaskHandlerStatus * TGo4TaskHandler::CreateStatus()
00440 {
00441    TGo4TaskHandlerStatus* state= new TGo4TaskHandlerStatus(GetName());
00442    // we are friend of our status class, may use private setters:
00443    state->SetFlags(fbIsAborting);
00444    state->SetPorts(fuNegPort, fiComPort, fiStatPort, fiDatPort);
00445    state->SetNames(GetComName(),GetStatName(),GetDatName(),GetHostName());
00446    return state;
00447 }
00448 
00449 
00450 
00451 Bool_t TGo4TaskHandler::ConnectServerChannel(const char* name, TGo4Socket* negotiator, TGo4Socket* channel, const char* host)
00452 {
00453 Text_t* revchar=0;
00454 Int_t waitresult=0;
00455 UInt_t port=0;
00456 TGo4ServerTask* server=dynamic_cast<TGo4ServerTask*>(fxThreadManager);
00457 if(server==0)
00458    {
00459      TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no server task ",name);
00460      return kFALSE;
00461    }
00462 if(negotiator==0 || !negotiator->IsOpen())
00463    {
00464      TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no negotiation channel ",name);
00465      return kFALSE;
00466    }
00467 if(channel==0)
00468    {
00469      TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no TGo4Socket instance ",name);
00470      return kFALSE;
00471    }
00472 const Text_t* client=GetName(); // taskhandler name is client name
00473 // in server mode, we connect by the connector thread:
00474 // need timer mechanism for proper registration of ROOT sockets (timer is main thread)
00475 // only root sockets connected in main application thread will be cleaned up
00476 server->SetConnect(channel, host,0,kTRUE);
00477    // tell the ServerTask timer we want to connect; portscan. we keep server socket open for windows
00478 waitresult=server->WaitForOpen(); // wait for the server Open() call by timer
00479 if(waitresult<0)
00480    {
00481       // open timeout
00482       TGo4Log::Debug(" TaskHandler: Channel %s open TIMEOUT for client %s ",name, client);
00483       return kFALSE;
00484    }
00485 else
00486    {
00487       // ok, proceed
00488    }
00489 port=WaitGetPort(channel);
00490 if (port<0)
00491    {
00492       TGo4Log::Debug(" TaskHandler: Channel %s getport TIMEOUT for client %s ",name, client);
00493       return kFALSE;
00494    }
00495 else {}
00496 negotiator->Send(TGo4TaskHandler::fgcOK); // tell client we are ready to connect
00497 TString localbuffer;
00498 localbuffer.Form("%d",port);
00499 negotiator->Send(localbuffer.Data()); // tell client the port number;
00500 //cout <<"------- ConnectServerChannel offers portnumber "<< localbuffer<< " for Channel "<< name << endl;
00501 revchar=negotiator->RecvRaw("dummy"); // wait for client connection ok
00502 if(revchar && !strcmp(revchar,TGo4TaskHandler::fgcOK))
00503    {
00504       // o.k., client tells us connection is open, continue
00505    }
00506 else
00507    {
00508       // something went wrong, no ok
00509       TGo4Log::Debug(" TaskHandler: Negotiation ERROR after Channel %s open for client %s ",
00510                name, client);
00511       return kFALSE;
00512       //throw TGo4RuntimeException();
00513    }
00514 
00515 waitresult=server->WaitForConnection(); // we also check ourselves if timer has returned from server open
00516 if(waitresult<0)
00517    {
00518       // connect timeout
00519       TGo4Log::Debug(" TaskHandler: Channel %s connect TIMEOUT for client %s ", name, client);
00520       return kFALSE;
00521    }
00522 else
00523    {
00524       // ok, proceed
00525    }
00526 TGo4Log::Debug(" TaskHandler: Channel %s for client %s open!",name, client);
00527 return kTRUE;
00528 }
00529 
00530 Bool_t TGo4TaskHandler::ConnectClientChannel(const char* name, TGo4Socket * negotiator, TGo4Socket * channel, const char* host)
00531 {
00532 //
00533 Text_t* recvchar=0;
00534 Int_t port=0;
00535 if(negotiator==0 || !negotiator->IsOpen())
00536    {
00537      TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no negotiation channel ",name);
00538      return kFALSE;
00539    }
00540 if(channel==0)
00541    {
00542      TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no TGo4Socket instance ",name);
00543      return kFALSE;
00544    }
00545 
00546 recvchar=negotiator->RecvRaw("dummy");// get OK from server to connect first channel
00547 if(recvchar && !strcmp(recvchar,fgcOK))
00548    {
00549       // get portnumber  from server:
00550       recvchar=negotiator->RecvRaw("dummy");
00551       TString localbuffer = recvchar;
00552       port=atoi(localbuffer.Data());
00553 //      cout <<"------- TaskHandler::Connect client tries port  "<< port << "for Channel "<< name << endl;
00554       channel->Open(host,port);  // in client mode, we connect directly (main thread!)
00555       TGo4Log::Debug(" TaskHandler %s: Channel %s open!",GetName(), name );
00556       negotiator->Send(fgcOK); // tell server that open is ready
00557       return kTRUE;
00558    }
00559 else
00560    {
00561       TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Channel %s ",
00562                GetName(), name);
00563       return kFALSE;
00564       //throw TGo4RuntimeException();
00565    } // if(!strcmp(recvchar,fgcOK))
00566 }
00567 Int_t TGo4TaskHandler::WaitGetPort(TGo4Socket* sock)
00568 
00569 {
00570    Int_t count=0;
00571    Int_t port=0;
00572    while(port==0)
00573    {
00574       port=sock->GetPort(); // get dynamically bound port number of server socket
00575 //      cout <<"------- WaitGetPort has next portnumber "<< port << endl;
00576       if(count>fgiPORTWAITCYCLES)
00577             {
00578                return -1;
00579             }
00580       else if(fxThreadManager->IsTerminating())
00581             {
00582                return -2;
00583             }
00584       else
00585             {
00586               TGo4Thread::Sleep(fguPORTWAITTIME);
00587               ++count;
00588             }
00589    }
00590    return port;
00591 
00592 }
00593 
00594 void TGo4TaskHandler::StartTransportThreads()
00595 {
00596 fxThreadHandler->Start(GetComName());         // start runnables
00597 fxThreadHandler->Start(GetStatName());
00598 fxThreadHandler->Start(GetDatName());
00599 }
00600 
00601 Bool_t TGo4TaskHandler::StopTransportThreads(Bool_t wait)
00602 {
00603 Bool_t rev=kTRUE;
00604 fxThreadHandler->Stop(GetComName());
00605 if(IsMasterMode())
00606    {
00607    TGo4BufferQueue* comq= dynamic_cast<TGo4BufferQueue*>(GetCommandQueue());
00608    if(comq)
00609       {
00610          //cout <<"SSSSSStopTransportThreads Waking command queue" << endl;
00611          comq->Wake();
00612       }
00613    }
00614 fxThreadHandler->Stop(GetStatName());
00615 fxThreadHandler->Stop(GetDatName());
00616 if(wait)
00617    {
00618          rev&=WaitThreadStop(GetComName());
00619          rev&=WaitThreadStop(GetStatName());
00620          rev&=WaitThreadStop(GetDatName());
00621    }
00622 return rev;
00623 }
00624 
00625 Bool_t TGo4TaskHandler::WaitThreadStop(const char* name)
00626 {
00627 if(name==0) return kFALSE;
00628 TGo4Thread* thread=fxThreadHandler->GetThread(name);
00629 if(thread==0) return kFALSE;
00630 Int_t t=0;
00631 Bool_t timeout=kFALSE;
00632 while(!thread->IsWaiting())
00633    {
00634       TGo4Log::Debug(" TaskHandler Disconnect --  waiting for runnable %s to stop... ",name);
00635       TGo4Thread::Sleep(TGo4TaskHandler::fguTHREADSTOPTIME);
00636       if((t++>=TGo4TaskHandler::fgiTHREADSTOPCYCLES))
00637          {
00638             timeout=kTRUE;
00639             break;
00640          }
00641    }
00642 return (!timeout);
00643 }
00644 
00645 void TGo4TaskHandler::SetAdminAccount(const char* name, const char* passwd)
00646 {
00647 if(name) fgxADMINISTRATORACCOUNT.SetName(name);
00648 if(passwd) fgxADMINISTRATORACCOUNT.SetTitle(passwd);
00649 }
00650 
00651 void TGo4TaskHandler::SetCtrlAccount(const char* name, const char* passwd)
00652 {
00653 if(name) fgxCONTROLLERACCOUNT.SetName(name);
00654 if(passwd) fgxCONTROLLERACCOUNT.SetTitle(passwd);
00655 }
00656 
00657 void TGo4TaskHandler::SetObservAccount(const char* name, const char* passwd)
00658 {
00659 if(name) fgxOBSERVERACCOUNT.SetName(name);
00660 if(passwd) fgxOBSERVERACCOUNT.SetTitle(passwd);
00661 }
00662 
00663 const char* TGo4TaskHandler::Get_fgcOK()
00664 {
00665    return fgcOK;
00666 }
00667 
00668 const char* TGo4TaskHandler::Get_fgcERROR()
00669 {
00670    return fgcERROR;
00671 }
00672 
00673 UInt_t TGo4TaskHandler::Get_fguPORTWAITTIME()
00674 {
00675    return fguPORTWAITTIME;
00676 }
00677 
00678 Int_t TGo4TaskHandler::Get_fgiPORTWAITCYCLES()
00679 {
00680    return fgiPORTWAITCYCLES;
00681 }
00682 
00683 //----------------------------END OF GO4 SOURCE FILE ---------------------

Generated on Fri Nov 28 12:59:30 2008 for Go4-v3.04-1 by  doxygen 1.4.2