Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

/Go4TaskHandler/TGo4TaskManager.cxx

Go to the documentation of this file.
00001 //---------------------------------------------------------------
00002 //        Go4 Release Package v2.10-5 (build 21005) 
00003 //                      03-Nov-2005
00004 //---------------------------------------------------------------
00005 //       The GSI Online Offline Object Oriented (Go4) Project
00006 //       Experiment Data Processing at DVEE department, GSI
00007 //---------------------------------------------------------------
00008 //
00009 //Copyright (C) 2000- Gesellschaft f. Schwerionenforschung, GSI
00010 //                    Planckstr. 1, 64291 Darmstadt, Germany
00011 //Contact:            http://go4.gsi.de
00012 //----------------------------------------------------------------
00013 //This software can be used under the license agreements as stated
00014 //in Go4License.txt file which is part of the distribution.
00015 //----------------------------------------------------------------
00016 #include "TGo4TaskManager.h"
00017 
00018 #include <iostream.h>
00019 
00020 #include "TObjArray.h"
00021 
00022 #include "Go4Log/TGo4Log.h"
00023 #include "Go4LockGuard/TGo4LockGuard.h"
00024 #include "Go4ThreadManager/TGo4Thread.h"
00025 #include "Go4Queue/TGo4BufferQueue.h"
00026 #include "Go4Socket/TGo4Socket.h"
00027 #include "Go4CommandsBase/TGo4Command.h"
00028 #include "TGo4TaskHandler.h"
00029 #include "Go4TaskHandlerExceptions.h"
00030 #include "TGo4ServerTask.h"
00031 
00032 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360; // wait cycles 180
00033 
00034 const UInt_t TGo4TaskManager::fguDISCONTIME=500; // time in ms 1000
00035 
00036 TGo4TaskManager::TGo4TaskManager(const char* name,
00037                                  TGo4ServerTask * server,
00038                                  UInt_t negotiationport)
00039 : TNamed(name,"This is a Go4TaskManager"), 
00040    fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
00041 {
00042    fxServer=server;
00043  // set port number for the client server negotiation channel:
00044    if(negotiationport==0)
00045       {
00046          // default: use taskhandler intrinsic port number
00047          fuNegotiationPort=TGo4TaskHandler::fguCONNECTORPORT;
00048       }
00049    else
00050       {
00051          // use dynamic port number given by main program
00052          fuNegotiationPort=negotiationport;
00053       }
00054 
00055    fxListMutex=new TMutex(kTRUE);
00056    fxTaskList=new TObjArray;
00057    fxTaskIter=fxTaskList->MakeIterator();
00058    fxTransport=0;
00059 }
00060 
00061 TGo4TaskManager::~TGo4TaskManager()
00062 {
00063    if(fxTransport)
00064       {
00065          fxTransport->Close();
00066          delete fxTransport;
00067          fxTransport=0;
00068       }
00069    delete fxTaskIter;
00070    delete fxTaskList;
00071    delete fxListMutex;
00072 }
00073 
00074 Int_t TGo4TaskManager::ServeClient()
00075 {
00076    Int_t rev=0;
00077    char* recvchar=0;
00078    char cliname[TGo4ThreadManager::fguTEXTLENGTH];
00079    char hostname[TGo4ThreadManager::fguTEXTLENGTH];
00080    // open connection in server mode with default port as raw Socket, wait for client
00081    if(!fxTransport)
00082       {
00083          //cout << "+++++TaskManager creating new negotiation transport server instance" <<endl;
00084          fxTransport=new TGo4Socket(kFALSE); // use raw transport for negotiations
00085       }
00086    // we delegate the actual TSocket open to the taskconnector timer:
00087    fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
00088       // for portscan, we keep exisiting server socket (keepserv=kTRUE)
00089    Int_t waitresult=fxServer->WaitForOpen(); // wait for the server Open() call by timer
00090    if(waitresult<0)
00091    {
00092       // open timeout
00093       TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
00094       cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << endl;
00095       throw TGo4TerminateException(fxServer);
00096       //return kFALSE;
00097    }
00098    Int_t count=0;
00099    while(GetNegotiationPort()==0)
00100    {
00101       if(count>TGo4TaskHandler::fgiPORTWAITCYCLES)
00102             {
00103                TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
00104                cerr <<" TaskManager TIMEOUT ERROR retrieving port number  !!! Terminating..." << endl;
00105                throw TGo4TerminateException(fxServer);
00106                //return kFALSE;
00107             }
00108       else if(fxServer->IsTerminating())
00109             {
00110                return -1;
00111             }
00112       else
00113             {
00114               TGo4Thread::Sleep(TGo4TaskHandler::fguPORTWAITTIME);
00115               ++count;
00116             }
00117    }
00118    cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << endl;
00119    TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
00120             fuNegotiationPort);
00121    Int_t connectwaitseconds=fxServer->WaitForConnection(); // timer tells us by flag when the transport is opened
00122 
00123    if(connectwaitseconds<0)
00124       {
00125          // case of threadmanager termination:
00126          // connector runnable shall stop on return from ServeClient method
00127          return connectwaitseconds;
00128       }
00129    else
00130       {
00131          // just proceed to the client server negotiations
00132       }
00133 
00134    // check connected client: we expect correct ok string
00135 //   recvchar=fxTransport->RecvRaw("dummy");
00136 //   if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcOK))
00137      Go4CommandMode_t account=ClientLogin(); 
00138      if(account!=kGo4ComModeRefused)
00139          {
00140          // client knows task handler, we keep talking
00141          //
00142          fxTransport->Send(TGo4TaskHandler::fgcOK); // handshake to assure the client
00143          recvchar=fxTransport->RecvRaw("dummy");
00144          strncpy(cliname, recvchar, TGo4ThreadManager::fguTEXTLENGTH -1); // get the client name
00145          recvchar=fxTransport->RecvRaw("dummy");
00146          strncpy(hostname, recvchar, TGo4ThreadManager::fguTEXTLENGTH -1); // get the host name
00147          //
00148          // check for connect or disconnect:
00149          //
00150          recvchar=fxTransport->RecvRaw("dummy"); // get string to tell us what to do...
00151          if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
00152             {
00153                // request to connect a new client
00154                rev  = ConnectClient(cliname,hostname,account);
00155             }
00156          else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
00157             {
00158                // request to disconnect an existing client
00159              rev = DisConnectClient(cliname);
00160             }
00161          else
00162             {
00163                // unknown request
00164                rev =0;
00165             }
00166       }
00167    else
00168       {
00169          // no valid client
00170          //
00171       fxTransport->Send(TGo4TaskHandler::fgcERROR);
00172       TGo4Log::Debug(" TaskManager: ServeClient received invalid login, closing negotiation port ", cliname);
00173       fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
00174       //TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation port ... ", cliname);
00175       fxServer->WaitForClose(); // poll until timer has returned from close
00176 //      delete fxTransport; 
00177 //      fxTransport=0; 
00178 //      TGo4Log::Debug(" TaskManager: Closed and deleted negotiation port");
00179       TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname);
00180  
00181       return 0;
00182       }
00183 
00184    // finally, we close the channel again...
00185    recvchar=fxTransport->RecvRaw("dummy"); // get exit message
00186    if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcOK))
00187       {
00188       fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
00189       TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname);
00190       fxServer->WaitForClose(); // poll until timer has returned from close
00191       TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname);
00192       }
00193    else // if (!strcmp(revchar,TGo4TaskHandler::fgcOK))
00194       {
00195       TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s   ", cliname);
00196       throw TGo4RuntimeException();
00197       }
00198    return rev;
00199    }
00200 
00201 Go4CommandMode_t TGo4TaskManager::ClientLogin()
00202 {
00203 if(fxTransport==0) return kGo4ComModeRefused;
00204 TString purpose;
00205 TString account;
00206 TString passwd;
00207 char* recvchar=fxTransport->RecvRaw("dummy"); // first receive OK string   
00208 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcOK))
00209 {
00210    //return kGo4ComModeController; // old protocol: no password
00212    TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
00213    recvchar=fxTransport->RecvRaw("dummy"); // get purpose of client (master or slave)
00214    if(recvchar==0) return kGo4ComModeRefused;
00215    purpose=recvchar;
00216    recvchar=fxTransport->RecvRaw("dummy"); // login account
00217    if(recvchar==0) return kGo4ComModeRefused;
00218    account=recvchar;
00219    //cout <<"ClientLogin got account "<<account.Data() << endl;
00220    recvchar=fxTransport->RecvRaw("dummy"); // login password
00221    if(recvchar==0) return kGo4ComModeRefused;
00222    passwd=recvchar;
00223 //   cout <<"ClientLogin got purpose "<<purpose.Data() << endl;
00224 //   cout <<"ClientLogin got passwd "<<passwd.Data() << endl;
00225 //   cout <<"observer account is "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle() << endl;
00226 //   cout <<"controller account is "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle() << endl;
00227  
00228    // first check if client matches our own purpose:
00229    Bool_t matching=kFALSE;
00230    if(fxServer->IsMaster())
00231       {
00232            if(purpose==TGo4TaskHandler::fgcSLAVE) matching=kTRUE;
00233       }
00234    else
00235       {
00236            if(purpose==TGo4TaskHandler::fgcMASTER) matching=kTRUE;
00237       }
00238    if(!matching)   
00239       {
00240          TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
00241          return kGo4ComModeRefused;
00242       }
00243  
00244    // check password and account:
00245    if(account==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()
00246       && passwd==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle())
00247       {
00248          TGo4Log::Debug(" TaskManager: Client logged in as observer");
00249          return kGo4ComModeObserver;
00250       }
00251    else if(account==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()
00252          && passwd==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle())    
00253       {
00254          // avoid multiple controllers at this server:
00255          if(fbHasControllerConnection)
00256             {
00257                TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
00258                return kGo4ComModeObserver;
00259             }
00260          else
00261             {
00262                TGo4Log::Debug(" TaskManager: Client logged in as controller");
00263                return kGo4ComModeController;
00264             }
00265       }
00266    else if(account==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()
00267          && passwd==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle())    
00268       {
00269          // avoid multiple controllers at this server:
00270          if(fbHasControllerConnection)
00271             {
00272                TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
00273                return kGo4ComModeObserver;
00274             }
00275          else
00276             {
00277                TGo4Log::Debug(" TaskManager: Client logged in as administrator");
00278                return kGo4ComModeAdministrator;
00279             }
00280       }   
00281       
00282    else
00283       {
00284          TGo4Log::Debug(" TaskManager: Client Login failed!!!");
00285          return kGo4ComModeRefused;
00286       }
00287 }
00288 return kGo4ComModeRefused;   
00289 } 
00290 
00291 Int_t TGo4TaskManager::ConnectClient(const char* client, const char* host, Go4CommandMode_t role)
00292 {
00293    //cout <<"llllllll TGo4TaskManager::ConnectClient requesting list mutex"<< endl;
00294  
00295    TGo4LockGuard listguard(fxListMutex);
00296    Int_t rev=0;
00297    // check first if client of that name already exists:
00298    TString cliname=client;  
00299    //if(fxTaskList->FindObject(cliname.Data())) cliname+=GetTaskCount()+1;
00300    if(fxTaskList->FindObject(cliname.Data())) return 1; // do not allow second connection of same client
00301    //cout <<"CCCCCCCC Connecting client with name "<<cliname.Data() << endl;
00302    if (!AddClient(cliname.Data(),host,role)) rev=1;
00303    return rev;
00304 }
00305 
00306 
00307 Int_t TGo4TaskManager::DisConnectClient(const char* name, Bool_t clientwait)
00308 {
00309    TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
00310    //TGo4LockGuard listguard(fxListMutex); 
00311       // this mutex    
00312       // might deadlock between connector thread and local command thread
00313       // in case of timeout: command thread inits disconnect by client request
00314       // but if this fails, connector thread itself wants to finish disconnection hard
00315    Int_t rev=0;
00316    TGo4TaskHandler* han=GetTaskHandler(name);
00317    rev=DisConnectClient(han,clientwait);
00318    return rev;
00319 }
00320 
00321 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
00322 {
00323 Int_t rev=0;
00324 if(taskhandler!=0)
00325    {
00326       fbClientIsRemoved=kFALSE; // reset the flag for waiting commander thread
00327       TString tname=taskhandler->GetName();
00328       Bool_t iscontrollertask=(taskhandler->GetRole()>kGo4ComModeObserver);
00329       
00330       fxServer->SendStopBuffers(tname); // suspend remote threads from socket receive
00331       if(clientwait)
00332          {
00333          // wait for OK string sent by client over connector negotiation port
00334          Text_t* revchar=0;
00335          revchar=fxTransport->RecvRaw("dummy"); // wait for client close ok
00336          if(!(revchar && !strcmp(revchar,TGo4TaskHandler::fgcOK)))
00337             {
00338                TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!",GetName());
00339                rev+=1;
00340                //throw TGo4RuntimeException();
00341             }
00342          } // if(clientwait)      
00343       if(!taskhandler->DisConnect(clientwait))rev+=1;
00344       if (!RemoveTaskHandler(tname.Data()))   rev+=2;
00345       if (rev==0)
00346          {
00347             // all right, we reset flags
00348             fuTaskCount--;            // set number of still connected client tasks
00349             if(iscontrollertask) fbHasControllerConnection=kFALSE;
00350             fbClientIsRemoved=kTRUE; // this flag tells the main thread we are done
00351             TGo4Log::Debug(" TaskManager: client %s has been disconnected.  ", tname.Data());
00352          }
00353       else
00354          {
00355             // something went wrong, warn the user
00356             TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(),rev);
00357          }
00358 
00359    }
00360 
00361 else 
00362    {
00363       // no such client
00364       TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
00365       rev=-1;
00366    }
00367 return rev;
00368 }
00369 
00370 Bool_t TGo4TaskManager::AddClient(const char* client, const char* host, Go4CommandMode_t role)
00371 {
00372   TGo4TaskHandler* han=NewTaskHandler(client);
00373   if (han==0)
00374    {
00375        TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already exisiting !!! ",client);
00376        fxTransport->Send(TGo4TaskHandler::fgcERROR); // tell client we refuse connection
00377        return kFALSE;
00378    }
00379 
00380 if(han->Connect(host,fxTransport))
00381    {
00382       // successful connection:
00383       TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
00384                client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
00385       fuTaskCount++;
00386       han->SetRole(role);
00387       if(role>kGo4ComModeObserver) fbHasControllerConnection=kTRUE;
00388       fxServer->SetCurrentTask(client); // this will set the direct link to the new client handler
00389       fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
00390          client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
00391       return kTRUE;
00392     }
00393 else
00394    {
00395       TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s)  ",
00396                client,host);
00397       RemoveTaskHandler(client);
00398       return kFALSE;
00399    }
00400 return kFALSE;
00401 }
00402 
00403 
00404 Bool_t TGo4TaskManager::AddTaskHandler(TGo4TaskHandler* han)
00405 {
00406    Bool_t rev=kFALSE;
00407    {
00408    //cout <<"llllllll TGo4TaskManager::AddTaskHandler requesting list mutex"<< endl;
00409    TGo4LockGuard listguard(fxListMutex);
00410       if(fxTaskList->FindObject(han)==0)
00411          // is taskhandler already in list?
00412             {
00413                //no, add the new taskhandler
00414                fxTaskList->AddLast(han);
00415                rev=kTRUE;
00416             }
00417          else
00418             {
00419                // yes, do nothing
00420                rev=kFALSE;
00421             }
00422    } //  TGo4LockGuard
00423    return rev;
00424 }
00425 
00426 
00427 TGo4TaskHandler* TGo4TaskManager::NewTaskHandler(const char* name)
00428 {
00429    TGo4TaskHandler* han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
00430    if(AddTaskHandler(han))
00431       {
00432          // success, taskhandler was not already existing
00433          return han;
00434       }
00435    else
00436       {
00437       // error, taskhandler of this name was already there
00438       delete han;
00439       return 0;
00440 
00441       }
00442    return 0;
00443 }
00444 
00445 Bool_t TGo4TaskManager::RemoveTaskHandler(const char* name)
00446 {
00447    Bool_t rev=kTRUE;
00448    TGo4TaskHandler* taskhandler;
00449    {
00450    //cout <<"llllllll TGo4TaskManager::RemoveTaskHandler requesting list mutex for "<< name << endl;
00451    TGo4LockGuard listguard(fxListMutex);
00452       TObject* obj=fxTaskList->FindObject(name);
00453       taskhandler= (TGo4TaskHandler*) fxTaskList->Remove(obj);
00454             // Remove will do nothing if obj==0; on success, it returns pointer to
00455             // removed object
00456    } //TGo4LockGuard
00457    if(taskhandler!=0)
00458       {
00459          // test if we have removed the currently active taskhandler
00460          TGo4TaskHandler* currenttaskhandler=fxServer->GetCurrentTaskHandler();
00461          if(taskhandler==currenttaskhandler)
00462             {
00463                // yes, then set current task to the next in list
00464                fxServer->SetCurrentTask(0); // will also start the work threads again
00465             }
00466          else // if (taskhandler==currenttaskhandler)
00467             {
00468                // no, the current task remains
00469                fxServer->StartWorkThreads(); // but need to start the work threads
00470             }
00471          delete taskhandler;
00472    } // if (taskhandler!=0)
00473    else
00474       {
00475          // no such handler, do nothing
00476          rev=kFALSE;
00477       }
00478    return rev;
00479 }
00480 
00481 
00482 
00483 TGo4TaskHandler* TGo4TaskManager::GetTaskHandler(const char* name)
00484 {
00485    TGo4TaskHandler* th=0;
00486     {
00487       //cout <<"llllllll TGo4TaskManager::GetTaskHandler requesting list mutex for "<< name << endl;
00488       TGo4LockGuard listguard(fxListMutex);
00489       th= (TGo4TaskHandler*) fxTaskList->FindObject(name);
00490     } //TGo4LockGuard
00491    return th;
00492 }
00493 
00494 TGo4TaskHandler* TGo4TaskManager::GetLastTaskHandler()
00495 {
00496    TGo4TaskHandler* th=0;
00497    {
00498    //cout <<"llllllll TGo4TaskManager::GetLastTaskHandler requesting list mutex"<< endl;
00499    TGo4LockGuard listguard(fxListMutex);
00500       th= (TGo4TaskHandler*) fxTaskList->Last();
00501     } //TGo4LockGuard
00502    return th;
00503 }
00504 
00505 TGo4TaskHandler* TGo4TaskManager::NextTaskHandler(Bool_t reset)
00506 {
00507    //cout <<"llllllll TGo4TaskManager::NextTaskHandler requesting list mutex"<< endl;
00508    TGo4LockGuard listguard(fxListMutex);   
00509    if(reset) fxTaskIter->Reset();
00510    TGo4TaskHandler* th=dynamic_cast<TGo4TaskHandler*>(fxTaskIter->Next());   
00511    return th;
00512 }
00513 
00514 
00515 Int_t TGo4TaskManager::WaitForClientRemoved()
00516 
00517 {
00518    Int_t count=0;
00519    while(!fbClientIsRemoved)
00520       {
00521          if(count>TGo4TaskManager::fgiDISCONCYCLES)
00522             {
00523                return -1;
00524             }
00525          else if(fxServer->IsTerminating())
00526             {
00527                return -2;
00528             }
00529          else
00530             {
00531                TGo4Thread::Sleep(TGo4TaskManager::fguDISCONTIME);
00532                ++count;
00533             }
00534       }
00535    fbClientIsRemoved=kFALSE; //  reset for next time
00536    return count;
00537 
00538 }
00539 
00540 UInt_t TGo4TaskManager::GetNegotiationPort()
00541 {
00542    if(fxTransport)
00543       {
00544          fuNegotiationPort = fxTransport->GetPort();
00545       }
00546 //   cout << "...........Taskmanager found negotiation port "<< fuNegotiationPort << endl;
00547    return fuNegotiationPort;
00548 }
00549 
00550 //----------------------------END OF GO4 SOURCE FILE ---------------------

Generated on Tue Nov 8 10:56:06 2005 for Go4-v2.10-5 by doxygen1.2.15