00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4TaskManager.h"
00017
00018 #include "Riostream.h"
00019
00020 #include "TObjArray.h"
00021 #include "TMutex.h"
00022
00023 #include "TGo4Log.h"
00024 #include "TGo4LockGuard.h"
00025 #include "TGo4Thread.h"
00026 #include "TGo4Socket.h"
00027 #include "TGo4TaskHandler.h"
00028 #include "TGo4TerminateException.h"
00029 #include "TGo4ServerTask.h"
00030
00031 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360;
00032
00033 const UInt_t TGo4TaskManager::fguDISCONTIME=500;
00034
00035 TGo4TaskManager::TGo4TaskManager(const char* name,
00036 TGo4ServerTask * server,
00037 UInt_t negotiationport,
00038 Bool_t createconnector)
00039 : TNamed(name,"This is a Go4TaskManager"),
00040 fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
00041 {
00042 fxServer=server;
00043
00044 if(negotiationport==0)
00045 {
00046
00047 fuNegotiationPort=TGo4TaskHandler::fguCONNECTORPORT;
00048 }
00049 else
00050 {
00051
00052 fuNegotiationPort=negotiationport;
00053 }
00054
00055 fxListMutex=new TMutex(kTRUE);
00056 fxTaskList=new TObjArray;
00057 fxTaskIter=fxTaskList->MakeIterator();
00058 fxTransport=0;
00059 if(createconnector)
00060 {
00061
00062
00063 TGo4Log::Debug("TaskManager: Created negotiation channel in ctor");
00064 fxTransport=new TGo4Socket(kFALSE);
00065 fxTransport->Open( "Server mode does not need hostname", 0, kTRUE);
00066
00067
00068 }
00069 }
00070
00071 TGo4TaskManager::~TGo4TaskManager()
00072 {
00073 if(fxTransport!=0) {
00074 fxTransport->Close();
00075 delete fxTransport;
00076 fxTransport=0;
00077 }
00078 delete fxTaskIter;
00079 delete fxTaskList;
00080 delete fxListMutex;
00081 }
00082
00083 Int_t TGo4TaskManager::ServeClient()
00084 {
00085
00086 Int_t rev=0;
00087 char* recvchar=0;
00088 TString cliname, hostname;
00089
00090 if(!fxTransport)
00091 {
00092
00093 fxTransport=new TGo4Socket(kFALSE);
00094 }
00095
00096 fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
00097
00098 Int_t waitresult=fxServer->WaitForOpen();
00099 if(waitresult<0)
00100 {
00101
00102 TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
00103 cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << endl;
00104 throw TGo4TerminateException(fxServer);
00105
00106 }
00107 Int_t count=0;
00108 while(GetNegotiationPort()==0)
00109 {
00110 if(count>TGo4TaskHandler::Get_fgiPORTWAITCYCLES())
00111 {
00112 TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
00113 cerr <<" TaskManager TIMEOUT ERROR retrieving port number !!! Terminating..." << endl;
00114 throw TGo4TerminateException(fxServer);
00115
00116 }
00117 else if(fxServer->IsTerminating())
00118 {
00119
00120 return -1;
00121 }
00122 else
00123 {
00124 TGo4Thread::Sleep(TGo4TaskHandler::Get_fguPORTWAITTIME());
00125 ++count;
00126 }
00127 }
00128 cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << endl;
00129 TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
00130 fuNegotiationPort);
00131 Int_t connectwaitseconds=fxServer->WaitForConnection();
00132
00133 if(connectwaitseconds<0)
00134 {
00135
00136
00137 return connectwaitseconds;
00138 }
00139 else
00140 {
00141
00142 }
00143
00144
00145
00146
00147 Go4CommandMode_t account=ClientLogin();
00148 if(account!=kGo4ComModeRefused)
00149 {
00150
00151
00152 fxTransport->Send(TGo4TaskHandler::Get_fgcOK());
00153 recvchar = fxTransport->RecvRaw("dummy");
00154 cliname = recvchar;
00155 recvchar = fxTransport->RecvRaw("dummy");
00156 hostname = recvchar;
00157
00158
00159
00160 recvchar=fxTransport->RecvRaw("dummy");
00161 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
00162 {
00163
00164 rev = ConnectClient(cliname,hostname,account);
00165 }
00166 else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
00167 {
00168
00169 rev = DisConnectClient(cliname);
00170 }
00171 else
00172 {
00173
00174 rev =0;
00175 }
00176 }
00177 else
00178 {
00179
00180
00181 fxTransport->Send(TGo4TaskHandler::Get_fgcERROR());
00182 TGo4Log::Debug(" TaskManager: ServeClient received invalid login, closing negotiation port ", cliname.Data());
00183 fxServer->SetDisConnect(fxTransport);
00184
00185 fxServer->WaitForClose();
00186
00187
00188
00189 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
00190
00191 return 0;
00192 }
00193
00194
00195 recvchar=fxTransport->RecvRaw("dummy");
00196 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00197 {
00198 fxServer->SetDisConnect(fxTransport);
00199 TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname.Data());
00200 fxServer->WaitForClose();
00201 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
00202 }
00203 else
00204 {
00205 TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s ", cliname.Data());
00206 throw TGo4RuntimeException();
00207 }
00208 return rev;
00209 }
00210
00211 Go4CommandMode_t TGo4TaskManager::ClientLogin()
00212 {
00213 if(fxTransport==0) return kGo4ComModeRefused;
00214 TString purpose;
00215 TString account;
00216 TString passwd;
00217 char* recvchar=fxTransport->RecvRaw("dummy");
00218 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00219 {
00220
00222 TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
00223 recvchar=fxTransport->RecvRaw("dummy");
00224 if(recvchar==0) return kGo4ComModeRefused;
00225 purpose=recvchar;
00226 recvchar=fxTransport->RecvRaw("dummy");
00227 if(recvchar==0) return kGo4ComModeRefused;
00228 account=recvchar;
00229
00230 recvchar=fxTransport->RecvRaw("dummy");
00231 if(recvchar==0) return kGo4ComModeRefused;
00232 passwd=recvchar;
00233
00234
00235
00236
00237
00238
00239 Bool_t matching=kFALSE;
00240 if(fxServer->IsMaster())
00241 {
00242 if(purpose==TGo4TaskHandler::fgcSLAVE) matching=kTRUE;
00243 }
00244 else
00245 {
00246 if(purpose==TGo4TaskHandler::fgcMASTER) matching=kTRUE;
00247 }
00248 if(!matching)
00249 {
00250 TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
00251 return kGo4ComModeRefused;
00252 }
00253
00254
00255 if(account==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()
00256 && passwd==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle())
00257 {
00258 TGo4Log::Debug(" TaskManager: Client logged in as observer");
00259 return kGo4ComModeObserver;
00260 }
00261 else if(account==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()
00262 && passwd==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle())
00263 {
00264
00265 if(fbHasControllerConnection)
00266 {
00267 TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
00268 return kGo4ComModeObserver;
00269 }
00270 else
00271 {
00272 TGo4Log::Debug(" TaskManager: Client logged in as controller");
00273 return kGo4ComModeController;
00274 }
00275 }
00276 else if(account==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()
00277 && passwd==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle())
00278 {
00279
00280 if(fbHasControllerConnection)
00281 {
00282 TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
00283 return kGo4ComModeObserver;
00284 }
00285 else
00286 {
00287 TGo4Log::Debug(" TaskManager: Client logged in as administrator");
00288 return kGo4ComModeAdministrator;
00289 }
00290 }
00291
00292 else
00293 {
00294 TGo4Log::Debug(" TaskManager: Client Login failed!!!");
00295 return kGo4ComModeRefused;
00296 }
00297 }
00298 return kGo4ComModeRefused;
00299 }
00300
00301 Int_t TGo4TaskManager::ConnectClient(const char* client, const char* host, Go4CommandMode_t role)
00302 {
00303 Int_t rev=0;
00304
00305 TString cliname=client;
00306 if (!AddClient(cliname.Data(),host,role)) rev=1;
00307 return rev;
00308 }
00309
00310
00311 Int_t TGo4TaskManager::DisConnectClient(const char* name, Bool_t clientwait)
00312 {
00313 TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
00314
00315
00316
00317
00318
00319 Int_t rev=0;
00320 TGo4TaskHandler* han=GetTaskHandler(name);
00321 rev=DisConnectClient(han,clientwait);
00322 return rev;
00323 }
00324
00325 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
00326 {
00327 Int_t rev=0;
00328 if(taskhandler!=0)
00329 {
00330 fbClientIsRemoved=kFALSE;
00331 TString tname=taskhandler->GetName();
00332 Bool_t iscontrollertask=(taskhandler->GetRole()>kGo4ComModeObserver);
00333 fxServer->SendStopBuffers(tname);
00334 if(clientwait)
00335 {
00336
00337 Text_t* revchar=0;
00338 revchar=fxTransport->RecvRaw("dummy");
00339 if(!(revchar && !strcmp(revchar,TGo4TaskHandler::Get_fgcOK())))
00340 {
00341 TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!",GetName());
00342 rev+=1;
00343
00344 }
00345 }
00346 if(!taskhandler->DisConnect(clientwait))rev+=1;
00347 if (!RemoveTaskHandler(tname.Data())) rev+=2;
00348 if (rev==0)
00349 {
00350
00351 fuTaskCount--;
00352 if(iscontrollertask) fbHasControllerConnection=kFALSE;
00353 fbClientIsRemoved=kTRUE;
00354 TGo4Log::Debug(" TaskManager: client %s has been disconnected. ", tname.Data());
00355 }
00356 else
00357 {
00358
00359 TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(),rev);
00360 }
00361
00362 }
00363
00364 else
00365 {
00366
00367 TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
00368 rev=-1;
00369 }
00370 return rev;
00371 }
00372
00373 Bool_t TGo4TaskManager::AddClient(const char* client, const char* host, Go4CommandMode_t role)
00374 {
00375 TGo4TaskHandler* han=NewTaskHandler(client);
00376 if (han==0)
00377 {
00378 TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already exisiting !!! ",client);
00379 fxTransport->Send(TGo4TaskHandler::Get_fgcERROR());
00380 return kFALSE;
00381 }
00382
00383 if(han->Connect(host,fxTransport))
00384 {
00385
00386 TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
00387 client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
00388 fuTaskCount++;
00389 han->SetRole(role);
00390 if(role>kGo4ComModeObserver) fbHasControllerConnection=kTRUE;
00391 fxServer->SetCurrentTask(client);
00392 fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
00393 client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
00394 return kTRUE;
00395 }
00396 else
00397 {
00398 TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s) ",
00399 client,host);
00400 RemoveTaskHandler(client);
00401 return kFALSE;
00402 }
00403 return kFALSE;
00404 }
00405
00406
00407 Bool_t TGo4TaskManager::AddTaskHandler(TGo4TaskHandler* han)
00408 {
00409 Bool_t rev=kFALSE;
00410 {
00411 TGo4LockGuard listguard(fxListMutex);
00412 if(fxTaskList->FindObject(han)==0)
00413
00414 {
00415
00416 fxTaskList->AddLast(han);
00417 rev=kTRUE;
00418 }
00419 else
00420 {
00421
00422 rev=kFALSE;
00423 }
00424 }
00425 return rev;
00426 }
00427
00428
00429 TGo4TaskHandler* TGo4TaskManager::NewTaskHandler(const char* name)
00430 {
00431 TGo4TaskHandler* han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
00432 if(AddTaskHandler(han))
00433 {
00434
00435 return han;
00436 }
00437 else
00438 {
00439
00440 delete han;
00441 return 0;
00442
00443 }
00444 return 0;
00445 }
00446
00447 Bool_t TGo4TaskManager::RemoveTaskHandler(const char* name)
00448 {
00449 Bool_t rev=kTRUE;
00450 TGo4TaskHandler* taskhandler;
00451 {
00452 TGo4LockGuard listguard(fxListMutex);
00453 TObject* obj=fxTaskList->FindObject(name);
00454 taskhandler= (TGo4TaskHandler*) fxTaskList->Remove(obj);
00455
00456
00457 }
00458 if(taskhandler!=0)
00459 {
00460
00461 TGo4TaskHandler* currenttaskhandler=fxServer->GetCurrentTaskHandler();
00462 if(taskhandler==currenttaskhandler)
00463 {
00464
00465 fxServer->SetCurrentTask(0);
00466 }
00467 else
00468 {
00469
00470 fxServer->StartWorkThreads();
00471 }
00472 delete taskhandler;
00473 }
00474 else
00475 {
00476
00477 rev=kFALSE;
00478 }
00479 return rev;
00480 }
00481
00482
00483
00484 TGo4TaskHandler* TGo4TaskManager::GetTaskHandler(const char* name)
00485 {
00486 TGo4TaskHandler* th=0;
00487 {
00488 TGo4LockGuard listguard(fxListMutex);
00489 th= (TGo4TaskHandler*) fxTaskList->FindObject(name);
00490 }
00491 return th;
00492 }
00493
00494 TGo4TaskHandler* TGo4TaskManager::GetLastTaskHandler()
00495 {
00496 TGo4TaskHandler* th=0;
00497 {
00498 TGo4LockGuard listguard(fxListMutex);
00499 th= (TGo4TaskHandler*) fxTaskList->Last();
00500 }
00501 return th;
00502 }
00503
00504 TGo4TaskHandler* TGo4TaskManager::NextTaskHandler(Bool_t reset)
00505 {
00506 TGo4LockGuard listguard(fxListMutex);
00507 if(reset) fxTaskIter->Reset();
00508 TGo4TaskHandler* th=dynamic_cast<TGo4TaskHandler*>(fxTaskIter->Next());
00509 return th;
00510 }
00511
00512
00513 Int_t TGo4TaskManager::WaitForClientRemoved()
00514
00515 {
00516 Int_t count=0;
00517 while(!fbClientIsRemoved)
00518 {
00519 if(count>TGo4TaskManager::fgiDISCONCYCLES)
00520 {
00521 return -1;
00522 }
00523 else if(fxServer->IsTerminating())
00524 {
00525 return -2;
00526 }
00527 else
00528 {
00529 TGo4Thread::Sleep(TGo4TaskManager::fguDISCONTIME);
00530 ++count;
00531 }
00532 }
00533 fbClientIsRemoved=kFALSE;
00534 return count;
00535
00536 }
00537
00538 UInt_t TGo4TaskManager::GetNegotiationPort()
00539 {
00540 if(fxTransport)
00541 {
00542 fuNegotiationPort = fxTransport->GetPort();
00543 }
00544
00545 return fuNegotiationPort;
00546 }
00547
00548