00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include "TGo4TaskManager.h"
00015
00016 #include "Riostream.h"
00017 #include "TObjArray.h"
00018 #include "TMutex.h"
00019
00020 #include "TGo4Log.h"
00021 #include "TGo4LockGuard.h"
00022 #include "TGo4Thread.h"
00023 #include "TGo4Socket.h"
00024 #include "TGo4TaskHandler.h"
00025 #include "TGo4TerminateException.h"
00026 #include "TGo4ServerTask.h"
00027
00028 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360;
00029
00030 const UInt_t TGo4TaskManager::fguDISCONTIME=500;
00031
00032 TGo4TaskManager::TGo4TaskManager(const char* name,
00033 TGo4ServerTask * server,
00034 UInt_t negotiationport,
00035 Bool_t createconnector)
00036 : TNamed(name,"This is a Go4TaskManager"),
00037 fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
00038 {
00039 fxServer=server;
00040
00041 if(negotiationport==0)
00042 {
00043
00044 fuNegotiationPort=TGo4TaskHandler::fguCONNECTORPORT;
00045 }
00046 else
00047 {
00048
00049 fuNegotiationPort=negotiationport;
00050 }
00051
00052 fxListMutex=new TMutex(kTRUE);
00053 fxTaskList=new TObjArray;
00054 fxTaskIter=fxTaskList->MakeIterator();
00055 fxTransport=0;
00056 if(createconnector)
00057 {
00058
00059
00060 TGo4Log::Debug("TaskManager: Created negotiation channel in ctor");
00061 fxTransport=new TGo4Socket(kFALSE);
00062 fxTransport->Open( "Server mode does not need hostname", 0, kTRUE);
00063
00064
00065 }
00066 }
00067
00068 TGo4TaskManager::~TGo4TaskManager()
00069 {
00070 if(fxTransport!=0) {
00071 fxTransport->Close();
00072 delete fxTransport;
00073 fxTransport=0;
00074 }
00075 delete fxTaskIter;
00076 delete fxTaskList;
00077 delete fxListMutex;
00078 }
00079
00080 Int_t TGo4TaskManager::ServeClient()
00081 {
00082
00083 Int_t rev=0;
00084 char* recvchar=0;
00085 TString cliname, hostname;
00086
00087 if(!fxTransport)
00088 {
00089
00090 fxTransport=new TGo4Socket(kFALSE);
00091 }
00092
00093 fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
00094
00095 Int_t waitresult=fxServer->WaitForOpen();
00096 if(waitresult<0)
00097 {
00098
00099 TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
00100 std::cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << std::endl;
00101 throw TGo4TerminateException(fxServer);
00102
00103 }
00104 Int_t count=0;
00105 while(GetNegotiationPort()==0)
00106 {
00107 if(count>TGo4TaskHandler::Get_fgiPORTWAITCYCLES())
00108 {
00109 TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
00110 std::cerr <<" TaskManager TIMEOUT ERROR retrieving port number !!! Terminating..." << std::endl;
00111 throw TGo4TerminateException(fxServer);
00112
00113 }
00114 else if(fxServer->IsTerminating())
00115 {
00116
00117 return -1;
00118 }
00119 else
00120 {
00121 TGo4Thread::Sleep(TGo4TaskHandler::Get_fguPORTWAITTIME());
00122 ++count;
00123 }
00124 }
00125 std::cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << std::endl;
00126 TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
00127 fuNegotiationPort);
00128 Int_t connectwaitseconds=fxServer->WaitForConnection();
00129
00130 if(connectwaitseconds<0)
00131 {
00132
00133
00134 return connectwaitseconds;
00135 }
00136 else
00137 {
00138
00139 }
00140
00141
00142
00143
00144 Go4CommandMode_t account=ClientLogin();
00145 if(account!=kGo4ComModeRefused)
00146 {
00147
00148
00149 fxTransport->Send(TGo4TaskHandler::Get_fgcOK());
00150 recvchar = fxTransport->RecvRaw("dummy");
00151 cliname = recvchar;
00152 recvchar = fxTransport->RecvRaw("dummy");
00153 hostname = recvchar;
00154
00155
00156
00157 recvchar=fxTransport->RecvRaw("dummy");
00158 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
00159 {
00160
00161 rev = ConnectClient(cliname,hostname,account);
00162 }
00163 else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
00164 {
00165
00166 rev = DisConnectClient(cliname);
00167 }
00168 else
00169 {
00170
00171 rev =0;
00172 }
00173 }
00174 else
00175 {
00176
00177
00178 fxTransport->Send(TGo4TaskHandler::Get_fgcERROR());
00179 TGo4Log::Debug(" TaskManager: ServeClient received invalid login, closing negotiation port ", cliname.Data());
00180 fxServer->SetDisConnect(fxTransport);
00181
00182 fxServer->WaitForClose();
00183
00184
00185
00186 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
00187
00188 return 0;
00189 }
00190
00191
00192 recvchar=fxTransport->RecvRaw("dummy");
00193 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00194 {
00195 fxServer->SetDisConnect(fxTransport);
00196 TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname.Data());
00197 fxServer->WaitForClose();
00198 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
00199 }
00200 else
00201 {
00202 TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s ", cliname.Data());
00203 throw TGo4RuntimeException();
00204 }
00205 return rev;
00206 }
00207
00208 Go4CommandMode_t TGo4TaskManager::ClientLogin()
00209 {
00210 if(fxTransport==0) return kGo4ComModeRefused;
00211 TString purpose;
00212 TString account;
00213 TString passwd;
00214 char* recvchar=fxTransport->RecvRaw("dummy");
00215 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
00216 {
00217
00219 TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
00220 recvchar=fxTransport->RecvRaw("dummy");
00221 if(recvchar==0) return kGo4ComModeRefused;
00222 purpose=recvchar;
00223
00224 recvchar=fxTransport->RecvRaw("dummy");
00225 if(recvchar==0) return kGo4ComModeRefused;
00226 account=recvchar;
00227
00228 recvchar=fxTransport->RecvRaw("dummy");
00229 if(recvchar==0) return kGo4ComModeRefused;
00230 passwd=recvchar;
00231
00232
00233
00234
00235
00236
00237 Bool_t matching=kFALSE;
00238 if(fxServer->IsMaster())
00239 {
00240 if(purpose==TGo4TaskHandler::fgcSLAVE) matching=kTRUE;
00241 }
00242 else
00243 {
00244 if(purpose==TGo4TaskHandler::fgcMASTER) matching=kTRUE;
00245 }
00246 if(!matching)
00247 {
00248 TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
00249 return kGo4ComModeRefused;
00250 }
00251
00252
00253 if(account==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()
00254 && passwd==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle())
00255 {
00256 TGo4Log::Debug(" TaskManager: Client logged in as observer");
00257 return kGo4ComModeObserver;
00258 }
00259 else if(account==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()
00260 && passwd==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle())
00261 {
00262
00263 if(fbHasControllerConnection)
00264 {
00265 TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
00266 return kGo4ComModeObserver;
00267 }
00268 else
00269 {
00270 TGo4Log::Debug(" TaskManager: Client logged in as controller");
00271 return kGo4ComModeController;
00272 }
00273 }
00274 else if(account==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()
00275 && passwd==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle())
00276 {
00277
00278 if(fbHasControllerConnection)
00279 {
00280 TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
00281 return kGo4ComModeObserver;
00282 }
00283 else
00284 {
00285 TGo4Log::Debug(" TaskManager: Client logged in as administrator");
00286 return kGo4ComModeAdministrator;
00287 }
00288 }
00289
00290 else
00291 {
00292 TGo4Log::Debug(" TaskManager: Client Login failed!!!");
00293 return kGo4ComModeRefused;
00294 }
00295 }
00296 return kGo4ComModeRefused;
00297 }
00298
00299 Int_t TGo4TaskManager::ConnectClient(const char* client, const char* host, Go4CommandMode_t role)
00300 {
00301 Int_t rev=0;
00302
00303 TString cliname=client;
00304 if (!AddClient(cliname.Data(),host,role)) rev=1;
00305 return rev;
00306 }
00307
00308
00309 Int_t TGo4TaskManager::DisConnectClient(const char* name, Bool_t clientwait)
00310 {
00311 TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
00312
00313
00314
00315
00316
00317 Int_t rev=0;
00318 TGo4TaskHandler* han=GetTaskHandler(name);
00319 rev=DisConnectClient(han,clientwait);
00320 return rev;
00321 }
00322
00323 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
00324 {
00325 Int_t rev=0;
00326 if(taskhandler!=0)
00327 {
00328 fbClientIsRemoved=kFALSE;
00329 TString tname=taskhandler->GetName();
00330 Bool_t iscontrollertask=(taskhandler->GetRole()>kGo4ComModeObserver);
00331 fxServer->SendStopBuffers(tname);
00332 if(clientwait)
00333 {
00334
00335 char* revchar = fxTransport->RecvRaw("dummy");
00336 if(!(revchar && !strcmp(revchar,TGo4TaskHandler::Get_fgcOK())))
00337 {
00338 TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!",GetName());
00339 rev+=1;
00340
00341 }
00342 }
00343 if(!taskhandler->DisConnect(clientwait))rev+=1;
00344 if (!RemoveTaskHandler(tname.Data())) rev+=2;
00345 if (rev==0)
00346 {
00347
00348 fuTaskCount--;
00349 if(iscontrollertask) fbHasControllerConnection=kFALSE;
00350 fbClientIsRemoved=kTRUE;
00351 TGo4Log::Debug(" TaskManager: client %s has been disconnected. ", tname.Data());
00352 }
00353 else
00354 {
00355
00356 TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(),rev);
00357 }
00358
00359 }
00360
00361 else
00362 {
00363
00364 TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
00365 rev=-1;
00366 }
00367 return rev;
00368 }
00369
00370 Bool_t TGo4TaskManager::AddClient(const char* client, const char* host, Go4CommandMode_t role)
00371 {
00372 TGo4TaskHandler* han=NewTaskHandler(client);
00373 if (han==0)
00374 {
00375 TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already exisiting !!! ",client);
00376 fxTransport->Send(TGo4TaskHandler::Get_fgcERROR());
00377 return kFALSE;
00378 }
00379
00380 if(han->Connect(host,fxTransport))
00381 {
00382
00383 TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
00384 client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
00385 fuTaskCount++;
00386 han->SetRole(role);
00387 if(role>kGo4ComModeObserver) fbHasControllerConnection=kTRUE;
00388 fxServer->SetCurrentTask(client);
00389 fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
00390 client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
00391 return kTRUE;
00392 }
00393 else
00394 {
00395 TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s)", client, host);
00396 RemoveTaskHandler(client);
00397 return kFALSE;
00398 }
00399 return kFALSE;
00400 }
00401
00402
00403 Bool_t TGo4TaskManager::AddTaskHandler(TGo4TaskHandler* han)
00404 {
00405 Bool_t rev=kFALSE;
00406 {
00407 TGo4LockGuard listguard(fxListMutex);
00408 if(fxTaskList->FindObject(han)==0)
00409
00410 {
00411
00412 fxTaskList->AddLast(han);
00413 rev=kTRUE;
00414 }
00415 else
00416 {
00417
00418 rev=kFALSE;
00419 }
00420 }
00421 return rev;
00422 }
00423
00424
00425 TGo4TaskHandler* TGo4TaskManager::NewTaskHandler(const char* name)
00426 {
00427 TGo4TaskHandler* han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
00428 if(AddTaskHandler(han))
00429 {
00430
00431 return han;
00432 }
00433 else
00434 {
00435
00436 delete han;
00437 return 0;
00438
00439 }
00440 return 0;
00441 }
00442
00443 Bool_t TGo4TaskManager::RemoveTaskHandler(const char* name)
00444 {
00445 Bool_t rev=kTRUE;
00446 TGo4TaskHandler* taskhandler;
00447 {
00448 TGo4LockGuard listguard(fxListMutex);
00449 TObject* obj=fxTaskList->FindObject(name);
00450 taskhandler= (TGo4TaskHandler*) fxTaskList->Remove(obj);
00451
00452
00453 }
00454 if(taskhandler!=0)
00455 {
00456
00457 TGo4TaskHandler* currenttaskhandler=fxServer->GetCurrentTaskHandler();
00458 if(taskhandler==currenttaskhandler)
00459 {
00460
00461 fxServer->SetCurrentTask(0);
00462 }
00463 else
00464 {
00465
00466 fxServer->StartWorkThreads();
00467 }
00468 delete taskhandler;
00469 }
00470 else
00471 {
00472
00473 rev=kFALSE;
00474 }
00475 return rev;
00476 }
00477
00478
00479
00480 TGo4TaskHandler* TGo4TaskManager::GetTaskHandler(const char* name)
00481 {
00482 TGo4TaskHandler* th=0;
00483 {
00484 TGo4LockGuard listguard(fxListMutex);
00485 th= (TGo4TaskHandler*) fxTaskList->FindObject(name);
00486 }
00487 return th;
00488 }
00489
00490 TGo4TaskHandler* TGo4TaskManager::GetLastTaskHandler()
00491 {
00492 TGo4TaskHandler* th=0;
00493 {
00494 TGo4LockGuard listguard(fxListMutex);
00495 th= (TGo4TaskHandler*) fxTaskList->Last();
00496 }
00497 return th;
00498 }
00499
00500 TGo4TaskHandler* TGo4TaskManager::NextTaskHandler(Bool_t reset)
00501 {
00502 TGo4LockGuard listguard(fxListMutex);
00503 if(reset) fxTaskIter->Reset();
00504 TGo4TaskHandler* th=dynamic_cast<TGo4TaskHandler*>(fxTaskIter->Next());
00505 return th;
00506 }
00507
00508
00509 Int_t TGo4TaskManager::WaitForClientRemoved()
00510
00511 {
00512 Int_t count=0;
00513 while(!fbClientIsRemoved)
00514 {
00515 if(count>TGo4TaskManager::fgiDISCONCYCLES)
00516 {
00517 return -1;
00518 }
00519 else if(fxServer->IsTerminating())
00520 {
00521 return -2;
00522 }
00523 else
00524 {
00525 TGo4Thread::Sleep(TGo4TaskManager::fguDISCONTIME);
00526 ++count;
00527 }
00528 }
00529 fbClientIsRemoved=kFALSE;
00530 return count;
00531
00532 }
00533
00534 UInt_t TGo4TaskManager::GetNegotiationPort()
00535 {
00536 if(fxTransport)
00537 {
00538 fuNegotiationPort = fxTransport->GetPort();
00539 }
00540
00541 return fuNegotiationPort;
00542 }