00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4ServerTask.h"
00015
00016 #include "Riostream.h"
00017
00018 #include "TMutex.h"
00019 #include "TApplication.h"
00020 #include "TObjArray.h"
00021
00022 #include "TGo4Log.h"
00023 #include "TGo4LockGuard.h"
00024 #include "TGo4Socket.h"
00025 #include "TGo4BufferQueue.h"
00026 #include "TGo4Thread.h"
00027 #include "TGo4ThreadHandler.h"
00028 #include "TGo4Status.h"
00029 #include "TGo4CommandInvoker.h"
00030 #include "TGo4TaskManager.h"
00031 #include "TGo4TaskHandler.h"
00032 #include "TGo4ConnectorRunnable.h"
00033 #include "TGo4Master.h"
00034 #include "TGo4Slave.h"
00035 #include "TGo4TaskConnectorTimer.h"
00036
00037
00038 const Int_t TGo4ServerTask::fgiOPENWAITCYCLES=100;
00039 const UInt_t TGo4ServerTask::fguOPENWAITCYCLETIME=100;
00040 const Int_t TGo4ServerTask::fgiCLOSEWAITCYCLES=100;
00041 const UInt_t TGo4ServerTask::fguCLOSEWAITCYCLETIME=100;
00042 const Int_t TGo4ServerTask::fgiCONNECTWAITCYCLES=20;
00043 const UInt_t TGo4ServerTask::fguCONNECTWAITCYCLETIME=200;
00044 const UInt_t TGo4ServerTask::fguCONNECTTIMERPERIOD=100;
00045
00046
00047 const char* TGo4ServerTask::fgcLAUNCHPREFSFILE = "etc/Go4LaunchPrefs.txt";
00048
00049
00050 TGo4ServerTask::TGo4ServerTask(const char* name,
00051 UInt_t negotiationport,
00052 Bool_t blockingmode,
00053 Bool_t standalone,
00054 Bool_t autostart,
00055 Bool_t autocreate,
00056 Bool_t ismaster)
00057 : TGo4Task(name,blockingmode, autostart,autocreate,ismaster),
00058 fxTaskManager(0),fxCurrentTaskHandler(0),
00059 fxConnectTransport(0), fxDisConnectTransport(0),
00060 fuConnectPort(0), fbKeepServerSocket(kFALSE),
00061 fbConnectRequest(kFALSE), fbDisConnectRequest(kFALSE),
00062 fbConnectIsOpen(kFALSE),fbConnectIsDone(kFALSE), fbConnectIsClose(kFALSE),
00063 fxConnectorTimer(0)
00064 {
00065 TString nomen("TaskManager of "); nomen += name;
00066 fxTaskManager= new TGo4TaskManager(nomen.Data(), this, negotiationport);
00067
00068 if(negotiationport!=42)
00069 {
00070
00071 nomen.Form("ConnectorRunnable of %s", name);
00072 TGo4ConnectorRunnable* conny = new TGo4ConnectorRunnable(nomen.Data(),this);
00073 nomen.Form("CONNECTOR-%s", name);
00074 fxConnectorName=nomen;
00075 fxWorkHandler->NewThread(GetConnectorName(), conny);
00076 }
00077 else {}
00078
00079 TGo4CommandInvoker::Instance();
00080 TGo4CommandInvoker::Register("ServerTask", this);
00081 fxConnectorTimer= new TGo4TaskConnectorTimer(this,fguCONNECTTIMERPERIOD);
00082 fxConnectorTimer->TurnOn();
00083 if(standalone)
00084 {
00085 Launch();
00086 }
00087 else
00088 {
00089
00090 }
00091 }
00092
00093
00094 TGo4ServerTask::~TGo4ServerTask()
00095 {
00096 if (GetWorkHandler()) GetWorkHandler()->CancelAll();
00097 delete fxConnectorTimer;
00098 delete fxTaskManager;
00099 TGo4CommandInvoker::UnRegister(this);
00100 }
00101
00102 Bool_t TGo4ServerTask::RemoveClient(const char* name, Bool_t clientwait, Bool_t isterminating)
00103 {
00104 Bool_t rev=kTRUE;
00105 TGo4TaskHandler* taskhandler=0;
00106 if(name && strstr(name,"current"))
00107 taskhandler=GetCurrentTaskHandler();
00108 else
00109 taskhandler=GetTaskHandler(name);
00110 if(taskhandler==0)
00111 {
00112
00113 TGo4Log::Debug(" ServerTask -- RemoveClient FAILED, no client %s !!! ",
00114 name);
00115 rev=kFALSE;
00116 }
00117 else
00118 {
00119 TGo4Log::Debug(" ServerTask -- removing client task %s ",name);
00120
00121 StopWorkThreads();
00122 if(clientwait)
00123 {
00124
00125
00126
00127
00128
00129
00130 if(IsMaster())
00131 SubmitEmergencyCommand(kComQuit);
00132 else
00133 SubmitEmergencyData(kComQuit, taskhandler->GetName());
00134 TGo4Log::Debug(" Server Task -- Waiting for client %s disconnection...",taskhandler->GetName());
00135 Int_t removeresult=fxTaskManager->WaitForClientRemoved();
00136
00137
00138 switch(removeresult)
00139 {
00140 case -1:
00141
00142 TGo4Log::Debug(" !!! Server Task -- client remove wait TIMEOUT !!! ");
00143 rev=fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE);
00144 break;
00145
00146 case -2:
00147
00148 TGo4Log::Debug(" !!! Server Task -- client remove aborted for TERMINATION MODE !!! ");
00149 rev=kFALSE;
00150 break;
00151
00152 default:
00153
00154 TGo4Log::Debug(" Server Task -- waited %d cycles until client was removed. ",removeresult);
00155 rev=kTRUE;
00156 break;
00157
00158 }
00159 }
00160 else
00161 {
00162
00163 TGo4Log::Debug(" !!! Server Task -- removing client %s without waiting... ",
00164 taskhandler->GetName());
00165 SendStopBuffers(taskhandler->GetName());
00166 rev= (fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE)==0);
00167 }
00168 if(!isterminating) StartWorkThreads();
00169 }
00170 return rev;
00171 }
00172
00173 Int_t TGo4ServerTask::RemoveAllClients(Bool_t force)
00174 {
00175 Int_t rev=0;
00176
00178 TGo4TaskHandler* taskhandler=0;
00179 TObjArray names;
00180 Bool_t reset=kTRUE;
00181 while((taskhandler=fxTaskManager->NextTaskHandler(reset)) !=0)
00182 {
00183 reset=kFALSE;
00184
00185 names.AddLast(new TNamed(taskhandler->GetName(), "title"));
00186 }
00187 TIter niter(&names);
00188 TObject* nomen=0;
00189 while((nomen =niter.Next()) !=0)
00190 {
00191
00192 RemoveClient(nomen->GetName(),!force,kTRUE);
00193 rev++;
00194 }
00195 names.Delete();
00196
00197 return rev;
00198 }
00199
00200
00201 Bool_t TGo4ServerTask::RemoveCurrentClient()
00202 {
00203 Bool_t rev=kTRUE;
00204 TGo4TaskHandler* taskhandler=GetCurrentTaskHandler();
00205 if(taskhandler!=0)
00206 {
00207
00208 TGo4Log::Debug(" Server task -- removing current client %s ",taskhandler->GetName());
00209 rev = RemoveClient(taskhandler->GetName());
00210 }
00211 else
00212 {
00213 rev=kFALSE;
00214 }
00215 return rev;
00216 }
00217
00218 void TGo4ServerTask::SetCurrentTask(const char* name)
00219 {
00220
00221 TGo4TaskHandler* han=0;
00222 if(fxTaskManager==0)
00223 {
00224 TGo4Log::Debug(" TGo4ServerTask ''%s'' ERROR- task manager not existing!!! ");
00225 }
00226 else
00227 {
00228
00229 if(IsWorkStopped())
00230 {
00231
00232
00233
00234
00235
00236 }
00237 else
00238 {
00239
00240 StopWorkThreads();
00241 }
00242 {
00243 if(name==0)
00244 {
00245
00246 fxCurrentTaskHandler=fxTaskManager->GetLastTaskHandler();
00247
00248 }
00249 else
00250 {
00251
00252 han=fxTaskManager->GetTaskHandler(name);
00253 if(han)
00254 {
00255 fxCurrentTaskHandler=han;
00256
00257 }
00258 else
00259 {
00260 TGo4Log::Debug(" ServerTask: FAILED setting current task to %s-- no such client! ",name);
00261 }
00262 }
00263
00264 }
00265
00266 StartWorkThreads();
00267 }
00268 }
00269
00270
00271 TGo4TaskHandler* TGo4ServerTask::GetTaskHandler(const char* name)
00272 {
00273 return (fxTaskManager->GetTaskHandler(name));
00274 }
00275
00276 TGo4TaskHandler* TGo4ServerTask::GetTaskHandler()
00277 {
00278 return (GetCurrentTaskHandler());
00279 }
00280
00281 TGo4TaskHandler* TGo4ServerTask::GetCurrentTaskHandler()
00282 {
00283 return fxCurrentTaskHandler;
00284 }
00285
00286 TGo4TaskManager* TGo4ServerTask::GetTaskManager()
00287 {
00288 return fxTaskManager;
00289 }
00290
00291 void TGo4ServerTask::SetConnect(TGo4Socket * trans, const char* host, UInt_t port, Bool_t keepserv)
00292 {
00293 fxConnectTransport=trans;
00294 fxConnectHost=host;
00295 fuConnectPort=port;
00296 fbConnectRequest=kTRUE;
00297 fbKeepServerSocket=keepserv;
00298 }
00299
00300 void TGo4ServerTask::SetDisConnect(TGo4Socket * trans)
00301 {
00302 fxDisConnectTransport=trans;
00303 fbDisConnectRequest=kTRUE;
00304 }
00305
00306 Int_t TGo4ServerTask::TimerConnect()
00307 {
00308 Int_t rev=0;
00311 if(fbDisConnectRequest)
00312 {
00313 if(fxDisConnectTransport!=0)
00314 {
00315
00316 fxDisConnectTransport->Close();
00317
00318
00319 fbConnectIsClose=kTRUE;
00320 fbDisConnectRequest=kFALSE;
00321 rev+=1;
00322 }
00323 else
00324 {
00325
00326 rev+=32;
00327 }
00328 }
00329 else
00330 {
00331
00332 rev+=2;
00333 }
00334
00337 if(fbConnectRequest)
00338 {
00339
00340 if(fxConnectTransport!=0)
00341 {
00342
00343 if(!fxConnectTransport->IsOpen())
00344 {
00345
00346
00347 fbConnectIsOpen=kTRUE;
00348 Int_t result=fxConnectTransport->Open(GetConnectHost(), fuConnectPort, fbKeepServerSocket);
00349 if(result==0)
00350 {
00351 fbConnectIsDone=kTRUE;
00352 fbConnectRequest=kFALSE;
00353 fbKeepServerSocket=kFALSE;
00354 rev+=4;
00355 }
00356 else
00357 {
00358 rev=-4;
00359
00360 }
00361 }
00362 else
00363 {
00364
00365
00366 fbConnectIsOpen=kTRUE;
00367 fbConnectIsDone=kTRUE;
00368 fbConnectRequest=kFALSE;
00369 fbKeepServerSocket=kFALSE;
00370 rev+=8;
00371 }
00372 }
00373 else
00374 {
00375 rev+=64;
00376
00377
00378
00379 }
00380 }
00381 else
00382 {
00383
00384 rev+=16;
00385 }
00386 return rev;
00387 }
00388
00389 Int_t TGo4ServerTask::WaitForOpen()
00390 {
00391 Int_t count=0;
00392 while(!fbConnectIsOpen)
00393 {
00394 if(count>TGo4ServerTask::fgiOPENWAITCYCLES)
00395 {
00396 count = -1;
00397 break;
00398 }
00399 else
00400 {
00401 TGo4Thread::Sleep(TGo4ServerTask::fguOPENWAITCYCLETIME);
00402 ++count;
00403 }
00404
00405 }
00406 fbConnectIsOpen=kFALSE;
00407 return count;
00408 }
00409
00410
00411 Int_t TGo4ServerTask::WaitForClose()
00412 {
00413 Int_t count=0;
00414 while(!fbConnectIsClose)
00415 {
00416 if(count>TGo4ServerTask::fgiCLOSEWAITCYCLES)
00417 {
00418 count = -1;
00419 break;
00420 }
00421 else
00422 {
00423 TGo4Thread::Sleep(TGo4ServerTask::fguCLOSEWAITCYCLETIME);
00424 ++count;
00425 }
00426
00427 }
00428 fbConnectIsClose=kFALSE;
00429 return count;
00430 }
00431
00432 Int_t TGo4ServerTask::WaitForConnection()
00433 {
00434 Int_t count=0;
00435 while(!fbConnectIsDone)
00436 {
00437 if(IsTerminating())
00438 {
00439 count = -2;
00440 break;
00441 }
00442
00443
00444
00445
00446
00447
00448 else
00449 {
00450 TGo4Thread::Sleep(TGo4ServerTask::fguCONNECTWAITCYCLETIME);
00451 ++count;
00452 }
00453
00454 }
00455 fbConnectIsDone=kFALSE;
00456 return count;
00457 }
00458
00459 TGo4Socket* TGo4ServerTask::GetConnectTransport()
00460 {
00461 return fxConnectTransport;
00462 }
00463
00464 TGo4BufferQueue* TGo4ServerTask::GetCommandQueue(const char* name)
00465 {
00466 TGo4BufferQueue* queue=0;
00467 TGo4TaskHandler* currenttask=0;
00468 if(name==0 || strstr(name,"current"))
00469 currenttask=GetCurrentTaskHandler();
00470 else
00471 currenttask=GetTaskHandler(name);
00472 if(currenttask)
00473 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetCommandQueue());
00474 return queue;
00475 }
00476
00477 TGo4BufferQueue * TGo4ServerTask::GetStatusQueue(const char* name)
00478 {
00479 TGo4BufferQueue* queue=0;
00480 TGo4TaskHandler* currenttask=0;
00481 if(name==0)
00482 currenttask=GetCurrentTaskHandler();
00483 else
00484 currenttask=GetTaskHandler(name);
00485 if(currenttask)
00486 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetStatusQueue());
00487 return queue;
00488 }
00489
00490
00491 TGo4BufferQueue * TGo4ServerTask::GetDataQueue(const char* name)
00492 {
00493 TGo4BufferQueue* queue=0;
00494 TGo4TaskHandler* currenttask=0;
00495 if(name==0 || strstr(name,"current"))
00496 currenttask=GetCurrentTaskHandler();
00497 else
00498 currenttask=GetTaskHandler(name);
00499 if(currenttask)
00500 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetDataQueue());
00501 return queue;
00502
00503 }
00504
00505 TGo4Command* TGo4ServerTask::NextCommand()
00506 {
00507 if(IsMaster()) return 0;
00508 TGo4Command* com=0;
00509 TGo4TaskHandler* han=0;
00510 Bool_t reset=kTRUE;
00511 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00512 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00513 {
00514 reset=kFALSE;
00515 TGo4BufferQueue * comq=dynamic_cast<TGo4BufferQueue*> (han->GetCommandQueue());
00516 if(comq==0) continue;
00517 if(!comq->IsEmpty())
00518 {
00519 com= dynamic_cast<TGo4Command*>(comq->WaitObjectFromBuffer());
00520 if(com)
00521 {
00522 com->SetTaskName(han->GetName());
00523 com->SetMode(han->GetRole());
00524 return com;
00525 }
00526 }
00527 }
00528 return com;
00529 }
00530
00531 void TGo4ServerTask::SendStatus(TGo4Status * stat, const char* receiver)
00532 {
00533 if(IsMaster()) return;
00534 if(stat==0) return;
00535 if(receiver!=0)
00536 {
00537
00538 TGo4Task::SendStatus(stat,receiver);
00539 }
00540 else
00541 {
00542
00543 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00544 TGo4TaskHandler* han=0;
00545 Bool_t reset=kTRUE;
00546 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00547 {
00548 reset=kFALSE;
00549 TGo4BufferQueue * statq=dynamic_cast<TGo4BufferQueue*> (han->GetStatusQueue());
00550 if(statq==0) continue;
00551 TGo4Log::Debug(" Task - sending status %s to task %s", stat->ClassName(), han->GetName());
00552 statq->AddBufferFromObject(stat);
00553 }
00554 }
00555 }
00556
00557 void TGo4ServerTask::SendStatusBuffer()
00558 {
00559 if(IsMaster()) return;
00560
00561 TGo4LockGuard statguard(fxStatusMutex);
00562
00563 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00564 TGo4TaskHandler* han=0;
00565 Bool_t reset=kTRUE;
00566 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00567 {
00568 reset=kFALSE;
00569 TGo4BufferQueue * statq=dynamic_cast<TGo4BufferQueue*> (han->GetStatusQueue());
00570 if(statq==0) continue;
00571 TGo4Log::Debug(" Task - sending status buffer to task %s", han->GetName());
00572 statq->AddBuffer(fxStatusBuffer,kTRUE);
00573 }
00574 }
00575
00576
00577
00578 Bool_t TGo4ServerTask::StartConnectorThread()
00579 {
00580 Bool_t rev=kTRUE;
00581 rev= ( GetWorkHandler()->Start( GetConnectorName() ) );
00582
00583 return rev;
00584 }
00585
00586 Bool_t TGo4ServerTask::StopConnectorThread()
00587 {
00588 Bool_t rev=kTRUE;
00589 rev= ( GetWorkHandler()->Stop( GetConnectorName() ) );
00590
00591 const char* host = gSystem->HostName();
00592 Int_t negotiationport=fxTaskManager->GetNegotiationPort();
00593 TGo4Socket* connector= new TGo4Socket(kTRUE);
00594
00595 connector->Open(host,negotiationport);
00596 connector->Send(TGo4TaskHandler::Get_fgcERROR());
00597 connector->Close();
00598 delete connector;
00599 return rev;
00600 }
00601
00602
00603 Bool_t TGo4ServerTask::ConnectorThreadIsStopped()
00604 {
00605 Bool_t rev=kTRUE;
00606 TGo4Thread* conny= GetWorkHandler()->GetThread(GetConnectorName());
00607 rev= conny->IsWaiting();
00608 return rev;
00609 }
00610
00611 void TGo4ServerTask::Quit()
00612 {
00613 TGo4Log::Debug(" ServerTask Quit -- removing all connected clients ");
00614 SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName());
00615 TGo4Slave* slave=GetSlave();
00616 if(slave)
00617 {
00618 TGo4Log::Debug(" ServerTask Quit is stopping slave...");
00619 slave->Stop();
00620 }
00621 if(!IsMaster())
00622 {
00623
00624 fxTaskManager->GetMutex()->UnLock();
00625 }
00626 RemoveAllClients();
00627
00628 WakeCommandQueue(TGo4Task::Get_fgiTERMID());
00629 Terminate(!IsMaster());
00630
00631
00632 }
00633
00634
00635 void TGo4ServerTask::Shutdown()
00636 {
00637 TGo4Log::Debug(" ServerTask Shutdown without disconnect waiting");
00638 SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName());
00639 fxTaskManager->GetMutex()->UnLock();
00640 TGo4Thread::Sleep(10*fguCONNECTTIMERPERIOD);
00641 StopWorkThreads();
00642 WakeCommandQueue(TGo4Task::Get_fgiTERMID());
00643
00644 RemoveAllClients(true);
00645
00646 TGo4Slave* slave=GetSlave();
00647 if(slave)
00648 {
00649 TGo4Log::Debug(" ServerTask Shutdown stopping slave...");
00650 slave->Stop();
00651
00652 slave->SetTask(0,kFALSE);
00653 delete slave;
00654
00655 }
00656 fxTaskManager->GetMutex()->Lock();
00657 gApplication->Terminate();
00658 }
00659
00660
00661 void TGo4ServerTask::LockAll()
00662 {
00663
00664
00665 fxStatusMutex->Lock();
00666
00667 fxTaskManager->GetMutex()->Lock();
00668
00669 TGo4LockGuard::LockMainMutex();
00670
00671
00672 }
00673
00674 void TGo4ServerTask::UnLockAll()
00675 {
00676
00677 TGo4LockGuard::UnLockMainMutex();
00678
00679 fxTaskManager->GetMutex()->UnLock();
00680
00681 fxStatusMutex->UnLock();
00682
00683 }
00684
00685 const char* TGo4ServerTask::Get_fgcLAUNCHPREFSFILE()
00686 {
00687 return fgcLAUNCHPREFSFILE;
00688 }
00689
00690