00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4ServerTask.h"
00015
00016 #include "TMutex.h"
00017 #include "TApplication.h"
00018 #include "TObjArray.h"
00019
00020 #include "TGo4Log.h"
00021 #include "TGo4LockGuard.h"
00022 #include "TGo4Socket.h"
00023 #include "TGo4BufferQueue.h"
00024 #include "TGo4Thread.h"
00025 #include "TGo4ThreadHandler.h"
00026 #include "TGo4Status.h"
00027 #include "TGo4CommandInvoker.h"
00028 #include "TGo4TaskManager.h"
00029 #include "TGo4TaskHandler.h"
00030 #include "TGo4ConnectorRunnable.h"
00031 #include "TGo4Master.h"
00032 #include "TGo4Slave.h"
00033 #include "TGo4TaskConnectorTimer.h"
00034
00035
00036 const Int_t TGo4ServerTask::fgiOPENWAITCYCLES=100;
00037 const UInt_t TGo4ServerTask::fguOPENWAITCYCLETIME=100;
00038 const Int_t TGo4ServerTask::fgiCLOSEWAITCYCLES=100;
00039 const UInt_t TGo4ServerTask::fguCLOSEWAITCYCLETIME=100;
00040 const Int_t TGo4ServerTask::fgiCONNECTWAITCYCLES=20;
00041 const UInt_t TGo4ServerTask::fguCONNECTWAITCYCLETIME=200;
00042 const UInt_t TGo4ServerTask::fguCONNECTTIMERPERIOD=100;
00043
00044
00045 const char* TGo4ServerTask::fgcLAUNCHPREFSFILE = "etc/Go4LaunchPrefs.txt";
00046
00047
00048 TGo4ServerTask::TGo4ServerTask(const char* name,
00049 UInt_t negotiationport,
00050 Bool_t blockingmode,
00051 Bool_t standalone,
00052 Bool_t autostart,
00053 Bool_t autocreate,
00054 Bool_t ismaster)
00055 : TGo4Task(name,blockingmode, autostart,autocreate,ismaster),
00056 fxTaskManager(0),fxCurrentTaskHandler(0),
00057 fxConnectTransport(0), fxDisConnectTransport(0),
00058 fuConnectPort(0), fbKeepServerSocket(kFALSE),
00059 fbConnectRequest(kFALSE), fbDisConnectRequest(kFALSE),
00060 fbConnectIsOpen(kFALSE),fbConnectIsDone(kFALSE), fbConnectIsClose(kFALSE),
00061 fxConnectorTimer(0)
00062 {
00063 TString nomen("TaskManager of "); nomen += name;
00064 fxTaskManager= new TGo4TaskManager(nomen.Data(), this, negotiationport);
00065
00066 if(negotiationport!=42)
00067 {
00068
00069 nomen.Form("ConnectorRunnable of %s", name);
00070 TGo4ConnectorRunnable* conny = new TGo4ConnectorRunnable(nomen.Data(),this);
00071 nomen.Form("CONNECTOR-%s", name);
00072 fxConnectorName=nomen;
00073 fxWorkHandler->NewThread(GetConnectorName(), conny);
00074 }
00075 else {}
00076
00077 TGo4CommandInvoker::Instance();
00078 TGo4CommandInvoker::Register("ServerTask", this);
00079 fxConnectorTimer= new TGo4TaskConnectorTimer(this,fguCONNECTTIMERPERIOD);
00080 fxConnectorTimer->TurnOn();
00081 if(standalone)
00082 {
00083 Launch();
00084 }
00085 else
00086 {
00087
00088 }
00089 }
00090
00091
00092 TGo4ServerTask::~TGo4ServerTask()
00093 {
00094 if (GetWorkHandler()) GetWorkHandler()->CancelAll();
00095 delete fxConnectorTimer;
00096 delete fxTaskManager;
00097 TGo4CommandInvoker::UnRegister(this);
00098 }
00099
00100 Bool_t TGo4ServerTask::RemoveClient(const char* name, Bool_t clientwait, Bool_t isterminating)
00101 {
00102 Bool_t rev=kTRUE;
00103 TGo4TaskHandler* taskhandler=0;
00104 if(name && strstr(name,"current"))
00105 taskhandler=GetCurrentTaskHandler();
00106 else
00107 taskhandler=GetTaskHandler(name);
00108 if(taskhandler==0)
00109 {
00110
00111 TGo4Log::Debug(" ServerTask -- RemoveClient FAILED, no client %s !!! ",
00112 name);
00113 rev=kFALSE;
00114 }
00115 else
00116 {
00117 TGo4Log::Debug(" ServerTask -- removing client task %s ",name);
00118
00119 StopWorkThreads();
00120 if(clientwait)
00121 {
00122
00123
00124
00125
00126
00127
00128 if(IsMaster())
00129 SubmitEmergencyCommand(kComQuit);
00130 else
00131 SubmitEmergencyData(kComQuit, taskhandler->GetName());
00132 TGo4Log::Debug(" Server Task -- Waiting for client %s disconnection...",taskhandler->GetName());
00133 Int_t removeresult=fxTaskManager->WaitForClientRemoved();
00134
00135
00136 switch(removeresult)
00137 {
00138 case -1:
00139
00140 TGo4Log::Debug(" !!! Server Task -- client remove wait TIMEOUT !!! ");
00141 rev=fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE);
00142 break;
00143
00144 case -2:
00145
00146 TGo4Log::Debug(" !!! Server Task -- client remove aborted for TERMINATION MODE !!! ");
00147 rev=kFALSE;
00148 break;
00149
00150 default:
00151
00152 TGo4Log::Debug(" Server Task -- waited %d cycles until client was removed. ",removeresult);
00153 rev=kTRUE;
00154 break;
00155
00156 }
00157 }
00158 else
00159 {
00160
00161 TGo4Log::Debug(" !!! Server Task -- removing client %s without waiting... ",
00162 taskhandler->GetName());
00163 SendStopBuffers(taskhandler->GetName());
00164 rev= (fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE)==0);
00165 }
00166 if(!isterminating) StartWorkThreads();
00167 }
00168 return rev;
00169 }
00170
00171 Int_t TGo4ServerTask::RemoveAllClients(Bool_t force)
00172 {
00173 Int_t rev=0;
00174
00176 TGo4TaskHandler* taskhandler=0;
00177 TObjArray names;
00178 Bool_t reset=kTRUE;
00179 while((taskhandler=fxTaskManager->NextTaskHandler(reset)) !=0)
00180 {
00181 reset=kFALSE;
00182
00183 names.AddLast(new TNamed(taskhandler->GetName(), "title"));
00184 }
00185 TIter niter(&names);
00186 TObject* nomen=0;
00187 while((nomen =niter.Next()) !=0)
00188 {
00189
00190 RemoveClient(nomen->GetName(),!force,kTRUE);
00191 rev++;
00192 }
00193 names.Delete();
00194
00195 return rev;
00196 }
00197
00198
00199 Bool_t TGo4ServerTask::RemoveCurrentClient()
00200 {
00201 Bool_t rev=kTRUE;
00202 TGo4TaskHandler* taskhandler=GetCurrentTaskHandler();
00203 if(taskhandler!=0)
00204 {
00205
00206 TGo4Log::Debug(" Server task -- removing current client %s ",taskhandler->GetName());
00207 rev = RemoveClient(taskhandler->GetName());
00208 }
00209 else
00210 {
00211 rev=kFALSE;
00212 }
00213 return rev;
00214 }
00215
00216 void TGo4ServerTask::SetCurrentTask(const char* name)
00217 {
00218
00219 TGo4TaskHandler* han=0;
00220 if(fxTaskManager==0)
00221 {
00222 TGo4Log::Debug(" TGo4ServerTask ''%s'' ERROR- task manager not existing!!! ");
00223 }
00224 else
00225 {
00226
00227 if(IsWorkStopped())
00228 {
00229
00230
00231
00232
00233
00234 }
00235 else
00236 {
00237
00238 StopWorkThreads();
00239 }
00240 {
00241 if(name==0)
00242 {
00243
00244 fxCurrentTaskHandler=fxTaskManager->GetLastTaskHandler();
00245
00246 }
00247 else
00248 {
00249
00250 han=fxTaskManager->GetTaskHandler(name);
00251 if(han)
00252 {
00253 fxCurrentTaskHandler=han;
00254
00255 }
00256 else
00257 {
00258 TGo4Log::Debug(" ServerTask: FAILED setting current task to %s-- no such client! ",name);
00259 }
00260 }
00261
00262 }
00263
00264 StartWorkThreads();
00265 }
00266 }
00267
00268
00269 TGo4TaskHandler* TGo4ServerTask::GetTaskHandler(const char* name)
00270 {
00271 return (fxTaskManager->GetTaskHandler(name));
00272 }
00273
00274 TGo4TaskHandler* TGo4ServerTask::GetTaskHandler()
00275 {
00276 return (GetCurrentTaskHandler());
00277 }
00278
00279 TGo4TaskHandler* TGo4ServerTask::GetCurrentTaskHandler()
00280 {
00281 return fxCurrentTaskHandler;
00282 }
00283
00284 TGo4TaskManager* TGo4ServerTask::GetTaskManager()
00285 {
00286 return fxTaskManager;
00287 }
00288
00289 void TGo4ServerTask::SetConnect(TGo4Socket * trans, const char* host, UInt_t port, Bool_t keepserv)
00290 {
00291 fxConnectTransport=trans;
00292 fxConnectHost=host;
00293 fuConnectPort=port;
00294 fbConnectRequest=kTRUE;
00295 fbKeepServerSocket=keepserv;
00296 }
00297
00298 void TGo4ServerTask::SetDisConnect(TGo4Socket * trans)
00299 {
00300 fxDisConnectTransport=trans;
00301 fbDisConnectRequest=kTRUE;
00302 }
00303
00304 Int_t TGo4ServerTask::TimerConnect()
00305 {
00306 Int_t rev=0;
00309 if(fbDisConnectRequest)
00310 {
00311 if(fxDisConnectTransport!=0)
00312 {
00313
00314 fxDisConnectTransport->Close();
00315
00316
00317 fbConnectIsClose=kTRUE;
00318 fbDisConnectRequest=kFALSE;
00319 rev+=1;
00320 }
00321 else
00322 {
00323
00324 rev+=32;
00325 }
00326 }
00327 else
00328 {
00329
00330 rev+=2;
00331 }
00332
00335 if(fbConnectRequest)
00336 {
00337
00338 if(fxConnectTransport!=0)
00339 {
00340
00341 if(!fxConnectTransport->IsOpen())
00342 {
00343
00344
00345 fbConnectIsOpen=kTRUE;
00346 Int_t result=fxConnectTransport->Open(GetConnectHost(), fuConnectPort, fbKeepServerSocket);
00347 if(result==0)
00348 {
00349 fbConnectIsDone=kTRUE;
00350 fbConnectRequest=kFALSE;
00351 fbKeepServerSocket=kFALSE;
00352 rev+=4;
00353 }
00354 else
00355 {
00356 rev=-4;
00357
00358 }
00359 }
00360 else
00361 {
00362
00363
00364 fbConnectIsOpen=kTRUE;
00365 fbConnectIsDone=kTRUE;
00366 fbConnectRequest=kFALSE;
00367 fbKeepServerSocket=kFALSE;
00368 rev+=8;
00369 }
00370 }
00371 else
00372 {
00373 rev+=64;
00374
00375
00376
00377 }
00378 }
00379 else
00380 {
00381
00382 rev+=16;
00383 }
00384 return rev;
00385 }
00386
00387 Int_t TGo4ServerTask::WaitForOpen()
00388 {
00389 Int_t count=0;
00390 while(!fbConnectIsOpen)
00391 {
00392 if(count>TGo4ServerTask::fgiOPENWAITCYCLES)
00393 {
00394 count = -1;
00395 break;
00396 }
00397 else
00398 {
00399 TGo4Thread::Sleep(TGo4ServerTask::fguOPENWAITCYCLETIME);
00400 ++count;
00401 }
00402
00403 }
00404 fbConnectIsOpen=kFALSE;
00405 return count;
00406 }
00407
00408
00409 Int_t TGo4ServerTask::WaitForClose()
00410 {
00411 Int_t count=0;
00412 while(!fbConnectIsClose)
00413 {
00414 if(count>TGo4ServerTask::fgiCLOSEWAITCYCLES)
00415 {
00416 count = -1;
00417 break;
00418 }
00419 else
00420 {
00421 TGo4Thread::Sleep(TGo4ServerTask::fguCLOSEWAITCYCLETIME);
00422 ++count;
00423 }
00424
00425 }
00426 fbConnectIsClose=kFALSE;
00427 return count;
00428 }
00429
00430 Int_t TGo4ServerTask::WaitForConnection()
00431 {
00432 Int_t count=0;
00433 while(!fbConnectIsDone)
00434 {
00435 if(IsTerminating())
00436 {
00437 count = -2;
00438 break;
00439 }
00440
00441
00442
00443
00444
00445
00446 else
00447 {
00448 TGo4Thread::Sleep(TGo4ServerTask::fguCONNECTWAITCYCLETIME);
00449 ++count;
00450 }
00451
00452 }
00453 fbConnectIsDone=kFALSE;
00454 return count;
00455 }
00456
00457 TGo4Socket* TGo4ServerTask::GetConnectTransport()
00458 {
00459 return fxConnectTransport;
00460 }
00461
00462 TGo4BufferQueue* TGo4ServerTask::GetCommandQueue(const char* name)
00463 {
00464 TGo4BufferQueue* queue=0;
00465 TGo4TaskHandler* currenttask=0;
00466 if(name==0 || strstr(name,"current"))
00467 currenttask=GetCurrentTaskHandler();
00468 else
00469 currenttask=GetTaskHandler(name);
00470 if(currenttask)
00471 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetCommandQueue());
00472 return queue;
00473 }
00474
00475 TGo4BufferQueue * TGo4ServerTask::GetStatusQueue(const char* name)
00476 {
00477 TGo4BufferQueue* queue=0;
00478 TGo4TaskHandler* currenttask=0;
00479 if(name==0)
00480 currenttask=GetCurrentTaskHandler();
00481 else
00482 currenttask=GetTaskHandler(name);
00483 if(currenttask)
00484 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetStatusQueue());
00485 return queue;
00486 }
00487
00488
00489 TGo4BufferQueue * TGo4ServerTask::GetDataQueue(const char* name)
00490 {
00491 TGo4BufferQueue* queue=0;
00492 TGo4TaskHandler* currenttask=0;
00493 if(name==0 || strstr(name,"current"))
00494 currenttask=GetCurrentTaskHandler();
00495 else
00496 currenttask=GetTaskHandler(name);
00497 if(currenttask)
00498 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetDataQueue());
00499 return queue;
00500
00501 }
00502
00503 TGo4Command* TGo4ServerTask::NextCommand()
00504 {
00505 if(IsMaster()) return 0;
00506 TGo4Command* com=0;
00507 TGo4TaskHandler* han=0;
00508 Bool_t reset=kTRUE;
00509 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00510 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00511 {
00512 reset=kFALSE;
00513 TGo4BufferQueue * comq=dynamic_cast<TGo4BufferQueue*> (han->GetCommandQueue());
00514 if(comq==0) continue;
00515 if(!comq->IsEmpty())
00516 {
00517 com= dynamic_cast<TGo4Command*>(comq->WaitObjectFromBuffer());
00518 if(com)
00519 {
00520 com->SetTaskName(han->GetName());
00521 com->SetMode(han->GetRole());
00522 return com;
00523 }
00524 }
00525 }
00526 return com;
00527 }
00528
00529 void TGo4ServerTask::SendStatus(TGo4Status * stat, const char* receiver)
00530 {
00531 if(IsMaster()) return;
00532 if(stat==0) return;
00533 if(receiver!=0) {
00534 TGo4Task::SendStatus(stat,receiver);
00535 return;
00536 }
00537
00538 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00539 TGo4TaskHandler* han = 0;
00540 Bool_t reset = kTRUE;
00541 while((han = fxTaskManager->NextTaskHandler(reset))!=0) {
00542 reset = kFALSE;
00543 TGo4BufferQueue * statq=dynamic_cast<TGo4BufferQueue*> (han->GetStatusQueue());
00544 if(statq==0) continue;
00545 TGo4Log::Debug(" Task - sending status %s to task %s", stat->ClassName(), han->GetName());
00546 statq->AddBufferFromObject(stat);
00547 }
00548 }
00549
00550 void TGo4ServerTask::SendStatusBuffer()
00551 {
00552 if(IsMaster()) return;
00553
00554 TGo4LockGuard statguard(fxStatusMutex);
00555
00556 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00557 TGo4TaskHandler* han=0;
00558 Bool_t reset=kTRUE;
00559 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00560 {
00561 reset=kFALSE;
00562 TGo4BufferQueue * statq=dynamic_cast<TGo4BufferQueue*> (han->GetStatusQueue());
00563 if(statq==0) continue;
00564 TGo4Log::Debug(" Task - sending status buffer to task %s", han->GetName());
00565 statq->AddBuffer(fxStatusBuffer,kTRUE);
00566 }
00567 }
00568
00569
00570
00571 Bool_t TGo4ServerTask::StartConnectorThread()
00572 {
00573 return GetWorkHandler()->Start( GetConnectorName() );
00574 }
00575
00576 Bool_t TGo4ServerTask::StopConnectorThread()
00577 {
00578 Bool_t rev = GetWorkHandler()->Stop( GetConnectorName() );
00579
00580 const char* host = gSystem->HostName();
00581 Int_t negotiationport = fxTaskManager->GetNegotiationPort();
00582 TGo4Socket* connector = new TGo4Socket(kTRUE);
00583
00584 connector->Open(host,negotiationport);
00585 connector->Send(TGo4TaskHandler::Get_fgcERROR());
00586 connector->Close();
00587 delete connector;
00588 return rev;
00589 }
00590
00591
00592 Bool_t TGo4ServerTask::ConnectorThreadIsStopped()
00593 {
00594 TGo4Thread* conny = GetWorkHandler()->GetThread(GetConnectorName());
00595 return conny->IsWaiting();
00596 }
00597
00598 void TGo4ServerTask::Quit()
00599 {
00600 TGo4Log::Debug(" ServerTask Quit -- removing all connected clients ");
00601 SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName());
00602 TGo4Slave* slave=GetSlave();
00603 if(slave)
00604 {
00605 TGo4Log::Debug(" ServerTask Quit is stopping slave...");
00606 slave->Stop();
00607 }
00608 if(!IsMaster())
00609 {
00610
00611 fxTaskManager->GetMutex()->UnLock();
00612 }
00613 RemoveAllClients();
00614
00615 WakeCommandQueue(TGo4Task::Get_fgiTERMID());
00616 Terminate(!IsMaster());
00617
00618
00619 }
00620
00621
00622 void TGo4ServerTask::Shutdown()
00623 {
00624 TGo4Log::Debug(" ServerTask Shutdown without disconnect waiting");
00625 SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName());
00626 fxTaskManager->GetMutex()->UnLock();
00627 TGo4Thread::Sleep(10*fguCONNECTTIMERPERIOD);
00628 StopWorkThreads();
00629 WakeCommandQueue(TGo4Task::Get_fgiTERMID());
00630
00631 RemoveAllClients(true);
00632
00633 TGo4Slave* slave=GetSlave();
00634 if(slave)
00635 {
00636 TGo4Log::Debug(" ServerTask Shutdown stopping slave...");
00637 slave->Stop();
00638
00639 slave->SetTask(0,kFALSE);
00640 delete slave;
00641
00642 }
00643 fxTaskManager->GetMutex()->Lock();
00644 gApplication->Terminate();
00645 }
00646
00647
00648 void TGo4ServerTask::LockAll()
00649 {
00650
00651
00652 fxStatusMutex->Lock();
00653
00654 fxTaskManager->GetMutex()->Lock();
00655
00656 TGo4LockGuard::LockMainMutex();
00657
00658
00659 }
00660
00661 void TGo4ServerTask::UnLockAll()
00662 {
00663
00664 TGo4LockGuard::UnLockMainMutex();
00665
00666 fxTaskManager->GetMutex()->UnLock();
00667
00668 fxStatusMutex->UnLock();
00669
00670 }
00671
00672 const char* TGo4ServerTask::Get_fgcLAUNCHPREFSFILE()
00673 {
00674 return fgcLAUNCHPREFSFILE;
00675 }
00676
00677