GSI Object Oriented Online Offline (Go4)  GO4-5.3.2
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
TGo4TaskManager.cxx
Go to the documentation of this file.
1 // $Id: TGo4TaskManager.cxx 999 2013-07-25 11:58:59Z linev $
2 //-----------------------------------------------------------------------
3 // The GSI Online Offline Object Oriented (Go4) Project
4 // Experiment Data Processing at EE department, GSI
5 //-----------------------------------------------------------------------
6 // Copyright (C) 2000- GSI Helmholtzzentrum für Schwerionenforschung GmbH
7 // Planckstr. 1, 64291 Darmstadt, Germany
8 // Contact: http://go4.gsi.de
9 //-----------------------------------------------------------------------
10 // This software can be used under the license agreements as stated
11 // in Go4License.txt file which is part of the distribution.
12 //-----------------------------------------------------------------------
13 
14 #include "TGo4TaskManager.h"
15 
16 #include "Riostream.h"
17 #include "TObjArray.h"
18 #include "TMutex.h"
19 
20 #include "TGo4Log.h"
21 #include "TGo4LockGuard.h"
22 #include "TGo4Thread.h"
23 #include "TGo4Socket.h"
24 #include "TGo4TaskHandler.h"
25 #include "TGo4TerminateException.h"
26 #include "TGo4ServerTask.h"
27 
28 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360; // wait cycles 180
29 
30 const UInt_t TGo4TaskManager::fguDISCONTIME=500; // time in ms 1000
31 
33  TGo4ServerTask * server,
34  UInt_t negotiationport,
35  Bool_t createconnector)
36 : TNamed(name,"This is a Go4TaskManager"),
37  fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
38 {
39  fxServer=server;
40  // set port number for the client server negotiation channel:
41  if(negotiationport==0)
42  {
43  // default: use taskhandler intrinsic port number
45  }
46  else
47  {
48  // use dynamic port number given by main program
49  fuNegotiationPort=negotiationport;
50  }
51 
52  fxListMutex=new TMutex(kTRUE);
53  fxTaskList=new TObjArray;
54  fxTaskIter=fxTaskList->MakeIterator();
55  fxTransport=0;
56  if(createconnector)
57  {
58  // this mode is for server task created on the fly
59  // connector should be available immediately, independent of timer connect!
60  TGo4Log::Debug("TaskManager: Created negotiation channel in ctor");
61  fxTransport=new TGo4Socket(kFALSE); // use raw transport for negotiations
62  fxTransport->Open( "Server mode does not need hostname", 0, kTRUE);
63  // note: Open() return value is not 0 here, since we do not have
64  // accept finished yet! but portnumber is ready after this...
65  }
66 }
67 
69 {
70  if(fxTransport!=0) {
71  fxTransport->Close();
72  delete fxTransport;
73  fxTransport=0;
74  }
75  delete fxTaskIter;
76  delete fxTaskList;
77  delete fxListMutex;
78 }
79 
81 {
82  //std::cout <<"EEEEEEEEEEEnter TGo4TaskManager::ServeClient()" << std::endl;
83  Int_t rev=0;
84  char* recvchar=0;
85  TString cliname, hostname;
86  // open connection in server mode with default port as raw Socket, wait for client
87  if(!fxTransport)
88  {
89  //std::cout << "+++++TaskManager creating new negotiation transport server instance" << std::endl;
90  fxTransport=new TGo4Socket(kFALSE); // use raw transport for negotiations
91  }
92  // we delegate the actual TSocket open to the taskconnector timer:
93  fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
94  // for portscan, we keep exisiting server socket (keepserv=kTRUE)
95  Int_t waitresult=fxServer->WaitForOpen(); // wait for the server Open() call by timer
96  if(waitresult<0)
97  {
98  // open timeout
99  TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
100  std::cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << std::endl;
102  //return kFALSE;
103  }
104  Int_t count=0;
105  while(GetNegotiationPort()==0)
106  {
108  {
109  TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
110  std::cerr <<" TaskManager TIMEOUT ERROR retrieving port number !!! Terminating..." << std::endl;
112  //return kFALSE;
113  }
114  else if(fxServer->IsTerminating())
115  {
116  //std::cout << "TTTTTT ServeClient sees terminating state and returns -1" << std::endl;
117  return -1;
118  }
119  else
120  {
122  ++count;
123  }
124  }
125  std::cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << std::endl;
126  TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
128  Int_t connectwaitseconds=fxServer->WaitForConnection(); // timer tells us by flag when the transport is opened
129 
130  if(connectwaitseconds<0)
131  {
132  // case of threadmanager termination:
133  // connector runnable shall stop on return from ServeClient method
134  return connectwaitseconds;
135  }
136  else
137  {
138  // just proceed to the client server negotiations
139  }
140 
141  // check connected client: we expect correct ok string
142 // recvchar=fxTransport->RecvRaw("dummy");
143 // if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
144  Go4CommandMode_t account=ClientLogin();
145  if(account!=kGo4ComModeRefused)
146  {
147  // client knows task handler, we keep talking
148  //
149  fxTransport->Send(TGo4TaskHandler::Get_fgcOK()); // handshake to assure the client
150  recvchar = fxTransport->RecvRaw("dummy");
151  cliname = recvchar; // get the client name
152  recvchar = fxTransport->RecvRaw("dummy");
153  hostname = recvchar; // get the host name
154  //
155  // check for connect or disconnect:
156  //
157  recvchar=fxTransport->RecvRaw("dummy"); // get string to tell us what to do...
158  if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
159  {
160  // request to connect a new client
161  rev = ConnectClient(cliname,hostname,account);
162  }
163  else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
164  {
165  // request to disconnect an existing client
166  rev = DisConnectClient(cliname);
167  }
168  else
169  {
170  // unknown request
171  rev =0;
172  }
173  }
174  else
175  {
176  // no valid client
177  //
179  TGo4Log::Debug(" TaskManager: ServeClient received invalid login, closing negotiation port ", cliname.Data());
180  fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
181  //TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation port ... ", cliname);
182  fxServer->WaitForClose(); // poll until timer has returned from close
183 // delete fxTransport;
184 // fxTransport=0;
185 // TGo4Log::Debug(" TaskManager: Closed and deleted negotiation port");
186  TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
187 
188  return 0;
189  }
190 
191  // finally, we close the channel again...
192  recvchar=fxTransport->RecvRaw("dummy"); // get exit message
193  if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
194  {
195  fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
196  TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname.Data());
197  fxServer->WaitForClose(); // poll until timer has returned from close
198  TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
199  }
200  else // if (!strcmp(revchar,TGo4TaskHandler::Get_fgcOK()))
201  {
202  TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s ", cliname.Data());
203  throw TGo4RuntimeException();
204  }
205  return rev;
206  }
207 
209 {
210 if(fxTransport==0) return kGo4ComModeRefused;
211 TString purpose;
212 TString account;
213 TString passwd;
214 char* recvchar=fxTransport->RecvRaw("dummy"); // first receive OK string
215 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
216 {
217  //return kGo4ComModeController; // old protocol: no password
219  TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
220  recvchar=fxTransport->RecvRaw("dummy"); // get purpose of client (master or slave)
221  if(recvchar==0) return kGo4ComModeRefused;
222  purpose=recvchar;
223  //std::cout <<"ClientLogin got purpose "<<purpose.Data() << std::endl;
224  recvchar=fxTransport->RecvRaw("dummy"); // login account
225  if(recvchar==0) return kGo4ComModeRefused;
226  account=recvchar;
227  //std::cout <<"ClientLogin got account "<<account.Data() << std::endl;
228  recvchar=fxTransport->RecvRaw("dummy"); // login password
229  if(recvchar==0) return kGo4ComModeRefused;
230  passwd=recvchar;
231 // std::cout <<"ClientLogin got passwd "<<passwd.Data() << std::endl;
232 // std::cout <<"observer account is "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle() << std::endl;
233 // std::cout <<"controller account is "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle() << std::endl;
234 // std::cout <<"admin account is "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle() << std::endl;
235 
236  // first check if client matches our own purpose:
237  Bool_t matching=kFALSE;
238  if(fxServer->IsMaster())
239  {
240  if(purpose==TGo4TaskHandler::fgcSLAVE) matching=kTRUE;
241  }
242  else
243  {
244  if(purpose==TGo4TaskHandler::fgcMASTER) matching=kTRUE;
245  }
246  if(!matching)
247  {
248  TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
249  return kGo4ComModeRefused;
250  }
251 
252  // check password and account:
253  if(account==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()
254  && passwd==TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle())
255  {
256  TGo4Log::Debug(" TaskManager: Client logged in as observer");
257  return kGo4ComModeObserver;
258  }
259  else if(account==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()
260  && passwd==TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle())
261  {
262  // avoid multiple controllers at this server:
264  {
265  TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
266  return kGo4ComModeObserver;
267  }
268  else
269  {
270  TGo4Log::Debug(" TaskManager: Client logged in as controller");
271  return kGo4ComModeController;
272  }
273  }
274  else if(account==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()
275  && passwd==TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle())
276  {
277  // avoid multiple controllers at this server:
279  {
280  TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
281  return kGo4ComModeObserver;
282  }
283  else
284  {
285  TGo4Log::Debug(" TaskManager: Client logged in as administrator");
287  }
288  }
289 
290  else
291  {
292  TGo4Log::Debug(" TaskManager: Client Login failed!!!");
293  return kGo4ComModeRefused;
294  }
295 }
296 return kGo4ComModeRefused;
297 }
298 
299 Int_t TGo4TaskManager::ConnectClient(const char* client, const char* host, Go4CommandMode_t role)
300 {
301  Int_t rev=0;
302  // check first if client of that name already exists:
303  TString cliname=client;
304  if (!AddClient(cliname.Data(),host,role)) rev=1;
305  return rev;
306 }
307 
308 
309 Int_t TGo4TaskManager::DisConnectClient(const char* name, Bool_t clientwait)
310 {
311  TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
312  //TGo4LockGuard listguard(fxListMutex);
313  // this mutex
314  // might deadlock between connector thread and local command thread
315  // in case of timeout: command thread inits disconnect by client request
316  // but if this fails, connector thread itself wants to finish disconnection hard
317  Int_t rev=0;
318  TGo4TaskHandler* han=GetTaskHandler(name);
319  rev=DisConnectClient(han,clientwait);
320  return rev;
321 }
322 
323 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
324 {
325 Int_t rev=0;
326 if(taskhandler!=0)
327  {
328  fbClientIsRemoved=kFALSE; // reset the flag for waiting commander thread
329  TString tname=taskhandler->GetName();
330  Bool_t iscontrollertask=(taskhandler->GetRole()>kGo4ComModeObserver);
331  fxServer->SendStopBuffers(tname); // suspend remote threads from socket receive
332  if(clientwait)
333  {
334  // wait for OK string sent by client over connector negotiation port
335  char* revchar = fxTransport->RecvRaw("dummy"); // wait for client close ok
336  if(!(revchar && !strcmp(revchar,TGo4TaskHandler::Get_fgcOK())))
337  {
338  TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!",GetName());
339  rev+=1;
340  //throw TGo4RuntimeException();
341  }
342  } // if(clientwait)
343  if(!taskhandler->DisConnect(clientwait))rev+=1;
344  if (!RemoveTaskHandler(tname.Data())) rev+=2;
345  if (rev==0)
346  {
347  // all right, we reset flags
348  fuTaskCount--; // set number of still connected client tasks
349  if(iscontrollertask) fbHasControllerConnection=kFALSE;
350  fbClientIsRemoved=kTRUE; // this flag tells the main thread we are done
351  TGo4Log::Debug(" TaskManager: client %s has been disconnected. ", tname.Data());
352  }
353  else
354  {
355  // something went wrong, warn the user
356  TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(),rev);
357  }
358 
359  }
360 
361 else
362  {
363  // no such client
364  TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
365  rev=-1;
366  }
367 return rev;
368 }
369 
370 Bool_t TGo4TaskManager::AddClient(const char* client, const char* host, Go4CommandMode_t role)
371 {
372  TGo4TaskHandler* han=NewTaskHandler(client);
373  if (han==0)
374  {
375  TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already exisiting !!! ",client);
376  fxTransport->Send(TGo4TaskHandler::Get_fgcERROR()); // tell client we refuse connection
377  return kFALSE;
378  }
379 
380 if(han->Connect(host,fxTransport))
381  {
382  // successful connection:
383  TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
384  client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
385  fuTaskCount++;
386  han->SetRole(role);
388  fxServer->SetCurrentTask(client); // this will set the direct link to the new client handler
389  fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
390  client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
391  return kTRUE;
392  }
393 else
394  {
395  TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s)", client, host);
396  RemoveTaskHandler(client);
397  return kFALSE;
398  }
399 return kFALSE;
400 }
401 
402 
404 {
405  Bool_t rev=kFALSE;
406  {
407  TGo4LockGuard listguard(fxListMutex);
408  if(fxTaskList->FindObject(han)==0)
409  // is taskhandler already in list?
410  {
411  //no, add the new taskhandler
412  fxTaskList->AddLast(han);
413  rev=kTRUE;
414  }
415  else
416  {
417  // yes, do nothing
418  rev=kFALSE;
419  }
420  } // TGo4LockGuard
421  return rev;
422 }
423 
424 
426 {
427  TGo4TaskHandler* han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
428  if(AddTaskHandler(han))
429  {
430  // success, taskhandler was not already existing
431  return han;
432  }
433  else
434  {
435  // error, taskhandler of this name was already there
436  delete han;
437  return 0;
438 
439  }
440  return 0;
441 }
442 
443 Bool_t TGo4TaskManager::RemoveTaskHandler(const char* name)
444 {
445  Bool_t rev=kTRUE;
446  TGo4TaskHandler* taskhandler;
447  {
448  TGo4LockGuard listguard(fxListMutex);
449  TObject* obj=fxTaskList->FindObject(name);
450  taskhandler= (TGo4TaskHandler*) fxTaskList->Remove(obj);
451  // Remove will do nothing if obj==0; on success, it returns pointer to
452  // removed object
453  } //TGo4LockGuard
454  if(taskhandler!=0)
455  {
456  // test if we have removed the currently active taskhandler
457  TGo4TaskHandler* currenttaskhandler=fxServer->GetCurrentTaskHandler();
458  if(taskhandler==currenttaskhandler)
459  {
460  // yes, then set current task to the next in list
461  fxServer->SetCurrentTask(0); // will also start the work threads again
462  }
463  else // if (taskhandler==currenttaskhandler)
464  {
465  // no, the current task remains
466  fxServer->StartWorkThreads(); // but need to start the work threads
467  }
468  delete taskhandler;
469  } // if (taskhandler!=0)
470  else
471  {
472  // no such handler, do nothing
473  rev=kFALSE;
474  }
475  return rev;
476 }
477 
478 
479 
481 {
482  TGo4TaskHandler* th=0;
483  {
484  TGo4LockGuard listguard(fxListMutex);
485  th= (TGo4TaskHandler*) fxTaskList->FindObject(name);
486  } //TGo4LockGuard
487  return th;
488 }
489 
491 {
492  TGo4TaskHandler* th=0;
493  {
494  TGo4LockGuard listguard(fxListMutex);
495  th= (TGo4TaskHandler*) fxTaskList->Last();
496  } //TGo4LockGuard
497  return th;
498 }
499 
501 {
502  TGo4LockGuard listguard(fxListMutex);
503  if(reset) fxTaskIter->Reset();
504  TGo4TaskHandler* th=dynamic_cast<TGo4TaskHandler*>(fxTaskIter->Next());
505  return th;
506 }
507 
508 
510 
511 {
512  Int_t count=0;
513  while(!fbClientIsRemoved)
514  {
516  {
517  return -1;
518  }
519  else if(fxServer->IsTerminating())
520  {
521  return -2;
522  }
523  else
524  {
526  ++count;
527  }
528  }
529  fbClientIsRemoved=kFALSE; // reset for next time
530  return count;
531 
532 }
533 
535 {
536  if(fxTransport)
537  {
539  }
540 // std::cout << "...........Taskmanager found negotiation port "<< fuNegotiationPort << std::endl;
541  return fuNegotiationPort;
542 }
Bool_t DisConnect(Bool_t waitforclient=kTRUE)
virtual Int_t StartWorkThreads()
Definition: TGo4Task.cxx:589
static TNamed fgxADMINISTRATORACCOUNT
Bool_t AddTaskHandler(TGo4TaskHandler *han)
virtual Int_t Close(Option_t *opt="")
Definition: TGo4Socket.cxx:226
static UInt_t Get_fguPORTWAITTIME()
Int_t GetStatPort() const
virtual ~TGo4TaskManager()
TGo4TaskHandler * NextTaskHandler(Bool_t reset=kFALSE)
virtual Int_t Send(TObject *obj)
Definition: TGo4Socket.cxx:352
Int_t WaitForConnection()
Int_t ConnectClient(const char *client, const char *host, Go4CommandMode_t role=kGo4ComModeController)
Go4CommandMode_t
Definition: TGo4Command.h:28
UInt_t GetNegotiationPort()
void SetDisConnect(TGo4Socket *trans)
static const char * Get_fgcERROR()
static void Warn(const char *text,...)
Definition: TGo4Log.cxx:296
Bool_t fbHasControllerConnection
static void Sleep(UInt_t millisecs)
Definition: TGo4Thread.cxx:336
Int_t GetDatPort() const
static const char * Get_fgcOK()
void SendStopBuffers(const char *taskname=0)
Definition: TGo4Task.cxx:601
virtual Int_t Open(const char *host, Int_t port, Bool_t keepservsock=kFALSE)
Definition: TGo4Socket.cxx:106
void SetConnect(TGo4Socket *trans, const char *host, UInt_t port, Bool_t keepserv=kFALSE)
TGo4ServerTask * fxServer
Go4CommandMode_t ClientLogin()
Int_t GetComPort() const
static const UInt_t fguDISCONTIME
Bool_t RemoveTaskHandler(const char *name)
static const char * fgcCONNECT
TIterator * fxTaskIter
Int_t DisConnectClient(const char *name, Bool_t clientwait=kTRUE)
TGo4TaskHandler * GetTaskHandler(const char *name)
static const char * fgcMASTER
void SendStatusMessage(Int_t level, Bool_t printout, const char *text,...)
Definition: TGo4Task.cxx:303
Bool_t AddClient(const char *client, const char *host, Go4CommandMode_t role)
static const Int_t fgiDISCONCYCLES
static const UInt_t fguCONNECTORPORT
static TNamed fgxCONTROLLERACCOUNT
Go4CommandMode_t GetRole()
static const char * fgcSLAVE
void SetRole(Go4CommandMode_t role)
TGo4Socket * fxTransport
static TNamed fgxOBSERVERACCOUNT
TObjArray * fxTaskList
TGo4TaskHandler * GetLastTaskHandler()
Bool_t IsTerminating() const
static Int_t Get_fgiPORTWAITCYCLES()
Int_t GetPort() const
Definition: TGo4Socket.h:40
TGo4TaskHandler * GetCurrentTaskHandler()
Int_t WaitForClientRemoved()
static const char * fgcDISCONNECT
virtual char * RecvRaw(const char *name=0)
Definition: TGo4Socket.cxx:421
void SetCurrentTask(const char *name)
static const char * GetModeDescription(Go4CommandMode_t mode)
static void Error(const char *text,...)
Definition: TGo4Log.cxx:309
TGo4TaskHandler * NewTaskHandler(const char *name)
static void Info(const char *text,...)
Definition: TGo4Log.cxx:283
Bool_t Connect(const char *host="localhost", TGo4Socket *negotiator=0)
static void Debug(const char *text,...)
Definition: TGo4Log.cxx:270
Bool_t IsMaster()
Definition: TGo4Task.h:94