GSI Object Oriented Online Offline (Go4)  GO4-6.3.0
TGo4TaskManager.cxx
Go to the documentation of this file.
1 // $Id$
2 //-----------------------------------------------------------------------
3 // The GSI Online Offline Object Oriented (Go4) Project
4 // Experiment Data Processing at EE department, GSI
5 //-----------------------------------------------------------------------
6 // Copyright (C) 2000- GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
7 // Planckstr. 1, 64291 Darmstadt, Germany
8 // Contact: http://go4.gsi.de
9 //-----------------------------------------------------------------------
10 // This software can be used under the license agreements as stated
11 // in Go4License.txt file which is part of the distribution.
12 //-----------------------------------------------------------------------
13 
14 #include "TGo4TaskManager.h"
15 
16 #include <iostream>
17 
18 #include "TObjArray.h"
19 #include "TMutex.h"
20 
21 #include "TGo4Log.h"
22 #include "TGo4LockGuard.h"
23 #include "TGo4Thread.h"
24 #include "TGo4Socket.h"
25 #include "TGo4TaskHandler.h"
26 #include "TGo4TerminateException.h"
27 #include "TGo4ServerTask.h"
28 
29 const Int_t TGo4TaskManager::fgiDISCONCYCLES=360; // wait cycles 180
30 
31 const UInt_t TGo4TaskManager::fguDISCONTIME=500; // time in ms 1000
32 
34  TGo4ServerTask *server,
35  UInt_t negotiationport,
36  Bool_t createconnector)
37 : TNamed(name,"This is a Go4TaskManager"),
38  fuTaskCount(0),fuNegotiationPort(0), fbClientIsRemoved(kFALSE), fbHasControllerConnection(kFALSE)
39 {
40  fxServer = server;
41  // set port number for the client server negotiation channel:
42  if(negotiationport == 0)
43  {
44  // default: use taskhandler intrinsic port number
46  }
47  else
48  {
49  // use dynamic port number given by main program
50  fuNegotiationPort = negotiationport;
51  }
52 
53  fxListMutex = new TMutex(kTRUE);
54  fxTaskList = new TObjArray;
55  fxTaskIter = fxTaskList->MakeIterator();
56  fxTransport = nullptr;
57  if(createconnector) {
58  // this mode is for server task created on the fly
59  // connector should be available immediately, independent of timer connect!
60  TGo4Log::Debug("TaskManager: Created negotiation channel in ctor");
61  fxTransport = new TGo4Socket(kFALSE); // use raw transport for negotiations
62  fxTransport->Open( "Server mode does not need hostname", negotiationport, kTRUE);
63  // note: Open() return value is not 0 here, since we do not have
64  // accept finished yet! but portnumber is ready after this...
65  }
66 }
67 
69 {
70  if(fxTransport) {
71  fxTransport->Close();
72  delete fxTransport;
73  fxTransport = nullptr;
74  }
75  delete fxTaskIter;
76  delete fxTaskList;
77  delete fxListMutex;
78 }
79 
81 {
82  Int_t rev = 0;
83  char *recvchar = nullptr;
84  TString cliname, hostname;
85  // open connection in server mode with default port as raw Socket, wait for client
86  if (!fxTransport) {
87  fxTransport = new TGo4Socket(kFALSE); // use raw transport for negotiations
88  }
89  // we delegate the actual TSocket open to the taskconnector timer:
90  fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
91  // for portscan, we keep existing server socket (keepserv=kTRUE)
92  Int_t waitresult = fxServer->WaitForOpen(); // wait for the server Open() call by timer
93  if(waitresult < 0)
94  {
95  // open timeout
96  TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
97  std::cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << std::endl;
99  //return kFALSE;
100  }
101  Int_t count = 0;
102  while(GetNegotiationPort() == 0)
103  {
105  {
106  TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
107  std::cerr <<" TaskManager TIMEOUT ERROR retrieving port number !!! Terminating..." << std::endl;
109  //return kFALSE;
110  }
111  else if(fxServer->IsTerminating())
112  {
113  return -1;
114  }
115  else
116  {
118  ++count;
119  }
120  }
121  std::cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << std::endl;
122  TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
124  Int_t connectwaitseconds = fxServer->WaitForConnection(); // timer tells us by flag when the transport is opened
125 
126  if(connectwaitseconds < 0)
127  {
128  // case of threadmanager termination:
129  // connector runnable shall stop on return from ServeClient method
130  return connectwaitseconds;
131  }
132  else
133  {
134  // just proceed to the client server negotiations
135  }
136 
137  // check connected client: we expect correct ok string
138 // recvchar=fxTransport->RecvRaw("dummy");
139 // if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
140  Go4CommandMode_t account = ClientLogin();
141  if(account != kGo4ComModeRefused)
142  {
143  // client knows task handler, we keep talking
144  //
145  fxTransport->Send(TGo4TaskHandler::Get_fgcOK()); // handshake to assure the client
146  recvchar = fxTransport->RecvRaw("dummy");
147  cliname = recvchar; // get the client name
148  recvchar = fxTransport->RecvRaw("dummy");
149  hostname = recvchar; // get the host name
150  //
151  // check for connect or disconnect:
152  //
153  recvchar=fxTransport->RecvRaw("dummy"); // get string to tell us what to do...
154  if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
155  {
156  // request to connect a new client
157  rev = ConnectClient(cliname,hostname,account);
158  }
159  else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
160  {
161  // request to disconnect an existing client
162  rev = DisConnectClient(cliname);
163  }
164  else
165  {
166  // unknown request
167  rev = 0;
168  }
169  }
170  else
171  {
172  // no valid client
173  //
175  TGo4Log::Debug(" TaskManager: client %s received invalid login, closing negotiation port ", cliname.Data());
176  fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
177  //TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation port ... ", cliname);
178  fxServer->WaitForClose(); // poll until timer has returned from close
179 // delete fxTransport;
180 // fxTransport = nullptr;
181 // TGo4Log::Debug(" TaskManager: Closed and deleted negotiation port");
182  TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
183 
184  return 0;
185  }
186 
187  // finally, we close the channel again...
188  recvchar=fxTransport->RecvRaw("dummy"); // get exit message
189  if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
190  {
191  fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
192  TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname.Data());
193  fxServer->WaitForClose(); // poll until timer has returned from close
194  TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
195  }
196  else // if (!strcmp(revchar,TGo4TaskHandler::Get_fgcOK()))
197  {
198  TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s ", cliname.Data());
199  throw TGo4RuntimeException();
200  }
201  return rev;
202  }
203 
205 {
206  if (!fxTransport)
207  return kGo4ComModeRefused;
208  TString purpose;
209  TString account;
210  TString passwd;
211  char *recvchar = fxTransport->RecvRaw("dummy"); // first receive OK string
212  if (recvchar && !strcmp(recvchar, TGo4TaskHandler::Get_fgcOK())) {
213  // return kGo4ComModeController; // old protocol: no password
215  TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
216  recvchar = fxTransport->RecvRaw("dummy"); // get purpose of client (master or slave)
217  if (!recvchar)
218  return kGo4ComModeRefused;
219  purpose = recvchar;
220  recvchar = fxTransport->RecvRaw("dummy"); // login account
221  if (!recvchar)
222  return kGo4ComModeRefused;
223  account = recvchar;
224  recvchar = fxTransport->RecvRaw("dummy"); // login password
225  if (!recvchar)
226  return kGo4ComModeRefused;
227  passwd = recvchar;
228  // std::cout <<"ClientLogin got passwd "<<passwd.Data() << std::endl;
229  // std::cout <<"observer account is "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()<<",
230  // "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle() << std::endl; std::cout <<"controller account is
231  // "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle() <<
232  // std::endl; std::cout <<"admin account is "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()<<",
233  // "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle() << std::endl;
234 
235  // first check if client matches our own purpose:
236  Bool_t matching = kFALSE;
237  if (fxServer->IsMaster()) {
238  if (purpose == TGo4TaskHandler::fgcSLAVE)
239  matching = kTRUE;
240  } else {
241  if (purpose == TGo4TaskHandler::fgcMASTER)
242  matching = kTRUE;
243  }
244  if (!matching) {
245  TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
246  return kGo4ComModeRefused;
247  }
248 
249  // check password and account:
250  if (account == TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName() &&
251  passwd == TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle()) {
252  TGo4Log::Debug(" TaskManager: Client logged in as observer");
253  return kGo4ComModeObserver;
254  } else if (account == TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName() &&
255  passwd == TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle()) {
256  // avoid multiple controllers at this server:
258  TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
259  return kGo4ComModeObserver;
260  } else {
261  TGo4Log::Debug(" TaskManager: Client logged in as controller");
262  return kGo4ComModeController;
263  }
264  } else if (account == TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName() &&
265  passwd == TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle()) {
266  // avoid multiple controllers at this server:
268  TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
269  return kGo4ComModeObserver;
270  } else {
271  TGo4Log::Debug(" TaskManager: Client logged in as administrator");
273  }
274  }
275 
276  else {
277  TGo4Log::Debug(" TaskManager: Client Login failed!!!");
278  return kGo4ComModeRefused;
279  }
280  }
281  return kGo4ComModeRefused;
282 }
283 
284 Int_t TGo4TaskManager::ConnectClient(const char *client, const char *host, Go4CommandMode_t role)
285 {
286  Int_t rev = 0;
287  // check first if client of that name already exists:
288  TString cliname = client;
289  if (!AddClient(cliname.Data(),host,role)) rev = 1;
290  return rev;
291 }
292 
293 
294 Int_t TGo4TaskManager::DisConnectClient(const char *name, Bool_t clientwait)
295 {
296  TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
297  //TGo4LockGuard listguard(fxListMutex);
298  // this mutex
299  // might deadlock between connector thread and local command thread
300  // in case of timeout: command thread inits disconnect by client request
301  // but if this fails, connector thread itself wants to finish disconnection hard
302  Int_t rev = 0;
303  TGo4TaskHandler *han = GetTaskHandler(name);
304  rev = DisConnectClient(han,clientwait);
305  return rev;
306 }
307 
308 Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
309 {
310  Int_t rev = 0;
311  if (taskhandler) {
312  fbClientIsRemoved = kFALSE; // reset the flag for waiting commander thread
313  TString tname = taskhandler->GetName();
314  Bool_t iscontrollertask = (taskhandler->GetRole() > kGo4ComModeObserver);
315  fxServer->SendStopBuffers(tname); // suspend remote threads from socket receive
316  if (clientwait) {
317  // wait for OK string sent by client over connector negotiation port
318  char *revchar = fxTransport->RecvRaw("dummy"); // wait for client close ok
319  if (!(revchar && !strcmp(revchar, TGo4TaskHandler::Get_fgcOK()))) {
320  TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!", GetName());
321  rev += 1;
322  // throw TGo4RuntimeException();
323  }
324  } // if(clientwait)
325  if (!taskhandler->DisConnect(clientwait))
326  rev += 1;
327  if (!RemoveTaskHandler(tname.Data()))
328  rev += 2;
329  if (rev == 0) {
330  // all right, we reset flags
331  fuTaskCount--; // set number of still connected client tasks
332  if (iscontrollertask)
333  fbHasControllerConnection = kFALSE;
334  fbClientIsRemoved = kTRUE; // this flag tells the main thread we are done
335  TGo4Log::Debug(" TaskManager: client %s has been disconnected. ", tname.Data());
336  } else {
337  // something went wrong, warn the user
338  TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(), rev);
339  }
340  } else {
341  // no such client
342  TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
343  rev = -1;
344  }
345  return rev;
346 }
347 
348 Bool_t TGo4TaskManager::AddClient(const char *client, const char *host, Go4CommandMode_t role)
349 {
350  TGo4TaskHandler *han = NewTaskHandler(client);
351  if (!han) {
352  TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already existing !!! ",client);
353  fxTransport->Send(TGo4TaskHandler::Get_fgcERROR()); // tell client we refuse connection
354  return kFALSE;
355  }
356 
357  if(han->Connect(host,fxTransport)) {
358  // successful connection:
359  TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
360  client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
361  fuTaskCount++;
362  han->SetRole(role);
364  fxServer->SetCurrentTask(client); // this will set the direct link to the new client handler
365  fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
366  client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
367  return kTRUE;
368  }
369 
370  TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s)", client, host);
371  RemoveTaskHandler(client);
372  return kFALSE;
373 }
374 
375 
377 {
378  Bool_t rev = kFALSE;
379  {
380  TGo4LockGuard listguard(fxListMutex);
381  // is taskhandler already in list?
382  if (!fxTaskList->FindObject(han)) {
383  // no, add the new taskhandler
384  fxTaskList->AddLast(han);
385  rev = kTRUE;
386  } else {
387  // yes, do nothing
388  rev = kFALSE;
389  }
390  } // TGo4LockGuard
391  return rev;
392 }
393 
394 
396 {
397  TGo4TaskHandler *han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
398  // success, taskhandler was not already existing
399  if(AddTaskHandler(han)) return han;
400 
401  // error, taskhandler of this name was already there
402  delete han;
403  return nullptr;
404 }
405 
406 Bool_t TGo4TaskManager::RemoveTaskHandler(const char *name)
407 {
408  Bool_t rev=kTRUE;
409  TGo4TaskHandler *taskhandler = nullptr;
410  {
411  TGo4LockGuard listguard(fxListMutex);
412  TObject *obj = fxTaskList->FindObject(name);
413  taskhandler = (TGo4TaskHandler *) fxTaskList->Remove(obj);
414  // Remove will do nothing if obj == 0; on success, it returns pointer to
415  // removed object
416  } //TGo4LockGuard
417  if (taskhandler) {
418  // test if we have removed the currently active taskhandler
419  TGo4TaskHandler *currenttaskhandler = fxServer->GetCurrentTaskHandler();
420  if (taskhandler == currenttaskhandler) {
421  // yes, then set current task to the next in list
422  fxServer->SetCurrentTask(nullptr); // will also start the work threads again
423  } else {
424  // no, the current task remains
425  fxServer->StartWorkThreads(); // but need to start the work threads
426  }
427  delete taskhandler;
428  } else {
429  // no such handler, do nothing
430  rev = kFALSE;
431  }
432  return rev;
433 }
434 
436 {
437  TGo4TaskHandler *th = nullptr;
438  {
439  TGo4LockGuard listguard(fxListMutex);
440  th = (TGo4TaskHandler *) fxTaskList->FindObject(name);
441  } //TGo4LockGuard
442  return th;
443 }
444 
446 {
447  TGo4TaskHandler *th = nullptr;
448  {
449  TGo4LockGuard listguard(fxListMutex);
450  th = (TGo4TaskHandler *) fxTaskList->Last();
451  } //TGo4LockGuard
452  return th;
453 }
454 
456 {
457  TGo4LockGuard listguard(fxListMutex);
458  if(reset) fxTaskIter->Reset();
459  return dynamic_cast<TGo4TaskHandler *>(fxTaskIter->Next());
460 }
461 
463 {
464  Int_t count = 0;
465  while (!fbClientIsRemoved) {
466  if (count > TGo4TaskManager::fgiDISCONCYCLES) {
467  return -1;
468  } else if (fxServer->IsTerminating()) {
469  return -2;
470  } else {
472  ++count;
473  }
474  }
475  fbClientIsRemoved = kFALSE; // reset for next time
476  return count;
477 
478 }
479 
481 {
482  if(fxTransport)
484  return fuNegotiationPort;
485 }
Bool_t DisConnect(Bool_t waitforclient=kTRUE)
virtual Int_t StartWorkThreads()
Definition: TGo4Task.cxx:548
static TNamed fgxADMINISTRATORACCOUNT
Bool_t AddTaskHandler(TGo4TaskHandler *han)
virtual Int_t Close(Option_t *opt="")
Definition: TGo4Socket.cxx:210
Bool_t Connect(const char *host="localhost", TGo4Socket *negotiator=nullptr)
static UInt_t Get_fguPORTWAITTIME()
virtual ~TGo4TaskManager()
TGo4TaskHandler * NextTaskHandler(Bool_t reset=kFALSE)
virtual Int_t Send(TObject *obj)
Definition: TGo4Socket.cxx:332
Int_t WaitForConnection()
Int_t ConnectClient(const char *client, const char *host, Go4CommandMode_t role=kGo4ComModeController)
Go4CommandMode_t
Definition: TGo4Command.h:27
UInt_t GetNegotiationPort()
static void Info(const char *text,...) GO4_PRINTF_ARGS
Definition: TGo4Log.cxx:294
void SetDisConnect(TGo4Socket *trans)
void SendStopBuffers(const char *taskname=nullptr)
Definition: TGo4Task.cxx:560
static const char * Get_fgcERROR()
Bool_t fbHasControllerConnection
static void Sleep(UInt_t millisecs)
Definition: TGo4Thread.cxx:295
static const char * Get_fgcOK()
virtual Int_t Open(const char *host, Int_t port, Bool_t keepservsock=kFALSE)
Definition: TGo4Socket.cxx:92
void SetConnect(TGo4Socket *trans, const char *host, UInt_t port, Bool_t keepserv=kFALSE)
TGo4ServerTask * fxServer
Go4CommandMode_t ClientLogin()
static const UInt_t fguDISCONTIME
Int_t GetStatPort() const
Bool_t RemoveTaskHandler(const char *name)
static void Debug(const char *text,...) GO4_PRINTF_ARGS
Definition: TGo4Log.cxx:281
static const char * fgcCONNECT
TIterator * fxTaskIter
Int_t DisConnectClient(const char *name, Bool_t clientwait=kTRUE)
Int_t GetPort() const
Definition: TGo4Socket.h:41
virtual char * RecvRaw(const char *name=nullptr)
Definition: TGo4Socket.cxx:401
TGo4TaskHandler * GetTaskHandler(const char *name)
Int_t GetComPort() const
static const char * fgcMASTER
Int_t GetDatPort() const
void SendStatusMessage(Int_t level, Bool_t printout, const char *text,...)
Definition: TGo4Task.cxx:272
static void Error(const char *text,...) GO4_PRINTF_ARGS
Definition: TGo4Log.cxx:320
Bool_t AddClient(const char *client, const char *host, Go4CommandMode_t role)
static const Int_t fgiDISCONCYCLES
static const UInt_t fguCONNECTORPORT
static TNamed fgxCONTROLLERACCOUNT
Go4CommandMode_t GetRole()
static const char * fgcSLAVE
void SetRole(Go4CommandMode_t role)
Bool_t IsMaster() const
Definition: TGo4Task.h:93
TGo4Socket * fxTransport
static TNamed fgxOBSERVERACCOUNT
TObjArray * fxTaskList
TGo4TaskHandler * GetLastTaskHandler()
Bool_t IsTerminating() const
static Int_t Get_fgiPORTWAITCYCLES()
TGo4TaskHandler * GetCurrentTaskHandler()
Int_t WaitForClientRemoved()
static const char * fgcDISCONNECT
void SetCurrentTask(const char *name)
static const char * GetModeDescription(Go4CommandMode_t mode)
static void Warn(const char *text,...) GO4_PRINTF_ARGS
Definition: TGo4Log.cxx:307
TGo4TaskHandler * NewTaskHandler(const char *name)