00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4ServerTask.h"
00017
00018 #include <iostream.h>
00019
00020 #include "TApplication.h"
00021 #include "Go4Log/TGo4Log.h"
00022 #include "Go4LockGuard/TGo4LockGuard.h"
00023 #include "Go4Socket/TGo4Socket.h"
00024 #include "Go4Queue/TGo4BufferQueue.h"
00025 #include "Go4Queue/TGo4ObjectQueue.h"
00026 #include "Go4StatusBase/TGo4Status.h"
00027 #include "Go4CommandsTaskHandler/Go4CommandsTaskHandler.h"
00028 #include "Go4CommandsBase/TGo4CommandInvoker.h"
00029 #include "TGo4TaskManager.h"
00030 #include "TGo4TaskHandler.h"
00031 #include "TGo4ConnectorRunnable.h"
00032 #include "TGo4Master.h"
00033 #include "TGo4Slave.h"
00034 #include <Rtypes.h>
00035
00036
00037 const Int_t TGo4ServerTask::fgiOPENWAITCYCLES=100;
00038 const UInt_t TGo4ServerTask::fguOPENWAITCYCLETIME=100;
00039 const Int_t TGo4ServerTask::fgiCLOSEWAITCYCLES=100;
00040 const UInt_t TGo4ServerTask::fguCLOSEWAITCYCLETIME=100;
00041 const Int_t TGo4ServerTask::fgiCONNECTWAITCYCLES=20;
00042 const UInt_t TGo4ServerTask::fguCONNECTWAITCYCLETIME=200;
00043 const UInt_t TGo4ServerTask::fguCONNECTTIMERPERIOD=100;
00044
00045
00046 const Text_t TGo4ServerTask::fgcLAUNCHPREFSFILE[]="Go4Library/Go4LaunchClientPrefs.txt";
00047
00048
00049 TGo4ServerTask::TGo4ServerTask(const char* name,
00050 UInt_t negotiationport,
00051 Bool_t blockingmode,
00052 Bool_t standalone,
00053 Bool_t autostart,
00054 Bool_t autocreate,
00055 Bool_t ismaster)
00056 : TGo4Task(name,blockingmode, autostart,autocreate,ismaster),
00057 fxTaskManager(0),fxCurrentTaskHandler(0),
00058 fxConnectTransport(0), fxDisConnectTransport(0),
00059 fuConnectPort(0), fbKeepServerSocket(kFALSE),
00060 fbConnectRequest(kFALSE), fbDisConnectRequest(kFALSE),
00061 fbConnectIsOpen(kFALSE),fbConnectIsDone(kFALSE), fbConnectIsClose(kFALSE),
00062 fxConnectorTimer(0)
00063 {
00064 Text_t nomen[TGo4ThreadManager::fguTEXTLENGTH];
00065
00066 snprintf(nomen,TGo4ThreadManager::fguTEXTLENGTH -1, "TaskManager of %s", name);
00067 fxTaskManager= new TGo4TaskManager(nomen,this,negotiationport);
00068
00069 if(negotiationport!=42)
00070 {
00071
00072 snprintf(nomen,TGo4ThreadManager::fguTEXTLENGTH -1,"ConnectorRunnable of %s", name);
00073 TGo4ConnectorRunnable* conny = new TGo4ConnectorRunnable(nomen,this);
00074 snprintf(nomen,TGo4ThreadManager::fguTEXTLENGTH -1, "CONNECTOR-%s", name);
00075 fxConnectorName=nomen;
00076 fxWorkHandler->NewThread(GetConnectorName(), conny);
00077 }
00078 else {}
00079
00080 TGo4CommandInvoker::Instance();
00081 TGo4CommandInvoker::Register("ServerTask", this);
00082 fxConnectorTimer= new TGo4TaskConnectorTimer(this,fguCONNECTTIMERPERIOD);
00083 fxConnectorTimer->TurnOn();
00084 if(standalone)
00085 {
00086 Launch();
00087 }
00088 else
00089 {
00090
00091 }
00092 }
00093
00094
00095 TGo4ServerTask::~TGo4ServerTask()
00096 {
00097 if (GetWorkHandler()) GetWorkHandler()->CancelAll();
00098 delete fxConnectorTimer;
00099 delete fxTaskManager;
00100 }
00101
00102 Bool_t TGo4ServerTask::RemoveClient(const char* name, Bool_t clientwait, Bool_t isterminating)
00103 {
00104 Bool_t rev=kTRUE;
00105 TGo4TaskHandler* taskhandler=0;
00106 if(name && strstr(name,"current"))
00107 {
00108
00109 taskhandler=GetCurrentTaskHandler();
00110 }
00111 else
00112 {
00113
00114 taskhandler=GetTaskHandler(name);
00115 }
00116 if(taskhandler==0)
00117 {
00118
00119 TGo4Log::Debug(" ServerTask -- RemoveClient FAILED, no client %s !!! ",
00120 name);
00121 rev=kFALSE;
00122 }
00123 else
00124 {
00125 TGo4Log::Debug(" ServerTask -- removing client task %s ",name);
00126
00127 StopWorkThreads();
00128 if(clientwait)
00129 {
00130
00131
00132
00133
00134
00135 if(IsMaster())
00136 SubmitEmergencyCommand(kComQuit);
00137 else
00138 SubmitEmergencyData(kComQuit, taskhandler->GetName());
00139 TGo4Log::Debug(" Server Task -- Waiting for client %s disconnection...",taskhandler->GetName());
00140 Int_t removeresult=fxTaskManager->WaitForClientRemoved();
00141
00142
00143 switch(removeresult)
00144 {
00145 case -1:
00146
00147 TGo4Log::Debug(" !!! Server Task -- client remove wait TIMEOUT !!! ");
00148 rev=fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE);
00149 cout <<"client remove wait TIMEOUT after DisConnectClient " << endl;
00150 break;
00151
00152 case -2:
00153
00154 TGo4Log::Debug(" !!! Server Task -- client remove aborted for TERMINATION MODE !!! ");
00155 rev=kFALSE;
00156 break;
00157
00158 default:
00159
00160 TGo4Log::Debug(" Server Task -- waited %d cycles until client was removed. ",removeresult);
00161 rev=kTRUE;
00162 break;
00163
00164 }
00165 }
00166 else
00167 {
00168
00169 TGo4Log::Debug(" !!! Server Task -- removing client %s without waiting... ",
00170 taskhandler->GetName());
00171 SendStopBuffers(taskhandler->GetName());
00172 rev= (fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE)==0);
00173 }
00174 if(!isterminating) StartWorkThreads();
00175 }
00176 return rev;
00177 }
00178
00179 Int_t TGo4ServerTask::RemoveAllClients()
00180 {
00181 Int_t rev=0;
00182
00184
00185 TObjArray names;
00186 Bool_t reset=kTRUE;
00187 while((taskhandler=fxTaskManager->NextTaskHandler(reset)) !=0)
00188 {
00189 reset=kFALSE;
00190
00191 names.AddLast(new TNamed(taskhandler->GetName(), "title"));
00192 }
00193 TIter niter(&names);
00194 TObject* nomen=0;
00195 while((nomen =niter.Next()) !=0)
00196 {
00197
00198 RemoveClient(nomen->GetName(),kTRUE,kTRUE);
00199 rev++;
00200 }
00201 names.Delete();
00202
00203 return rev;
00204 }
00205
00206
00207 Bool_t TGo4ServerTask::RemoveCurrentClient()
00208 {
00209 Bool_t rev=kTRUE;
00210 TGo4TaskHandler* taskhandler=GetCurrentTaskHandler();
00211 if(taskhandler!=0)
00212 {
00213
00214 TGo4Log::Debug(" Server task -- removing current client %s ",taskhandler->GetName());
00215 rev = RemoveClient(taskhandler->GetName());
00216 }
00217 else
00218 {
00219 rev=kFALSE;
00220 }
00221 return rev;
00222 }
00223
00224 void TGo4ServerTask::SetCurrentTask(const char* name)
00225 {
00226
00227 TGo4TaskHandler* han=0;
00228 if(fxTaskManager==0)
00229 {
00230 TGo4Log::Debug(" TGo4ServerTask ''%s'' ERROR- task manager not existing!!! ");
00231 }
00232 else
00233 {
00234
00235 if(IsWorkStopped())
00236 {
00237
00238
00239
00240
00241
00242 }
00243 else
00244 {
00245
00246 StopWorkThreads();
00247 }
00248 {
00249 if(name==0)
00250 {
00251
00252 fxCurrentTaskHandler=fxTaskManager->GetLastTaskHandler();
00253
00254 }
00255 else
00256 {
00257
00258 han=fxTaskManager->GetTaskHandler(name);
00259 if(han)
00260 {
00261 fxCurrentTaskHandler=han;
00262
00263 }
00264 else
00265 {
00266 TGo4Log::Debug(" ServerTask: FAILED setting current task to %s-- no such client! ",name);
00267 }
00268 }
00269
00270 }
00271
00272 StartWorkThreads();
00273 }
00274 }
00275
00276
00277 TGo4TaskHandler* TGo4ServerTask::GetTaskHandler(const char* name)
00278 {
00279 return (fxTaskManager->GetTaskHandler(name));
00280 }
00281
00282 TGo4TaskHandler* TGo4ServerTask::GetTaskHandler()
00283 {
00284 return (GetCurrentTaskHandler());
00285 }
00286
00287 TGo4TaskHandler* TGo4ServerTask::GetCurrentTaskHandler()
00288 {
00289 return fxCurrentTaskHandler;
00290 }
00291
00292 TGo4TaskManager* TGo4ServerTask::GetTaskManager()
00293 {
00294 return fxTaskManager;
00295 }
00296
00297 void TGo4ServerTask::SetConnect(TGo4Socket * trans, const char* host, UInt_t port, Bool_t keepserv)
00298 {
00299 fxConnectTransport=trans;
00300 fxConnectHost=host;
00301 fuConnectPort=port;
00302 fbConnectRequest=kTRUE;
00303 fbKeepServerSocket=keepserv;
00304 }
00305
00306 void TGo4ServerTask::SetDisConnect(TGo4Socket * trans)
00307 {
00308 fxDisConnectTransport=trans;
00309 fbDisConnectRequest=kTRUE;
00310 }
00311
00312 Int_t TGo4ServerTask::TimerConnect()
00313 {
00314 Int_t rev=0;
00317 if(fbDisConnectRequest)
00318 {
00319 if(fxDisConnectTransport!=0)
00320 {
00321
00322 fxDisConnectTransport->Close();
00323
00324
00325 fbConnectIsClose=kTRUE;
00326 fbDisConnectRequest=kFALSE;
00327 rev+=1;
00328 }
00329 else
00330 {
00331
00332 rev+=32;
00333 }
00334 }
00335 else
00336 {
00337
00338 rev+=2;
00339 }
00340
00343 if(fbConnectRequest)
00344 {
00345
00346 if(fxConnectTransport!=0)
00347 {
00348
00349 if(!fxConnectTransport->IsOpen())
00350 {
00351
00352 fbConnectIsOpen=kTRUE;
00353 Int_t result=fxConnectTransport->Open(GetConnectHost(), fuConnectPort, fbKeepServerSocket);
00354 if(result==0)
00355 {
00356 fbConnectIsDone=kTRUE;
00357 fbConnectRequest=kFALSE;
00358 fbKeepServerSocket=kFALSE;
00359 rev+=4;
00360 }
00361 else
00362 {
00363 rev=-4;
00364
00365 }
00366 }
00367 else
00368 {
00369
00370 rev+=8;
00371 }
00372 }
00373 else
00374 {
00375 rev+=64;
00376
00377
00378
00379 }
00380 }
00381 else
00382 {
00383
00384 rev+=16;
00385 }
00386 return rev;
00387 }
00388
00389 Int_t TGo4ServerTask::WaitForOpen()
00390 {
00391 Int_t count=0;
00392 while(!fbConnectIsOpen)
00393 {
00394 if(count>TGo4ServerTask::fgiOPENWAITCYCLES)
00395 {
00396 count = -1;
00397 break;
00398 }
00399 else
00400 {
00401 TGo4Thread::Sleep(TGo4ServerTask::fguOPENWAITCYCLETIME);
00402 ++count;
00403 }
00404
00405 }
00406 fbConnectIsOpen=kFALSE;
00407 return count;
00408 }
00409
00410
00411 Int_t TGo4ServerTask::WaitForClose()
00412 {
00413 Int_t count=0;
00414 while(!fbConnectIsClose)
00415 {
00416 if(count>TGo4ServerTask::fgiCLOSEWAITCYCLES)
00417 {
00418 count = -1;
00419 break;
00420 }
00421 else
00422 {
00423 TGo4Thread::Sleep(TGo4ServerTask::fguCLOSEWAITCYCLETIME);
00424 ++count;
00425 }
00426
00427 }
00428 fbConnectIsClose=kFALSE;
00429 return count;
00430 }
00431
00432 Int_t TGo4ServerTask::WaitForConnection()
00433 {
00434 Int_t count=0;
00435 while(!fbConnectIsDone)
00436 {
00437 if(IsTerminating())
00438 {
00439 count = -2;
00440 break;
00441 }
00442
00443
00444
00445
00446
00447
00448 else
00449 {
00450 TGo4Thread::Sleep(TGo4ServerTask::fguCONNECTWAITCYCLETIME);
00451 ++count;
00452 }
00453
00454 }
00455 fbConnectIsDone=kFALSE;
00456 return count;
00457 }
00458
00459 TGo4Socket* TGo4ServerTask::GetConnectTransport()
00460 {
00461 return fxConnectTransport;
00462 }
00463
00464 TGo4BufferQueue* TGo4ServerTask::GetCommandQueue(const char* name)
00465 {
00466 TGo4BufferQueue* queue=0;
00467 TGo4TaskHandler* currenttask=0;
00468 if(name==0 || strstr(name,"current"))
00469 currenttask=GetCurrentTaskHandler();
00470 else
00471 currenttask=GetTaskHandler(name);
00472 if(currenttask)
00473 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetCommandQueue());
00474 return queue;
00475 }
00476
00477 TGo4BufferQueue * TGo4ServerTask::GetStatusQueue(const char* name)
00478 {
00479 TGo4BufferQueue* queue=0;
00480 TGo4TaskHandler* currenttask=0;
00481 if(name==0)
00482 currenttask=GetCurrentTaskHandler();
00483 else
00484 currenttask=GetTaskHandler(name);
00485 if(currenttask)
00486 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetStatusQueue());
00487 return queue;
00488 }
00489
00490
00491 TGo4BufferQueue * TGo4ServerTask::GetDataQueue(const char* name)
00492 {
00493 TGo4BufferQueue* queue=0;
00494 TGo4TaskHandler* currenttask=0;
00495 if(name==0 || strstr(name,"current"))
00496 currenttask=GetCurrentTaskHandler();
00497 else
00498 currenttask=GetTaskHandler(name);
00499 if(currenttask)
00500 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetDataQueue());
00501 return queue;
00502
00503 }
00504
00505 TGo4Command* TGo4ServerTask::NextCommand()
00506 {
00507 if(IsMaster()) return 0;
00508 TGo4Command* com=0;
00509 TGo4TaskHandler* han=0;
00510 Bool_t reset=kTRUE;
00511 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00512 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00513 {
00514 reset=kFALSE;
00515 TGo4BufferQueue * comq=dynamic_cast<TGo4BufferQueue*> (han->GetCommandQueue());
00516 if(comq==0) continue;
00517 if(!comq->IsEmpty())
00518 {
00519 com= dynamic_cast<TGo4Command*>(comq->WaitObjectFromBuffer());
00520 if(com)
00521 {
00522 com->SetTaskName(han->GetName());
00523 com->SetMode(han->GetRole());
00524 return com;
00525 }
00526 }
00527 }
00528 return com;
00529 }
00530
00531 void TGo4ServerTask::SendStatus(TGo4Status * stat, const char* receiver)
00532 {
00533 if(IsMaster()) return;
00534 if(stat==0) return;
00535 if(receiver!=0)
00536 {
00537 TGo4Task::SendStatus(stat,receiver);
00538 }
00539 else
00540 {
00541
00542 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00543 TGo4TaskHandler* han=0;
00544 Bool_t reset=kTRUE;
00545 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00546 {
00547 reset=kFALSE;
00548 TGo4BufferQueue * statq=dynamic_cast<TGo4BufferQueue*> (han->GetStatusQueue());
00549 if(statq==0) continue;
00550 TGo4Log::Debug(" Task - sending status %s to task %s", stat->ClassName(), han->GetName());
00551 statq->AddBufferFromObject(stat);
00552 }
00553 }
00554 }
00555
00556 void TGo4ServerTask::SendStatusBuffer()
00557 {
00558 if(IsMaster()) return;
00559 TGo4LockGuard statguard(fxStatusMutex);
00560 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00561 TGo4TaskHandler* han=0;
00562 Bool_t reset=kTRUE;
00563 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00564 {
00565 reset=kFALSE;
00566 TGo4BufferQueue * statq=dynamic_cast<TGo4BufferQueue*> (han->GetStatusQueue());
00567 if(statq==0) continue;
00568 TGo4Log::Debug(" Task - sending status buffer to task %s", han->GetName());
00569 statq->AddBuffer(fxStatusBuffer,kTRUE);
00570 }
00571 }
00572
00573
00574
00575 Bool_t TGo4ServerTask::StartConnectorThread()
00576 {
00577 Bool_t rev=kTRUE;
00578 rev= ( GetWorkHandler()->Start( GetConnectorName() ) );
00579
00580 return rev;
00581 }
00582
00583 Bool_t TGo4ServerTask::StopConnectorThread()
00584 {
00585 Bool_t rev=kTRUE;
00586 rev= ( GetWorkHandler()->Stop( GetConnectorName() ) );
00587
00588 const char* host = gSystem->HostName();
00589 Int_t negotiationport=fxTaskManager->GetNegotiationPort();
00590 TGo4Socket* connector= new TGo4Socket(kTRUE);
00591
00592 connector->Open(host,negotiationport);
00593 connector->Send(TGo4TaskHandler::fgcERROR);
00594 connector->Close();
00595 delete connector;
00596
00597 return rev;
00598 }
00599
00600
00601 Bool_t TGo4ServerTask::ConnectorThreadIsStopped()
00602 {
00603 Bool_t rev=kTRUE;
00604 TGo4Thread* conny= GetWorkHandler()->GetThread(GetConnectorName());
00605 rev= conny->IsWaiting();
00606 return rev;
00607 }
00608
00609 void TGo4ServerTask::Quit()
00610 {
00611 TGo4Log::Debug(" ServerTask Quit -- removing all connected clients ");
00612 SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName());
00613 RemoveAllClients();
00614
00615 WakeCommandQueue(TGo4Task::fgiTERMID);
00616 Terminate(!IsMaster());
00617 }
00618
00619
00620 void TGo4ServerTask::Shutdown()
00621 {
00622 TGo4Log::Debug(" ServerTask Shutdown without disconnect waiting");
00623 SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName());
00624 TGo4Thread::Sleep(10*fguCONNECTTIMERPERIOD);
00625 StopWorkThreads();
00626 WakeCommandQueue(TGo4Task::fgiTERMID);
00627 Terminate(!IsMaster());
00628 TGo4Slave* slave=GetSlave();
00629 if(slave)
00630 {
00631 slave->Stop();
00632
00633 slave->SetTask(0,kFALSE);
00634 delete slave;
00635 }
00636 gApplication->Terminate();
00637 }
00638
00639
00640
00641
00642 Int_t TGo4ServerTask::LaunchClient(const char* name,
00643 const char* remotehost,
00644 const char* remotedir,
00645 const char* remotecommand, Int_t mode)
00646 {
00647 Int_t rev = 0;
00648 Text_t commandstring[5*TGo4ThreadManager::fguTEXTLENGTH];
00649 Text_t formatstring[2*TGo4ThreadManager::fguTEXTLENGTH];
00650 Text_t filename[2*TGo4ThreadManager::fguTEXTLENGTH];
00651 Text_t shcom[TGo4ThreadManager::fguTEXTLENGTH];
00652 Text_t serverdisplay[TGo4ThreadManager::fguTEXTLENGTH];
00653 if(mode & kSecureShell)
00654 sprintf(shcom,"ssh ");
00655 else
00656 sprintf(shcom,"rsh -n");
00657 const Text_t* serverhost=gSystem->HostName();
00658 const Text_t* sdisplay=gSystem->Getenv("DISPLAY");
00659 const Text_t* go4system=gSystem->Getenv("GO4SYS");
00660 const Text_t* rootsystem=gSystem->Getenv("ROOTSYS");
00661 const Text_t* path=gSystem->Getenv("PATH");
00662 const Text_t* ldpath=gSystem->Getenv("LD_LIBRARY_PATH");
00663 UInt_t negport=GetTaskManager()->GetNegotiationPort();
00664 snprintf(serverdisplay, 2*TGo4ThreadManager::fguTEXTLENGTH,"-display %s",sdisplay);
00665 if(remotehost==0)
00666 {
00667 remotehost="localhost";
00668 }
00669 else
00670 { }
00671 if(GetMaster()) GetMaster()->SetSlave(remotehost,shcom,remotecommand);
00672
00673
00674 snprintf(filename, 2*TGo4ThreadManager::fguTEXTLENGTH,
00675 "%s/%s",go4system, TGo4ServerTask::fgcLAUNCHPREFSFILE);
00676 ifstream launchprefs(filename);
00677 if(!launchprefs)
00678 {
00679 TGo4Log::Debug(" ServerTask -- ERROR: Preferences file %s not existing, could not launch client ",
00680 filename);
00681 return -1;
00682 }
00683 StopConnectorThread();
00684 while(!ConnectorThreadIsStopped())
00685 {
00686 TGo4Log::Debug(" ServerTask -- waiting for connector thread to be stopped...");
00687 TGo4Thread::Sleep(500);
00688 }
00689
00690 TGo4Log::Debug(" ServerTask -- Launching new client process %s on host %s, port %d ",
00691 name, remotehost, negport);
00692
00693 launchprefs.getline(formatstring, 2*TGo4ThreadManager::fguTEXTLENGTH, '\n' );
00694 if((mode & kGuiEmbed) && GetMaster()!=0)
00695 {
00696
00697
00698
00699 snprintf(commandstring,
00700 5*TGo4ThreadManager::fguTEXTLENGTH-1,
00701 formatstring,
00702 shcom, remotehost, go4system, rootsystem,
00703 path, ldpath, remotedir, remotecommand, name, serverhost, negport);
00704
00705 GetMaster()->StartSlaveWindow(commandstring);
00706 }
00707 else
00708 {
00709 if (mode & kSecureShell) {
00710
00711
00712 launchprefs.getline(formatstring, 2*TGo4ThreadManager::fguTEXTLENGTH, '\n');
00713 snprintf(commandstring,
00714 5*TGo4ThreadManager::fguTEXTLENGTH-1,
00715 formatstring,
00716 shcom, remotehost, "", name, remotehost, go4system, rootsystem,
00717 path, ldpath, remotedir, remotecommand, name, serverhost, negport);
00718 }else {
00719
00720 launchprefs.getline(formatstring, 2*TGo4ThreadManager::fguTEXTLENGTH, '\n');
00721 snprintf(commandstring,
00722 5*TGo4ThreadManager::fguTEXTLENGTH-1,
00723 formatstring,
00724 shcom, remotehost, serverdisplay, name, remotehost, go4system, rootsystem,
00725 path, ldpath, remotedir, remotecommand, name, serverhost, negport);
00726 }
00727
00728 rev=gSystem->Exec(commandstring);
00729 }
00730
00731
00732
00733 StartConnectorThread();
00734 return rev;
00735 }
00736
00737
00738 ClassImp(TGo4ServerTask)
00739
00740
00741
00742
00743