00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4TaskHandler.h"
00017
00018 #include <iostream.h>
00019
00020 #include "TSystem.h"
00021
00022 #include "Go4Log/TGo4Log.h"
00023 #include "Go4LockGuard/TGo4LockGuard.h"
00024 #include "Go4ThreadManager/TGo4ThreadManager.h"
00025 #include "Go4ThreadManager/TGo4ThreadHandler.h"
00026 #include "Go4Socket/TGo4Socket.h"
00027 #include "Go4Queue/TGo4Queue.h"
00028 #include "Go4Queue/TGo4BufferQueue.h"
00029 #include "Go4Queue/TGo4ObjectQueue.h"
00030 #include "Go4CommandsBase/TGo4CommandInvoker.h"
00031 #include "Go4TaskHandler/TGo4TaskHandlerStatus.h"
00032 #include "TGo4DataRunnable.h"
00033 #include "TGo4StatusRunnable.h"
00034 #include "TGo4CommandRunnable.h"
00035
00036 #include "Go4TaskHandler/TGo4ServerTask.h"
00037 #include <TBuffer.h>
00038
00039 const UInt_t TGo4TaskHandler::fguCONNECTORPORT=5000;
00040
00041 const UInt_t TGo4TaskHandler::fguTRANSPORTCHECKDELAY=5000;
00042
00043 const UInt_t TGo4TaskHandler::fguSTATUSQUEUESIZE=1000;
00044 const UInt_t TGo4TaskHandler::fguDATAQUEUESIZE=1000;
00045 const UInt_t TGo4TaskHandler::fguCOMMANDQUEUESIZE=1000;
00046
00047
00048 const Int_t TGo4TaskHandler::fgiPORTWAITCYCLES=150;
00049
00050 const UInt_t TGo4TaskHandler::fguPORTWAITTIME=200;
00051
00052 const Int_t TGo4TaskHandler::fgiTHREADSTOPCYCLES=6;
00053
00054 const UInt_t TGo4TaskHandler::fguTHREADSTOPTIME=500;
00055
00056 const Text_t TGo4TaskHandler::fgcCONNECT[]="CONNECT-VERSION-210";
00057 const Text_t TGo4TaskHandler::fgcDISCONNECT[]="DISCONNECT-VERSION-210";
00058
00059 const Text_t TGo4TaskHandler::fgcOK[]="OK-VERSION-210";
00060 const Text_t TGo4TaskHandler::fgcERROR[]="ERROR-VERSION-210";
00061
00062 const Text_t TGo4TaskHandler::fgcMASTER[]="Master-VERSION-210";
00063 const Text_t TGo4TaskHandler::fgcSLAVE[]="Slave-VERSION-210";
00064
00065 const Text_t TGo4TaskHandler::fgcCOMMANDTHREAD[]="COMMAND-";
00066 const Text_t TGo4TaskHandler::fgcSTATUSTHREAD[]="STATUS-";
00067 const Text_t TGo4TaskHandler::fgcDATATHREAD[]="DATA-";
00068
00069 TNamed TGo4TaskHandler::fgxOBSERVERACCOUNT("observer","go4view");
00070 TNamed TGo4TaskHandler::fgxCONTROLLERACCOUNT("controller","go4ctrl");
00071 TNamed TGo4TaskHandler::fgxADMINISTRATORACCOUNT("admin","go4super");
00072
00073 TGo4TaskHandler::TGo4TaskHandler(const char* name, TGo4ThreadManager* threadmanager, Bool_t clientmode, Bool_t mastermode,UInt_t negotiationport)
00074 :TNamed(name,"This is a Go4 Task Handler"),
00075 fbIsAborting(kFALSE), fiComPort(0),fiStatPort(0),fiDatPort(0),fiRole(kGo4ComModeController)
00076 {
00077 fbClientMode=clientmode;
00078 fbMasterMode=mastermode;
00079 if(threadmanager==0)
00080 {
00081
00082 TGo4Log::Debug(" TaskHandler -- constructor error, unspecified ThreadManager: aborting ");
00083
00084 }
00085 else
00086 {
00087
00088 }
00089
00090
00091 if(negotiationport==0)
00092 {
00093
00094 fuNegPort=TGo4TaskHandler::fguCONNECTORPORT;
00095 }
00096 else
00097 {
00098
00099 fuNegPort=negotiationport;
00100 }
00101
00102 fxThreadManager=threadmanager;
00103 fxThreadHandler=fxThreadManager->GetWorkHandler();
00104 Text_t namebuffer[TGo4ThreadManager::fguTEXTLENGTH];
00105 fxInvoker=0;
00106 fxCommandQueue= new TGo4BufferQueue("Command");
00107 fxStatusQueue= new TGo4BufferQueue("Status");
00108 fxDataQueue= new TGo4BufferQueue("Data");
00109
00110 fxCommandTransport=new TGo4Socket(IsClientMode());
00111 fxStatusTransport=new TGo4Socket(IsClientMode());
00112 fxDataTransport=new TGo4Socket(IsClientMode());
00113 snprintf(namebuffer,TGo4ThreadManager::fguTEXTLENGTH-1, "CommandRunnable of %s",GetName());
00114
00115 fxCommandRun=new TGo4CommandRunnable(namebuffer, fxThreadManager, this, !IsMasterMode());
00116 snprintf(namebuffer,TGo4ThreadManager::fguTEXTLENGTH-1, "StatusRunnable of %s",GetName());
00117
00118 fxStatusRun=new TGo4StatusRunnable(namebuffer, fxThreadManager, this, IsMasterMode());
00119 snprintf(namebuffer,TGo4ThreadManager::fguTEXTLENGTH-1, "DataRunnable of %s",GetName());
00120
00121 fxDataRun=new TGo4DataRunnable(namebuffer, fxThreadManager, this, IsMasterMode());
00122
00123
00124 snprintf(namebuffer,TGo4ThreadManager::fguTEXTLENGTH-1,"%s%s",fgcCOMMANDTHREAD,GetName());
00125 fxComName=namebuffer;
00126 fxThreadHandler->NewThread(GetComName(), fxCommandRun);
00127 snprintf(namebuffer,TGo4ThreadManager::fguTEXTLENGTH-1,"%s%s",fgcSTATUSTHREAD,GetName());
00128 fxStatName=namebuffer;
00129 fxThreadHandler->NewThread(GetStatName(),fxStatusRun);
00130 snprintf(namebuffer,TGo4ThreadManager::fguTEXTLENGTH-1,"%s%s",fgcDATATHREAD,GetName());
00131 fxDatName=namebuffer;
00132 fxThreadHandler->NewThread(GetDatName(),fxDataRun);
00133 if(IsClientMode())
00134 TGo4Log::Debug(" New TaskHandler %s in client mode ",GetName());
00135 else
00136 TGo4Log::Debug(" New TaskHandler %s in server mode ",GetName());
00137
00138
00139 fxCommandQueue->SetMaxEntries(TGo4TaskHandler::fguCOMMANDQUEUESIZE);
00140 fxDataQueue->SetMaxEntries(TGo4TaskHandler::fguDATAQUEUESIZE);
00141 fxStatusQueue->SetMaxEntries(TGo4TaskHandler::fguSTATUSQUEUESIZE);
00142
00143
00144 }
00145
00146 TGo4TaskHandler::~TGo4TaskHandler()
00147 {
00148 fxThreadHandler->RemoveThread(GetComName());
00149 fxThreadHandler->RemoveThread(GetDatName());
00150 fxThreadHandler->RemoveThread(GetStatName());
00151 delete fxCommandTransport;
00152 delete fxStatusTransport;
00153 delete fxDataTransport;
00154 delete fxCommandQueue;
00155 delete fxStatusQueue;
00156 delete fxDataQueue;
00157 }
00158
00159 TGo4Socket* TGo4TaskHandler::ServerRequest(const char* host)
00160 {
00161 if(fbClientMode)
00162 {
00163
00164 TGo4Socket* connector=new TGo4Socket(kTRUE);
00165 connector->Open(host,fuNegPort);
00166 if(ServerLogin(connector, GetRole()))
00167 {
00168
00169 TString myname=fxThreadManager->GetName();
00170
00171 connector->Send(myname.Data());
00172 connector->Send(gSystem->HostName());
00173 return connector;
00174 }
00175 else
00176 {
00177
00178 connector->Send(fgcERROR);
00179 connector->Send(fgcERROR);
00180 connector->Close();
00181 delete connector;
00182 TGo4Log::Debug(" TaskHandler %s server connection ERROR ",GetName());
00183 return 0;
00184
00185 }
00186 return 0;
00187
00188 }
00189
00190 else
00191 {
00192
00193 return 0;
00194 }
00195 }
00196
00197
00198 Bool_t TGo4TaskHandler::Connect(const char* host, TGo4Socket* connector)
00199
00200 {
00201 TGo4Log::Debug(" TaskHandler %s connecting to host %s ...",GetName(),host);
00202 Text_t* recvchar;
00203 if(fbClientMode)
00204 {
00206 SetAborting(kFALSE);
00207 fxHostName=host;
00208 if(connector==0)
00209 {
00210
00211 connector=ServerRequest(host);
00212 }
00213 if(connector)
00214 {
00215
00216 connector->Send(fgcCONNECT);
00217 recvchar=connector->RecvRaw("dummy");
00218 if(recvchar==0)
00219 {
00220 TGo4Log::Debug(" TaskHandler %s; Error on server connection, abortin... ",GetName());
00221 connector->Close();
00222 throw TGo4RuntimeException();
00223 }
00224 if(!strcmp(recvchar,fgcERROR))
00225 {
00226
00227 TGo4Log::Debug(" TaskHandler %s; Server refuses Connection",GetName());
00228 connector->Send(fgcOK);
00229 connector->Close();
00230 throw TGo4RuntimeException();
00231 }
00232 if(!ConnectClientChannel("Command",connector,fxCommandTransport,host))
00233 {
00234 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Command Channel",GetName());
00235 throw TGo4RuntimeException();
00236 }
00237 if(!ConnectClientChannel("Status",connector,fxStatusTransport,host))
00238 {
00239 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Status Channel",GetName());
00240 throw TGo4RuntimeException();
00241 }
00242 if(!ConnectClientChannel("Data",connector,fxDataTransport,host))
00243 {
00244 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Data Channel",GetName());
00245 throw TGo4RuntimeException();
00246 }
00247 connector->Send(fgcOK);
00248 connector->Close();
00249 TGo4Log::Debug(" TaskHandler %s closed negotiation connection ",GetName());
00250 delete connector;
00251 }
00252 else
00253 {
00254
00255 TGo4Log::Debug(" TaskHandler %s server connection ERROR ",GetName());
00256 return kFALSE;
00257 }
00258 }
00259 else
00260 {
00262 const Text_t* client=GetName();
00263 if(connector==0) return kFALSE;
00264 connector->Send(TGo4TaskHandler::fgcOK);
00265
00266 if (!ConnectServerChannel("Command",connector, fxCommandTransport, host))
00267 {
00268 TGo4Log::Debug(" TaskHandler: Command channel connect ERROR for client %s ",client);
00269 return kFALSE;
00270 }
00271 if (!ConnectServerChannel("Status",connector, fxStatusTransport, host))
00272 {
00273 TGo4Log::Debug(" TaskManager: Status channel connect ERROR for client %s ",client);
00274 return kFALSE;
00275 }
00276 if (!ConnectServerChannel("Data",connector, fxDataTransport, host))
00277 {
00278 TGo4Log::Debug(" TaskManager: Data channel connect ERROR for client %s ",client);
00279 return kFALSE;
00280 }
00281 }
00282
00283 fiComPort=WaitGetPort(fxCommandTransport);
00284 fiStatPort=WaitGetPort(fxStatusTransport);
00285 fiDatPort=WaitGetPort(fxDataTransport);
00286 StartTransportThreads();
00287
00288 return kTRUE;
00289 }
00290
00291 Bool_t TGo4TaskHandler::ServerLogin(TGo4Socket* connector, Go4CommandMode_t account)
00292 {
00293 if(connector==0) return kFALSE;
00294
00295
00296
00297
00298
00299 connector->Send(fgcOK);
00300
00301
00302 if(fbMasterMode)
00303 connector->Send(fgcMASTER);
00304 else
00305 connector->Send(fgcSLAVE);
00306
00307
00308 switch (account)
00309 {
00310 case kGo4ComModeObserver:
00311 connector->Send(fgxOBSERVERACCOUNT.GetName());
00312 connector->Send(fgxOBSERVERACCOUNT.GetTitle());
00313 break;
00314
00315 case kGo4ComModeController:
00316 connector->Send(fgxCONTROLLERACCOUNT.GetName());
00317 connector->Send(fgxCONTROLLERACCOUNT.GetTitle());
00318 break;
00319
00320 case kGo4ComModeAdministrator:
00321 connector->Send(fgxADMINISTRATORACCOUNT.GetName());
00322 connector->Send(fgxADMINISTRATORACCOUNT.GetTitle());
00323 break;
00324
00325 default:
00326 connector->Send(fgcERROR);
00327 connector->Send(fgcERROR);
00328 break;
00329 }
00330
00331 char * recvchar=connector->RecvRaw("dummy");
00332 if(recvchar && !strcmp(recvchar,fgcOK)) return kTRUE;
00333 return kFALSE;
00334 }
00335
00336 Bool_t TGo4TaskHandler::DisConnect(Bool_t waitforclient)
00337 {
00338 TGo4Log::Debug(" TaskHandler %s disconnecting ",GetName());
00339 if(fbClientMode)
00340 {
00341 if(!IsAborting())
00342 {
00343
00344
00345 TGo4Socket* connector=ServerRequest(GetHostName());
00346 if(connector)
00347 {
00348
00349 connector->Send(fgcDISCONNECT);
00350 StopTransportThreads(kTRUE);
00351
00352 CloseChannels();
00353 connector->Send(fgcOK);
00354
00355
00356 connector->Send(fgcOK);
00357 connector->Close();
00358 delete connector;
00359 }
00360 else
00361 {
00362
00363 TGo4Log::Debug(" TaskHandler %s server disconnect ERROR ",GetName());
00364 return kFALSE;
00365 }
00366 }
00367 else
00368 {
00369
00370 TGo4Log::Debug(" Client Aborting mode: Fast DisConnect... ",GetName());
00371 StopTransportThreads(kFALSE);
00372 CloseChannels();
00373 }
00374
00375
00376 }
00377 else
00378 {
00379 StopTransportThreads(waitforclient);
00380 CloseChannels();
00381 }
00382 return kTRUE;
00383 }
00384
00385 void TGo4TaskHandler::CloseChannels()
00386 {
00387 fxDataTransport->Close();
00388 fxCommandTransport->Close();
00389 fxStatusTransport->Close();
00390 ClearQueues();
00391 }
00392
00393 void TGo4TaskHandler::ClearQueues()
00394 {
00395 fxDataQueue->Clear();
00396 fxCommandQueue->Clear();
00397 fxStatusQueue->Clear();
00398
00399 }
00400
00401 TGo4TaskHandlerStatus * TGo4TaskHandler::CreateStatus()
00402 {
00403 TGo4TaskHandlerStatus* state= new TGo4TaskHandlerStatus(GetName());
00404
00405 state->SetFlags(fbIsAborting);
00406 state->SetPorts(fuNegPort, fiComPort, fiStatPort, fiDatPort);
00407 state->SetNames(GetComName(),GetStatName(),GetDatName(),GetHostName());
00408 return state;
00409 }
00410
00411
00412
00413 Bool_t TGo4TaskHandler::ConnectServerChannel(const char* name, TGo4Socket* negotiator, TGo4Socket* channel, const char* host)
00414 {
00415 Text_t localbuffer[TGo4ThreadManager::fguTEXTLENGTH];
00416 Text_t* revchar=0;
00417 Int_t waitresult=0;
00418 UInt_t port=0;
00419 TGo4ServerTask* server=dynamic_cast<TGo4ServerTask*>(fxThreadManager);
00420 if(server==0)
00421 {
00422 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no server task ",name);
00423 return kFALSE;
00424 }
00425 if(negotiator==0 || !negotiator->IsOpen())
00426 {
00427 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no negotiation channel ",name);
00428 return kFALSE;
00429 }
00430 if(channel==0)
00431 {
00432 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no TGo4Socket instance ",name);
00433 return kFALSE;
00434 }
00435 const Text_t* client=GetName();
00436
00437
00438
00439
00440 server->SetConnect(channel, host,0);
00441 waitresult=server->WaitForOpen();
00442 if(waitresult<0)
00443 {
00444
00445 TGo4Log::Debug(" TaskHandler: Channel %s open TIMEOUT for client %s ",name, client);
00446 return kFALSE;
00447 }
00448 else
00449 {
00450
00451 }
00452 port=WaitGetPort(channel);
00453 if (port<0)
00454 {
00455 TGo4Log::Debug(" TaskHandler: Channel %s getport TIMEOUT for client %s ",name, client);
00456 return kFALSE;
00457 }
00458 else {}
00459 negotiator->Send(TGo4TaskHandler::fgcOK);
00460 snprintf(localbuffer,TGo4ThreadManager::fguTEXTLENGTH-1,"%d",port);
00461 negotiator->Send(localbuffer);
00462
00463 revchar=negotiator->RecvRaw("dummy");
00464 if(revchar && !strcmp(revchar,TGo4TaskHandler::fgcOK))
00465 {
00466
00467 }
00468 else
00469 {
00470
00471 TGo4Log::Debug(" TaskHandler: Negotiation ERROR after Channel %s open for client %s ",
00472 name, client);
00473 return kFALSE;
00474
00475 }
00476 waitresult=server->WaitForConnection();
00477 if(waitresult<0)
00478 {
00479
00480 TGo4Log::Debug(" TaskHandler: Channel %s connect TIMEOUT for client %s ", name, client);
00481 return kFALSE;
00482 }
00483 else
00484 {
00485
00486 }
00487 TGo4Log::Debug(" TaskHandler: Channel %s for client %s open!",name, client);
00488 return kTRUE;
00489 }
00490
00491 Bool_t TGo4TaskHandler::ConnectClientChannel(const char* name, TGo4Socket * negotiator, TGo4Socket * channel, const char* host)
00492 {
00493
00494 Text_t localbuffer[TGo4ThreadManager::fguTEXTLENGTH];
00495 Text_t* recvchar=0;
00496 Int_t port=0;
00497 if(negotiator==0 || !negotiator->IsOpen())
00498 {
00499 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no negotiation channel ",name);
00500 return kFALSE;
00501 }
00502 if(channel==0)
00503 {
00504 TGo4Log::Debug(" TaskHandler: Channel %s open ERROR: no TGo4Socket instance ",name);
00505 return kFALSE;
00506 }
00507
00508 recvchar=negotiator->RecvRaw("dummy");
00509 if(recvchar && !strcmp(recvchar,fgcOK))
00510 {
00511
00512 recvchar=negotiator->RecvRaw("dummy");
00513 strncpy(localbuffer, recvchar,TGo4ThreadManager::fguTEXTLENGTH -1);
00514 port=atoi(localbuffer);
00515
00516 channel->Open(host,port);
00517 TGo4Log::Debug(" TaskHandler %s: Channel %s open!",GetName(), name );
00518 negotiator->Send(fgcOK);
00519 return kTRUE;
00520 }
00521 else
00522 {
00523 TGo4Log::Debug(" TaskHandler %s; negotiation error, FAILED to open Channel %s ",
00524 GetName(), name);
00525 return kFALSE;
00526
00527 }
00528 }
00529 Int_t TGo4TaskHandler::WaitGetPort(TGo4Socket* sock)
00530
00531 {
00532 Int_t count=0;
00533 Int_t port=0;
00534 while(port==0)
00535 {
00536 port=sock->GetPort();
00537
00538 if(count>fgiPORTWAITCYCLES)
00539 {
00540 return -1;
00541 }
00542 else if(fxThreadManager->IsTerminating())
00543 {
00544 return -2;
00545 }
00546 else
00547 {
00548 TGo4Thread::Sleep(fguPORTWAITTIME);
00549 ++count;
00550 }
00551 }
00552 return port;
00553
00554 }
00555
00556 void TGo4TaskHandler::StartTransportThreads()
00557 {
00558 fxThreadHandler->Start(GetComName());
00559 fxThreadHandler->Start(GetStatName());
00560 fxThreadHandler->Start(GetDatName());
00561 }
00562
00563 Bool_t TGo4TaskHandler::StopTransportThreads(Bool_t wait)
00564 {
00565 Bool_t rev=kTRUE;
00566 fxThreadHandler->Stop(GetComName());
00567 if(IsMasterMode())
00568 {
00569 TGo4BufferQueue* comq= dynamic_cast<TGo4BufferQueue*>(GetCommandQueue());
00570 if(comq)
00571 {
00572
00573 comq->Wake();
00574 }
00575 }
00576 fxThreadHandler->Stop(GetStatName());
00577 fxThreadHandler->Stop(GetDatName());
00578 if(wait)
00579 {
00580 rev&=WaitThreadStop(GetComName());
00581 rev&=WaitThreadStop(GetStatName());
00582 rev&=WaitThreadStop(GetDatName());
00583 }
00584 return rev;
00585 }
00586
00587 Bool_t TGo4TaskHandler::WaitThreadStop(const char* name)
00588 {
00589 if(name==0) return kFALSE;
00590 TGo4Thread* thread=fxThreadHandler->GetThread(name);
00591 if(thread==0) return kFALSE;
00592 Int_t t=0;
00593 Bool_t timeout=kFALSE;
00594 while(!thread->IsWaiting())
00595 {
00596 TGo4Log::Debug(" TaskHandler Disconnect -- waiting for runnable %s to stop... ",name);
00597 TGo4Thread::Sleep(TGo4TaskHandler::fguTHREADSTOPTIME);
00598 if((t++>=TGo4TaskHandler::fgiTHREADSTOPCYCLES))
00599 {
00600 timeout=kTRUE;
00601 break;
00602 }
00603 }
00604 return (!timeout);
00605 }
00606
00607 void TGo4TaskHandler::SetAdminAccount(const char* name, const char* passwd)
00608 {
00609 if(name) fgxADMINISTRATORACCOUNT.SetName(name);
00610 if(passwd) fgxADMINISTRATORACCOUNT.SetTitle(passwd);
00611 }
00612
00613 void TGo4TaskHandler::SetCtrlAccount(const char* name, const char* passwd)
00614 {
00615 if(name) fgxCONTROLLERACCOUNT.SetName(name);
00616 if(passwd) fgxCONTROLLERACCOUNT.SetTitle(passwd);
00617 }
00618
00619 void TGo4TaskHandler::SetObservAccount(const char* name, const char* passwd)
00620 {
00621 if(name) fgxOBSERVERACCOUNT.SetName(name);
00622 if(passwd) fgxOBSERVERACCOUNT.SetTitle(passwd);
00623 }
00624
00625
00626
00627