DABC (Data Acquisition Backbone Core)  2.9.9
SocketCommandChannel.cxx
Go to the documentation of this file.
1 // $Id: SocketCommandChannel.cxx 4764 2021-04-23 07:01:04Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
17 
18 #include <cstdlib>
19 #include <unistd.h>
20 
21 #include "dabc/Manager.h"
22 #include "dabc/Configuration.h"
23 #include "dabc/Publisher.h"
24 #include "dabc/Url.h"
25 #include "dabc/SocketDevice.h"
26 
28  SocketAddon* addon,
29  const std::string &hostname,
30  double reconnect) :
31  dabc::Worker(MakePair(parent, name)),
32  fRemoteHostName(hostname),
33  fReconnectPeriod(reconnect),
34  fState(stConnecting),
35  fSendHdr(),
36  fSendBuf(),
37  fSendRawData(),
38  fSendingActive(true), // mark as active until I/O object is not yet assigned
39  fRecvHdr(),
40  fRecvBuf(0),
41  fRecvBufSize(0),
42  fSendQueue(),
43  fWaitQueue(),
44  fRecvState(recvInit),
45  fRemoteObserver(false),
46  fRemoteName(),
47  fMasterConn(false),
48  fClientNameSufix()
49 {
50  AssignAddon(addon);
51 
52  SetAutoDestroy(true);
53 
54  SocketIOAddon* io = dynamic_cast<SocketIOAddon*> (addon);
55 
56  // we are getting I/O in the constructor, means connection is established
57  if (io!=0) {
58  fState = stWorking;
59  fSendingActive = false;
61  io->StartRecv(&fRecvHdr, sizeof(fRecvHdr));
62  }
63 }
64 
66 {
67  EnsureRecvBuffer(0);
68 }
69 
71 {
73 
74  if (!fRemoteHostName.empty() && (fReconnectPeriod>0) && fAddon.null())
75  ActivateTimeout(0.);
76  else
77  ActivateTimeout(1.);
78 }
79 
80 
82 {
83  if ((strsize>0) && (strsize< fRecvBufSize)) return true;
84 
85  if (fRecvBufSize > 0) {
86  delete [] fRecvBuf;
87  fRecvBuf = nullptr;
88  fRecvBufSize = 0;
89  }
90 
91  if (strsize == 0) return true;
92 
93  fRecvBufSize = 2048;
94  while (fRecvBufSize <= strsize) fRecvBufSize *=2;
95 
96  fRecvBuf = new char[fRecvBufSize];
97 
98  if (fRecvBuf==0) {
99  EOUT("Cannot allocate buffer %u", fRecvBufSize);
100  fRecvBufSize = 0;
101  return false;
102  }
103 
104  DOUT3("%s ALLOCATE %u", ItemName().c_str(), fRecvBufSize);
105 
106  return true;
107 }
108 
109 void dabc::SocketCommandClient::CloseClient(bool iserr, const char* msg)
110 {
111  if (msg!=0) {
112  if (iserr)
113  EOUT("%s closing connection due to error %s", ItemName().c_str(), msg);
114  else
115  DOUT2("%s closing connection due to %s", ItemName().c_str(), msg);
116  }
117 
118  if (!fRemoteHostName.empty() && (fReconnectPeriod>0)) {
119  AssignAddon(nullptr); // we destroy current addon
120  DOUT2("Try to reconnect worker %s to remote node %s", ItemName().c_str(), fRemoteHostName.c_str());
121  fState = stConnecting;
122  ActivateTimeout(fReconnectPeriod);
123  } else {
124  fState = iserr ? stFailure : stClosing;
125  DeleteThis();
126  }
127 }
128 
129 
131 {
132  if (cmd.IsName("SocketConnect")) {
133 
134  int fd = cmd.GetInt("fd", -1);
135 
136  if (fd<=0) return dabc::cmd_false;
137 
138  if (fState != stConnecting) {
139  EOUT("Fatal error - connection again when it was established???");
140  return dabc::cmd_false;
141  }
142 
143  fState = stWorking;
144 
145  SocketIOAddon* addon = new SocketIOAddon(fd);
146  addon->SetDeliverEventsToWorker(true);
147 
148  DOUT0("SocketCommand - create client side fd:%d worker:%s for:%s", fd, GetName(), fRemoteHostName.c_str());
149 
150  AssignAddon(addon);
151 
152  fSendingActive = false;
153 
154  // DOUT0("Did addon assign numrefs:%u", NumReferences());
155 
156  fRecvState = recvHeader;
157  // start receiving of header immediately
158  addon->StartRecv(&fRecvHdr, sizeof(fRecvHdr));
159 
160  if (fMasterConn) {
161  dabc::Command cmd("AcceptClient");
162  std::string myname = dabc::SocketThread::DefineHostName();
163  if (myname.empty()) myname = "Client";
164  if (!fClientNameSufix.empty())
165  myname+="_"+fClientNameSufix;
166 
167  // this name is used somewhere in hierarchy
168  cmd.SetStr("name", myname);
169 
170  // this name will be used to identify our node and deliver commands
171  cmd.SetStr("globalname", dabc::mgr.GetLocalAddress());
172 
173  SendCommand(cmd);
174  }
175 
176  SendSubmittedCommands();
177 
178  return dabc::cmd_true;
179  }
180 
181  return dabc::Worker::ExecuteCommand(cmd);
182 }
183 
185 {
186  if (cmd.IsName("AcceptClient")) {
187  DOUT0("We allow to transform connection to the monitoring channel");
188  cmd.SetResult(cmd_true);
189 
190  fRemoteObserver = true;
191 
192  std::string name = cmd.GetStr("name");
193  std::string globalname = cmd.GetStr("globalname");
194  if (!name.empty()) fRemoteName = name;
195  if (!globalname.empty() && fRemoteHostName.empty()) {
196  fRemoteHostName = globalname;
197  DOUT0("ACCEPT remote %s", fRemoteHostName.c_str());
198  }
199 
200  // here we inform publisher that new node want to add its hierarchy to the master
201  // publisher will request hierarchy itself
202  PublisherRef(GetPublisher()).AddRemote(fRemoteHostName, ItemName());
203 
204  return true;
205  }
206 
207  return false;
208 }
209 
211 {
212  if (fRecvHdr.data_kind == kindDisconnect) {
213  CloseClient(false, "disconnect packet");
214  return;
215  }
216 
217  dabc::Command cmd;
218 
219  if (fRecvHdr.data_cmdsize==0) {
220  CloseClient(true, "received empty command");
221  return;
222  }
223 
224  dabc::Buffer cmddata = dabc::Buffer::CreateBuffer(fRecvBuf, fRecvHdr.data_cmdsize, false, true);
225  if (!cmd.ReadFromBuffer(cmddata)) {
226  CloseClient(true, "cannot decode command");
227  return;
228  }
229 
230 // DOUT0("Worker: %s get command %s from remote kind %u ", ItemName().c_str(), cmd.GetName(), (unsigned) fRecvHdr.data_kind);
231 
232  switch (fRecvHdr.data_kind) {
233 
234  case kindCommand:
235 
236  if (fRecvHdr.data_rawsize>0) {
237  dabc::Buffer rawdata = dabc::Buffer::CreateBuffer(fRecvBuf + fRecvHdr.data_cmdsize, fRecvHdr.data_rawsize, false, true);
238  cmd.SetRawData(rawdata);
239  }
240 
241  if (ExecuteCommandByItself(cmd)) {
242  cmd.RemoveReceiver();
243  AddCommand(cmd, true);
244  cmd.Release();
245  } else {
246 
247  double tmout = fRecvHdr.data_timeout * 0.001;
248 
249  if (tmout>0) cmd.SetTimeout(tmout);
250 
251  DOUT2("Submit command %s rcv:%s for execution", cmd.GetName(), cmd.GetReceiver().c_str());
252 
253  // indicate that command must be executed locally
254  // done while receiver may contain different address format and may not recognized by Manager
255  cmd.SetBool("#local_cmd", true);
256 
257  dabc::mgr.Submit(Assign(cmd));
258  }
259 
260  break;
261 
262  case kindReply:
263  case kindCancel: {
264 
265  unsigned cmdid = cmd.GetUInt("__send_cmdid__");
266  cmd.RemoveField("__send_cmdid__");
267 
268  dabc::Command maincmd = fWaitQueue.PopWithId(cmdid);
269 
270  if (maincmd.null()) {
271  EOUT("No command found with searched %u", cmdid);
272  } else
273  if (fRecvHdr.data_kind == kindCancel) {
274  maincmd.Reply(cmd_timedout);
275  } else {
276 
277  maincmd.AddValuesFrom(cmd);
278 
279  if (fRecvHdr.data_rawsize>0) {
280  dabc::Buffer rawdata = dabc::Buffer::CreateBuffer(fRecvBuf + fRecvHdr.data_cmdsize, fRecvHdr.data_rawsize, false, true);
281  maincmd.SetRawData(rawdata);
282  }
283 
284  maincmd.Reply(cmd.GetResult());
285  }
286 
287  break;
288  }
289  }
290 
291  fRecvState = recvHeader;
292  // first of all, sumbit next recv header operation
293  SocketIOAddon *io = dynamic_cast<SocketIOAddon*> (fAddon());
294  io->StartRecv(&fRecvHdr, sizeof(fRecvHdr));
295 }
296 
298 {
299  // DOUT0("Get command reply %s", cmd.GetName());
300 
301  if (cmd.GetBool("#local_cmd")) {
302  cmd.RemoveField("#local_cmd");
303  AddCommand(cmd, true);
304  return true;
305  }
306 
307  return dabc::Worker::ReplyCommand(cmd);
308 }
309 
310 
312 {
313  switch (evnt.GetCode()) {
315 
316  fSendingActive = false;
317 
318  // immediately try to send next commands
319  SendSubmittedCommands();
320 
321  break;
322 
324 
325  if (fRecvState == recvInit) {
326  CloseClient(true, "Receive data in init state");
327  return;
328  }
329 
330  if (fRecvState == recvData) {
331  fRecvState = recvInit;
332  // here raw buffer is received and must be processed
333  ProcessRecvPacket();
334  return;
335  }
336 
337  if (fRecvState == recvHeader) {
338  // received header
339 
340  SocketIOAddon* io = dynamic_cast<SocketIOAddon*> (fAddon());
341 
342  if (fRecvHdr.dabc_header == dabc::SocketDevice::headerConnect) {
343 
344  SocketCommandChannel* ch = dynamic_cast<SocketCommandChannel*> (GetParent());
345  if ((ch==0) || ch->fRedirectDevice.empty()) {
346  CloseClient(true, "Wrong socket device connect from network");
347  return;
348  }
349 
350  EnsureRecvBuffer(dabc::SocketDevice::ProtocolMsgSize);
351  memcpy(fRecvBuf, &fRecvHdr, sizeof(fRecvHdr));
352  fRecvState = recvDevConnect;
353  io->StartRecv(fRecvBuf + sizeof(fRecvHdr), dabc::SocketDevice::ProtocolMsgSize - sizeof(fRecvHdr));
354  return;
355  }
356 
357  if (fRecvHdr.dabc_header != headerDabc) {
358  CloseClient(true, "Wrong packet from network");
359  return;
360  }
361 
362  if (fRecvHdr.data_size == 0) {
363  fRecvState = recvInit;
364  // when no additional data, process packet
365  ProcessRecvPacket();
366  return;
367  }
368 
369  if (!EnsureRecvBuffer(fRecvHdr.data_size)) {
370  CloseClient(true, "memory allocation");
371  return;
372  }
373 
374  fRecvState = recvData;
375  io->StartRecv(fRecvBuf, fRecvHdr.data_size);
376 
377  // DOUT0("Start command recv data_size %u", fRecvHdr.data_size);
378  }
379 
380  if (fRecvState == recvDevConnect) {
381  // here we should get all data for connection
382  // redirect it to the device
383 
384  SocketCommandChannel* ch = dynamic_cast<SocketCommandChannel*> (GetParent());
385  SocketIOAddon* io = dynamic_cast<SocketIOAddon*> (fAddon());
386 
387  dabc::Command cmd("RedirectConnect");
388  cmd.SetInt("Socket", io->TakeSocket());
390 
391  cmd.SetReceiver(ch->fRedirectDevice);
392  dabc::mgr.Submit(cmd);
393 
394  CloseClient(false, "redirect connect");
395  }
396 
397  break;
398  }
399 
401  CloseClient(true, "Socket error");
402  break;
403 
404 
406  CloseClient(false, "Socket closed");
407  break;
408 
409 
410  default:
412  }
413 
414 }
415 
417 {
418  fSendQueue.Push(cmd, asreply ? CommandsQueue::kindReply : CommandsQueue::kindSubmit);
419 
420  SendSubmittedCommands();
421 }
422 
423 
425 {
426  while (!fSendingActive && (fSendQueue.Size()>0)) {
427  bool isreply = (fSendQueue.FrontKind() == CommandsQueue::kindReply);
428  SendCommand(fSendQueue.Pop(), isreply);
429  }
430 }
431 
433 {
434  double send_tmout = 0;
435 
436  if (!asreply) {
437  // first check that command is timedout
438  send_tmout = cmd.TimeTillTimeout();
439  if (send_tmout == 0.) {
440  cmd.Reply(cmd_timedout);
441  return;
442  }
443 
444  uint32_t cmdid = fWaitQueue.Push(cmd);
445  cmd.SetUInt("__send_cmdid__", cmdid);
446  }
447 
448  // DOUT0("RAWSEND cmd %s debugid %d", fSendQueue.Front().GetName(), fSendQueue.Front().GetInt("debugid"));
449 
450  fSendHdr.dabc_header = headerDabc;
451  fSendHdr.data_kind = asreply ? kindReply : kindCommand;
452  fSendHdr.data_timeout = send_tmout > 0 ? (unsigned) send_tmout*1000. : 0;
453  fSendHdr.data_size = 0;
454  fSendHdr.data_cmdsize = 0;
455  fSendHdr.data_rawsize = 0;
456 
457  fSendBuf.Release();
458  fSendRawData.Release();
459 
460  if (cmd.IsCanceled()) fSendHdr.data_kind = kindCancel;
461 
462  fSendBuf = cmd.SaveToBuffer();
463 
464  fSendRawData = cmd.GetRawData();
465 
466  fSendHdr.data_cmdsize = fSendBuf.GetTotalSize(); // transport 0-terminated string as is
467  fSendHdr.data_rawsize = fSendRawData.GetTotalSize();
468  void* rawdata = nullptr;
469  if (fSendHdr.data_rawsize > 0) rawdata = fSendRawData.SegmentPtr();
470  fSendHdr.data_size = fSendHdr.data_cmdsize + fSendHdr.data_rawsize;
471 
472  SocketIOAddon* addon = dynamic_cast<SocketIOAddon*> (fAddon());
473 
474  if (!addon) {
475  EOUT("Cannot send command %s addon %p", cmd.GetName(), fAddon());
476 
477  CloseClient(true, "I/O object missing");
478  return;
479  }
480 
481  // DOUT0("Start command send fullsize:%u cmd:%u raw:%u", fSendHdr.data_size, fSendHdr.data_cmdsize, fSendHdr.data_rawsize);
482 
483  if (!addon->StartSend(&fSendHdr, sizeof(fSendHdr),
484  fSendBuf.SegmentPtr(), fSendHdr.data_cmdsize,
485  rawdata, fSendHdr.data_rawsize)) {
486  CloseClient(true, "Fail to send command");
487  return;
488  }
489 
490  // DOUT0("Send command %s asreply %s", cmd.GetName(), DBOOL(asreply));
491 
492  fSendingActive = true;
493 }
494 
495 
497 {
498  double next_tmout = 1.;
499 
500  if (fAddon.null() && !fRemoteHostName.empty() && (fReconnectPeriod>0)) {
501 
503 
504  if (client!=0) {
505  client->SetRetryOpt(2000000000, fReconnectPeriod);
506 
507  client->SetDeliverEventsToWorker(true);
508 
509  AssignAddon(client);
510  } else {
511  next_tmout = fReconnectPeriod;
512  }
513  }
514 
515  // check when commands are timedout
516  fWaitQueue.ReplyTimedout();
517  fSendQueue.ReplyTimedout();
518 
519  return next_tmout;
520 }
521 
522 // =======================================================================================
523 
524 
526  dabc::Worker(MakePair(name.empty() ? "/CommandChannel" : name)),
527  fNodeId(0),
528  fClientsAllowed(true),
529  fClientCnt(0)
530 {
531  fNodeId = dabc::mgr()->cfg()->MgrNodeId();
532 
533  fClientsAllowed = cmd.GetBool("ClientsAllowed", true);
534 
535  AssignAddon(connaddon);
536 
537  // object is owner its childs - autodestroy flag will be automatically set to the new add object
538  SetOwner(true);
539 }
540 
541 std::string dabc::SocketCommandChannel::GetRemoteNode(const std::string &url_str)
542 {
543  std::string server, itemname;
544  bool islocal(true);
545 
546  if (dabc::mgr.DecomposeAddress(url_str, islocal, server, itemname))
547  if (!islocal) return server;
548 
549  return std::string();
550 }
551 
552 
553 dabc::SocketCommandClientRef dabc::SocketCommandChannel::ProvideWorker(const std::string &remnodename, double conn_tmout)
554 {
555  SocketCommandClientRef worker;
556 
557  if (remnodename.empty()) return worker;
558 
559  for (unsigned n=0;n<NumChilds();n++) {
560  worker = GetChildRef(n);
561  if (!worker.null() && (worker()->fRemoteHostName == remnodename)) break;
562  worker.Release();
563  }
564 
565  if (!worker.null() || (conn_tmout<0)) return worker;
566 
567  // we create worker only if address with port is specified
568  // dabc::Url url(remnodename);
569  // if (url.GetPort()<=0) return worker;
570 
571  std::string worker_name = dabc::format("Client%d", fClientCnt++);
572 
573  worker = new SocketCommandClient(this, worker_name, 0, remnodename, conn_tmout);
574 
575  worker()->AssignToThread(thread());
576 
577  return worker;
578 }
579 
580 
582 {
583  std::string receiver = cmd.GetReceiver();
584  if (receiver.empty()) return dabc::Worker::PreviewCommand(cmd);
585 
586  std::string remnodename = GetRemoteNode(receiver);
587 
588  dabc::SocketCommandClientRef worker = ProvideWorker(remnodename, 1);
589 
590  DOUT2("SEARCH node %s receiver %s worker %p", remnodename.c_str(), cmd.GetReceiver().c_str(), worker());
591 
592  if (worker.null()) return dabc::Worker::PreviewCommand(cmd);
593 
594  DOUT4("Append command %s to client %s", cmd.GetName(), worker.ItemName().c_str());
595 
596  worker()->AddCommand(cmd, false);
597 
598  // worker.Release();
599 
600  return cmd_postponed;
601 }
602 
604 {
605  if (cmd.IsName("SocketConnect")) {
606 
607  int fd = cmd.GetInt("fd", -1);
608 
609  if (fd<=0) return dabc::cmd_false;
610 
611  std::string worker_name = dabc::format("Server%d", fClientCnt++);
612 
613  SocketCommandClientRef worker = FindChildRef(worker_name.c_str());
614 
615  if (!worker.null()) {
616  EOUT("How such possible - client %s already exists", worker_name.c_str());
617  close(fd);
618  return dabc::cmd_false;
619  }
620 
621  SocketIOAddon* io = new SocketIOAddon(fd);
622  io->SetDeliverEventsToWorker(true);
623 
624  worker = new SocketCommandClient(this, worker_name, io);
625 
626  worker()->AssignToThread(thread());
627 
628  DOUT3("SocketCommand - create server side fd:%d worker:%s thread:%s", fd, worker_name.c_str(), thread().GetName());
629 
630  worker.Release();
631 
632  return dabc::cmd_true;
633  } else
634  if (cmd.IsName("disconnect") || cmd.IsName("close")) {
635  std::string remnodename = GetRemoteNode(cmd.GetStr("host"));
636 
637  if (remnodename.empty()) return dabc::cmd_false;
638 
639  dabc::Url url(remnodename);
640  if (url.IsValid() && (url.GetProtocol()=="dabc")) {
641 
642  SocketCommandClientRef worker = ProvideWorker(url.GetHostNameWithPort(defaultDabcPort));
643 
644  if (!worker.null()) {
645  DOUT0("Close connection to %s", remnodename.c_str());
646  worker.Destroy();
647  return dabc::cmd_true;
648  }
649  }
650 
651  DOUT0("No connection to %s exists", remnodename.c_str());
652 
653  return dabc::cmd_false;
654  } else
655  if (cmd.IsName("ConfigureMaster")) {
656 
657  Url url(cmd.GetStr("Master"));
658 
659  // std::string dabc::Manager::ComposeAddress(const std::string &server, const std::string &itemname)
660 
661  std::string remnode = url.GetHostNameWithPort(defaultDabcPort);
662 
663  dabc::SocketCommandClientRef worker = ProvideWorker(remnode, 3.);
664 
665  // DOUT0("Start worker %s to connect with master %s", worker.GetName(), remnode.c_str());
666 
667  if (worker.null()) return dabc::cmd_true;
668 
669  // indicate that this is special connection to master node
670  // each time it is reconnected, it will append special command to register itself in remote master
671  worker()->fMasterConn = true;
672  worker()->fClientNameSufix = cmd.GetStr("NameSufix");
673 
674  return dabc::cmd_true;
675  } else
676  if (cmd.IsName("RedirectSocketConnect")) {
677  SocketServerAddon* addon = dynamic_cast<SocketServerAddon*> (fAddon());
678  if (addon && fRedirectDevice.empty()) {
679  fRedirectDevice = cmd.GetStr("Device");
680  cmd.SetStr("ServerId", addon->ServerId());
681  return dabc::cmd_true;
682  }
683  return dabc::cmd_false;
684  }
685 
686  return dabc::Worker::ExecuteCommand(cmd);
687 }
688 
690 {
691  return -1;
692 }
693 
Reference on memory from memory pool.
Definition: Buffer.h:135
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Definition: Buffer.cxx:91
static Buffer CreateBuffer(BufferSize_t sz)
This static method create independent buffer for any other memory pools Therefore it can be used in s...
Definition: Buffer.cxx:419
Represents command with its arguments.
Definition: Command.h:99
double TimeTillTimeout(double extra=0.) const
Returns time which remains until command should be timed out.
Definition: Command.cxx:132
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
Definition: Command.h:148
bool SetStr(const std::string &name, const char *value)
Definition: Command.h:134
bool SetBool(const std::string &name, bool v)
Definition: Command.h:141
bool SetInt(const std::string &name, int v)
Definition: Command.h:138
bool SetRawData(Buffer rawdata)
Set raw data to the command, which can be transported also between nodes.
Definition: Command.cxx:334
std::string GetStr(const std::string &name, const std::string &dflt="") const
Definition: Command.h:136
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
Definition: Command.cxx:108
void SetResult(int res)
Definition: Command.h:173
bool SetUInt(const std::string &name, unsigned v)
Definition: Command.h:147
void Release()
Method used to clean command - all internal data will be cleaned, command container will be released.
Definition: Command.cxx:198
void AddValuesFrom(const Command &cmd, bool canoverwrite=true)
Definition: Command.cxx:187
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
Definition: Command.h:264
int GetResult() const
Definition: Command.h:174
bool GetBool(const std::string &name, bool dflt=false) const
Definition: Command.h:142
bool IsCanceled()
Return true if command was canceled.
Definition: Command.cxx:214
int GetInt(const std::string &name, int dflt=0) const
Definition: Command.h:139
Buffer GetRawData()
Returns reference on raw data Can be called only once - raw data reference will be cleaned.
Definition: Command.cxx:347
void Reply(int res=cmd_noresult)
Replied on the command.
Definition: Command.cxx:225
void RemoveReceiver()
Definition: Command.h:268
std::string GetReceiver() const
Definition: Command.h:267
@ kindSubmit
command submitted for execution
Definition: CommandsQueue.h:45
@ kindReply
command replied to the worker
Definition: CommandsQueue.h:46
void SetAutoDestroy(bool on=true)
Set autodestroy flag for the object Once enabled, object will be destroyed when last reference will b...
Definition: Object.cxx:160
void SetOwner(bool on=true)
Specifies if object will be owner of its new childs.
Definition: Object.cxx:154
bool AddRemote(const std::string &remnode, const std::string &workername)
Definition: Publisher.h:239
dabc::Buffer SaveToBuffer()
Definition: Record.cxx:1712
bool ReadFromBuffer(const dabc::Buffer &buf)
Definition: Record.cxx:1735
bool RemoveField(const std::string &name)
Definition: Record.h:501
Reference on the arbitrary object
Definition: Reference.h:73
void Release()
Releases reference on the object.
Definition: Reference.cxx:138
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
Definition: Reference.cxx:167
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
Definition: Reference.cxx:177
bool null() const
Returns true if reference contains nullptr.
Definition: Reference.h:151
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Definition: Reference.cxx:241
void Destroy()
Release reference and starts destroyment of referenced object.
Definition: Reference.cxx:148
Special addon class for handling of socket and socket events.
Definition: SocketThread.h:52
@ evntSocketSendInfo
event delivered to worker when write is completed
Definition: SocketThread.h:96
@ evntSocketRecvInfo
event delivered to worker when read is completed
Definition: SocketThread.h:95
@ evntSocketCloseInfo
event delivered to worker when socket is closed
Definition: SocketThread.h:98
@ evntSocketErrorInfo
event delivered to worker when error is detected
Definition: SocketThread.h:97
void SetDeliverEventsToWorker(bool on=true)
Definition: SocketThread.h:121
Socket addon for handling connection on client side.
Definition: SocketThread.h:322
void SetRetryOpt(int nretry, double tmout=1.)
Provides command channel to the dabc process.
SocketCommandClientRef ProvideWorker(const std::string &remnodename, double conn_tmout=-1)
Provide client for remote node.
SocketCommandChannel(const std::string &name, SocketServerAddon *connaddon, Command cmd)
bool fClientsAllowed
when true, incomming clients are allowed
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
std::string fRedirectDevice
name of socket device, which can get redirection
std::string GetRemoteNode(const std::string &url)
Provide string for connection to remote node.
virtual double ProcessTimeout(double last_diff)
timeout used in channel to update node hierarchy, which than can be requested from remote
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Client side of command connection between two nodes.
void SendSubmittedCommands()
Send submitted commands to remote.
bool EnsureRecvBuffer(unsigned strsize)
void ProcessRecvPacket()
Method called, when complete packet (header + raw data) is received.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
bool ExecuteCommandByItself(Command cmd)
@ stWorking
normal state when execution of different commands are done
virtual double ProcessTimeout(double last_diff)
EState fState
current state of the worker
void AddCommand(dabc::Command cmd, bool asreply=false)
Submit command to send queue, if queue is empty start sending immediately.
SocketCommandClient(Reference parent, const std::string &name, SocketAddon *addon, const std::string &hostname="", double reconnect=0.)
virtual void ProcessEvent(const EventId &)
ERecvState fRecvState
state that happens with receiver (server)
void CloseClient(bool iserr=false, const char *msg=0)
Called when connection must be closed due to the error.
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
void SendCommand(dabc::Command cmd, bool asreply=false)
Send next command to the remote.
SocketCmdPacket fRecvHdr
buffer for receiving header
bool fSendingActive
indicate if currently send active
Socket addon for handling I/O events.
Definition: SocketThread.h:144
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
bool StartRecv(void *buf, size_t size)
Socket addon for handling connection requests on server side.
Definition: SocketThread.h:285
std::string ServerId()
Definition: SocketThread.h:308
static std::string DefineHostName(bool force=true)
Return current host name.
static SocketClientAddon * CreateClientAddon(const std::string &servid, int dflt_port=-1)
Uniform Resource Locator interpreter.
Definition: Url.h:33
std::string GetProtocol() const
Definition: Url.h:57
std::string GetHostNameWithPort(int dfltport=0) const
Definition: Url.cxx:139
bool IsValid() const
Definition: Url.h:55
bool Submit(Command cmd)
Definition: Worker.cxx:1139
Active object, which is working inside dabc::Thread.
Definition: Worker.h:116
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
Definition: Worker.cxx:650
virtual void OnThreadAssigned()
Definition: Worker.h:392
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
Definition: Worker.cxx:277
virtual void ProcessEvent(const EventId &)
Definition: Worker.cxx:499
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Worker.cxx:851
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
Definition: Worker.cxx:856
bool reconnect
Definition: hldprint.cxx:994
#define DOUT2(args ...)
Definition: logging.h:170
#define DOUT0(args ...)
Definition: logging.h:156
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT4(args ...)
Definition: logging.h:182
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
Definition: XmlEngine.cxx:917
Event manipulation API.
Definition: api.h:23
ManagerRef mgr
Definition: Manager.cxx:42
std::string format(const char *fmt,...)
Definition: string.cxx:49
@ cmd_postponed
Definition: Command.h:42
@ cmd_false
Definition: Command.h:37
@ cmd_timedout
Definition: Command.h:40
@ cmd_true
Definition: Command.h:38
@ defaultDabcPort
Definition: ConfigBase.h:92
Event structure, exchanged between DABC threads.
Definition: Thread.h:70
uint16_t GetCode() const
Definition: Thread.h:92