00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4TaskManager.h"
00015
00016 #include "Riostream.h"
00017
00018 #include "TObjArray.h"
00019 #include "TMutex.h"
00020
00021 #include "TGo4Log.h"
00022 #include "TGo4LockGuard.h"
00023 #include "TGo4Thread.h"
00024 #include "TGo4Socket.h"
00025 #include "TGo4TaskHandler.h"
00026 #include "TGo4TerminateException.h"
00027 #include "TGo4ServerTask.h"
00028
00029 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360;
00030
00031 const UInt_t TGo4TaskManager::fguDISCONTIME=500;
00032
00033 TGo4TaskManager::TGo4TaskManager(const char* name,
00034 TGo4ServerTask * server,
00035 UInt_t negotiationport,
00036 Bool_t createconnector)
00037 : TNamed(name,"This is a Go4TaskManager"),
00038 fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
00039 {
00040 fxServer=server;
00041
00042 if(negotiationport==0)
00043 {
00044
00045 fuNegotiationPort=TGo4TaskHandler::fguCONNECTORPORT;
00046 }
00047 else
00048 {
00049
00050 fuNegotiationPort=negotiationport;
00051 }
00052
00053 fxListMutex=new TMutex(kTRUE);
00054 fxTaskList=new TObjArray;
00055 fxTaskIter=fxTaskList->MakeIterator();
00056 fxTransport=0;
00057 if(createconnector)
00058 {
00059
00060
00061 TGo4Log::Debug("TaskManager: Created negotiation channel in ctor");
00062 fxTransport=new TGo4Socket(kFALSE);
00063 fxTransport->Open( "Server mode does not need hostname", 0, kTRUE);
00064
00065
00066 }
00067 }
00068
00069 TGo4TaskManager::~TGo4TaskManager()
00070 {
00071 if(fxTransport!=0) {
00072 fxTransport->Close();
00073 delete fxTransport;
00074 fxTransport=0;
00075 }
00076 delete fxTaskIter;
00077 delete fxTaskList;
00078 delete fxListMutex;
00079 }
00080
00081 Int_t TGo4TaskManager::ServeClient()
00082 {
00083
00084 Int_t rev=0;
00085 char* recvchar=0;
00086 TString cliname, hostname;
00087
00088 if(!fxTransport)
00089 {
00090
00091 fxTransport=new TGo4Socket(kFALSE);
00092 }
00093
00094 fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
00095
00096 Int_t waitresult=fxServer->WaitForOpen();
00097 if(waitresult<0)
00098 {
00099
00100 TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
00101 cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << endl;
00102 throw TGo4TerminateException(fxServer);
00103
00104 }
00105 Int_t count=0;
00106 while(GetNegotiationPort()==0)
00107 {
00108 if(count>TGo4TaskHandler::Get_fgiPORTWAITCYCLES())
00109 {
00110 TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
00111 cerr <<" TaskManager TIMEOUT ERROR retrieving port number !!! Terminating..." << endl;
00112 throw TGo4TerminateException(fxServer);
00113
00114 }
00115 else if(fxServer->IsTerminating())
00116 {
00117
00118 return -1;
00119 }
00120 else
00121 {
00122 TGo4Thread::Sleep(TGo4TaskHandler::Get_fguPORTWAITTIME());
00123 ++count;
00124 }
00125 }
00126 cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << endl;
00127 TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
00128 fuNegotiationPort);
00129 Int_t connectwaitseconds=fxServer->WaitForConnection();
00130
00131 if(connectwaitseconds<0)
00132 {
00133
00134
00135 return connectwaitseconds;
00136 }
00137 else
00138 {
00139
00140 }
00141
00142
00143
00144
00145 Go4CommandMode_t account=ClientLogin();
00146 if(account!=kGo4ComModeRefused)
00147 {
00148
00149
00150 fxTransport->Send(TGo4TaskHandler::Get_fgcOK());
00151 recvchar = fxTransport->RecvRaw("dummy");
00152 cliname = recvchar;
00153 recvchar = fxTransport->RecvRaw("dummy");
00154 hostname = recvchar;
00155
00156
00157
00158 recvchar=fxTransport->RecvRaw("dummy");
00159 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
00160 {
00161
00162 rev = ConnectClient(cliname,hostname,account);
00163 }
00164 else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
00165 {
00166
00167 rev = DisConnectClient(cliname);
00168 }
00169 else
00170 {
00171
00172 rev =0;
00173 }
00174 }
00175 else
00176 {
00177
00178
00179 fxTransport->Send(TGo4TaskHandler::Get_fgcERROR());
00180 TGo4Log::Debug(" TaskManager: ServeClient received invalid login, closing negotiation port ", cliname.Data());
00181 fxServer->SetDisConnect(fxTransport);
00182
00183 fxServer->WaitForClose();
00184
00185
00186
00187 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
00188
00189 return 0;
00190 }
00191
00192
00193 recvchar=fxTransport->RecvRaw("dummy");
00194 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00195 {
00196 fxServer->SetDisConnect(fxTransport);
00197 TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname.Data());
00198 fxServer->WaitForClose();
00199 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
00200 }
00201 else
00202 {
00203 TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s ", cliname.Data());
00204 throw TGo4RuntimeException();
00205 }
00206 return rev;
00207 }
00208
00209 Go4CommandMode_t TGo4TaskManager::ClientLogin()
00210 {
00211 if(fxTransport==0) return kGo4ComModeRefused;
00212 TString purpose;
00213 TString account;
00214 TString passwd;
00215 char* recvchar=fxTransport->RecvRaw("dummy");
00216 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00217 {
00218
00220 TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
00221 recvchar=fxTransport->RecvRaw("dummy");
00222 if(recvchar==0) return kGo4ComModeRefused;
00223 purpose=recvchar;
00224
00225 recvchar=fxTransport->RecvRaw("dummy");
00226 if(recvchar==0) return kGo4ComModeRefused;
00227 account=recvchar;
00228
00229 recvchar=fxTransport->RecvRaw("dummy");
00230 if(recvchar==0) return kGo4ComModeRefused;
00231 passwd=recvchar;
00232
00233
00234
00235
00236
00237
00238 Bool_t matching=kFALSE;
00239 if(fxServer->IsMaster())
00240 {
00241 if(purpose==TGo4TaskHandler::fgcSLAVE) matching=kTRUE;
00242 }
00243 else
00244 {
00245 if(purpose==TGo4TaskHandler::fgcMASTER) matching=kTRUE;
00246 }
00247 if(!matching)
00248 {
00249 TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
00250 return kGo4ComModeRefused;
00251 }
00252
00253
00254 if(account==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()
00255 && passwd==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle())
00256 {
00257 TGo4Log::Debug(" TaskManager: Client logged in as observer");
00258 return kGo4ComModeObserver;
00259 }
00260 else if(account==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()
00261 && passwd==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle())
00262 {
00263
00264 if(fbHasControllerConnection)
00265 {
00266 TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
00267 return kGo4ComModeObserver;
00268 }
00269 else
00270 {
00271 TGo4Log::Debug(" TaskManager: Client logged in as controller");
00272 return kGo4ComModeController;
00273 }
00274 }
00275 else if(account==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()
00276 && passwd==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle())
00277 {
00278
00279 if(fbHasControllerConnection)
00280 {
00281 TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
00282 return kGo4ComModeObserver;
00283 }
00284 else
00285 {
00286 TGo4Log::Debug(" TaskManager: Client logged in as administrator");
00287 return kGo4ComModeAdministrator;
00288 }
00289 }
00290
00291 else
00292 {
00293 TGo4Log::Debug(" TaskManager: Client Login failed!!!");
00294 return kGo4ComModeRefused;
00295 }
00296 }
00297 return kGo4ComModeRefused;
00298 }
00299
00300 Int_t TGo4TaskManager::ConnectClient(const char* client, const char* host, Go4CommandMode_t role)
00301 {
00302 Int_t rev=0;
00303
00304 TString cliname=client;
00305 if (!AddClient(cliname.Data(),host,role)) rev=1;
00306 return rev;
00307 }
00308
00309
00310 Int_t TGo4TaskManager::DisConnectClient(const char* name, Bool_t clientwait)
00311 {
00312 TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
00313
00314
00315
00316
00317
00318 Int_t rev=0;
00319 TGo4TaskHandler* han=GetTaskHandler(name);
00320 rev=DisConnectClient(han,clientwait);
00321 return rev;
00322 }
00323
00324 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
00325 {
00326 Int_t rev=0;
00327 if(taskhandler!=0)
00328 {
00329 fbClientIsRemoved=kFALSE;
00330 TString tname=taskhandler->GetName();
00331 Bool_t iscontrollertask=(taskhandler->GetRole()>kGo4ComModeObserver);
00332 fxServer->SendStopBuffers(tname);
00333 if(clientwait)
00334 {
00335
00336 char* revchar = fxTransport->RecvRaw("dummy");
00337 if(!(revchar && !strcmp(revchar,TGo4TaskHandler::Get_fgcOK())))
00338 {
00339 TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!",GetName());
00340 rev+=1;
00341
00342 }
00343 }
00344 if(!taskhandler->DisConnect(clientwait))rev+=1;
00345 if (!RemoveTaskHandler(tname.Data())) rev+=2;
00346 if (rev==0)
00347 {
00348
00349 fuTaskCount--;
00350 if(iscontrollertask) fbHasControllerConnection=kFALSE;
00351 fbClientIsRemoved=kTRUE;
00352 TGo4Log::Debug(" TaskManager: client %s has been disconnected. ", tname.Data());
00353 }
00354 else
00355 {
00356
00357 TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(),rev);
00358 }
00359
00360 }
00361
00362 else
00363 {
00364
00365 TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
00366 rev=-1;
00367 }
00368 return rev;
00369 }
00370
00371 Bool_t TGo4TaskManager::AddClient(const char* client, const char* host, Go4CommandMode_t role)
00372 {
00373 TGo4TaskHandler* han=NewTaskHandler(client);
00374 if (han==0)
00375 {
00376 TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already exisiting !!! ",client);
00377 fxTransport->Send(TGo4TaskHandler::Get_fgcERROR());
00378 return kFALSE;
00379 }
00380
00381 if(han->Connect(host,fxTransport))
00382 {
00383
00384 TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
00385 client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
00386 fuTaskCount++;
00387 han->SetRole(role);
00388 if(role>kGo4ComModeObserver) fbHasControllerConnection=kTRUE;
00389 fxServer->SetCurrentTask(client);
00390 fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
00391 client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
00392 return kTRUE;
00393 }
00394 else
00395 {
00396 TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s) ",
00397 client,host);
00398 RemoveTaskHandler(client);
00399 return kFALSE;
00400 }
00401 return kFALSE;
00402 }
00403
00404
00405 Bool_t TGo4TaskManager::AddTaskHandler(TGo4TaskHandler* han)
00406 {
00407 Bool_t rev=kFALSE;
00408 {
00409 TGo4LockGuard listguard(fxListMutex);
00410 if(fxTaskList->FindObject(han)==0)
00411
00412 {
00413
00414 fxTaskList->AddLast(han);
00415 rev=kTRUE;
00416 }
00417 else
00418 {
00419
00420 rev=kFALSE;
00421 }
00422 }
00423 return rev;
00424 }
00425
00426
00427 TGo4TaskHandler* TGo4TaskManager::NewTaskHandler(const char* name)
00428 {
00429 TGo4TaskHandler* han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
00430 if(AddTaskHandler(han))
00431 {
00432
00433 return han;
00434 }
00435 else
00436 {
00437
00438 delete han;
00439 return 0;
00440
00441 }
00442 return 0;
00443 }
00444
00445 Bool_t TGo4TaskManager::RemoveTaskHandler(const char* name)
00446 {
00447 Bool_t rev=kTRUE;
00448 TGo4TaskHandler* taskhandler;
00449 {
00450 TGo4LockGuard listguard(fxListMutex);
00451 TObject* obj=fxTaskList->FindObject(name);
00452 taskhandler= (TGo4TaskHandler*) fxTaskList->Remove(obj);
00453
00454
00455 }
00456 if(taskhandler!=0)
00457 {
00458
00459 TGo4TaskHandler* currenttaskhandler=fxServer->GetCurrentTaskHandler();
00460 if(taskhandler==currenttaskhandler)
00461 {
00462
00463 fxServer->SetCurrentTask(0);
00464 }
00465 else
00466 {
00467
00468 fxServer->StartWorkThreads();
00469 }
00470 delete taskhandler;
00471 }
00472 else
00473 {
00474
00475 rev=kFALSE;
00476 }
00477 return rev;
00478 }
00479
00480
00481
00482 TGo4TaskHandler* TGo4TaskManager::GetTaskHandler(const char* name)
00483 {
00484 TGo4TaskHandler* th=0;
00485 {
00486 TGo4LockGuard listguard(fxListMutex);
00487 th= (TGo4TaskHandler*) fxTaskList->FindObject(name);
00488 }
00489 return th;
00490 }
00491
00492 TGo4TaskHandler* TGo4TaskManager::GetLastTaskHandler()
00493 {
00494 TGo4TaskHandler* th=0;
00495 {
00496 TGo4LockGuard listguard(fxListMutex);
00497 th= (TGo4TaskHandler*) fxTaskList->Last();
00498 }
00499 return th;
00500 }
00501
00502 TGo4TaskHandler* TGo4TaskManager::NextTaskHandler(Bool_t reset)
00503 {
00504 TGo4LockGuard listguard(fxListMutex);
00505 if(reset) fxTaskIter->Reset();
00506 TGo4TaskHandler* th=dynamic_cast<TGo4TaskHandler*>(fxTaskIter->Next());
00507 return th;
00508 }
00509
00510
00511 Int_t TGo4TaskManager::WaitForClientRemoved()
00512
00513 {
00514 Int_t count=0;
00515 while(!fbClientIsRemoved)
00516 {
00517 if(count>TGo4TaskManager::fgiDISCONCYCLES)
00518 {
00519 return -1;
00520 }
00521 else if(fxServer->IsTerminating())
00522 {
00523 return -2;
00524 }
00525 else
00526 {
00527 TGo4Thread::Sleep(TGo4TaskManager::fguDISCONTIME);
00528 ++count;
00529 }
00530 }
00531 fbClientIsRemoved=kFALSE;
00532 return count;
00533
00534 }
00535
00536 UInt_t TGo4TaskManager::GetNegotiationPort()
00537 {
00538 if(fxTransport)
00539 {
00540 fuNegotiationPort = fxTransport->GetPort();
00541 }
00542
00543 return fuNegotiationPort;
00544 }