DABC (Data Acquisition Backbone Core)  2.9.9
ConnectionManager.cxx
Go to the documentation of this file.
1 // $Id: ConnectionManager.cxx 4476 2020-04-15 14:12:38Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/ConnectionManager.h"
17 
18 #include "dabc/Manager.h"
19 #include "dabc/Device.h"
20 
43  dabc::Command(CmdName())
44 {
45  SetStr(ReqArg(), req.ItemName());
46 }
47 
48 dabc::ConnectionManager::ConnectionManager(const std::string &name, Command cmd) :
49  ModuleAsync(name, cmd),
50  fRecs(),
51  fConnCmd(),
52  fDoingConnection(0),
53  fConnCounter(0)
54 {
55  // we want to see all events which produced by any of connection object
57 
58  DOUT3("Connection manager created parent %p", GetParent());
59 }
60 
62 {
63  // we can comment out unregister method, while it will be done automatically anyway
64  // UnregisterForParameterEvent(ConnectionObject::ObjectName());
65 }
66 
68 {
69  // here one should analyze
70 
71  // when application terminated - do not start with new connections
72  if (dabc::mgr.IsTerminated()) return;
73 
74  std::string value = evnt.ParValue();
75 
78 
79  if (!ispending && !isbroken) return;
80 
82  if (req.null()) {
83  EOUT("Connection handle not found !!!! ");
84  return;
85  }
86 
87  if (fRecs.HasObject(req())) {
88  EOUT("Connection %s already registered as pending - that happened??", evnt.ParName().c_str());
89  return;
90  }
91 
92  req.ResetConnData();
93 
94  DOUT2("We starting connection for %s url %s", evnt.ParName().c_str(), req.GetRemoteUrl().c_str());
95 
96  // FIXME: derive connection id from unique application code
97  if (req.IsServerSide())
98  req.SetConnId(dabc::format("%s_Conn%d", dabc::mgr.GetLocalAddress().c_str(), fConnCounter++));
99 
100  fRecs.Add(req);
101 
102  // TODO: in current implementation connection requests are collected and activated only when
103  // special command is send to connection manager. Later one should react automatically on all connection
104  // changes and restart connection if this is specified by the user
105 
106  if ((fDoingConnection == 0) && fConnCmd.null() && isbroken) {
107  DOUT0("Reactivate connection manager");
108  fDoingConnection = 1;
109  ActivateTimeout(0.);
110  }
111 
112 }
113 
114 
116 {
117  // UnregisterForParameterEvent(ConnectionObject::ObjectName());
118 
119  fDoingConnection = 0;
120 
121  fConnCmd.ReplyFalse();
122 
123  while (fRecs.GetSize()>0) {
124  ConnectionRequest req = fRecs.TakeLast();
126  }
127 
129 }
130 
131 
132 void dabc::ConnectionManager::CheckConnectionRecs(bool finish_command_dueto_timeout)
133 {
134  bool iserror = false, isonlyoptional = true;
135 
136  unsigned n = 0;
137 
138  while (n<fRecs.GetSize()) {
139 
140  ConnectionRequestFull req = fRecs[n];
141 
142  switch (req.progress()) {
143 
144  case progrFailed: {
145  req.ResetConnData();
147 
148  // TODO: later one can use optional flag to ignore connection which takes too much time
149  if (!req.IsOptional()) iserror = true;
150 
151  fRecs.RemoveAt(n);
152  break;
153  }
154 
155  case progrConnected: {
156 
157  req.ResetConnData();
159 
160  fRecs.RemoveAt(n);
161  break;
162  }
163 
164  default:
165  if (!req.IsOptional()) isonlyoptional = false;
166  n++;
167  break;
168  }
169  }
170 
171  if (iserror) {
172  fConnCmd.ReplyFalse();
173  DOUT2("SOME CONNECTIONS FINSIHED WITH FAILURE");
174  // rest of the connections will be continued - application should decide how to work
175  if (fRecs.GetSize()==0) fDoingConnection = 0;
176  } else
177  if (fRecs.GetSize()==0) {
178  fDoingConnection = 0;
179  fConnCmd.ReplyTrue();
180  DOUT0("ALL CONNECTIONS FINSIHED OK");
181  } else
182  if (finish_command_dueto_timeout) {
183  // we must finish command due to timeout, but if only optional requests are remaining
184  // we can indicate that command is done successfully
185  // in any case optional commands will be continued
186 
187  if (isonlyoptional) {
188  DOUT0("ALL NON-OPTIONAL CONNECTIONS FINSIHED OK, OPTIONAL WILL BE CONTINUED");
189  fConnCmd.ReplyTrue();
190  } else {
191  EOUT("CONNECTION COMMAND is TIMEDOUT");
192  fConnCmd.ReplyTimedout();
193  }
194  }
195 }
196 
197 
199 {
200  if (fDoingConnection==0) return -1.;
201 
202  double mindelay = 1.;
203 
204  for (unsigned n=0; n<fRecs.GetSize(); n++) {
205 
206  ConnectionRequestFull req = fRecs[n];
207  if (req.null()) continue;
208 
209  double tm = req()->CheckDelay(last_diff);
210  if (tm>0) {
211  if (tm<mindelay) mindelay = tm;
212  continue;
213  }
214 
215  switch (req.progress()) {
216  case progrInit: {
217 
218  PortRef port = req.GetPort();
219 
221 
222  if (dev.null() || port.null()) {
223  if (dev.null())
224  EOUT("Cannot find device %s for connection record", req.GetConnDevice().c_str());
225  if (port.null())
226  EOUT("Cannot find port %s for connection record", req.GetLocalUrl().c_str());
227  req.SetProgress(progrFailed);
228  } else {
229  // req.SetInlineDataSize(port()->InlineDataSize());
230 
231  req.SetProgress(progrDoingInit);
232 
233  req()->SetDelay(5, true); // let 5 second to prepare record, one
234 
235  DOUT0("REGISTERED CONN %s isserv %s", req.GetConnInfo().c_str(), DBOOL(req.IsServerSide()));
236 
237  //FIXME: specify 5 second for submitted command as well
238  dev.Submit(Assign(CmdConnectionManagerHandle(req)));
239  }
240 
241  break;
242  }
243 
244  case progrFailed: {
245  // ignore for the moment
246  break;
247  }
248 
249  case progrDoingInit: {
250  // wait for device reply
251 
252  EOUT("Device did not initialize record for so long time - one should do something. Now going in FAILURE");
253 
254  req.SetProgress(progrFailed);
255 
256  break;
257  }
258 
259  case progrPending: {
260  // should we send a request - only for the client
261 
262  if (req.IsServerSide()) {
263  // server just waiting when client connects
264  // can we do here more action - just declare connection as failed
265  req()->SetDelay(2, true);
266  break;
267  }
268 
269  bool islocal = false;
270  std::string remserver, remitem;
271 
272  if (!dabc::mgr.DecomposeAddress(req.GetRemoteUrl(), islocal, remserver, remitem)) {
273  EOUT("Fail to detect server from URL %s", req.GetRemoteUrl().c_str());
274  req.SetProgress(progrFailed);
275  break;
276  }
277 
279  // we change order that on other node one can compare directly
280  cmd.SetUrl1(req.GetRemoteUrl());
281  cmd.SetUrl2(req.GetLocalUrl());
282  cmd.SetStr("ClientId", req.GetClientId());
283 
284  cmd.SetReceiver(dabc::mgr.ComposeAddress(remserver, dabc::Manager::ConnMgrName()));
285 
286  req.SetProgress(progrWaitReply);
287 
288  // FIXME: this is important delay value, should be configurable, may be even in connect port method
289  // we use 1 sec more while command itself should be timed out correctly
290  req()->SetDelay(req.GetConnTimeout()+1., true);
291 
292  DOUT0("CONN %s isserv %s server %s receiver %s tmout %f", req.GetConnInfo().c_str(), DBOOL(req.IsServerSide()), remserver.c_str(), dabc::mgr.ComposeAddress(remserver, dabc::Manager::ConnMgrName()).c_str(), req.GetConnTimeout());
293 
294  cmd.SetTimeout(req.GetConnTimeout());
295 
296  // FIXME: delay should be specified also for submitted command
297  dabc::mgr.Submit(Assign(cmd));
298 
299  break;
300  }
301 
302  case progrDoingConnect: {
303  EOUT("Timeout when doing connect - device should be responsible here!!!");
304  break;
305  }
306 
307  case progrWaitReply: {
308  EOUT("Timeout when waiting for reply - command timeout should be used here till the end %5.1f!!!", fConnCmd.TimeTillTimeout());
309  break;
310  }
311 
312  default:
313  break;
314  }
315  }
316 
317  double cmd_tmout = fConnCmd.TimeTillTimeout(-0.5); // process timeout 0.5 sec before actual timeout happened
318 
319  if ((cmd_tmout>0) && (cmd_tmout<mindelay)) mindelay = cmd_tmout;
320 
321  CheckConnectionRecs(cmd_tmout==0.);
322 
323  return mindelay;
324 }
325 
326 dabc::ConnectionRequestFull dabc::ConnectionManager::FindConnection(const std::string &local, const std::string &remote)
327 {
328  for (unsigned n=0; n<fRecs.GetSize(); n++) {
329  ConnectionRequestFull req = fRecs[n];
330 
331  if (req.match(local, remote)) return req;
332  }
333 
334  return 0;
335 }
336 
337 
339 {
340  if (cmd.IsName(CmdGlobalConnect::CmdName())) {
341 
342  CmdGlobalConnect cmd1 = cmd;
343 
344  ConnectionRequestFull req = FindConnection(cmd1.GetUrl1(), cmd1.GetUrl2());
345 
346  DOUT2("Get request for %s -> %s found:%p",
347  cmd1.GetUrl1().c_str(), cmd1.GetUrl2().c_str(), req());
348 
349  if (req.null()) {
350  EOUT("Request from remote for undefined connection %s %s", cmd1.GetUrl1().c_str(), cmd1.GetUrl2().c_str());
351  return cmd_false;
352  }
353 
354  switch ( req.progress() ) {
355 
356  case progrInit:
357  // this is situation when request comes really too fast - even initialization not yet started
358  // we reject request, but client can repeat it after short time
359  return 77;
360 
361  case progrDoingInit:
362  // this happens when request comes too early - local record was not yet initialized by device
363  // we reject, but client can repeat it after short time
364 
365  return 77;
366 
367  case progrPending:
368  // this is normal situation when connection is pending -
369  // waiting that remote starts connecting
370  if (!FillAnswerOnRemoteConnectCmd(cmd, req)) return cmd_false;
371  return cmd_postponed;
372 
373  case progrWaitReply:
374  // this is situation when client and server simultaneously sends request
375  // it is forbidden that server sends requests, therefore it is definitely the error
376  EOUT("Two requests for %s meet together - FAILURE", req.GetConnInfo().c_str());
377  return cmd_false;
378 
379  default:
380  EOUT("CmdGlobalConnect received in wrong progress state %d", req.progress());
381  break;
382  }
383 
384  return cmd_false;
385 
386  } else
387  if (cmd.IsName("ActivateConnections")) {
388  fConnCmd.ReplyFalse();
389 
390  DOUT2("Start processing of connections number %u", fRecs.GetSize());
391 
392  fDoingConnection = 1;
393  fConnCmd = cmd;
394 
395  ActivateTimeout(0.);
396 
397  return cmd_postponed;
398 
399  } else
400  if (cmd.IsName("ShutdownConnection")) {
401  fConnCmd.ReplyFalse();
402 
403  fDoingConnection = -1;
404  fConnCmd = cmd;
405 
406  ActivateTimeout(0.);
407 
408  return cmd_postponed;
409  }
410 
412 }
413 
415 {
416  if (cmd.IsName(CmdConnectionManagerHandle::CmdName())) {
417  HandleConnectRequestCmdReply(cmd);
418  } else
419  if (cmd.IsName(CmdGlobalConnect::CmdName())) {
420  HandleCmdGlobalConnectReply(cmd);
421  }
422 
423  return true;
424 }
425 
427 {
428  if (cmd.null() || req.null()) return false;
429 
431 
432  if (dev.null()) {
433  EOUT("Cannot find device");
434  return false;
435  }
436 
437  if (req.IsServerSide()) {
438  // client must provide its id which can be useful for connection
439  req.SetClientId(cmd.GetStr("ClientId"));
440 
441  // server returns its identifier and connection id
442  cmd.SetStr("ConnectionId", req.GetConnId());
443  cmd.SetStr("ServerId", req.GetServerId());
444 
445  cmd.SetInt("ServerInlineSize", req.GetInlineDataSize());
446  cmd.SetDouble("ServerTimeout", req.GetConnTimeout());
448 
449  } else {
450  // should not happened
451  EOUT("NEVER COME HERE");
452  }
453 
454  // FIXME: delay should correspond to awaited time to establish connection
455  req()->SetDelay(100);
456 
457  req.SetRemoteCommand(cmd);
458 
459  req.SetProgress(progrDoingConnect);
460 
461  dev.Submit(Assign(CmdConnectionManagerHandle(req)));
462 
463  return true;
464 }
465 
466 
468 {
469  ConnectionRequestFull req = FindConnection(cmd.GetUrl2(), cmd.GetUrl1());
470  if (req.null()) {
471  EOUT("Did not find connection request for reply command");
472  return;
473  }
474 
475  int res = cmd.GetResult();
476 
477  switch (req.progress()) {
478  case progrWaitReply: {
479 
480  if (res!=cmd_true) {
481  // we get request rejected, lets try it after some timeout
482  // TODO: one can get more info why connection rejected and react more smarter
483 
484  if (res==77) {
485  DOUT2("Connection %s was too early, try short again", req.GetConnInfo().c_str());
486 
487  req.SetProgress(progrPending);
488 
489  req()->SetDelay(req.IsServerSide() ? 2. : 0.2); // for server retry rate is slower
490  } else {
491  EOUT("Connection %s is rejected res = %d - why?", req.GetConnInfo().c_str(), res);
492 
493  req.SetProgress(progrPending);
494 
495  req()->SetDelay(1.); // retry connection after some time
496  }
497 
498  return;
499  }
500 
501  if (req.IsServerSide()) {
502  EOUT("NEVER COME HERE");
503  } else {
504  // we got server id which is required to establish connection
505  req.SetServerId(cmd.GetStr("ServerId", ""));
506  // to verify connection, client should use identifier provided by server
507  req.SetConnId(cmd.GetStr("ConnectionId", ""));
508 
509  // to know how long server will wait for connection
510  req.SetConnTimeoutDirectly(cmd.GetDouble("ServerTimeout", 10.));
511  // this acknowledge parameter of protocol, one can later code it inside serverid
513 
514  int inlinesize = cmd.GetInt("ServerInlineSize");
515  if (inlinesize != req.GetInlineDataSize()) {
516  EOUT("Mismatch in configured header sizes: %d %d", inlinesize, req.GetInlineDataSize());
517  req.SetInlineDataSize(inlinesize);
518  }
519  }
520 
522 
523  if (dev.null()) {
524  EOUT("Cannot find device");
525  return;
526  }
527 
528  // FIXME: delay should correspond to awaited time to establish connection
529  req()->SetDelay(100);
530 
531  req.SetProgress(progrDoingConnect);
532 
533  dev.Submit(Assign(CmdConnectionManagerHandle(req)));
534 
535  break;
536  }
537 
538  default:
539  EOUT("Reply on global connect in strange state");
540  break;
541  }
542 }
543 
545 {
547  if (req.null()) return;
548 
549  int res = cmd.GetResult();
550 
551  if (res != cmd_true) {
552  req.SetProgress(progrFailed);
553 
554  // if remote command was not replied
555  req.ReplyRemoteCommand(false);
556  return;
557  }
558 
559  switch (req.progress()) {
560  case progrDoingInit: {
561  // device confirm connection request, we can try to contact with client
562  req.SetProgress(progrPending);
563 
564  if (req.IsServerSide())
565  // let server wait some time to analyze that to do
566  req()->SetDelay(2.);
567  else
568  // client should try to connect immediately
569  req()->SetDelay(0);
570 
571  ActivateTimeout(0.);
572 
573  break;
574  }
575 
576  case progrDoingConnect: {
577  // this is confirmation from device finish with connection
578  // we reply to remote node that we are starting connection
579 
580  req.SetProgress(progrConnected);
581 
582  CheckConnectionRecs(false);
583 
584  break;
585  }
586 
587  default:
588  DOUT0("Command reply at state %d - that to do?", req.progress());
589  break;
590  }
591 }
592 
CmdConnectionManagerHandle(ConnectionRequestFull &req)
Here is description how connection between two nodes are build and which states are used.
std::string GetUrl2() const
void SetUrl2(const std::string &url2)
void SetUrl1(const std::string &url1)
std::string GetUrl1() const
Represents command with its arguments.
Definition: Command.h:99
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetBool(const std::string &name, bool v)
Definition: Command.h:141
double GetDouble(const std::string &name, double dflt=0.) const
Definition: Command.h:145
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
Definition: Command.cxx:108
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
Definition: Command.h:264
int GetResult() const
Definition: Command.h:174
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
bool SetDouble(const std::string &name, double v)
Definition: Command.h:144
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
ConnectionRequestFull FindConnection(const std::string &url1, const std::string &url2)
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
void HandleCmdGlobalConnectReply(CmdGlobalConnect cmd)
React on the reply of global connect command.
virtual double ProcessTimeout(double last_diff)
Check status of connections establishing.
void CheckConnectionRecs(bool finish_command=false)
Check current situation with connections.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
virtual void ProcessParameterEvent(const ParameterEvent &evnt)
Process changes in connection recs.
bool FillAnswerOnRemoteConnectCmd(Command cmd, ConnectionRequestFull &req)
Fill answer on remote connection request and invoke device to start connection When device confirms t...
ConnectionManager(const std::string &name, Command cmd=nullptr)
virtual void ModuleCleanup()
Destroy all connections, if necessary - request to cleanup custom data by device.
void HandleConnectRequestCmdReply(CmdConnectionManagerHandle cmd)
Analyze reply of the command, send to the device.
static const char * ObjectName()
@ sConnected
connection is up and working
@ sPending
connection is pending (want to be connected)
@ sBroken
connection is broken and should be reactivated by connection manager
@ sFailed
connection cannot be established by connection manager
static const char * GetStateName(EState state)
Full description of connection request.
std::string GetConnId() const
void SetRemoteCommand(dabc::Command cmd)
bool match(const std::string localurl, const std::string remoteurl)
std::string GetServerId() const
void SetConnTimeoutDirectly(double tm)
std::string GetClientId() const
void SetClientId(const std::string &id)
void SetServerId(const std::string &id)
void SetConnId(const std::string &id)
Connection request.
bool GetUseAckn() const
Use of acknowledge in protocol.
std::string GetLocalUrl() const
bool IsOptional() const
indicate if connection is optional and therefore may be ignored during failure or long timeout
void ChangeState(ConnectionObject::EState state, bool force)
double GetConnTimeout() const
time required to establish connection, if expired connection will be switched to "failed" state
std::string GetRemoteUrl() const
Return url of data source to which connection should be established.
std::string GetConnDevice() const
Device name which may be used to create connection (depends from url)
Reference GetPort() const
bool IsServerSide() const
Indicates if local node in connection is server or client.
Reference on dabc::Device class
Definition: Device.h:79
WorkerRef FindDevice(const std::string &name)
Definition: Manager.cxx:2013
Parameter FindPar(const std::string &parname)
Definition: Manager.cxx:2069
std::string ComposeAddress(const std::string &server, const std::string &itemname="")
Definition: Manager.h:603
static const char * ConnMgrName()
Definition: Manager.h:478
Base class for user-derived code, implementing event-processing.
Definition: ModuleAsync.h:32
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
virtual void ModuleCleanup()
Method, which can be reimplemented by user and should cleanup all references on buffers and other obj...
Definition: Module.h:249
Object * GetParent() const
Returns pointer on parent object, thread safe
Definition: Object.h:286
std::string ParName() const
Definition: Worker.h:543
std::string ParValue() const
Definition: Worker.h:544
Reference on the dabc::Port class
Definition: Port.h:195
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Reference.cxx:241
bool Submit(Command cmd)
Definition: Worker.cxx:1139
bool RegisterForParameterEvent(const std::string &mask, bool onlychangeevent=true)
Subscribe to parameter events from local or remote node.
Definition: Worker.cxx:1050
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DBOOL(arg)
Definition: logging.h:191
Event manipulation API.
Definition: api.h:23
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * xmlUseAcknowledge
Definition: Object.cxx:60
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_true
Definition: Command.h:38