00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "TGo4TaskManager.h"
00017
00018 #include <iostream.h>
00019
00020 #include "TObjArray.h"
00021
00022 #include "Go4Log/TGo4Log.h"
00023 #include "Go4LockGuard/TGo4LockGuard.h"
00024 #include "Go4ThreadManager/TGo4Thread.h"
00025 #include "Go4Queue/TGo4BufferQueue.h"
00026 #include "Go4Socket/TGo4Socket.h"
00027 #include "Go4CommandsBase/TGo4Command.h"
00028 #include "TGo4TaskHandler.h"
00029 #include "Go4TaskHandlerExceptions.h"
00030 #include "TGo4ServerTask.h"
00031
00032 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360;
00033
00034 const UInt_t TGo4TaskManager::fguDISCONTIME=500;
00035
00036 TGo4TaskManager::TGo4TaskManager(const char* name,
00037 TGo4ServerTask * server,
00038 UInt_t negotiationport)
00039 : TNamed(name,"This is a Go4TaskManager"),
00040 fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
00041 {
00042 fxServer=server;
00043
00044 if(negotiationport==0)
00045 {
00046
00047 fuNegotiationPort=TGo4TaskHandler::fguCONNECTORPORT;
00048 }
00049 else
00050 {
00051
00052 fuNegotiationPort=negotiationport;
00053 }
00054
00055 fxListMutex=new TMutex(kTRUE);
00056 fxTaskList=new TObjArray;
00057 fxTaskIter=fxTaskList->MakeIterator();
00058 fxTransport=0;
00059 }
00060
00061 TGo4TaskManager::~TGo4TaskManager()
00062 {
00063 if(fxTransport)
00064 {
00065 fxTransport->Close();
00066 delete fxTransport;
00067 fxTransport=0;
00068 }
00069 delete fxTaskIter;
00070 delete fxTaskList;
00071 delete fxListMutex;
00072 }
00073
00074 Int_t TGo4TaskManager::ServeClient()
00075 {
00076 Int_t rev=0;
00077 char* recvchar=0;
00078 char cliname[TGo4ThreadManager::fguTEXTLENGTH];
00079 char hostname[TGo4ThreadManager::fguTEXTLENGTH];
00080
00081 if(!fxTransport)
00082 {
00083
00084 fxTransport=new TGo4Socket(kFALSE);
00085 }
00086
00087 fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
00088
00089 Int_t waitresult=fxServer->WaitForOpen();
00090 if(waitresult<0)
00091 {
00092
00093 TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
00094 cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << endl;
00095 throw TGo4TerminateException(fxServer);
00096
00097 }
00098 Int_t count=0;
00099 while(GetNegotiationPort()==0)
00100 {
00101 if(count>TGo4TaskHandler::fgiPORTWAITCYCLES)
00102 {
00103 TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
00104 cerr <<" TaskManager TIMEOUT ERROR retrieving port number !!! Terminating..." << endl;
00105 throw TGo4TerminateException(fxServer);
00106
00107 }
00108 else if(fxServer->IsTerminating())
00109 {
00110 return -1;
00111 }
00112 else
00113 {
00114 TGo4Thread::Sleep(TGo4TaskHandler::fguPORTWAITTIME);
00115 ++count;
00116 }
00117 }
00118 cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << endl;
00119 TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
00120 fuNegotiationPort);
00121 Int_t connectwaitseconds=fxServer->WaitForConnection();
00122
00123 if(connectwaitseconds<0)
00124 {
00125
00126
00127 return connectwaitseconds;
00128 }
00129 else
00130 {
00131
00132 }
00133
00134
00135
00136
00137 Go4CommandMode_t account=ClientLogin();
00138 if(account!=kGo4ComModeRefused)
00139 {
00140
00141
00142 fxTransport->Send(TGo4TaskHandler::fgcOK);
00143 recvchar=fxTransport->RecvRaw("dummy");
00144 strncpy(cliname, recvchar, TGo4ThreadManager::fguTEXTLENGTH -1);
00145 recvchar=fxTransport->RecvRaw("dummy");
00146 strncpy(hostname, recvchar, TGo4ThreadManager::fguTEXTLENGTH -1);
00147
00148
00149
00150 recvchar=fxTransport->RecvRaw("dummy");
00151 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
00152 {
00153
00154 rev = ConnectClient(cliname,hostname,account);
00155 }
00156 else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
00157 {
00158
00159 rev = DisConnectClient(cliname);
00160 }
00161 else
00162 {
00163
00164 rev =0;
00165 }
00166 }
00167 else
00168 {
00169
00170
00171 fxTransport->Send(TGo4TaskHandler::fgcERROR);
00172 TGo4Log::Debug(" TaskManager: ServeClient received invalid login, closing negotiation port ", cliname);
00173 fxServer->SetDisConnect(fxTransport);
00174
00175 fxServer->WaitForClose();
00176
00177
00178
00179 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname);
00180
00181 return 0;
00182 }
00183
00184
00185 recvchar=fxTransport->RecvRaw("dummy");
00186 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcOK))
00187 {
00188 fxServer->SetDisConnect(fxTransport);
00189 TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname);
00190 fxServer->WaitForClose();
00191 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname);
00192 }
00193 else
00194 {
00195 TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s ", cliname);
00196 throw TGo4RuntimeException();
00197 }
00198 return rev;
00199 }
00200
00201 Go4CommandMode_t TGo4TaskManager::ClientLogin()
00202 {
00203 if(fxTransport==0) return kGo4ComModeRefused;
00204 TString purpose;
00205 TString account;
00206 TString passwd;
00207 char* recvchar=fxTransport->RecvRaw("dummy");
00208 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcOK))
00209 {
00210
00212
00213 recvchar=fxTransport->RecvRaw("dummy");
00214 if(recvchar==0) return kGo4ComModeRefused;
00215 purpose=recvchar;
00216 recvchar=fxTransport->RecvRaw("dummy");
00217 if(recvchar==0) return kGo4ComModeRefused;
00218 account=recvchar;
00219
00220 recvchar=fxTransport->RecvRaw("dummy");
00221 if(recvchar==0) return kGo4ComModeRefused;
00222 passwd=recvchar;
00223
00224
00225
00226
00227
00228
00229 Bool_t matching=kFALSE;
00230 if(fxServer->IsMaster())
00231 {
00232 if(purpose==TGo4TaskHandler::fgcSLAVE) matching=kTRUE;
00233 }
00234 else
00235 {
00236 if(purpose==TGo4TaskHandler::fgcMASTER) matching=kTRUE;
00237 }
00238 if(!matching)
00239 {
00240 TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
00241 return kGo4ComModeRefused;
00242 }
00243
00244
00245 if(account==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()
00246 && passwd==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle())
00247 {
00248 TGo4Log::Debug(" TaskManager: Client logged in as observer");
00249 return kGo4ComModeObserver;
00250 }
00251 else if(account==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()
00252 && passwd==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle())
00253 {
00254
00255 if(fbHasControllerConnection)
00256 {
00257 TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
00258 return kGo4ComModeObserver;
00259 }
00260 else
00261 {
00262 TGo4Log::Debug(" TaskManager: Client logged in as controller");
00263 return kGo4ComModeController;
00264 }
00265 }
00266 else if(account==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()
00267 && passwd==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle())
00268 {
00269
00270 if(fbHasControllerConnection)
00271 {
00272 TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
00273 return kGo4ComModeObserver;
00274 }
00275 else
00276 {
00277 TGo4Log::Debug(" TaskManager: Client logged in as administrator");
00278 return kGo4ComModeAdministrator;
00279 }
00280 }
00281
00282 else
00283 {
00284 TGo4Log::Debug(" TaskManager: Client Login failed!!!");
00285 return kGo4ComModeRefused;
00286 }
00287 }
00288 return kGo4ComModeRefused;
00289 }
00290
00291 Int_t TGo4TaskManager::ConnectClient(const char* client, const char* host, Go4CommandMode_t role)
00292 {
00293
00294
00295 TGo4LockGuard listguard(fxListMutex);
00296 Int_t rev=0;
00297
00298 TString cliname=client;
00299
00300 if(fxTaskList->FindObject(cliname.Data())) return 1;
00301
00302 if (!AddClient(cliname.Data(),host,role)) rev=1;
00303 return rev;
00304 }
00305
00306
00307 Int_t TGo4TaskManager::DisConnectClient(const char* name, Bool_t clientwait)
00308 {
00309 TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
00310
00311
00312
00313
00314
00315 Int_t rev=0;
00316 TGo4TaskHandler* han=GetTaskHandler(name);
00317 rev=DisConnectClient(han,clientwait);
00318 return rev;
00319 }
00320
00321 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
00322 {
00323 Int_t rev=0;
00324 if(taskhandler!=0)
00325 {
00326 fbClientIsRemoved=kFALSE;
00327 TString tname=taskhandler->GetName();
00328 Bool_t iscontrollertask=(taskhandler->GetRole()>kGo4ComModeObserver);
00329
00330 fxServer->SendStopBuffers(tname);
00331 if(clientwait)
00332 {
00333
00334 Text_t* revchar=0;
00335 revchar=fxTransport->RecvRaw("dummy");
00336 if(!(revchar && !strcmp(revchar,TGo4TaskHandler::fgcOK)))
00337 {
00338 TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!",GetName());
00339 rev+=1;
00340
00341 }
00342 }
00343 if(!taskhandler->DisConnect(clientwait))rev+=1;
00344 if (!RemoveTaskHandler(tname.Data())) rev+=2;
00345 if (rev==0)
00346 {
00347
00348 fuTaskCount--;
00349 if(iscontrollertask) fbHasControllerConnection=kFALSE;
00350 fbClientIsRemoved=kTRUE;
00351 TGo4Log::Debug(" TaskManager: client %s has been disconnected. ", tname.Data());
00352 }
00353 else
00354 {
00355
00356 TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(),rev);
00357 }
00358
00359 }
00360
00361 else
00362 {
00363
00364 TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
00365 rev=-1;
00366 }
00367 return rev;
00368 }
00369
00370 Bool_t TGo4TaskManager::AddClient(const char* client, const char* host, Go4CommandMode_t role)
00371 {
00372 TGo4TaskHandler* han=NewTaskHandler(client);
00373 if (han==0)
00374 {
00375 TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already exisiting !!! ",client);
00376 fxTransport->Send(TGo4TaskHandler::fgcERROR);
00377 return kFALSE;
00378 }
00379
00380 if(han->Connect(host,fxTransport))
00381 {
00382
00383 TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
00384 client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
00385 fuTaskCount++;
00386 han->SetRole(role);
00387 if(role>kGo4ComModeObserver) fbHasControllerConnection=kTRUE;
00388 fxServer->SetCurrentTask(client);
00389 fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
00390 client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
00391 return kTRUE;
00392 }
00393 else
00394 {
00395 TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s) ",
00396 client,host);
00397 RemoveTaskHandler(client);
00398 return kFALSE;
00399 }
00400 return kFALSE;
00401 }
00402
00403
00404 Bool_t TGo4TaskManager::AddTaskHandler(TGo4TaskHandler* han)
00405 {
00406 Bool_t rev=kFALSE;
00407 {
00408
00409 TGo4LockGuard listguard(fxListMutex);
00410 if(fxTaskList->FindObject(han)==0)
00411
00412 {
00413
00414 fxTaskList->AddLast(han);
00415 rev=kTRUE;
00416 }
00417 else
00418 {
00419
00420 rev=kFALSE;
00421 }
00422 }
00423 return rev;
00424 }
00425
00426
00427 TGo4TaskHandler* TGo4TaskManager::NewTaskHandler(const char* name)
00428 {
00429 TGo4TaskHandler* han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
00430 if(AddTaskHandler(han))
00431 {
00432
00433 return han;
00434 }
00435 else
00436 {
00437
00438 delete han;
00439 return 0;
00440
00441 }
00442 return 0;
00443 }
00444
00445 Bool_t TGo4TaskManager::RemoveTaskHandler(const char* name)
00446 {
00447 Bool_t rev=kTRUE;
00448 TGo4TaskHandler* taskhandler;
00449 {
00450
00451 TGo4LockGuard listguard(fxListMutex);
00452 TObject* obj=fxTaskList->FindObject(name);
00453 taskhandler= (TGo4TaskHandler*) fxTaskList->Remove(obj);
00454
00455
00456 }
00457 if(taskhandler!=0)
00458 {
00459
00460 TGo4TaskHandler* currenttaskhandler=fxServer->GetCurrentTaskHandler();
00461 if(taskhandler==currenttaskhandler)
00462 {
00463
00464 fxServer->SetCurrentTask(0);
00465 }
00466 else
00467 {
00468
00469 fxServer->StartWorkThreads();
00470 }
00471 delete taskhandler;
00472 }
00473 else
00474 {
00475
00476 rev=kFALSE;
00477 }
00478 return rev;
00479 }
00480
00481
00482
00483 TGo4TaskHandler* TGo4TaskManager::GetTaskHandler(const char* name)
00484 {
00485 TGo4TaskHandler* th=0;
00486 {
00487
00488 TGo4LockGuard listguard(fxListMutex);
00489 th= (TGo4TaskHandler*) fxTaskList->FindObject(name);
00490 }
00491 return th;
00492 }
00493
00494 TGo4TaskHandler* TGo4TaskManager::GetLastTaskHandler()
00495 {
00496 TGo4TaskHandler* th=0;
00497 {
00498
00499 TGo4LockGuard listguard(fxListMutex);
00500 th= (TGo4TaskHandler*) fxTaskList->Last();
00501 }
00502 return th;
00503 }
00504
00505 TGo4TaskHandler* TGo4TaskManager::NextTaskHandler(Bool_t reset)
00506 {
00507
00508 TGo4LockGuard listguard(fxListMutex);
00509 if(reset) fxTaskIter->Reset();
00510 TGo4TaskHandler* th=dynamic_cast<TGo4TaskHandler*>(fxTaskIter->Next());
00511 return th;
00512 }
00513
00514
00515 Int_t TGo4TaskManager::WaitForClientRemoved()
00516
00517 {
00518 Int_t count=0;
00519 while(!fbClientIsRemoved)
00520 {
00521 if(count>TGo4TaskManager::fgiDISCONCYCLES)
00522 {
00523 return -1;
00524 }
00525 else if(fxServer->IsTerminating())
00526 {
00527 return -2;
00528 }
00529 else
00530 {
00531 TGo4Thread::Sleep(TGo4TaskManager::fguDISCONTIME);
00532 ++count;
00533 }
00534 }
00535 fbClientIsRemoved=kFALSE;
00536 return count;
00537
00538 }
00539
00540 UInt_t TGo4TaskManager::GetNegotiationPort()
00541 {
00542 if(fxTransport)
00543 {
00544 fuNegotiationPort = fxTransport->GetPort();
00545 }
00546
00547 return fuNegotiationPort;
00548 }
00549
00550