00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4TaskHandler.h"
00015
00016 #include "Riostream.h"
00017 #include <stdlib.h>
00018 #include "TSystem.h"
00019
00020 #include "TGo4Log.h"
00021 #include "TGo4Thread.h"
00022 #include "TGo4ThreadHandler.h"
00023 #include "TGo4Socket.h"
00024 #include "TGo4BufferQueue.h"
00025 #include "TGo4RuntimeException.h"
00026 #include "TGo4DataRunnable.h"
00027 #include "TGo4StatusRunnable.h"
00028 #include "TGo4CommandRunnable.h"
00029 #include "TGo4ServerTask.h"
00030 #include "TGo4TaskHandlerStatus.h"
00031
00032 const UInt_t TGo4TaskHandler::fguCONNECTORPORT=5000;
00033
00034 const UInt_t TGo4TaskHandler::fguTRANSPORTCHECKDELAY=5000;
00035
00036 const UInt_t TGo4TaskHandler::fguSTATUSQUEUESIZE=1000;
00037 const UInt_t TGo4TaskHandler::fguDATAQUEUESIZE=1000;
00038 const UInt_t TGo4TaskHandler::fguCOMMANDQUEUESIZE=1000;
00039
00040
00041 const Int_t TGo4TaskHandler::fgiPORTWAITCYCLES=150;
00042
00043 const UInt_t TGo4TaskHandler::fguPORTWAITTIME=200;
00044
00045 const Int_t TGo4TaskHandler::fgiTHREADSTOPCYCLES=6;
00046
00047 const UInt_t TGo4TaskHandler::fguTHREADSTOPTIME=500;
00048
00049 const char* TGo4TaskHandler::fgcCONNECT="CONNECT-VERSION-300";
00050 const char* TGo4TaskHandler::fgcDISCONNECT="DISCONNECT-VERSION-300";
00051
00052 const char* TGo4TaskHandler::fgcOK = "OK-VERSION-300";
00053 const char* TGo4TaskHandler::fgcERROR = "ERROR-VERSION-300";
00054
00055 const char* TGo4TaskHandler::fgcMASTER="Master-VERSION-300";
00056 const char* TGo4TaskHandler::fgcSLAVE="Slave-VERSION-300";
00057
00058 const char* TGo4TaskHandler::fgcCOMMANDTHREAD="COMMAND-";
00059 const char* TGo4TaskHandler::fgcSTATUSTHREAD="STATUS-";
00060 const char* TGo4TaskHandler::fgcDATATHREAD="DATA-";
00061
00062 TNamed TGo4TaskHandler::fgxOBSERVERACCOUNT("observer","go4view");
00063 TNamed TGo4TaskHandler::fgxCONTROLLERACCOUNT("controller","go4ctrl");
00064 TNamed TGo4TaskHandler::fgxADMINISTRATORACCOUNT("admin","go4super");
00065
00066 TGo4TaskHandler::TGo4TaskHandler(const char* name, TGo4ThreadManager* threadmanager, Bool_t clientmode, Bool_t mastermode,UInt_t negotiationport)
00067 :TNamed(name,"This is a Go4 Task Handler"),
00068 fbIsAborting(kFALSE), fiComPort(0),fiStatPort(0),fiDatPort(0),fiRole(kGo4ComModeController)
00069 {
00070 fbClientMode=clientmode;
00071 fbMasterMode=mastermode;
00072 if(threadmanager==0)
00073 {
00074
00075 TGo4Log::Debug(" TaskHandler -- constructor error, unspecified ThreadManager: aborting ");
00076
00077 }
00078 else
00079 {
00080
00081 }
00082
00083
00084 if(negotiationport==0)
00085 {
00086
00087 fuNegPort=TGo4TaskHandler::fguCONNECTORPORT;
00088 }
00089 else
00090 {
00091
00092 fuNegPort=negotiationport;
00093 }
00094
00095 fxThreadManager=threadmanager;
00096 fxThreadHandler=fxThreadManager->GetWorkHandler();
00097 TString namebuffer;
00098 fxInvoker=0;
00099 fxCommandQueue= new TGo4BufferQueue("Command");
00100 fxStatusQueue= new TGo4BufferQueue("Status");
00101 fxDataQueue= new TGo4BufferQueue("Data");
00102
00103 fxCommandTransport=new TGo4Socket(IsClientMode());
00104 fxStatusTransport=new TGo4Socket(IsClientMode());
00105 fxDataTransport=new TGo4Socket(IsClientMode());
00106 namebuffer.Form("CommandRunnable of %s",GetName());
00107
00108 fxCommandRun=new TGo4CommandRunnable(namebuffer.Data(), fxThreadManager, this, !IsMasterMode());
00109
00110 namebuffer.Form("StatusRunnable of %s",GetName());
00111
00112 fxStatusRun=new TGo4StatusRunnable(namebuffer.Data(), fxThreadManager, this, IsMasterMode());
00113
00114 namebuffer.Form("DataRunnable of %s",GetName());
00115
00116 fxDataRun=new TGo4DataRunnable(namebuffer.Data(), fxThreadManager, this, IsMasterMode());
00117
00118
00119 namebuffer.Form("%s%s",fgcCOMMANDTHREAD,GetName());
00120 fxComName=namebuffer;
00121 fxThreadHandler->NewThread(GetComName(), fxCommandRun);
00122 namebuffer.Form("%s%s",fgcSTATUSTHREAD,GetName());
00123 fxStatName=namebuffer;
00124 fxThreadHandler->NewThread(GetStatName(),fxStatusRun);
00125 namebuffer.Form("%s%s",fgcDATATHREAD,GetName());
00126 fxDatName=namebuffer;
00127 fxThreadHandler->NewThread(GetDatName(),fxDataRun);
00128 if(IsClientMode())
00129 TGo4Log::Debug(" New TaskHandler %s in client mode ",GetName());
00130 else
00131 TGo4Log::Debug(" New TaskHandler %s in server mode ",GetName());
00132
00133
00134 fxCommandQueue->SetMaxEntries(TGo4TaskHandler::fguCOMMANDQUEUESIZE);
00135 fxDataQueue->SetMaxEntries(TGo4TaskHandler::fguDATAQUEUESIZE);
00136 fxStatusQueue->SetMaxEntries(TGo4TaskHandler::fguSTATUSQUEUESIZE);
00137
00138
00139 }
00140
00141 TGo4TaskHandler::~TGo4TaskHandler()
00142 {
00143 fxThreadHandler->RemoveThread(GetComName());
00144 fxThreadHandler->RemoveThread(GetDatName());
00145 fxThreadHandler->RemoveThread(GetStatName());
00146 delete fxCommandTransport;
00147 delete fxStatusTransport;
00148 delete fxDataTransport;
00149 delete fxCommandQueue;
00150 delete fxStatusQueue;
00151 delete fxDataQueue;
00152 }
00153
00154 TGo4Socket* TGo4TaskHandler::ServerRequest(const char* host)
00155 {
00156 if(fbClientMode)
00157 {
00158
00159 TGo4Socket* connector=new TGo4Socket(kTRUE);
00160 connector->Open(host,fuNegPort);
00161 if(ServerLogin(connector, GetRole()))
00162 {
00163
00164 TString myname=fxThreadManager->GetName();
00165
00166 connector->Send(myname.Data());
00167 connector->Send(gSystem->HostName());
00168 return connector;
00169 }
00170 else
00171 {
00172
00173 connector->Send(Get_fgcERROR());
00174 connector->Send(Get_fgcERROR());
00175 #ifdef WIN32
00176 gSystem->Sleep(1000);
00177 #endif
00178 connector->Close();
00179 delete connector;
00180 TGo4Log::Debug(" TaskHandler %s server connection ERROR ",GetName());
00181 return 0;
00182
00183 }
00184 return 0;
00185
00186 }
00187
00188 else
00189 {
00190
00191 return 0;
00192 }
00193 }
00194
00195
00196 Bool_t TGo4TaskHandler::Connect(const char* host, TGo4Socket* connector)
00197
00198 {
00199 TGo4Log::Debug(" TaskHandler %s connecting to host %s ...",GetName(),host);
00200 char* recvchar;
00201 if(fbClientMode)
00202 {
00204 SetAborting(kFALSE);
00205 fxHostName=host;
00206 if(connector==0)
00207 {
00208
00209 connector=ServerRequest(host);
00210 }
00211 if(connector)
00212 {
00213
00214 connector->Send(fgcCONNECT);
00215 recvchar=connector->RecvRaw("dummy");
00216 if(recvchar==0)
00217 {
00218 TGo4Log::Debug(" TaskHandler %s; Error on server connection, abortin... ",GetName());
00219 connector->Close();
00220 throw TGo4RuntimeException();
00221 }
00222 if(!strcmp(recvchar,Get_fgcERROR()))
00223 {
00224
00225 TGo4Log::Debug(" TaskHandler %s; Server refuses Connection",GetName());
00226 connector->Send(fgcOK);
00227 #ifdef WIN32
00228 gSystem->Sleep(1000);
00229 #endif
00230 connector->Close();
00231 throw TGo4RuntimeException();
00232 }
00233 if(!ConnectClientChannel("Command",connector,fxCommandTransport,host))
00234 {
00235 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Command Channel",GetName());
00236 throw TGo4RuntimeException();
00237 }
00238 if(!ConnectClientChannel("Status",connector,fxStatusTransport,host))
00239 {
00240 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Status Channel",GetName());
00241 throw TGo4RuntimeException();
00242 }
00243 if(!ConnectClientChannel("Data",connector,fxDataTransport,host))
00244 {
00245 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Data Channel",GetName());
00246 throw TGo4RuntimeException();
00247 }
00248 connector->Send(fgcOK);
00249 #ifdef WIN32
00250 gSystem->Sleep(1000);
00251 #endif
00252 connector->Close();
00253 TGo4Log::Debug(" TaskHandler %s closed negotiation connection ",GetName());
00254 delete connector;
00255 }
00256 else
00257 {
00258
00259 TGo4Log::Debug(" TaskHandler %s server connection ERROR ",GetName());
00260 return kFALSE;
00261 }
00262 }
00263 else
00264 {
00266 const char* client=GetName();
00267 if(connector==0) return kFALSE;
00268 connector->Send(TGo4TaskHandler::fgcOK);
00269
00270 if (!ConnectServerChannel("Command",connector, fxCommandTransport, host))
00271 {
00272 TGo4Log::Debug(" TaskHandler: Command channel connect ERROR for client %s ",client);
00273 return kFALSE;
00274 }
00275 if (!ConnectServerChannel("Status",connector, fxStatusTransport, host))
00276 {
00277 TGo4Log::Debug(" TaskManager: Status channel connect ERROR for client %s ",client);
00278 return kFALSE;
00279 }
00280 if (!ConnectServerChannel("Data",connector, fxDataTransport, host))
00281 {
00282 TGo4Log::Debug(" TaskManager: Data channel connect ERROR for client %s ",client);
00283 return kFALSE;
00284 }
00285 }
00286
00287 fiComPort=WaitGetPort(fxCommandTransport);
00288 fiStatPort=WaitGetPort(fxStatusTransport);
00289 fiDatPort=WaitGetPort(fxDataTransport);
00290 StartTransportThreads();
00291
00292 return kTRUE;
00293 }
00294
00295 Bool_t TGo4TaskHandler::ServerLogin(TGo4Socket* connector, Go4CommandMode_t account)
00296 {
00297 if(connector==0) return kFALSE;
00298
00299
00300
00301
00302
00303 connector->Send(fgcOK);
00304
00305
00306 if(fbMasterMode)
00307 connector->Send(fgcMASTER);
00308 else
00309 connector->Send(fgcSLAVE);
00310
00311
00312 switch (account)
00313 {
00314 case kGo4ComModeObserver:
00315 connector->Send(fgxOBSERVERACCOUNT.GetName());
00316 connector->Send(fgxOBSERVERACCOUNT.GetTitle());
00317 break;
00318
00319 case kGo4ComModeController:
00320 connector->Send(fgxCONTROLLERACCOUNT.GetName());
00321 connector->Send(fgxCONTROLLERACCOUNT.GetTitle());
00322 break;
00323
00324 case kGo4ComModeAdministrator:
00325 connector->Send(fgxADMINISTRATORACCOUNT.GetName());
00326 connector->Send(fgxADMINISTRATORACCOUNT.GetTitle());
00327 break;
00328
00329 default:
00330 connector->Send(Get_fgcERROR());
00331 connector->Send(Get_fgcERROR());
00332 break;
00333 }
00334
00335 char * recvchar=connector->RecvRaw("dummy");
00336 if(recvchar && !strcmp(recvchar,fgcOK)) return kTRUE;
00337 return kFALSE;
00338 }
00339
00340 Bool_t TGo4TaskHandler::DisConnect(Bool_t waitforclient)
00341 {
00342 TGo4Log::Debug(" TaskHandler %s disconnecting ",GetName());
00343
00344 if(fbClientMode)
00345 {
00346 if(!IsAborting())
00347 {
00348
00349
00350 TGo4Socket* connector=ServerRequest(GetHostName());
00351 if(connector)
00352 {
00353
00354
00355
00356
00357
00358
00359
00361 connector->Send(fgcDISCONNECT);
00362 StopTransportThreads(kTRUE);
00363
00364 CloseChannels();
00365 connector->Send(fgcOK);
00366
00367
00368 connector->Send(fgcOK);
00369 #ifdef WIN32
00370 gSystem->Sleep(1000);
00371 #endif
00372 connector->Close();
00373 delete connector;
00374 }
00375 else
00376 {
00377
00378 TGo4Log::Debug(" TaskHandler %s server disconnect login ERROR - Trying Fast DisConnect... ",GetName());
00379 StopTransportThreads(kFALSE);
00380 CloseChannels("force");
00381
00382 }
00383 }
00384 else
00385 {
00386
00387 TGo4Log::Debug(" Client Aborting mode: Fast DisConnect... ",GetName());
00388 StopTransportThreads(kFALSE);
00389 CloseChannels("force");
00390 }
00391
00392
00393 }
00394 else
00395 {
00396 StopTransportThreads(waitforclient);
00397 CloseChannels();
00398 }
00399 return kTRUE;
00400 }
00401
00402 void TGo4TaskHandler::CloseChannels(Option_t* opt)
00403 {
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421 fxDataTransport->Close(opt);
00422 fxCommandTransport->Close(opt);
00423 fxStatusTransport->Close(opt);
00424 ClearQueues();
00425
00426 }
00427
00428 void TGo4TaskHandler::ClearQueues()
00429 {
00430 fxDataQueue->Clear();
00431 fxCommandQueue->Clear();
00432 fxStatusQueue->Clear();
00433
00434 }
00435
00436 TGo4TaskHandlerStatus * TGo4TaskHandler::CreateStatus()
00437 {
00438 TGo4TaskHandlerStatus* state= new TGo4TaskHandlerStatus(GetName());
00439
00440 state->SetFlags(fbIsAborting);
00441 state->SetPorts(fuNegPort, fiComPort, fiStatPort, fiDatPort);
00442 state->SetNames(GetComName(),GetStatName(),GetDatName(),GetHostName());
00443 return state;
00444 }
00445
00446
00447
00448 Bool_t TGo4TaskHandler::ConnectServerChannel(const char* name, TGo4Socket* negotiator, TGo4Socket* channel, const char* host)
00449 {
00450 char* revchar=0;
00451 Int_t waitresult=0;
00452 UInt_t port=0;
00453 TGo4ServerTask* server=dynamic_cast<TGo4ServerTask*>(fxThreadManager);
00454 if(server==0)
00455 {
00456 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no server task ",name);
00457 return kFALSE;
00458 }
00459 if(negotiator==0 || !negotiator->IsOpen())
00460 {
00461 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no negotiation channel ",name);
00462 return kFALSE;
00463 }
00464 if(channel==0)
00465 {
00466 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no TGo4Socket instance ",name);
00467 return kFALSE;
00468 }
00469 const char* client=GetName();
00470
00471
00472
00473 server->SetConnect(channel, host,0,kTRUE);
00474
00475 waitresult=server->WaitForOpen();
00476 if(waitresult<0)
00477 {
00478
00479 TGo4Log::Debug(" TaskHandler: Channel %s open TIMEOUT for client %s ",name, client);
00480 return kFALSE;
00481 }
00482 else
00483 {
00484
00485 }
00486 port=WaitGetPort(channel);
00487 if (port<0)
00488 {
00489 TGo4Log::Debug(" TaskHandler: Channel %s getport TIMEOUT for client %s ",name, client);
00490 return kFALSE;
00491 }
00492 else {}
00493 negotiator->Send(TGo4TaskHandler::fgcOK);
00494 TString localbuffer;
00495 localbuffer.Form("%d",port);
00496 negotiator->Send(localbuffer.Data());
00497
00498 revchar=negotiator->RecvRaw("dummy");
00499 if(revchar && !strcmp(revchar,TGo4TaskHandler::fgcOK))
00500 {
00501
00502 }
00503 else
00504 {
00505
00506 TGo4Log::Debug(" TaskHandler: Negotiation ERROR after Channel %s open for client %s ",
00507 name, client);
00508 return kFALSE;
00509
00510 }
00511
00512 waitresult=server->WaitForConnection();
00513 if(waitresult<0)
00514 {
00515
00516 TGo4Log::Debug(" TaskHandler: Channel %s connect TIMEOUT for client %s ", name, client);
00517 return kFALSE;
00518 }
00519 else
00520 {
00521
00522 }
00523 TGo4Log::Debug(" TaskHandler: Channel %s for client %s open!",name, client);
00524 return kTRUE;
00525 }
00526
00527 Bool_t TGo4TaskHandler::ConnectClientChannel(const char* name, TGo4Socket * negotiator, TGo4Socket * channel, const char* host)
00528 {
00529
00530 char* recvchar=0;
00531 Int_t port=0;
00532 if(negotiator==0 || !negotiator->IsOpen())
00533 {
00534 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no negotiation channel ",name);
00535 return kFALSE;
00536 }
00537 if(channel==0)
00538 {
00539 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no TGo4Socket instance ",name);
00540 return kFALSE;
00541 }
00542
00543 recvchar=negotiator->RecvRaw("dummy");
00544 if(recvchar && !strcmp(recvchar,fgcOK))
00545 {
00546
00547 recvchar=negotiator->RecvRaw("dummy");
00548 TString localbuffer = recvchar;
00549 port=atoi(localbuffer.Data());
00550
00551 channel->Open(host,port);
00552 TGo4Log::Debug(" TaskHandler %s: Channel %s open!",GetName(), name );
00553 negotiator->Send(fgcOK);
00554 return kTRUE;
00555 }
00556 else
00557 {
00558 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Channel %s ",
00559 GetName(), name);
00560 return kFALSE;
00561
00562 }
00563 }
00564 Int_t TGo4TaskHandler::WaitGetPort(TGo4Socket* sock)
00565
00566 {
00567 Int_t count=0;
00568 Int_t port=0;
00569 while(port==0)
00570 {
00571 port=sock->GetPort();
00572
00573 if(count>fgiPORTWAITCYCLES)
00574 {
00575 return -1;
00576 }
00577 else if(fxThreadManager->IsTerminating())
00578 {
00579 return -2;
00580 }
00581 else
00582 {
00583 TGo4Thread::Sleep(fguPORTWAITTIME);
00584 ++count;
00585 }
00586 }
00587 return port;
00588
00589 }
00590
00591 void TGo4TaskHandler::StartTransportThreads()
00592 {
00593 fxThreadHandler->Start(GetComName());
00594 fxThreadHandler->Start(GetStatName());
00595 fxThreadHandler->Start(GetDatName());
00596 }
00597
00598 Bool_t TGo4TaskHandler::StopTransportThreads(Bool_t wait)
00599 {
00600 Bool_t rev=kTRUE;
00601 fxThreadHandler->Stop(GetComName());
00602 if(IsMasterMode())
00603 {
00604 TGo4BufferQueue* comq= dynamic_cast<TGo4BufferQueue*>(GetCommandQueue());
00605 if(comq)
00606 {
00607
00608 comq->Wake();
00609 }
00610 }
00611 fxThreadHandler->Stop(GetStatName());
00612 fxThreadHandler->Stop(GetDatName());
00613 if(wait)
00614 {
00615 rev&=WaitThreadStop(GetComName());
00616 rev&=WaitThreadStop(GetStatName());
00617 rev&=WaitThreadStop(GetDatName());
00618 }
00619 return rev;
00620 }
00621
00622 Bool_t TGo4TaskHandler::WaitThreadStop(const char* name)
00623 {
00624 if(name==0) return kFALSE;
00625 TGo4Thread* thread=fxThreadHandler->GetThread(name);
00626 if(thread==0) return kFALSE;
00627 Int_t t=0;
00628 Bool_t timeout=kFALSE;
00629 while(!thread->IsWaiting())
00630 {
00631 TGo4Log::Debug(" TaskHandler Disconnect -- waiting for runnable %s to stop... ",name);
00632 TGo4Thread::Sleep(TGo4TaskHandler::fguTHREADSTOPTIME);
00633 if((t++>=TGo4TaskHandler::fgiTHREADSTOPCYCLES))
00634 {
00635 timeout=kTRUE;
00636 break;
00637 }
00638 }
00639 return (!timeout);
00640 }
00641
00642 void TGo4TaskHandler::SetAdminAccount(const char* name, const char* passwd)
00643 {
00644 if(name) fgxADMINISTRATORACCOUNT.SetName(name);
00645 if(passwd) fgxADMINISTRATORACCOUNT.SetTitle(passwd);
00646 }
00647
00648 void TGo4TaskHandler::SetCtrlAccount(const char* name, const char* passwd)
00649 {
00650 if(name) fgxCONTROLLERACCOUNT.SetName(name);
00651 if(passwd) fgxCONTROLLERACCOUNT.SetTitle(passwd);
00652 }
00653
00654 void TGo4TaskHandler::SetObservAccount(const char* name, const char* passwd)
00655 {
00656 if(name) fgxOBSERVERACCOUNT.SetName(name);
00657 if(passwd) fgxOBSERVERACCOUNT.SetTitle(passwd);
00658 }
00659
00660 const char* TGo4TaskHandler::Get_fgcOK()
00661 {
00662 return fgcOK;
00663 }
00664
00665 const char* TGo4TaskHandler::Get_fgcERROR()
00666 {
00667 return fgcERROR;
00668 }
00669
00670 UInt_t TGo4TaskHandler::Get_fguPORTWAITTIME()
00671 {
00672 return fguPORTWAITTIME;
00673 }
00674
00675 Int_t TGo4TaskHandler::Get_fgiPORTWAITCYCLES()
00676 {
00677 return fgiPORTWAITCYCLES;
00678 }