00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4TaskHandler.h"
00017
00018 #include "Riostream.h"
00019 #include <stdlib.h>
00020 #include "TSystem.h"
00021 #include "snprintf.h"
00022
00023 #include "TGo4Log.h"
00024 #include "TGo4Thread.h"
00025 #include "TGo4ThreadHandler.h"
00026 #include "TGo4Socket.h"
00027 #include "TGo4BufferQueue.h"
00028 #include "TGo4RuntimeException.h"
00029 #include "TGo4DataRunnable.h"
00030 #include "TGo4StatusRunnable.h"
00031 #include "TGo4CommandRunnable.h"
00032 #include "TGo4ServerTask.h"
00033 #include "TGo4TaskHandlerStatus.h"
00034
00035 const UInt_t TGo4TaskHandler::fguCONNECTORPORT=5000;
00036
00037 const UInt_t TGo4TaskHandler::fguTRANSPORTCHECKDELAY=5000;
00038
00039 const UInt_t TGo4TaskHandler::fguSTATUSQUEUESIZE=1000;
00040 const UInt_t TGo4TaskHandler::fguDATAQUEUESIZE=1000;
00041 const UInt_t TGo4TaskHandler::fguCOMMANDQUEUESIZE=1000;
00042
00043
00044 const Int_t TGo4TaskHandler::fgiPORTWAITCYCLES=150;
00045
00046 const UInt_t TGo4TaskHandler::fguPORTWAITTIME=200;
00047
00048 const Int_t TGo4TaskHandler::fgiTHREADSTOPCYCLES=6;
00049
00050 const UInt_t TGo4TaskHandler::fguTHREADSTOPTIME=500;
00051
00052 const Text_t TGo4TaskHandler::fgcCONNECT[]="CONNECT-VERSION-300";
00053 const Text_t TGo4TaskHandler::fgcDISCONNECT[]="DISCONNECT-VERSION-300";
00054
00055 const char* TGo4TaskHandler::fgcOK = "OK-VERSION-300";
00056 const char* TGo4TaskHandler::fgcERROR = "ERROR-VERSION-300";
00057
00058 const Text_t TGo4TaskHandler::fgcMASTER[]="Master-VERSION-300";
00059 const Text_t TGo4TaskHandler::fgcSLAVE[]="Slave-VERSION-300";
00060
00061 const Text_t TGo4TaskHandler::fgcCOMMANDTHREAD[]="COMMAND-";
00062 const Text_t TGo4TaskHandler::fgcSTATUSTHREAD[]="STATUS-";
00063 const Text_t TGo4TaskHandler::fgcDATATHREAD[]="DATA-";
00064
00065 TNamed TGo4TaskHandler::fgxOBSERVERACCOUNT("observer","go4view");
00066 TNamed TGo4TaskHandler::fgxCONTROLLERACCOUNT("controller","go4ctrl");
00067 TNamed TGo4TaskHandler::fgxADMINISTRATORACCOUNT("admin","go4super");
00068
00069 TGo4TaskHandler::TGo4TaskHandler(const char* name, TGo4ThreadManager* threadmanager, Bool_t clientmode, Bool_t mastermode,UInt_t negotiationport)
00070 :TNamed(name,"This is a Go4 Task Handler"),
00071 fbIsAborting(kFALSE), fiComPort(0),fiStatPort(0),fiDatPort(0),fiRole(kGo4ComModeController)
00072 {
00073 fbClientMode=clientmode;
00074 fbMasterMode=mastermode;
00075 if(threadmanager==0)
00076 {
00077
00078 TGo4Log::Debug(" TaskHandler -- constructor error, unspecified ThreadManager: aborting ");
00079
00080 }
00081 else
00082 {
00083
00084 }
00085
00086
00087 if(negotiationport==0)
00088 {
00089
00090 fuNegPort=TGo4TaskHandler::fguCONNECTORPORT;
00091 }
00092 else
00093 {
00094
00095 fuNegPort=negotiationport;
00096 }
00097
00098 fxThreadManager=threadmanager;
00099 fxThreadHandler=fxThreadManager->GetWorkHandler();
00100 TString namebuffer;
00101 fxInvoker=0;
00102 fxCommandQueue= new TGo4BufferQueue("Command");
00103 fxStatusQueue= new TGo4BufferQueue("Status");
00104 fxDataQueue= new TGo4BufferQueue("Data");
00105
00106 fxCommandTransport=new TGo4Socket(IsClientMode());
00107 fxStatusTransport=new TGo4Socket(IsClientMode());
00108 fxDataTransport=new TGo4Socket(IsClientMode());
00109 namebuffer.Form("CommandRunnable of %s",GetName());
00110
00111 fxCommandRun=new TGo4CommandRunnable(namebuffer.Data(), fxThreadManager, this, !IsMasterMode());
00112
00113 namebuffer.Form("StatusRunnable of %s",GetName());
00114
00115 fxStatusRun=new TGo4StatusRunnable(namebuffer.Data(), fxThreadManager, this, IsMasterMode());
00116
00117 namebuffer.Form("DataRunnable of %s",GetName());
00118
00119 fxDataRun=new TGo4DataRunnable(namebuffer.Data(), fxThreadManager, this, IsMasterMode());
00120
00121
00122 namebuffer.Form("%s%s",fgcCOMMANDTHREAD,GetName());
00123 fxComName=namebuffer;
00124 fxThreadHandler->NewThread(GetComName(), fxCommandRun);
00125 namebuffer.Form("%s%s",fgcSTATUSTHREAD,GetName());
00126 fxStatName=namebuffer;
00127 fxThreadHandler->NewThread(GetStatName(),fxStatusRun);
00128 namebuffer.Form("%s%s",fgcDATATHREAD,GetName());
00129 fxDatName=namebuffer;
00130 fxThreadHandler->NewThread(GetDatName(),fxDataRun);
00131 if(IsClientMode())
00132 TGo4Log::Debug(" New TaskHandler %s in client mode ",GetName());
00133 else
00134 TGo4Log::Debug(" New TaskHandler %s in server mode ",GetName());
00135
00136
00137 fxCommandQueue->SetMaxEntries(TGo4TaskHandler::fguCOMMANDQUEUESIZE);
00138 fxDataQueue->SetMaxEntries(TGo4TaskHandler::fguDATAQUEUESIZE);
00139 fxStatusQueue->SetMaxEntries(TGo4TaskHandler::fguSTATUSQUEUESIZE);
00140
00141
00142 }
00143
00144 TGo4TaskHandler::~TGo4TaskHandler()
00145 {
00146 fxThreadHandler->RemoveThread(GetComName());
00147 fxThreadHandler->RemoveThread(GetDatName());
00148 fxThreadHandler->RemoveThread(GetStatName());
00149 delete fxCommandTransport;
00150 delete fxStatusTransport;
00151 delete fxDataTransport;
00152 delete fxCommandQueue;
00153 delete fxStatusQueue;
00154 delete fxDataQueue;
00155 }
00156
00157 TGo4Socket* TGo4TaskHandler::ServerRequest(const char* host)
00158 {
00159 if(fbClientMode)
00160 {
00161
00162 TGo4Socket* connector=new TGo4Socket(kTRUE);
00163 connector->Open(host,fuNegPort);
00164 if(ServerLogin(connector, GetRole()))
00165 {
00166
00167 TString myname=fxThreadManager->GetName();
00168
00169 connector->Send(myname.Data());
00170 connector->Send(gSystem->HostName());
00171 return connector;
00172 }
00173 else
00174 {
00175
00176 connector->Send(Get_fgcERROR());
00177 connector->Send(Get_fgcERROR());
00178 #ifdef WIN32
00179 gSystem->Sleep(1000);
00180 #endif
00181 connector->Close();
00182 delete connector;
00183 TGo4Log::Debug(" TaskHandler %s server connection ERROR ",GetName());
00184 return 0;
00185
00186 }
00187 return 0;
00188
00189 }
00190
00191 else
00192 {
00193
00194 return 0;
00195 }
00196 }
00197
00198
00199 Bool_t TGo4TaskHandler::Connect(const char* host, TGo4Socket* connector)
00200
00201 {
00202 TGo4Log::Debug(" TaskHandler %s connecting to host %s ...",GetName(),host);
00203 Text_t* recvchar;
00204 if(fbClientMode)
00205 {
00207 SetAborting(kFALSE);
00208 fxHostName=host;
00209 if(connector==0)
00210 {
00211
00212 connector=ServerRequest(host);
00213 }
00214 if(connector)
00215 {
00216
00217 connector->Send(fgcCONNECT);
00218 recvchar=connector->RecvRaw("dummy");
00219 if(recvchar==0)
00220 {
00221 TGo4Log::Debug(" TaskHandler %s; Error on server connection, abortin... ",GetName());
00222 connector->Close();
00223 throw TGo4RuntimeException();
00224 }
00225 if(!strcmp(recvchar,Get_fgcERROR()))
00226 {
00227
00228 TGo4Log::Debug(" TaskHandler %s; Server refuses Connection",GetName());
00229 connector->Send(fgcOK);
00230 #ifdef WIN32
00231 gSystem->Sleep(1000);
00232 #endif
00233 connector->Close();
00234 throw TGo4RuntimeException();
00235 }
00236 if(!ConnectClientChannel("Command",connector,fxCommandTransport,host))
00237 {
00238 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Command Channel",GetName());
00239 throw TGo4RuntimeException();
00240 }
00241 if(!ConnectClientChannel("Status",connector,fxStatusTransport,host))
00242 {
00243 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Status Channel",GetName());
00244 throw TGo4RuntimeException();
00245 }
00246 if(!ConnectClientChannel("Data",connector,fxDataTransport,host))
00247 {
00248 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Data Channel",GetName());
00249 throw TGo4RuntimeException();
00250 }
00251 connector->Send(fgcOK);
00252 #ifdef WIN32
00253 gSystem->Sleep(1000);
00254 #endif
00255 connector->Close();
00256 TGo4Log::Debug(" TaskHandler %s closed negotiation connection ",GetName());
00257 delete connector;
00258 }
00259 else
00260 {
00261
00262 TGo4Log::Debug(" TaskHandler %s server connection ERROR ",GetName());
00263 return kFALSE;
00264 }
00265 }
00266 else
00267 {
00269 const Text_t* client=GetName();
00270 if(connector==0) return kFALSE;
00271 connector->Send(TGo4TaskHandler::fgcOK);
00272
00273 if (!ConnectServerChannel("Command",connector, fxCommandTransport, host))
00274 {
00275 TGo4Log::Debug(" TaskHandler: Command channel connect ERROR for client %s ",client);
00276 return kFALSE;
00277 }
00278 if (!ConnectServerChannel("Status",connector, fxStatusTransport, host))
00279 {
00280 TGo4Log::Debug(" TaskManager: Status channel connect ERROR for client %s ",client);
00281 return kFALSE;
00282 }
00283 if (!ConnectServerChannel("Data",connector, fxDataTransport, host))
00284 {
00285 TGo4Log::Debug(" TaskManager: Data channel connect ERROR for client %s ",client);
00286 return kFALSE;
00287 }
00288 }
00289
00290 fiComPort=WaitGetPort(fxCommandTransport);
00291 fiStatPort=WaitGetPort(fxStatusTransport);
00292 fiDatPort=WaitGetPort(fxDataTransport);
00293 StartTransportThreads();
00294
00295 return kTRUE;
00296 }
00297
00298 Bool_t TGo4TaskHandler::ServerLogin(TGo4Socket* connector, Go4CommandMode_t account)
00299 {
00300 if(connector==0) return kFALSE;
00301
00302
00303
00304
00305
00306 connector->Send(fgcOK);
00307
00308
00309 if(fbMasterMode)
00310 connector->Send(fgcMASTER);
00311 else
00312 connector->Send(fgcSLAVE);
00313
00314
00315 switch (account)
00316 {
00317 case kGo4ComModeObserver:
00318 connector->Send(fgxOBSERVERACCOUNT.GetName());
00319 connector->Send(fgxOBSERVERACCOUNT.GetTitle());
00320 break;
00321
00322 case kGo4ComModeController:
00323 connector->Send(fgxCONTROLLERACCOUNT.GetName());
00324 connector->Send(fgxCONTROLLERACCOUNT.GetTitle());
00325 break;
00326
00327 case kGo4ComModeAdministrator:
00328 connector->Send(fgxADMINISTRATORACCOUNT.GetName());
00329 connector->Send(fgxADMINISTRATORACCOUNT.GetTitle());
00330 break;
00331
00332 default:
00333 connector->Send(Get_fgcERROR());
00334 connector->Send(Get_fgcERROR());
00335 break;
00336 }
00337
00338 char * recvchar=connector->RecvRaw("dummy");
00339 if(recvchar && !strcmp(recvchar,fgcOK)) return kTRUE;
00340 return kFALSE;
00341 }
00342
00343 Bool_t TGo4TaskHandler::DisConnect(Bool_t waitforclient)
00344 {
00345 TGo4Log::Debug(" TaskHandler %s disconnecting ",GetName());
00346
00347 if(fbClientMode)
00348 {
00349 if(!IsAborting())
00350 {
00351
00352
00353 TGo4Socket* connector=ServerRequest(GetHostName());
00354 if(connector)
00355 {
00356
00357
00358
00359
00360
00361
00362
00364 connector->Send(fgcDISCONNECT);
00365 StopTransportThreads(kTRUE);
00366
00367 CloseChannels();
00368 connector->Send(fgcOK);
00369
00370
00371 connector->Send(fgcOK);
00372 #ifdef WIN32
00373 gSystem->Sleep(1000);
00374 #endif
00375 connector->Close();
00376 delete connector;
00377 }
00378 else
00379 {
00380
00381 TGo4Log::Debug(" TaskHandler %s server disconnect login ERROR - Trying Fast DisConnect... ",GetName());
00382 StopTransportThreads(kFALSE);
00383 CloseChannels("force");
00384
00385 }
00386 }
00387 else
00388 {
00389
00390 TGo4Log::Debug(" Client Aborting mode: Fast DisConnect... ",GetName());
00391 StopTransportThreads(kFALSE);
00392 CloseChannels("force");
00393 }
00394
00395
00396 }
00397 else
00398 {
00399 StopTransportThreads(waitforclient);
00400 CloseChannels();
00401 }
00402 return kTRUE;
00403 }
00404
00405 void TGo4TaskHandler::CloseChannels(Option_t* opt)
00406 {
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 fxDataTransport->Close(opt);
00425 fxCommandTransport->Close(opt);
00426 fxStatusTransport->Close(opt);
00427 ClearQueues();
00428
00429 }
00430
00431 void TGo4TaskHandler::ClearQueues()
00432 {
00433 fxDataQueue->Clear();
00434 fxCommandQueue->Clear();
00435 fxStatusQueue->Clear();
00436
00437 }
00438
00439 TGo4TaskHandlerStatus * TGo4TaskHandler::CreateStatus()
00440 {
00441 TGo4TaskHandlerStatus* state= new TGo4TaskHandlerStatus(GetName());
00442
00443 state->SetFlags(fbIsAborting);
00444 state->SetPorts(fuNegPort, fiComPort, fiStatPort, fiDatPort);
00445 state->SetNames(GetComName(),GetStatName(),GetDatName(),GetHostName());
00446 return state;
00447 }
00448
00449
00450
00451 Bool_t TGo4TaskHandler::ConnectServerChannel(const char* name, TGo4Socket* negotiator, TGo4Socket* channel, const char* host)
00452 {
00453 Text_t* revchar=0;
00454 Int_t waitresult=0;
00455 UInt_t port=0;
00456 TGo4ServerTask* server=dynamic_cast<TGo4ServerTask*>(fxThreadManager);
00457 if(server==0)
00458 {
00459 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no server task ",name);
00460 return kFALSE;
00461 }
00462 if(negotiator==0 || !negotiator->IsOpen())
00463 {
00464 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no negotiation channel ",name);
00465 return kFALSE;
00466 }
00467 if(channel==0)
00468 {
00469 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no TGo4Socket instance ",name);
00470 return kFALSE;
00471 }
00472 const Text_t* client=GetName();
00473
00474
00475
00476 server->SetConnect(channel, host,0,kTRUE);
00477
00478 waitresult=server->WaitForOpen();
00479 if(waitresult<0)
00480 {
00481
00482 TGo4Log::Debug(" TaskHandler: Channel %s open TIMEOUT for client %s ",name, client);
00483 return kFALSE;
00484 }
00485 else
00486 {
00487
00488 }
00489 port=WaitGetPort(channel);
00490 if (port<0)
00491 {
00492 TGo4Log::Debug(" TaskHandler: Channel %s getport TIMEOUT for client %s ",name, client);
00493 return kFALSE;
00494 }
00495 else {}
00496 negotiator->Send(TGo4TaskHandler::fgcOK);
00497 TString localbuffer;
00498 localbuffer.Form("%d",port);
00499 negotiator->Send(localbuffer.Data());
00500
00501 revchar=negotiator->RecvRaw("dummy");
00502 if(revchar && !strcmp(revchar,TGo4TaskHandler::fgcOK))
00503 {
00504
00505 }
00506 else
00507 {
00508
00509 TGo4Log::Debug(" TaskHandler: Negotiation ERROR after Channel %s open for client %s ",
00510 name, client);
00511 return kFALSE;
00512
00513 }
00514
00515 waitresult=server->WaitForConnection();
00516 if(waitresult<0)
00517 {
00518
00519 TGo4Log::Debug(" TaskHandler: Channel %s connect TIMEOUT for client %s ", name, client);
00520 return kFALSE;
00521 }
00522 else
00523 {
00524
00525 }
00526 TGo4Log::Debug(" TaskHandler: Channel %s for client %s open!",name, client);
00527 return kTRUE;
00528 }
00529
00530 Bool_t TGo4TaskHandler::ConnectClientChannel(const char* name, TGo4Socket * negotiator, TGo4Socket * channel, const char* host)
00531 {
00532
00533 Text_t* recvchar=0;
00534 Int_t port=0;
00535 if(negotiator==0 || !negotiator->IsOpen())
00536 {
00537 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no negotiation channel ",name);
00538 return kFALSE;
00539 }
00540 if(channel==0)
00541 {
00542 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no TGo4Socket instance ",name);
00543 return kFALSE;
00544 }
00545
00546 recvchar=negotiator->RecvRaw("dummy");
00547 if(recvchar && !strcmp(recvchar,fgcOK))
00548 {
00549
00550 recvchar=negotiator->RecvRaw("dummy");
00551 TString localbuffer = recvchar;
00552 port=atoi(localbuffer.Data());
00553
00554 channel->Open(host,port);
00555 TGo4Log::Debug(" TaskHandler %s: Channel %s open!",GetName(), name );
00556 negotiator->Send(fgcOK);
00557 return kTRUE;
00558 }
00559 else
00560 {
00561 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Channel %s ",
00562 GetName(), name);
00563 return kFALSE;
00564
00565 }
00566 }
00567 Int_t TGo4TaskHandler::WaitGetPort(TGo4Socket* sock)
00568
00569 {
00570 Int_t count=0;
00571 Int_t port=0;
00572 while(port==0)
00573 {
00574 port=sock->GetPort();
00575
00576 if(count>fgiPORTWAITCYCLES)
00577 {
00578 return -1;
00579 }
00580 else if(fxThreadManager->IsTerminating())
00581 {
00582 return -2;
00583 }
00584 else
00585 {
00586 TGo4Thread::Sleep(fguPORTWAITTIME);
00587 ++count;
00588 }
00589 }
00590 return port;
00591
00592 }
00593
00594 void TGo4TaskHandler::StartTransportThreads()
00595 {
00596 fxThreadHandler->Start(GetComName());
00597 fxThreadHandler->Start(GetStatName());
00598 fxThreadHandler->Start(GetDatName());
00599 }
00600
00601 Bool_t TGo4TaskHandler::StopTransportThreads(Bool_t wait)
00602 {
00603 Bool_t rev=kTRUE;
00604 fxThreadHandler->Stop(GetComName());
00605 if(IsMasterMode())
00606 {
00607 TGo4BufferQueue* comq= dynamic_cast<TGo4BufferQueue*>(GetCommandQueue());
00608 if(comq)
00609 {
00610
00611 comq->Wake();
00612 }
00613 }
00614 fxThreadHandler->Stop(GetStatName());
00615 fxThreadHandler->Stop(GetDatName());
00616 if(wait)
00617 {
00618 rev&=WaitThreadStop(GetComName());
00619 rev&=WaitThreadStop(GetStatName());
00620 rev&=WaitThreadStop(GetDatName());
00621 }
00622 return rev;
00623 }
00624
00625 Bool_t TGo4TaskHandler::WaitThreadStop(const char* name)
00626 {
00627 if(name==0) return kFALSE;
00628 TGo4Thread* thread=fxThreadHandler->GetThread(name);
00629 if(thread==0) return kFALSE;
00630 Int_t t=0;
00631 Bool_t timeout=kFALSE;
00632 while(!thread->IsWaiting())
00633 {
00634 TGo4Log::Debug(" TaskHandler Disconnect -- waiting for runnable %s to stop... ",name);
00635 TGo4Thread::Sleep(TGo4TaskHandler::fguTHREADSTOPTIME);
00636 if((t++>=TGo4TaskHandler::fgiTHREADSTOPCYCLES))
00637 {
00638 timeout=kTRUE;
00639 break;
00640 }
00641 }
00642 return (!timeout);
00643 }
00644
00645 void TGo4TaskHandler::SetAdminAccount(const char* name, const char* passwd)
00646 {
00647 if(name) fgxADMINISTRATORACCOUNT.SetName(name);
00648 if(passwd) fgxADMINISTRATORACCOUNT.SetTitle(passwd);
00649 }
00650
00651 void TGo4TaskHandler::SetCtrlAccount(const char* name, const char* passwd)
00652 {
00653 if(name) fgxCONTROLLERACCOUNT.SetName(name);
00654 if(passwd) fgxCONTROLLERACCOUNT.SetTitle(passwd);
00655 }
00656
00657 void TGo4TaskHandler::SetObservAccount(const char* name, const char* passwd)
00658 {
00659 if(name) fgxOBSERVERACCOUNT.SetName(name);
00660 if(passwd) fgxOBSERVERACCOUNT.SetTitle(passwd);
00661 }
00662
00663 const char* TGo4TaskHandler::Get_fgcOK()
00664 {
00665 return fgcOK;
00666 }
00667
00668 const char* TGo4TaskHandler::Get_fgcERROR()
00669 {
00670 return fgcERROR;
00671 }
00672
00673 UInt_t TGo4TaskHandler::Get_fguPORTWAITTIME()
00674 {
00675 return fguPORTWAITTIME;
00676 }
00677
00678 Int_t TGo4TaskHandler::Get_fgiPORTWAITCYCLES()
00679 {
00680 return fgiPORTWAITCYCLES;
00681 }
00682
00683