00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4ServerTask.h"
00017
00018 #include "Riostream.h"
00019
00020 #include "TMutex.h"
00021 #include "TApplication.h"
00022 #include "TObjArray.h"
00023 #include "snprintf.h"
00024
00025 #include "TGo4Log.h"
00026 #include "TGo4LockGuard.h"
00027 #include "TGo4Socket.h"
00028 #include "TGo4BufferQueue.h"
00029 #include "TGo4Thread.h"
00030 #include "TGo4ThreadHandler.h"
00031 #include "TGo4Status.h"
00032 #include "TGo4CommandInvoker.h"
00033 #include "TGo4TaskManager.h"
00034 #include "TGo4TaskHandler.h"
00035 #include "TGo4ConnectorRunnable.h"
00036 #include "TGo4Master.h"
00037 #include "TGo4Slave.h"
00038 #include "TGo4TaskConnectorTimer.h"
00039
00040
00041 const Int_t TGo4ServerTask::fgiOPENWAITCYCLES=100;
00042 const UInt_t TGo4ServerTask::fguOPENWAITCYCLETIME=100;
00043 const Int_t TGo4ServerTask::fgiCLOSEWAITCYCLES=100;
00044 const UInt_t TGo4ServerTask::fguCLOSEWAITCYCLETIME=100;
00045 const Int_t TGo4ServerTask::fgiCONNECTWAITCYCLES=20;
00046 const UInt_t TGo4ServerTask::fguCONNECTWAITCYCLETIME=200;
00047 const UInt_t TGo4ServerTask::fguCONNECTTIMERPERIOD=100;
00048
00049
00050 const char* TGo4ServerTask::fgcLAUNCHPREFSFILE = "etc/Go4LaunchClientPrefs.txt";
00051
00052
00053 TGo4ServerTask::TGo4ServerTask(const char* name,
00054 UInt_t negotiationport,
00055 Bool_t blockingmode,
00056 Bool_t standalone,
00057 Bool_t autostart,
00058 Bool_t autocreate,
00059 Bool_t ismaster)
00060 : TGo4Task(name,blockingmode, autostart,autocreate,ismaster),
00061 fxTaskManager(0),fxCurrentTaskHandler(0),
00062 fxConnectTransport(0), fxDisConnectTransport(0),
00063 fuConnectPort(0), fbKeepServerSocket(kFALSE),
00064 fbConnectRequest(kFALSE), fbDisConnectRequest(kFALSE),
00065 fbConnectIsOpen(kFALSE),fbConnectIsDone(kFALSE), fbConnectIsClose(kFALSE),
00066 fxConnectorTimer(0)
00067 {
00068 TString nomen("TaskManager of "); nomen += name;
00069 fxTaskManager= new TGo4TaskManager(nomen.Data(), this, negotiationport);
00070
00071 if(negotiationport!=42)
00072 {
00073
00074 nomen.Form("ConnectorRunnable of %s", name);
00075 TGo4ConnectorRunnable* conny = new TGo4ConnectorRunnable(nomen.Data(),this);
00076 nomen.Form("CONNECTOR-%s", name);
00077 fxConnectorName=nomen;
00078 fxWorkHandler->NewThread(GetConnectorName(), conny);
00079 }
00080 else {}
00081
00082 TGo4CommandInvoker::Instance();
00083 TGo4CommandInvoker::Register("ServerTask", this);
00084 fxConnectorTimer= new TGo4TaskConnectorTimer(this,fguCONNECTTIMERPERIOD);
00085 fxConnectorTimer->TurnOn();
00086 if(standalone)
00087 {
00088 Launch();
00089 }
00090 else
00091 {
00092
00093 }
00094 }
00095
00096
00097 TGo4ServerTask::~TGo4ServerTask()
00098 {
00099 if (GetWorkHandler()) GetWorkHandler()->CancelAll();
00100 delete fxConnectorTimer;
00101 delete fxTaskManager;
00102 TGo4CommandInvoker::UnRegister(this);
00103 }
00104
00105 Bool_t TGo4ServerTask::RemoveClient(const char* name, Bool_t clientwait, Bool_t isterminating)
00106 {
00107 Bool_t rev=kTRUE;
00108 TGo4TaskHandler* taskhandler=0;
00109 if(name && strstr(name,"current"))
00110 taskhandler=GetCurrentTaskHandler();
00111 else
00112 taskhandler=GetTaskHandler(name);
00113 if(taskhandler==0)
00114 {
00115
00116 TGo4Log::Debug(" ServerTask -- RemoveClient FAILED, no client %s !!! ",
00117 name);
00118 rev=kFALSE;
00119 }
00120 else
00121 {
00122 TGo4Log::Debug(" ServerTask -- removing client task %s ",name);
00123
00124 StopWorkThreads();
00125 if(clientwait)
00126 {
00127
00128
00129
00130
00131
00132 if(IsMaster())
00133 SubmitEmergencyCommand(kComQuit);
00134 else
00135 SubmitEmergencyData(kComQuit, taskhandler->GetName());
00136 TGo4Log::Debug(" Server Task -- Waiting for client %s disconnection...",taskhandler->GetName());
00137 Int_t removeresult=fxTaskManager->WaitForClientRemoved();
00138
00139
00140 switch(removeresult)
00141 {
00142 case -1:
00143
00144 TGo4Log::Debug(" !!! Server Task -- client remove wait TIMEOUT !!! ");
00145 rev=fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE);
00146 break;
00147
00148 case -2:
00149
00150 TGo4Log::Debug(" !!! Server Task -- client remove aborted for TERMINATION MODE !!! ");
00151 rev=kFALSE;
00152 break;
00153
00154 default:
00155
00156 TGo4Log::Debug(" Server Task -- waited %d cycles until client was removed. ",removeresult);
00157 rev=kTRUE;
00158 break;
00159
00160 }
00161 }
00162 else
00163 {
00164
00165 TGo4Log::Debug(" !!! Server Task -- removing client %s without waiting... ",
00166 taskhandler->GetName());
00167 SendStopBuffers(taskhandler->GetName());
00168 rev= (fxTaskManager->DisConnectClient(taskhandler->GetName(),kFALSE)==0);
00169 }
00170 if(!isterminating) StartWorkThreads();
00171 }
00172 return rev;
00173 }
00174
00175 Int_t TGo4ServerTask::RemoveAllClients(Bool_t force)
00176 {
00177 Int_t rev=0;
00178
00180 TGo4TaskHandler* taskhandler=0;
00181 TObjArray names;
00182 Bool_t reset=kTRUE;
00183 while((taskhandler=fxTaskManager->NextTaskHandler(reset)) !=0)
00184 {
00185 reset=kFALSE;
00186
00187 names.AddLast(new TNamed(taskhandler->GetName(), "title"));
00188 }
00189 TIter niter(&names);
00190 TObject* nomen=0;
00191 while((nomen =niter.Next()) !=0)
00192 {
00193
00194 RemoveClient(nomen->GetName(),!force,kTRUE);
00195 rev++;
00196 }
00197 names.Delete();
00198
00199 return rev;
00200 }
00201
00202
00203 Bool_t TGo4ServerTask::RemoveCurrentClient()
00204 {
00205 Bool_t rev=kTRUE;
00206 TGo4TaskHandler* taskhandler=GetCurrentTaskHandler();
00207 if(taskhandler!=0)
00208 {
00209
00210 TGo4Log::Debug(" Server task -- removing current client %s ",taskhandler->GetName());
00211 rev = RemoveClient(taskhandler->GetName());
00212 }
00213 else
00214 {
00215 rev=kFALSE;
00216 }
00217 return rev;
00218 }
00219
00220 void TGo4ServerTask::SetCurrentTask(const char* name)
00221 {
00222
00223 TGo4TaskHandler* han=0;
00224 if(fxTaskManager==0)
00225 {
00226 TGo4Log::Debug(" TGo4ServerTask ''%s'' ERROR- task manager not existing!!! ");
00227 }
00228 else
00229 {
00230
00231 if(IsWorkStopped())
00232 {
00233
00234
00235
00236
00237
00238 }
00239 else
00240 {
00241
00242 StopWorkThreads();
00243 }
00244 {
00245 if(name==0)
00246 {
00247
00248 fxCurrentTaskHandler=fxTaskManager->GetLastTaskHandler();
00249
00250 }
00251 else
00252 {
00253
00254 han=fxTaskManager->GetTaskHandler(name);
00255 if(han)
00256 {
00257 fxCurrentTaskHandler=han;
00258
00259 }
00260 else
00261 {
00262 TGo4Log::Debug(" ServerTask: FAILED setting current task to %s-- no such client! ",name);
00263 }
00264 }
00265
00266 }
00267
00268 StartWorkThreads();
00269 }
00270 }
00271
00272
00273 TGo4TaskHandler* TGo4ServerTask::GetTaskHandler(const char* name)
00274 {
00275 return (fxTaskManager->GetTaskHandler(name));
00276 }
00277
00278 TGo4TaskHandler* TGo4ServerTask::GetTaskHandler()
00279 {
00280 return (GetCurrentTaskHandler());
00281 }
00282
00283 TGo4TaskHandler* TGo4ServerTask::GetCurrentTaskHandler()
00284 {
00285 return fxCurrentTaskHandler;
00286 }
00287
00288 TGo4TaskManager* TGo4ServerTask::GetTaskManager()
00289 {
00290 return fxTaskManager;
00291 }
00292
00293 void TGo4ServerTask::SetConnect(TGo4Socket * trans, const char* host, UInt_t port, Bool_t keepserv)
00294 {
00295 fxConnectTransport=trans;
00296 fxConnectHost=host;
00297 fuConnectPort=port;
00298 fbConnectRequest=kTRUE;
00299 fbKeepServerSocket=keepserv;
00300 }
00301
00302 void TGo4ServerTask::SetDisConnect(TGo4Socket * trans)
00303 {
00304 fxDisConnectTransport=trans;
00305 fbDisConnectRequest=kTRUE;
00306 }
00307
00308 Int_t TGo4ServerTask::TimerConnect()
00309 {
00310 Int_t rev=0;
00313 if(fbDisConnectRequest)
00314 {
00315 if(fxDisConnectTransport!=0)
00316 {
00317
00318 fxDisConnectTransport->Close();
00319
00320
00321 fbConnectIsClose=kTRUE;
00322 fbDisConnectRequest=kFALSE;
00323 rev+=1;
00324 }
00325 else
00326 {
00327
00328 rev+=32;
00329 }
00330 }
00331 else
00332 {
00333
00334 rev+=2;
00335 }
00336
00339 if(fbConnectRequest)
00340 {
00341
00342 if(fxConnectTransport!=0)
00343 {
00344
00345 if(!fxConnectTransport->IsOpen())
00346 {
00347
00348
00349 fbConnectIsOpen=kTRUE;
00350 Int_t result=fxConnectTransport->Open(GetConnectHost(), fuConnectPort, fbKeepServerSocket);
00351 if(result==0)
00352 {
00353 fbConnectIsDone=kTRUE;
00354 fbConnectRequest=kFALSE;
00355 fbKeepServerSocket=kFALSE;
00356 rev+=4;
00357 }
00358 else
00359 {
00360 rev=-4;
00361
00362 }
00363 }
00364 else
00365 {
00366
00367
00368 fbConnectIsOpen=kTRUE;
00369 fbConnectIsDone=kTRUE;
00370 fbConnectRequest=kFALSE;
00371 fbKeepServerSocket=kFALSE;
00372 rev+=8;
00373 }
00374 }
00375 else
00376 {
00377 rev+=64;
00378
00379
00380
00381 }
00382 }
00383 else
00384 {
00385
00386 rev+=16;
00387 }
00388 return rev;
00389 }
00390
00391 Int_t TGo4ServerTask::WaitForOpen()
00392 {
00393 Int_t count=0;
00394 while(!fbConnectIsOpen)
00395 {
00396 if(count>TGo4ServerTask::fgiOPENWAITCYCLES)
00397 {
00398 count = -1;
00399 break;
00400 }
00401 else
00402 {
00403 TGo4Thread::Sleep(TGo4ServerTask::fguOPENWAITCYCLETIME);
00404 ++count;
00405 }
00406
00407 }
00408 fbConnectIsOpen=kFALSE;
00409 return count;
00410 }
00411
00412
00413 Int_t TGo4ServerTask::WaitForClose()
00414 {
00415 Int_t count=0;
00416 while(!fbConnectIsClose)
00417 {
00418 if(count>TGo4ServerTask::fgiCLOSEWAITCYCLES)
00419 {
00420 count = -1;
00421 break;
00422 }
00423 else
00424 {
00425 TGo4Thread::Sleep(TGo4ServerTask::fguCLOSEWAITCYCLETIME);
00426 ++count;
00427 }
00428
00429 }
00430 fbConnectIsClose=kFALSE;
00431 return count;
00432 }
00433
00434 Int_t TGo4ServerTask::WaitForConnection()
00435 {
00436 Int_t count=0;
00437 while(!fbConnectIsDone)
00438 {
00439 if(IsTerminating())
00440 {
00441 count = -2;
00442 break;
00443 }
00444
00445
00446
00447
00448
00449
00450 else
00451 {
00452 TGo4Thread::Sleep(TGo4ServerTask::fguCONNECTWAITCYCLETIME);
00453 ++count;
00454 }
00455
00456 }
00457 fbConnectIsDone=kFALSE;
00458 return count;
00459 }
00460
00461 TGo4Socket* TGo4ServerTask::GetConnectTransport()
00462 {
00463 return fxConnectTransport;
00464 }
00465
00466 TGo4BufferQueue* TGo4ServerTask::GetCommandQueue(const char* name)
00467 {
00468 TGo4BufferQueue* queue=0;
00469 TGo4TaskHandler* currenttask=0;
00470 if(name==0 || strstr(name,"current"))
00471 currenttask=GetCurrentTaskHandler();
00472 else
00473 currenttask=GetTaskHandler(name);
00474 if(currenttask)
00475 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetCommandQueue());
00476 return queue;
00477 }
00478
00479 TGo4BufferQueue * TGo4ServerTask::GetStatusQueue(const char* name)
00480 {
00481 TGo4BufferQueue* queue=0;
00482 TGo4TaskHandler* currenttask=0;
00483 if(name==0)
00484 currenttask=GetCurrentTaskHandler();
00485 else
00486 currenttask=GetTaskHandler(name);
00487 if(currenttask)
00488 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetStatusQueue());
00489 return queue;
00490 }
00491
00492
00493 TGo4BufferQueue * TGo4ServerTask::GetDataQueue(const char* name)
00494 {
00495 TGo4BufferQueue* queue=0;
00496 TGo4TaskHandler* currenttask=0;
00497 if(name==0 || strstr(name,"current"))
00498 currenttask=GetCurrentTaskHandler();
00499 else
00500 currenttask=GetTaskHandler(name);
00501 if(currenttask)
00502 queue=dynamic_cast<TGo4BufferQueue*> (currenttask->GetDataQueue());
00503 return queue;
00504
00505 }
00506
00507 TGo4Command* TGo4ServerTask::NextCommand()
00508 {
00509 if(IsMaster()) return 0;
00510 TGo4Command* com=0;
00511 TGo4TaskHandler* han=0;
00512 Bool_t reset=kTRUE;
00513 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00514 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00515 {
00516 reset=kFALSE;
00517 TGo4BufferQueue * comq=dynamic_cast<TGo4BufferQueue*> (han->GetCommandQueue());
00518 if(comq==0) continue;
00519 if(!comq->IsEmpty())
00520 {
00521 com= dynamic_cast<TGo4Command*>(comq->WaitObjectFromBuffer());
00522 if(com)
00523 {
00524 com->SetTaskName(han->GetName());
00525 com->SetMode(han->GetRole());
00526 return com;
00527 }
00528 }
00529 }
00530 return com;
00531 }
00532
00533 void TGo4ServerTask::SendStatus(TGo4Status * stat, const char* receiver)
00534 {
00535 if(IsMaster()) return;
00536 if(stat==0) return;
00537 if(receiver!=0)
00538 {
00539
00540 TGo4Task::SendStatus(stat,receiver);
00541 }
00542 else
00543 {
00544
00545 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00546 TGo4TaskHandler* han=0;
00547 Bool_t reset=kTRUE;
00548 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00549 {
00550 reset=kFALSE;
00551 TGo4BufferQueue * statq=dynamic_cast<TGo4BufferQueue*> (han->GetStatusQueue());
00552 if(statq==0) continue;
00553 TGo4Log::Debug(" Task - sending status %s to task %s", stat->ClassName(), han->GetName());
00554 statq->AddBufferFromObject(stat);
00555 }
00556 }
00557 }
00558
00559 void TGo4ServerTask::SendStatusBuffer()
00560 {
00561 if(IsMaster()) return;
00562
00563 TGo4LockGuard statguard(fxStatusMutex);
00564
00565 TGo4LockGuard taskmutex(fxTaskManager->GetMutex());
00566 TGo4TaskHandler* han=0;
00567 Bool_t reset=kTRUE;
00568 while((han=fxTaskManager->NextTaskHandler(reset))!=0)
00569 {
00570 reset=kFALSE;
00571 TGo4BufferQueue * statq=dynamic_cast<TGo4BufferQueue*> (han->GetStatusQueue());
00572 if(statq==0) continue;
00573 TGo4Log::Debug(" Task - sending status buffer to task %s", han->GetName());
00574 statq->AddBuffer(fxStatusBuffer,kTRUE);
00575 }
00576 }
00577
00578
00579
00580 Bool_t TGo4ServerTask::StartConnectorThread()
00581 {
00582 Bool_t rev=kTRUE;
00583 rev= ( GetWorkHandler()->Start( GetConnectorName() ) );
00584
00585 return rev;
00586 }
00587
00588 Bool_t TGo4ServerTask::StopConnectorThread()
00589 {
00590 Bool_t rev=kTRUE;
00591 rev= ( GetWorkHandler()->Stop( GetConnectorName() ) );
00592
00593 const char* host = gSystem->HostName();
00594 Int_t negotiationport=fxTaskManager->GetNegotiationPort();
00595 TGo4Socket* connector= new TGo4Socket(kTRUE);
00596
00597 connector->Open(host,negotiationport);
00598 connector->Send(TGo4TaskHandler::Get_fgcERROR());
00599 connector->Close();
00600 delete connector;
00601 return rev;
00602 }
00603
00604
00605 Bool_t TGo4ServerTask::ConnectorThreadIsStopped()
00606 {
00607 Bool_t rev=kTRUE;
00608 TGo4Thread* conny= GetWorkHandler()->GetThread(GetConnectorName());
00609 rev= conny->IsWaiting();
00610 return rev;
00611 }
00612
00613 void TGo4ServerTask::Quit()
00614 {
00615 TGo4Log::Debug(" ServerTask Quit -- removing all connected clients ");
00616 SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName());
00617 RemoveAllClients();
00618
00619 WakeCommandQueue(TGo4Task::Get_fgiTERMID());
00620 Terminate(!IsMaster());
00621 }
00622
00623
00624 void TGo4ServerTask::Shutdown()
00625 {
00626 TGo4Log::Debug(" ServerTask Shutdown without disconnect waiting");
00627 SendStatusMessage(2,kTRUE,"ServerTask %s is shutting down now! All clients are removed...",GetName());
00628 TGo4Thread::Sleep(10*fguCONNECTTIMERPERIOD);
00629 StopWorkThreads();
00630 WakeCommandQueue(TGo4Task::Get_fgiTERMID());
00631 RemoveAllClients(true);
00632 TGo4Slave* slave=GetSlave();
00633 if(slave)
00634 {
00635 slave->Stop();
00636
00637 slave->SetTask(0,kFALSE);
00638 delete slave;
00639
00640 }
00641 gApplication->Terminate();
00642 }
00643
00644
00645 void TGo4ServerTask::LockAll()
00646 {
00647
00648
00649 fxStatusMutex->Lock();
00650
00651 fxTaskManager->GetMutex()->Lock();
00652
00653 TGo4LockGuard::LockMainMutex();
00654
00655
00656 }
00657
00658 void TGo4ServerTask::UnLockAll()
00659 {
00660
00661 TGo4LockGuard::UnLockMainMutex();
00662
00663 fxTaskManager->GetMutex()->UnLock();
00664
00665 fxStatusMutex->UnLock();
00666
00667 }
00668
00669 const char* TGo4ServerTask::Get_fgcLAUNCHPREFSFILE()
00670 {
00671 return fgcLAUNCHPREFSFILE;
00672 }
00673
00674
00675
00676