GSI Object Oriented Online Offline (Go4)  GO4-6.1.4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TGo4TaskManager.cxx
Go to the documentation of this file.
1 // $Id: TGo4TaskManager.cxx 3061 2021-03-12 15:13:42Z linev $
2 //-----------------------------------------------------------------------
3 // The GSI Online Offline Object Oriented (Go4) Project
4 // Experiment Data Processing at EE department, GSI
5 //-----------------------------------------------------------------------
6 // Copyright (C) 2000- GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
7 // Planckstr. 1, 64291 Darmstadt, Germany
8 // Contact: http://go4.gsi.de
9 //-----------------------------------------------------------------------
10 // This software can be used under the license agreements as stated
11 // in Go4License.txt file which is part of the distribution.
12 //-----------------------------------------------------------------------
13 
14 #include "TGo4TaskManager.h"
15 
16 #include <iostream>
17 
18 #include "TObjArray.h"
19 #include "TMutex.h"
20 
21 #include "TGo4Log.h"
22 #include "TGo4LockGuard.h"
23 #include "TGo4Thread.h"
24 #include "TGo4Socket.h"
25 #include "TGo4TaskHandler.h"
26 #include "TGo4TerminateException.h"
27 #include "TGo4ServerTask.h"
28 
29 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360; // wait cycles 180
30 
31 const UInt_t TGo4TaskManager::fguDISCONTIME=500; // time in ms 1000
32 
34  TGo4ServerTask * server,
35  UInt_t negotiationport,
36  Bool_t createconnector)
37 : TNamed(name,"This is a Go4TaskManager"),
38  fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
39 {
40  fxServer=server;
41  // set port number for the client server negotiation channel:
42  if(negotiationport==0)
43  {
44  // default: use taskhandler intrinsic port number
46  }
47  else
48  {
49  // use dynamic port number given by main program
50  fuNegotiationPort=negotiationport;
51  }
52 
53  fxListMutex=new TMutex(kTRUE);
54  fxTaskList=new TObjArray;
55  fxTaskIter=fxTaskList->MakeIterator();
56  fxTransport=0;
57  if(createconnector)
58  {
59  // this mode is for server task created on the fly
60  // connector should be available immediately, independent of timer connect!
61  TGo4Log::Debug("TaskManager: Created negotiation channel in ctor");
62  fxTransport=new TGo4Socket(kFALSE); // use raw transport for negotiations
63  fxTransport->Open( "Server mode does not need hostname", 0, kTRUE);
64  // note: Open() return value is not 0 here, since we do not have
65  // accept finished yet! but portnumber is ready after this...
66  }
67 }
68 
70 {
71  if(fxTransport!=0) {
72  fxTransport->Close();
73  delete fxTransport;
74  fxTransport=0;
75  }
76  delete fxTaskIter;
77  delete fxTaskList;
78  delete fxListMutex;
79 }
80 
82 {
83  //std::cout <<"EEEEEEEEEEEnter TGo4TaskManager::ServeClient()" << std::endl;
84  Int_t rev=0;
85  char* recvchar=0;
86  TString cliname, hostname;
87  // open connection in server mode with default port as raw Socket, wait for client
88  if(!fxTransport)
89  {
90  //std::cout << "+++++TaskManager creating new negotiation transport server instance" << std::endl;
91  fxTransport=new TGo4Socket(kFALSE); // use raw transport for negotiations
92  }
93  // we delegate the actual TSocket open to the taskconnector timer:
94  fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
95  // for portscan, we keep exisiting server socket (keepserv=kTRUE)
96  Int_t waitresult=fxServer->WaitForOpen(); // wait for the server Open() call by timer
97  if(waitresult<0)
98  {
99  // open timeout
100  TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
101  std::cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << std::endl;
103  //return kFALSE;
104  }
105  Int_t count=0;
106  while(GetNegotiationPort()==0)
107  {
109  {
110  TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
111  std::cerr <<" TaskManager TIMEOUT ERROR retrieving port number !!! Terminating..." << std::endl;
113  //return kFALSE;
114  }
115  else if(fxServer->IsTerminating())
116  {
117  //std::cout << "TTTTTT ServeClient sees terminating state and returns -1" << std::endl;
118  return -1;
119  }
120  else
121  {
123  ++count;
124  }
125  }
126  std::cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << std::endl;
127  TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
129  Int_t connectwaitseconds=fxServer->WaitForConnection(); // timer tells us by flag when the transport is opened
130 
131  if(connectwaitseconds<0)
132  {
133  // case of threadmanager termination:
134  // connector runnable shall stop on return from ServeClient method
135  return connectwaitseconds;
136  }
137  else
138  {
139  // just proceed to the client server negotiations
140  }
141 
142  // check connected client: we expect correct ok string
143 // recvchar=fxTransport->RecvRaw("dummy");
144 // if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
145  Go4CommandMode_t account=ClientLogin();
146  if(account!=kGo4ComModeRefused)
147  {
148  // client knows task handler, we keep talking
149  //
150  fxTransport->Send(TGo4TaskHandler::Get_fgcOK()); // handshake to assure the client
151  recvchar = fxTransport->RecvRaw("dummy");
152  cliname = recvchar; // get the client name
153  recvchar = fxTransport->RecvRaw("dummy");
154  hostname = recvchar; // get the host name
155  //
156  // check for connect or disconnect:
157  //
158  recvchar=fxTransport->RecvRaw("dummy"); // get string to tell us what to do...
159  if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
160  {
161  // request to connect a new client
162  rev = ConnectClient(cliname,hostname,account);
163  }
164  else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
165  {
166  // request to disconnect an existing client
167  rev = DisConnectClient(cliname);
168  }
169  else
170  {
171  // unknown request
172  rev =0;
173  }
174  }
175  else
176  {
177  // no valid client
178  //
180  TGo4Log::Debug(" TaskManager: ServeClient received invalid login, closing negotiation port ", cliname.Data());
181  fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
182  //TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation port ... ", cliname);
183  fxServer->WaitForClose(); // poll until timer has returned from close
184 // delete fxTransport;
185 // fxTransport=0;
186 // TGo4Log::Debug(" TaskManager: Closed and deleted negotiation port");
187  TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
188 
189  return 0;
190  }
191 
192  // finally, we close the channel again...
193  recvchar=fxTransport->RecvRaw("dummy"); // get exit message
194  if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
195  {
196  fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
197  TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname.Data());
198  fxServer->WaitForClose(); // poll until timer has returned from close
199  TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
200  }
201  else // if (!strcmp(revchar,TGo4TaskHandler::Get_fgcOK()))
202  {
203  TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s ", cliname.Data());
204  throw TGo4RuntimeException();
205  }
206  return rev;
207  }
208 
210 {
211 if(fxTransport==0) return kGo4ComModeRefused;
212 TString purpose;
213 TString account;
214 TString passwd;
215 char* recvchar=fxTransport->RecvRaw("dummy"); // first receive OK string
216 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
217 {
218  //return kGo4ComModeController; // old protocol: no password
220  TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
221  recvchar=fxTransport->RecvRaw("dummy"); // get purpose of client (master or slave)
222  if(recvchar==0) return kGo4ComModeRefused;
223  purpose=recvchar;
224  //std::cout <<"ClientLogin got purpose "<<purpose.Data() << std::endl;
225  recvchar=fxTransport->RecvRaw("dummy"); // login account
226  if(recvchar==0) return kGo4ComModeRefused;
227  account=recvchar;
228  //std::cout <<"ClientLogin got account "<<account.Data() << std::endl;
229  recvchar=fxTransport->RecvRaw("dummy"); // login password
230  if(recvchar==0) return kGo4ComModeRefused;
231  passwd=recvchar;
232 // std::cout <<"ClientLogin got passwd "<<passwd.Data() << std::endl;
233 // std::cout <<"observer account is "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle() << std::endl;
234 // std::cout <<"controller account is "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle() << std::endl;
235 // std::cout <<"admin account is "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle() << std::endl;
236 
237  // first check if client matches our own purpose:
238  Bool_t matching=kFALSE;
239  if(fxServer->IsMaster())
240  {
241  if(purpose==TGo4TaskHandler::fgcSLAVE) matching=kTRUE;
242  }
243  else
244  {
245  if(purpose==TGo4TaskHandler::fgcMASTER) matching=kTRUE;
246  }
247  if(!matching)
248  {
249  TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
250  return kGo4ComModeRefused;
251  }
252 
253  // check password and account:
254  if(account==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()
255  && passwd==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle())
256  {
257  TGo4Log::Debug(" TaskManager: Client logged in as observer");
258  return kGo4ComModeObserver;
259  }
260  else if(account==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()
261  && passwd==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle())
262  {
263  // avoid multiple controllers at this server:
265  {
266  TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
267  return kGo4ComModeObserver;
268  }
269  else
270  {
271  TGo4Log::Debug(" TaskManager: Client logged in as controller");
272  return kGo4ComModeController;
273  }
274  }
275  else if(account==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()
276  && passwd==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle())
277  {
278  // avoid multiple controllers at this server:
280  {
281  TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
282  return kGo4ComModeObserver;
283  }
284  else
285  {
286  TGo4Log::Debug(" TaskManager: Client logged in as administrator");
288  }
289  }
290 
291  else
292  {
293  TGo4Log::Debug(" TaskManager: Client Login failed!!!");
294  return kGo4ComModeRefused;
295  }
296 }
297 return kGo4ComModeRefused;
298 }
299 
300 Int_t TGo4TaskManager::ConnectClient(const char* client, const char* host, Go4CommandMode_t role)
301 {
302  Int_t rev=0;
303  // check first if client of that name already exists:
304  TString cliname=client;
305  if (!AddClient(cliname.Data(),host,role)) rev=1;
306  return rev;
307 }
308 
309 
310 Int_t TGo4TaskManager::DisConnectClient(const char* name, Bool_t clientwait)
311 {
312  TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
313  //TGo4LockGuard listguard(fxListMutex);
314  // this mutex
315  // might deadlock between connector thread and local command thread
316  // in case of timeout: command thread inits disconnect by client request
317  // but if this fails, connector thread itself wants to finish disconnection hard
318  Int_t rev=0;
319  TGo4TaskHandler* han=GetTaskHandler(name);
320  rev=DisConnectClient(han,clientwait);
321  return rev;
322 }
323 
324 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
325 {
326 Int_t rev=0;
327 if(taskhandler!=0)
328  {
329  fbClientIsRemoved=kFALSE; // reset the flag for waiting commander thread
330  TString tname=taskhandler->GetName();
331  Bool_t iscontrollertask=(taskhandler->GetRole()>kGo4ComModeObserver);
332  fxServer->SendStopBuffers(tname); // suspend remote threads from socket receive
333  if(clientwait)
334  {
335  // wait for OK string sent by client over connector negotiation port
336  char* revchar = fxTransport->RecvRaw("dummy"); // wait for client close ok
337  if(!(revchar && !strcmp(revchar,TGo4TaskHandler::Get_fgcOK())))
338  {
339  TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!",GetName());
340  rev+=1;
341  //throw TGo4RuntimeException();
342  }
343  } // if(clientwait)
344  if(!taskhandler->DisConnect(clientwait))rev+=1;
345  if (!RemoveTaskHandler(tname.Data())) rev+=2;
346  if (rev==0)
347  {
348  // all right, we reset flags
349  fuTaskCount--; // set number of still connected client tasks
350  if(iscontrollertask) fbHasControllerConnection=kFALSE;
351  fbClientIsRemoved=kTRUE; // this flag tells the main thread we are done
352  TGo4Log::Debug(" TaskManager: client %s has been disconnected. ", tname.Data());
353  }
354  else
355  {
356  // something went wrong, warn the user
357  TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(),rev);
358  }
359 
360  }
361 
362 else
363  {
364  // no such client
365  TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
366  rev=-1;
367  }
368 return rev;
369 }
370 
371 Bool_t TGo4TaskManager::AddClient(const char* client, const char* host, Go4CommandMode_t role)
372 {
373  TGo4TaskHandler* han = NewTaskHandler(client);
374  if (!han) {
375  TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already exisiting !!! ",client);
376  fxTransport->Send(TGo4TaskHandler::Get_fgcERROR()); // tell client we refuse connection
377  return kFALSE;
378  }
379 
380  if(han->Connect(host,fxTransport)) {
381  // successful connection:
382  TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
383  client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
384  fuTaskCount++;
385  han->SetRole(role);
387  fxServer->SetCurrentTask(client); // this will set the direct link to the new client handler
388  fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
389  client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
390  return kTRUE;
391  }
392 
393  TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s)", client, host);
394  RemoveTaskHandler(client);
395  return kFALSE;
396 }
397 
398 
400 {
401  Bool_t rev=kFALSE;
402  {
403  TGo4LockGuard listguard(fxListMutex);
404  if(fxTaskList->FindObject(han)==0)
405  // is taskhandler already in list?
406  {
407  //no, add the new taskhandler
408  fxTaskList->AddLast(han);
409  rev=kTRUE;
410  }
411  else
412  {
413  // yes, do nothing
414  rev=kFALSE;
415  }
416  } // TGo4LockGuard
417  return rev;
418 }
419 
420 
422 {
423  TGo4TaskHandler* han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
424  // success, taskhandler was not already existing
425  if(AddTaskHandler(han)) return han;
426 
427  // error, taskhandler of this name was already there
428  delete han;
429  return 0;
430 }
431 
432 Bool_t TGo4TaskManager::RemoveTaskHandler(const char* name)
433 {
434  Bool_t rev=kTRUE;
435  TGo4TaskHandler* taskhandler;
436  {
437  TGo4LockGuard listguard(fxListMutex);
438  TObject* obj=fxTaskList->FindObject(name);
439  taskhandler= (TGo4TaskHandler*) fxTaskList->Remove(obj);
440  // Remove will do nothing if obj==0; on success, it returns pointer to
441  // removed object
442  } //TGo4LockGuard
443  if(taskhandler!=0)
444  {
445  // test if we have removed the currently active taskhandler
446  TGo4TaskHandler* currenttaskhandler=fxServer->GetCurrentTaskHandler();
447  if(taskhandler==currenttaskhandler)
448  {
449  // yes, then set current task to the next in list
450  fxServer->SetCurrentTask(0); // will also start the work threads again
451  }
452  else // if (taskhandler==currenttaskhandler)
453  {
454  // no, the current task remains
455  fxServer->StartWorkThreads(); // but need to start the work threads
456  }
457  delete taskhandler;
458  } // if (taskhandler!=0)
459  else
460  {
461  // no such handler, do nothing
462  rev=kFALSE;
463  }
464  return rev;
465 }
466 
467 
468 
470 {
471  TGo4TaskHandler* th=0;
472  {
473  TGo4LockGuard listguard(fxListMutex);
474  th= (TGo4TaskHandler*) fxTaskList->FindObject(name);
475  } //TGo4LockGuard
476  return th;
477 }
478 
480 {
481  TGo4TaskHandler* th=0;
482  {
483  TGo4LockGuard listguard(fxListMutex);
484  th= (TGo4TaskHandler*) fxTaskList->Last();
485  } //TGo4LockGuard
486  return th;
487 }
488 
490 {
491  TGo4LockGuard listguard(fxListMutex);
492  if(reset) fxTaskIter->Reset();
493  TGo4TaskHandler* th=dynamic_cast<TGo4TaskHandler*>(fxTaskIter->Next());
494  return th;
495 }
496 
497 
499 
500 {
501  Int_t count=0;
502  while(!fbClientIsRemoved)
503  {
505  {
506  return -1;
507  }
508  else if(fxServer->IsTerminating())
509  {
510  return -2;
511  }
512  else
513  {
515  ++count;
516  }
517  }
518  fbClientIsRemoved=kFALSE; // reset for next time
519  return count;
520 
521 }
522 
524 {
525  if(fxTransport)
526  {
528  }
529 // std::cout << "...........Taskmanager found negotiation port "<< fuNegotiationPort << std::endl;
530  return fuNegotiationPort;
531 }
Bool_t DisConnect(Bool_t waitforclient=kTRUE)
virtual Int_t StartWorkThreads()
Definition: TGo4Task.cxx:569
static TNamed fgxADMINISTRATORACCOUNT
Bool_t AddTaskHandler(TGo4TaskHandler *han)
virtual Int_t Close(Option_t *opt="")
Definition: TGo4Socket.cxx:212
static UInt_t Get_fguPORTWAITTIME()
Int_t GetStatPort() const
virtual ~TGo4TaskManager()
TGo4TaskHandler * NextTaskHandler(Bool_t reset=kFALSE)
virtual Int_t Send(TObject *obj)
Definition: TGo4Socket.cxx:338
Int_t WaitForConnection()
Int_t ConnectClient(const char *client, const char *host, Go4CommandMode_t role=kGo4ComModeController)
Go4CommandMode_t
Definition: TGo4Command.h:27
UInt_t GetNegotiationPort()
void SetDisConnect(TGo4Socket *trans)
static const char * Get_fgcERROR()
static void Warn(const char *text,...)
Definition: TGo4Log.cxx:300
Bool_t fbHasControllerConnection
static void Sleep(UInt_t millisecs)
Definition: TGo4Thread.cxx:335
Int_t GetDatPort() const
static const char * Get_fgcOK()
void SendStopBuffers(const char *taskname=0)
Definition: TGo4Task.cxx:581
virtual Int_t Open(const char *host, Int_t port, Bool_t keepservsock=kFALSE)
Definition: TGo4Socket.cxx:92
void SetConnect(TGo4Socket *trans, const char *host, UInt_t port, Bool_t keepserv=kFALSE)
TGo4ServerTask * fxServer
Go4CommandMode_t ClientLogin()
Int_t GetComPort() const
static const UInt_t fguDISCONTIME
Bool_t RemoveTaskHandler(const char *name)
static const char * fgcCONNECT
TIterator * fxTaskIter
Int_t DisConnectClient(const char *name, Bool_t clientwait=kTRUE)
TGo4TaskHandler * GetTaskHandler(const char *name)
static const char * fgcMASTER
void SendStatusMessage(Int_t level, Bool_t printout, const char *text,...)
Definition: TGo4Task.cxx:285
Bool_t AddClient(const char *client, const char *host, Go4CommandMode_t role)
static const Int_t fgiDISCONCYCLES
static const UInt_t fguCONNECTORPORT
static TNamed fgxCONTROLLERACCOUNT
Go4CommandMode_t GetRole()
static const char * fgcSLAVE
void SetRole(Go4CommandMode_t role)
TGo4Socket * fxTransport
static TNamed fgxOBSERVERACCOUNT
TObjArray * fxTaskList
TGo4TaskHandler * GetLastTaskHandler()
Bool_t IsTerminating() const
static Int_t Get_fgiPORTWAITCYCLES()
Int_t GetPort() const
Definition: TGo4Socket.h:40
TGo4TaskHandler * GetCurrentTaskHandler()
Int_t WaitForClientRemoved()
static const char * fgcDISCONNECT
virtual char * RecvRaw(const char *name=0)
Definition: TGo4Socket.cxx:407
void SetCurrentTask(const char *name)
static const char * GetModeDescription(Go4CommandMode_t mode)
static void Error(const char *text,...)
Definition: TGo4Log.cxx:313
TGo4TaskHandler * NewTaskHandler(const char *name)
static void Info(const char *text,...)
Definition: TGo4Log.cxx:287
Bool_t Connect(const char *host="localhost", TGo4Socket *negotiator=0)
static void Debug(const char *text,...)
Definition: TGo4Log.cxx:274
Bool_t IsMaster()
Definition: TGo4Task.h:93