GSI Object Oriented Online Offline (Go4) GO4-6.4.0
Loading...
Searching...
No Matches
TGo4TaskManager.cxx
Go to the documentation of this file.
1// $Id$
2//-----------------------------------------------------------------------
3// The GSI Online Offline Object Oriented (Go4) Project
4// Experiment Data Processing at EE department, GSI
5//-----------------------------------------------------------------------
6// Copyright (C) 2000- GSI Helmholtzzentrum fuer Schwerionenforschung GmbH
7// Planckstr. 1, 64291 Darmstadt, Germany
8// Contact: http://go4.gsi.de
9//-----------------------------------------------------------------------
10// This software can be used under the license agreements as stated
11// in Go4License.txt file which is part of the distribution.
12//-----------------------------------------------------------------------
13
14#include "TGo4TaskManager.h"
15
16#include <iostream>
17
18#include "TObjArray.h"
19#include "TMutex.h"
20
21#include "TGo4Log.h"
22#include "TGo4LockGuard.h"
23#include "TGo4Thread.h"
24#include "TGo4Socket.h"
25#include "TGo4TaskHandler.h"
27#include "TGo4ServerTask.h"
28
29const Int_t TGo4TaskManager::fgiDISCONCYCLES=360; // wait cycles 180
30
31const UInt_t TGo4TaskManager::fguDISCONTIME=500; // time in ms 1000
32
34 TGo4ServerTask *server,
35 UInt_t negotiationport,
36 Bool_t createconnector)
37: TNamed(name,"This is a Go4TaskManager"),
39{
40 fxServer = server;
41 // set port number for the client server negotiation channel:
42 if(negotiationport == 0)
43 {
44 // default: use taskhandler intrinsic port number
46 }
47 else
48 {
49 // use dynamic port number given by main program
50 fuNegotiationPort = negotiationport;
51 }
52
53 fxListMutex = new TMutex(kTRUE);
54 fxTaskList = new TObjArray;
55 fxTaskIter = fxTaskList->MakeIterator();
56 fxTransport = nullptr;
57 if(createconnector) {
58 // this mode is for server task created on the fly
59 // connector should be available immediately, independent of timer connect!
60 TGo4Log::Debug("TaskManager: Created negotiation channel in ctor");
61 fxTransport = new TGo4Socket(kFALSE); // use raw transport for negotiations
62 fxTransport->Open( "Server mode does not need hostname", negotiationport, kTRUE);
63 // note: Open() return value is not 0 here, since we do not have
64 // accept finished yet! but portnumber is ready after this...
65 }
66}
67
69{
70 if(fxTransport) {
71 fxTransport->Close();
72 delete fxTransport;
73 fxTransport = nullptr;
74 }
75 delete fxTaskIter;
76 delete fxTaskList;
77 delete fxListMutex;
78}
79
81{
82 Int_t rev = 0;
83 char *recvchar = nullptr;
84 TString cliname, hostname;
85 // open connection in server mode with default port as raw Socket, wait for client
86 if (!fxTransport) {
87 fxTransport = new TGo4Socket(kFALSE); // use raw transport for negotiations
88 }
89 // we delegate the actual TSocket open to the taskconnector timer:
90 fxServer->SetConnect(fxTransport, "Server mode does not need hostname", 0, kTRUE);
91 // for portscan, we keep existing server socket (keepserv=kTRUE)
92 Int_t waitresult = fxServer->WaitForOpen(); // wait for the server Open() call by timer
93 if(waitresult < 0)
94 {
95 // open timeout
96 TGo4Log::Debug(" TaskManager: Negotiation channel open TIMEOUT");
97 std::cerr <<" TaskManager TIMEOUT ERROR opening socket connection !!! Terminating..." << std::endl;
99 //return kFALSE;
100 }
101 Int_t count = 0;
102 while(GetNegotiationPort() == 0)
103 {
105 {
106 TGo4Log::Debug(" TaskManager: Negotiation port getter TIMEOUT");
107 std::cerr <<" TaskManager TIMEOUT ERROR retrieving port number !!! Terminating..." << std::endl;
109 //return kFALSE;
110 }
111 else if(fxServer->IsTerminating())
112 {
113 return -1;
114 }
115 else
116 {
118 ++count;
119 }
120 }
121 std::cout << " Waiting for client connection on PORT: "<< fuNegotiationPort << std::endl;
122 TGo4Log::Debug(" TaskManager is waiting to serve client request on port %d ... ",
124 Int_t connectwaitseconds = fxServer->WaitForConnection(); // timer tells us by flag when the transport is opened
125
126 if(connectwaitseconds < 0)
127 {
128 // case of threadmanager termination:
129 // connector runnable shall stop on return from ServeClient method
130 return connectwaitseconds;
131 }
132 else
133 {
134 // just proceed to the client server negotiations
135 }
136
137 // check connected client: we expect correct ok string
138// recvchar=fxTransport->RecvRaw("dummy");
139// if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
140 Go4CommandMode_t account = ClientLogin();
141 if(account != kGo4ComModeRefused)
142 {
143 // client knows task handler, we keep talking
144 //
145 fxTransport->Send(TGo4TaskHandler::Get_fgcOK()); // handshake to assure the client
146 recvchar = fxTransport->RecvRaw("dummy");
147 cliname = recvchar; // get the client name
148 recvchar = fxTransport->RecvRaw("dummy");
149 hostname = recvchar; // get the host name
150 //
151 // check for connect or disconnect:
152 //
153 recvchar=fxTransport->RecvRaw("dummy"); // get string to tell us what to do...
154 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcCONNECT))
155 {
156 // request to connect a new client
157 rev = ConnectClient(cliname,hostname,account);
158 }
159 else if(recvchar && !strcmp(recvchar,TGo4TaskHandler::fgcDISCONNECT))
160 {
161 // request to disconnect an existing client
162 rev = DisConnectClient(cliname);
163 }
164 else
165 {
166 // unknown request
167 rev = 0;
168 }
169 }
170 else
171 {
172 // no valid client
173 //
175 TGo4Log::Debug(" TaskManager: client %s received invalid login, closing negotiation port ", cliname.Data());
176 fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
177 //TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation port ... ", cliname);
178 fxServer->WaitForClose(); // poll until timer has returned from close
179// delete fxTransport;
180// fxTransport = nullptr;
181// TGo4Log::Debug(" TaskManager: Closed and deleted negotiation port");
182 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
183
184 return 0;
185 }
186
187 // finally, we close the channel again...
188 recvchar=fxTransport->RecvRaw("dummy"); // get exit message
189 if(recvchar && !strcmp(recvchar,TGo4TaskHandler::Get_fgcOK()))
190 {
191 fxServer->SetDisConnect(fxTransport); // timer shall do the Close() of negotiation
192 TGo4Log::Debug(" TaskManager: Waiting for timer Close() of negotiation to client %s ... ", cliname.Data());
193 fxServer->WaitForClose(); // poll until timer has returned from close
194 TGo4Log::Debug(" TaskManager: Finished negotiations with client %s ", cliname.Data());
195 }
196 else // if (!strcmp(revchar,TGo4TaskHandler::Get_fgcOK()))
197 {
198 TGo4Log::Debug(" TaskManager: ERROR on closing down negotiation channel, client %s ", cliname.Data());
199 throw TGo4RuntimeException();
200 }
201 return rev;
202 }
203
205{
206 if (!fxTransport)
207 return kGo4ComModeRefused;
208 TString purpose;
209 TString account;
210 TString passwd;
211 char *recvchar = fxTransport->RecvRaw("dummy"); // first receive OK string
212 if (recvchar && !strcmp(recvchar, TGo4TaskHandler::Get_fgcOK())) {
213 // return kGo4ComModeController; // old protocol: no password
215 TGo4Log::Debug(" TaskManager::ClientLogin getting login...");
216 recvchar = fxTransport->RecvRaw("dummy"); // get purpose of client (master or slave)
217 if (!recvchar)
218 return kGo4ComModeRefused;
219 purpose = recvchar;
220 recvchar = fxTransport->RecvRaw("dummy"); // login account
221 if (!recvchar)
222 return kGo4ComModeRefused;
223 account = recvchar;
224 recvchar = fxTransport->RecvRaw("dummy"); // login password
225 if (!recvchar)
226 return kGo4ComModeRefused;
227 passwd = recvchar;
228 // std::cout <<"ClientLogin got passwd "<<passwd.Data() << std::endl;
229 // std::cout <<"observer account is "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName()<<",
230 // "<<TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle() << std::endl; std::cout <<"controller account is
231 // "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName()<<", "<<TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle() <<
232 // std::endl; std::cout <<"admin account is "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName()<<",
233 // "<<TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle() << std::endl;
234
235 // first check if client matches our own purpose:
236 Bool_t matching = kFALSE;
237 if (fxServer->IsMaster()) {
238 if (purpose == TGo4TaskHandler::fgcSLAVE)
239 matching = kTRUE;
240 } else {
241 if (purpose == TGo4TaskHandler::fgcMASTER)
242 matching = kTRUE;
243 }
244 if (!matching) {
245 TGo4Log::Debug(" TaskManager: Client does not match Server, Login failed!!!");
246 return kGo4ComModeRefused;
247 }
248
249 // check password and account:
250 if (account == TGo4TaskHandler::fgxOBSERVERACCOUNT.GetName() &&
251 passwd == TGo4TaskHandler::fgxOBSERVERACCOUNT.GetTitle()) {
252 TGo4Log::Debug(" TaskManager: Client logged in as observer");
253 return kGo4ComModeObserver;
254 } else if (account == TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetName() &&
255 passwd == TGo4TaskHandler::fgxCONTROLLERACCOUNT.GetTitle()) {
256 // avoid multiple controllers at this server:
258 TGo4Log::Debug(" TaskManager: Client logged in as 2nd controller, will be observer");
259 return kGo4ComModeObserver;
260 } else {
261 TGo4Log::Debug(" TaskManager: Client logged in as controller");
263 }
264 } else if (account == TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetName() &&
265 passwd == TGo4TaskHandler::fgxADMINISTRATORACCOUNT.GetTitle()) {
266 // avoid multiple controllers at this server:
268 TGo4Log::Warn(" TaskManager: Client logged in as 2nd controller, will be observer");
269 return kGo4ComModeObserver;
270 } else {
271 TGo4Log::Debug(" TaskManager: Client logged in as administrator");
273 }
274 }
275
276 else {
277 TGo4Log::Debug(" TaskManager: Client Login failed!!!");
278 return kGo4ComModeRefused;
279 }
280 }
281 return kGo4ComModeRefused;
282}
283
284Int_t TGo4TaskManager::ConnectClient(const char *client, const char *host, Go4CommandMode_t role)
285{
286 Int_t rev = 0;
287 // check first if client of that name already exists:
288 TString cliname = client;
289 if (!AddClient(cliname.Data(),host,role)) rev = 1;
290 return rev;
291}
292
293
294Int_t TGo4TaskManager::DisConnectClient(const char *name, Bool_t clientwait)
295{
296 TGo4Log::Info("TaskManager is disconnecting client %s %s ...", name, clientwait ? "with waiting" : "with no wait!" );
297 //TGo4LockGuard listguard(fxListMutex);
298 // this mutex
299 // might deadlock between connector thread and local command thread
300 // in case of timeout: command thread inits disconnect by client request
301 // but if this fails, connector thread itself wants to finish disconnection hard
302 Int_t rev = 0;
303 TGo4TaskHandler *han = GetTaskHandler(name);
304 rev = DisConnectClient(han,clientwait);
305 return rev;
306}
307
308Int_t TGo4TaskManager::DisConnectClient(TGo4TaskHandler * taskhandler, Bool_t clientwait)
309{
310 Int_t rev = 0;
311 if (taskhandler) {
312 fbClientIsRemoved = kFALSE; // reset the flag for waiting commander thread
313 TString tname = taskhandler->GetName();
314 Bool_t iscontrollertask = (taskhandler->GetRole() > kGo4ComModeObserver);
315 fxServer->SendStopBuffers(tname); // suspend remote threads from socket receive
316 if (clientwait) {
317 // wait for OK string sent by client over connector negotiation port
318 char *revchar = fxTransport->RecvRaw("dummy"); // wait for client close ok
319 if (!(revchar && !strcmp(revchar, TGo4TaskHandler::Get_fgcOK()))) {
320 TGo4Log::Debug(" TaskManager %s; negotiation ERROR after client disconnect!", GetName());
321 rev += 1;
322 // throw TGo4RuntimeException();
323 }
324 } // if(clientwait)
325 if (!taskhandler->DisConnect(clientwait))
326 rev += 1;
327 if (!RemoveTaskHandler(tname.Data()))
328 rev += 2;
329 if (rev == 0) {
330 // all right, we reset flags
331 fuTaskCount--; // set number of still connected client tasks
332 if (iscontrollertask)
334 fbClientIsRemoved = kTRUE; // this flag tells the main thread we are done
335 TGo4Log::Debug(" TaskManager: client %s has been disconnected. ", tname.Data());
336 } else {
337 // something went wrong, warn the user
338 TGo4Log::Debug(" TaskManager: client %s disconnect ERROR %d occured !! ", tname.Data(), rev);
339 }
340 } else {
341 // no such client
342 TGo4Log::Debug(" TaskManager: FAILED to disonnect client -- no such client! ");
343 rev = -1;
344 }
345 return rev;
346}
347
348Bool_t TGo4TaskManager::AddClient(const char *client, const char *host, Go4CommandMode_t role)
349{
350 TGo4TaskHandler *han = NewTaskHandler(client);
351 if (!han) {
352 TGo4Log::Warn(" !!! TaskManager::AddClient ERROR: client of name %s is already existing !!! ",client);
353 fxTransport->Send(TGo4TaskHandler::Get_fgcERROR()); // tell client we refuse connection
354 return kFALSE;
355 }
356
357 if(han->Connect(host,fxTransport)) {
358 // successful connection:
359 TGo4Log::Info(" TaskManager: Succesfully added new client %s (host %s, ports %d,%d,%d) ",
360 client, host, han->GetComPort(), han->GetStatPort(), han->GetDatPort());
361 fuTaskCount++;
362 han->SetRole(role);
364 fxServer->SetCurrentTask(client); // this will set the direct link to the new client handler
365 fxServer->SendStatusMessage(1,kTRUE,"%s::Client %s is logged in at %s as %s",
366 client,client,fxServer->GetName(),TGo4Command::GetModeDescription(han->GetRole()) );
367 return kTRUE;
368 }
369
370 TGo4Log::Error(" TaskManager: ERROR on connecting new client %s (host %s)", client, host);
371 RemoveTaskHandler(client);
372 return kFALSE;
373}
374
375
377{
378 Bool_t rev = kFALSE;
379 {
380 TGo4LockGuard listguard(fxListMutex);
381 // is taskhandler already in list?
382 if (!fxTaskList->FindObject(han)) {
383 // no, add the new taskhandler
384 fxTaskList->AddLast(han);
385 rev = kTRUE;
386 } else {
387 // yes, do nothing
388 rev = kFALSE;
389 }
390 } // TGo4LockGuard
391 return rev;
392}
393
394
396{
397 TGo4TaskHandler *han=new TGo4TaskHandler(name,fxServer,kFALSE, fxServer->IsMaster());
398 // success, taskhandler was not already existing
399 if(AddTaskHandler(han)) return han;
400
401 // error, taskhandler of this name was already there
402 delete han;
403 return nullptr;
404}
405
407{
408 Bool_t rev=kTRUE;
409 TGo4TaskHandler *taskhandler = nullptr;
410 {
411 TGo4LockGuard listguard(fxListMutex);
412 TObject *obj = fxTaskList->FindObject(name);
413 taskhandler = (TGo4TaskHandler *) fxTaskList->Remove(obj);
414 // Remove will do nothing if obj == 0; on success, it returns pointer to
415 // removed object
416 } //TGo4LockGuard
417 if (taskhandler) {
418 // test if we have removed the currently active taskhandler
419 TGo4TaskHandler *currenttaskhandler = fxServer->GetCurrentTaskHandler();
420 if (taskhandler == currenttaskhandler) {
421 // yes, then set current task to the next in list
422 fxServer->SetCurrentTask(nullptr); // will also start the work threads again
423 } else {
424 // no, the current task remains
425 fxServer->StartWorkThreads(); // but need to start the work threads
426 }
427 delete taskhandler;
428 } else {
429 // no such handler, do nothing
430 rev = kFALSE;
431 }
432 return rev;
433}
434
436{
437 TGo4TaskHandler *th = nullptr;
438 {
439 TGo4LockGuard listguard(fxListMutex);
440 th = (TGo4TaskHandler *) fxTaskList->FindObject(name);
441 } //TGo4LockGuard
442 return th;
443}
444
446{
447 TGo4TaskHandler *th = nullptr;
448 {
449 TGo4LockGuard listguard(fxListMutex);
450 th = (TGo4TaskHandler *) fxTaskList->Last();
451 } //TGo4LockGuard
452 return th;
453}
454
456{
457 TGo4LockGuard listguard(fxListMutex);
458 if(reset) fxTaskIter->Reset();
459 return dynamic_cast<TGo4TaskHandler *>(fxTaskIter->Next());
460}
461
463{
464 Int_t count = 0;
465 while (!fbClientIsRemoved) {
467 return -1;
468 } else if (fxServer->IsTerminating()) {
469 return -2;
470 } else {
472 ++count;
473 }
474 }
475 fbClientIsRemoved = kFALSE; // reset for next time
476 return count;
477
478}
479
#define TGo4LockGuard
Go4CommandMode_t
These values define command execution rights.
Definition TGo4Command.h:27
@ kGo4ComModeController
Definition TGo4Command.h:30
@ kGo4ComModeRefused
Definition TGo4Command.h:28
@ kGo4ComModeObserver
Definition TGo4Command.h:29
@ kGo4ComModeAdministrator
Definition TGo4Command.h:31
static const char * GetModeDescription(Go4CommandMode_t mode)
text description of current execution mode
static void Warn(const char *text,...) GO4_PRINTF_ARGS
User shortcut for message with prio 2.
Definition TGo4Log.cxx:307
static void Info(const char *text,...) GO4_PRINTF_ARGS
User shortcut for message with prio 1.
Definition TGo4Log.cxx:294
static void Debug(const char *text,...) GO4_PRINTF_ARGS
User shortcut for message with prio 0.
Definition TGo4Log.cxx:281
static void Error(const char *text,...) GO4_PRINTF_ARGS
User shortcut for message with prio 3.
Definition TGo4Log.cxx:320
Server task.
This class is responsible for the interconnection of two tasks: provided are three communication chan...
Int_t GetDatPort() const
static TNamed fgxADMINISTRATORACCOUNT
This keeps account for admin connection.
static UInt_t Get_fguPORTWAITTIME()
static const char * fgcCONNECT
Initial string for connect request (raw transport)
static const UInt_t fguCONNECTORPORT
Default port number of negotiation connection (raw transport)
static const char * Get_fgcERROR()
Int_t GetStatPort() const
Bool_t DisConnect(Bool_t waitforclient=kTRUE)
Closes the connections of all three transport channels.
static TNamed fgxOBSERVERACCOUNT
This keeps account for observer connection.
static const char * fgcMASTER
Task identifier for client connect negotiations (raw transport)
static const char * fgcDISCONNECT
Initial string for disconnect request (raw transport)
void SetRole(Go4CommandMode_t role)
static const char * Get_fgcOK()
Go4CommandMode_t GetRole()
Bool_t Connect(const char *host="localhost", TGo4Socket *negotiator=nullptr)
establishes the connections of all three transport channels and starts the service threads
static const char * fgcSLAVE
Task identifier for client connect negotiations (raw transport)
static TNamed fgxCONTROLLERACCOUNT
This keeps account for controller connection.
Int_t GetComPort() const
static Int_t Get_fgiPORTWAITCYCLES()
TGo4TaskHandler * NextTaskHandler(Bool_t reset=kFALSE)
For iteration over all connected task handlers.
TGo4TaskHandler * NewTaskHandler(const char *name)
creates new task handler with given name and adds it into array
TIterator * fxTaskIter
Iterator over list of tasks.
Go4CommandMode_t ClientLogin()
Check account and password of the client that requests a connection to this server.
TGo4TaskHandler * GetLastTaskHandler()
returns last task handler in list
UInt_t fuNegotiationPort
port number for the server client negotiation connections
Bool_t AddClient(const char *client, const char *host, Go4CommandMode_t role)
adds client task of name to manager: create server task handler and try to connect to client Command ...
Bool_t RemoveTaskHandler(const char *name)
removes task handler from array by name
UInt_t GetNegotiationPort()
returns the portnumber for client server negotiation port which is actually used by the running taskm...
TGo4Socket * fxTransport
Bool_t AddTaskHandler(TGo4TaskHandler *han)
adds external task handler to array
Int_t ServeClient()
used by connector runnable to wait for a client connect/disonnect request
TGo4TaskHandler * GetTaskHandler(const char *name)
returns certain task handler by name
TGo4ServerTask * fxServer
Bool_t fbHasControllerConnection
True if this server already has one connection to a master client that has the controller role.
static const Int_t fgiDISCONCYCLES
cycles to wait for client disconnection
TObjArray * fxTaskList
static const UInt_t fguDISCONTIME
time for each disonnection wait cycle
Bool_t fbClientIsRemoved
True if the last specified client is removed from server.
virtual ~TGo4TaskManager()
Int_t ConnectClient(const char *client, const char *host, Go4CommandMode_t role=kGo4ComModeController)
used by connector runnable to wait for a client request to connect to this server task.
Int_t DisConnectClient(const char *name, Bool_t clientwait=kTRUE)
disonnect an existing client by name, wait for negotiation OK if clientwait is true
Exception which terminates the threadmanager and the application.
static void Sleep(UInt_t millisecs)
wrapper for gSystem->Sleep with consecutive TThread::CancelPoint - necessary for proper pthread termi...