29 const std::string &hostname,
32 fRemoteHostName(hostname),
45 fRemoteObserver(false),
74 if (!fRemoteHostName.empty() && (fReconnectPeriod>0) && fAddon.null())
83 if ((strsize>0) && (strsize< fRecvBufSize))
return true;
85 if (fRecvBufSize > 0) {
91 if (strsize == 0)
return true;
94 while (fRecvBufSize <= strsize) fRecvBufSize *=2;
96 fRecvBuf =
new char[fRecvBufSize];
99 EOUT(
"Cannot allocate buffer %u", fRecvBufSize);
104 DOUT3(
"%s ALLOCATE %u", ItemName().c_str(), fRecvBufSize);
113 EOUT(
"%s closing connection due to error %s", ItemName().c_str(), msg);
115 DOUT2(
"%s closing connection due to %s", ItemName().c_str(), msg);
118 if (!fRemoteHostName.empty() && (fReconnectPeriod>0)) {
119 AssignAddon(
nullptr);
120 DOUT2(
"Try to reconnect worker %s to remote node %s", ItemName().c_str(), fRemoteHostName.c_str());
121 fState = stConnecting;
122 ActivateTimeout(fReconnectPeriod);
124 fState = iserr ? stFailure : stClosing;
132 if (cmd.
IsName(
"SocketConnect")) {
134 int fd = cmd.
GetInt(
"fd", -1);
138 if (fState != stConnecting) {
139 EOUT(
"Fatal error - connection again when it was established???");
148 DOUT0(
"SocketCommand - create client side fd:%d worker:%s for:%s", fd, GetName(), fRemoteHostName.c_str());
152 fSendingActive =
false;
156 fRecvState = recvHeader;
158 addon->
StartRecv(&fRecvHdr,
sizeof(fRecvHdr));
163 if (myname.empty()) myname =
"Client";
164 if (!fClientNameSufix.empty())
165 myname+=
"_"+fClientNameSufix;
168 cmd.
SetStr(
"name", myname);
176 SendSubmittedCommands();
186 if (cmd.
IsName(
"AcceptClient")) {
187 DOUT0(
"We allow to transform connection to the monitoring channel");
190 fRemoteObserver =
true;
192 std::string name = cmd.
GetStr(
"name");
193 std::string globalname = cmd.
GetStr(
"globalname");
194 if (!name.empty()) fRemoteName = name;
195 if (!globalname.empty() && fRemoteHostName.empty()) {
196 fRemoteHostName = globalname;
197 DOUT0(
"ACCEPT remote %s", fRemoteHostName.c_str());
212 if (fRecvHdr.data_kind == kindDisconnect) {
213 CloseClient(
false,
"disconnect packet");
219 if (fRecvHdr.data_cmdsize==0) {
220 CloseClient(
true,
"received empty command");
226 CloseClient(
true,
"cannot decode command");
232 switch (fRecvHdr.data_kind) {
236 if (fRecvHdr.data_rawsize>0) {
241 if (ExecuteCommandByItself(cmd)) {
243 AddCommand(cmd,
true);
247 double tmout = fRecvHdr.data_timeout * 0.001;
255 cmd.
SetBool(
"#local_cmd",
true);
265 unsigned cmdid = cmd.
GetUInt(
"__send_cmdid__");
270 if (maincmd.
null()) {
271 EOUT(
"No command found with searched %u", cmdid);
273 if (fRecvHdr.data_kind == kindCancel) {
279 if (fRecvHdr.data_rawsize>0) {
291 fRecvState = recvHeader;
294 io->
StartRecv(&fRecvHdr,
sizeof(fRecvHdr));
301 if (cmd.
GetBool(
"#local_cmd")) {
303 AddCommand(cmd,
true);
316 fSendingActive =
false;
319 SendSubmittedCommands();
325 if (fRecvState == recvInit) {
326 CloseClient(
true,
"Receive data in init state");
330 if (fRecvState == recvData) {
331 fRecvState = recvInit;
337 if (fRecvState == recvHeader) {
346 CloseClient(
true,
"Wrong socket device connect from network");
351 memcpy(fRecvBuf, &fRecvHdr,
sizeof(fRecvHdr));
352 fRecvState = recvDevConnect;
357 if (fRecvHdr.dabc_header != headerDabc) {
358 CloseClient(
true,
"Wrong packet from network");
362 if (fRecvHdr.data_size == 0) {
363 fRecvState = recvInit;
369 if (!EnsureRecvBuffer(fRecvHdr.data_size)) {
370 CloseClient(
true,
"memory allocation");
374 fRecvState = recvData;
375 io->
StartRecv(fRecvBuf, fRecvHdr.data_size);
380 if (fRecvState == recvDevConnect) {
394 CloseClient(
false,
"redirect connect");
401 CloseClient(
true,
"Socket error");
406 CloseClient(
false,
"Socket closed");
420 SendSubmittedCommands();
426 while (!fSendingActive && (fSendQueue.Size()>0)) {
428 SendCommand(fSendQueue.Pop(), isreply);
434 double send_tmout = 0;
439 if (send_tmout == 0.) {
444 uint32_t cmdid = fWaitQueue.Push(cmd);
445 cmd.
SetUInt(
"__send_cmdid__", cmdid);
450 fSendHdr.dabc_header = headerDabc;
451 fSendHdr.data_kind = asreply ? kindReply : kindCommand;
452 fSendHdr.data_timeout = send_tmout > 0 ? (unsigned) send_tmout*1000. : 0;
453 fSendHdr.data_size = 0;
454 fSendHdr.data_cmdsize = 0;
455 fSendHdr.data_rawsize = 0;
458 fSendRawData.Release();
460 if (cmd.
IsCanceled()) fSendHdr.data_kind = kindCancel;
467 fSendHdr.data_rawsize = fSendRawData.GetTotalSize();
468 void* rawdata =
nullptr;
469 if (fSendHdr.data_rawsize > 0) rawdata = fSendRawData.SegmentPtr();
470 fSendHdr.data_size = fSendHdr.data_cmdsize + fSendHdr.data_rawsize;
475 EOUT(
"Cannot send command %s addon %p", cmd.
GetName(), fAddon());
477 CloseClient(
true,
"I/O object missing");
483 if (!addon->
StartSend(&fSendHdr,
sizeof(fSendHdr),
484 fSendBuf.SegmentPtr(), fSendHdr.data_cmdsize,
485 rawdata, fSendHdr.data_rawsize)) {
486 CloseClient(
true,
"Fail to send command");
492 fSendingActive =
true;
498 double next_tmout = 1.;
500 if (fAddon.null() && !fRemoteHostName.empty() && (fReconnectPeriod>0)) {
511 next_tmout = fReconnectPeriod;
516 fWaitQueue.ReplyTimedout();
517 fSendQueue.ReplyTimedout();
526 dabc::
Worker(MakePair(name.empty() ?
"/CommandChannel" : name)),
528 fClientsAllowed(true),
543 std::string server, itemname;
546 if (
dabc::mgr.DecomposeAddress(url_str, islocal, server, itemname))
547 if (!islocal)
return server;
549 return std::string();
557 if (remnodename.empty())
return worker;
559 for (
unsigned n=0;n<NumChilds();n++) {
560 worker = GetChildRef(n);
561 if (!worker.
null() && (worker()->fRemoteHostName == remnodename))
break;
565 if (!worker.
null() || (conn_tmout<0))
return worker;
571 std::string worker_name =
dabc::format(
"Client%d", fClientCnt++);
575 worker()->AssignToThread(thread());
586 std::string remnodename = GetRemoteNode(receiver);
590 DOUT2(
"SEARCH node %s receiver %s worker %p", remnodename.c_str(), cmd.
GetReceiver().c_str(), worker());
596 worker()->AddCommand(cmd,
false);
605 if (cmd.
IsName(
"SocketConnect")) {
607 int fd = cmd.
GetInt(
"fd", -1);
611 std::string worker_name =
dabc::format(
"Server%d", fClientCnt++);
615 if (!worker.
null()) {
616 EOUT(
"How such possible - client %s already exists", worker_name.c_str());
626 worker()->AssignToThread(thread());
628 DOUT3(
"SocketCommand - create server side fd:%d worker:%s thread:%s", fd, worker_name.c_str(), thread().GetName());
635 std::string remnodename = GetRemoteNode(cmd.
GetStr(
"host"));
644 if (!worker.
null()) {
645 DOUT0(
"Close connection to %s", remnodename.c_str());
651 DOUT0(
"No connection to %s exists", remnodename.c_str());
655 if (cmd.
IsName(
"ConfigureMaster")) {
671 worker()->fMasterConn =
true;
672 worker()->fClientNameSufix = cmd.
GetStr(
"NameSufix");
676 if (cmd.
IsName(
"RedirectSocketConnect")) {
678 if (addon && fRedirectDevice.empty()) {
679 fRedirectDevice = cmd.
GetStr(
"Device");
Reference on memory from memory pool.
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
static Buffer CreateBuffer(BufferSize_t sz)
This static method create independent buffer for any other memory pools Therefore it can be used in s...
Represents command with its arguments.
double TimeTillTimeout(double extra=0.) const
Returns time which remains until command should be timed out.
unsigned GetUInt(const std::string &name, unsigned dflt=0) const
bool SetStr(const std::string &name, const char *value)
bool SetBool(const std::string &name, bool v)
bool SetInt(const std::string &name, int v)
bool SetRawData(Buffer rawdata)
Set raw data to the command, which can be transported also between nodes.
std::string GetStr(const std::string &name, const std::string &dflt="") const
Command & SetTimeout(double tm)
Set maximum time which can be used for command execution.
bool SetUInt(const std::string &name, unsigned v)
void Release()
Method used to clean command - all internal data will be cleaned, command container will be released.
void AddValuesFrom(const Command &cmd, bool canoverwrite=true)
Command & SetReceiver(const std::string &itemname)
These methods prepare command so, that one can submit command to the manager like: dabc::mgr....
bool GetBool(const std::string &name, bool dflt=false) const
bool IsCanceled()
Return true if command was canceled.
int GetInt(const std::string &name, int dflt=0) const
Buffer GetRawData()
Returns reference on raw data Can be called only once - raw data reference will be cleaned.
void Reply(int res=cmd_noresult)
Replied on the command.
std::string GetReceiver() const
@ kindSubmit
command submitted for execution
@ kindReply
command replied to the worker
void SetAutoDestroy(bool on=true)
Set autodestroy flag for the object Once enabled, object will be destroyed when last reference will b...
void SetOwner(bool on=true)
Specifies if object will be owner of its new childs.
bool AddRemote(const std::string &remnode, const std::string &workername)
dabc::Buffer SaveToBuffer()
bool ReadFromBuffer(const dabc::Buffer &buf)
bool RemoveField(const std::string &name)
Reference on the arbitrary object
void Release()
Releases reference on the object.
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
void Destroy()
Release reference and starts destroyment of referenced object.
Special addon class for handling of socket and socket events.
@ evntSocketSendInfo
event delivered to worker when write is completed
@ evntSocketRecvInfo
event delivered to worker when read is completed
@ evntSocketCloseInfo
event delivered to worker when socket is closed
@ evntSocketErrorInfo
event delivered to worker when error is detected
void SetDeliverEventsToWorker(bool on=true)
Socket addon for handling connection on client side.
void SetRetryOpt(int nretry, double tmout=1.)
Provides command channel to the dabc process.
SocketCommandClientRef ProvideWorker(const std::string &remnodename, double conn_tmout=-1)
Provide client for remote node.
SocketCommandChannel(const std::string &name, SocketServerAddon *connaddon, Command cmd)
bool fClientsAllowed
when true, incomming clients are allowed
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
std::string fRedirectDevice
name of socket device, which can get redirection
std::string GetRemoteNode(const std::string &url)
Provide string for connection to remote node.
virtual double ProcessTimeout(double last_diff)
timeout used in channel to update node hierarchy, which than can be requested from remote
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
int fNodeId
current node id
Client side of command connection between two nodes.
void SendSubmittedCommands()
Send submitted commands to remote.
bool EnsureRecvBuffer(unsigned strsize)
void ProcessRecvPacket()
Method called, when complete packet (header + raw data) is received.
virtual void OnThreadAssigned()
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
bool ExecuteCommandByItself(Command cmd)
@ stWorking
normal state when execution of different commands are done
virtual double ProcessTimeout(double last_diff)
EState fState
current state of the worker
void AddCommand(dabc::Command cmd, bool asreply=false)
Submit command to send queue, if queue is empty start sending immediately.
SocketCommandClient(Reference parent, const std::string &name, SocketAddon *addon, const std::string &hostname="", double reconnect=0.)
virtual void ProcessEvent(const EventId &)
ERecvState fRecvState
state that happens with receiver (server)
void CloseClient(bool iserr=false, const char *msg=0)
Called when connection must be closed due to the error.
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
virtual ~SocketCommandClient()
void SendCommand(dabc::Command cmd, bool asreply=false)
Send next command to the remote.
SocketCmdPacket fRecvHdr
buffer for receiving header
bool fSendingActive
indicate if currently send active
Socket addon for handling I/O events.
bool StartSend(const void *buf, unsigned size, const void *buf2=0, unsigned size2=0, const void *buf3=0, unsigned size3=0)
bool StartRecv(void *buf, size_t size)
Socket addon for handling connection requests on server side.
static std::string DefineHostName(bool force=true)
Return current host name.
static SocketClientAddon * CreateClientAddon(const std::string &servid, int dflt_port=-1)
Uniform Resource Locator interpreter.
std::string GetProtocol() const
std::string GetHostNameWithPort(int dfltport=0) const
Active object, which is working inside dabc::Thread.
virtual int PreviewCommand(Command cmd)
This method called before command will be executed.
virtual void OnThreadAssigned()
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
virtual void ProcessEvent(const EventId &)
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
virtual bool ReplyCommand(Command cmd)
Reimplement this method to react on command reply Return true if command can be destroyed by framewor...
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
std::string format(const char *fmt,...)
Event structure, exchanged between DABC threads.