26 fQueueCapacity(queuesize),
28 fSignal(SignalConfirm),
35 fDoingReconnect(false),
42 DOUT3(
"PORT %s: destructor %p queue %p", GetName(),
this, fQueue());
48 fQueueCapacity = Cfg(
xmlQueueAttr).AsInt(fQueueCapacity);
49 fMaxLoopLength = Cfg(
xmlLoopAttr).AsInt(fMaxLoopLength);
51 if (signal ==
"none") fSignal = SignalNone;
else
52 if ((signal ==
"confirm") || (signal ==
"normal")) fSignal = SignalConfirm;
else
53 if (signal ==
"oper") fSignal = SignalConfirm;
else
54 if (signal ==
"every") fSignal = SignalEvery;
58 ConfigureOnError(Cfg(
"onerror").AsStr());
67 EOUT(
"Cannot change signaling kind with connected port!!!");
79 if (!req.
null() || !force)
return req;
104 return fQueueCapacity;
123 if (fQueue.null())
return;
125 fQueue()->SetConnected(IsInput());
129 fQueueCapacity = fQueue()->Capacity();
135 fQueue.PortActivated(GetType(),
true);
140 fQueue.PortActivated(GetType(),
false);
151 DOUT3(
"Port %s cleanup inp:%s out:%s", ItemName().c_str(),
DBOOL(IsInput()),
DBOOL(IsOutput()));
166 if (fRate.GetUnits().empty())
175 switch (SignalingKind()) {
179 case SignalOperation:
180 return fMaxLoopLength ? fMaxLoopLength : QueueCapacity();
189 if (IsInput())
return fQueue.SubmitCommandTo(
false, cmd);
190 if (IsOutput())
return fQueue.SubmitCommandTo(
true, cmd);
198 if ((fReconnectPeriod>0) && ((fReconnectLimit < 0) || (fReconnectLimit-- > 0)))
return true;
200 SetDoingReconnect(
false);
202 if (fOnError ==
"none") {
204 }
else if (fOnError ==
"stop") {
207 }
else if (fOnError ==
"exit") {
208 DOUT0(
"Exit application due to error on port %s", ItemName().c_str());
210 }
else if (fOnError ==
"abort") {
211 DOUT0(
"Abort application due to error on port %s", ItemName().c_str());
213 }
else if (can_disconnect) {
227 cmd.
SetStr(
"Port", GetObject()->GetName());
228 if (GetModule().Execute(cmd) ==
cmd_true)
229 return cmd.
GetInt(
"Kind");
235 if (GetObject()==0)
return false;
237 cmd.
SetStr(
"Port", GetObject()->GetName());
238 cmd.
SetBool(
"WithErr", witherr);
239 return GetModule().Execute(cmd) ==
cmd_true;
245 if (GetObject()) name = GetObject()->GetBindName();
246 if (name.empty())
return 0;
247 return GetModule().
FindChild(name.c_str());
252 if (GetObject()==0)
return false;
254 cmd.
SetStr(
"Port", GetObject()->GetName());
255 return GetModule().Execute(cmd) ==
cmd_true;
263 if (
null() || GetModule().
null())
return req;
266 cmd.
SetStr(
"Port", GetObject()->GetName());
268 cmd.
SetBool(
"IsServer", isserver);
270 if (GetModule().Execute(cmd) ==
cmd_true)
271 req = cmd.
GetRef(
"ConnReq");
280 const std::string &name,
281 unsigned queuesize) :
291 DOUT4(
"PORT: destructor %p",
this);
297 if (!CanRecv())
return false;
308 switch (SignalingKind()) {
312 case SignalOperation:
313 return CanRecv() ? 1 : 0;
332 const std::string &name,
333 unsigned queuesize) :
343 DOUT4(
"PORT: destructor %p",
this);
351 bool res = fQueue.Send(buf);
353 if (!buf.
null() && !fQueue.null() && res)
354 EOUT(
"Should not happen queue %p buf %p", fQueue.GetObject(), buf.
GetObject());
357 EOUT(
"PORT %s fail to send buffer %u", ItemName().c_str(), buf.
GetTotalSize());
367 switch (SignalingKind()) {
371 case SignalOperation:
372 return CanSend() ? 1 : 0;
385 const std::string &name,
386 unsigned queuesize) :
397 DOUT4(
"PoolHandle: destructor %p",
this);
403 switch (SignalingKind()) {
407 case SignalOperation:
408 return (NumRequestedBuffer() > 0) ? 1 : 0;
410 return NumRequestedBuffer();
418 if (QueueCapacity()==0)
return ((
MemoryPool*)fPool())->TakeBuffer(size);
420 return TakeRequestedBuffer();
Reference on memory from memory pool.
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Represents command with its arguments.
bool SetStr(const std::string &name, const char *value)
bool SetBool(const std::string &name, bool v)
Reference GetRef(const std::string &name)
Returns reference from the command, can be called only once.
int GetInt(const std::string &name, int dflt=0) const
Interface class between xml configuration and dabc objects.
bool ReadRecordField(Object *obj, const std::string &name, RecordField *field, RecordFieldsMap *fieldsmap)
Container for connection parameters.
static const char * ObjectName()
Lock guard for posix mutex.
Base class for module items like ports, timers, pool handles.
OutputPort(Reference parent, const std::string &name, unsigned queuesize)
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
bool Send(dabc::Buffer &buf)
Parameter & SetUnits(const std::string &unit)
Set units field of parameter.
virtual unsigned NumStartEvents()
Return number of events which should be produced when async module starts.
PoolHandle(Reference parent, Reference pool, const std::string &name, unsigned queuesize)
Buffer TakeBuffer(BufferSize_t size=0)
Reference on the dabc::Port class
bool IsConnected()
Returns true if port is connected.
int GetSignalingKind()
Returns signaling method configured for the port.
PortRef GetBindPort()
Return reference on the bind port.
bool Disconnect(bool witherr=false)
Disconnect port
ConnectionRequest MakeConnReq(const std::string &url, bool isserver=false)
Create connection request to specified url.
Base class for input and output ports.
void SetRateMeter(const Parameter &ref)
Set port ratemeter - must be used from module thread.
unsigned QueueCapacity() const
Method returns actual queue capacity of the port, object mutex is used.
ConnectionRequest GetConnReq(bool force=false)
Return reference on existing request object.
Port(int kind, Reference parent, const std::string &name, unsigned queuesize)
bool SubmitCommandToTransport(Command cmd)
Submit command to connected transport.
bool SetSignaling(EventsProducing kind)
Specifies how often port event will be produced.
int GetMaxLoopLength()
Return maximum number of events, which could be processed at once.
void SetBindName(const std::string &name)
Set name of bind port - when input and output ports should use same transport.
bool TryNextReconnect(bool caused_by_error, bool can_disconnect=true)
Returns true when reconnection should be attempted.
void RemoveConnReq()
Remove connection request - it does not automatically means that port will be disconnected.
virtual void ObjectCleanup()
Inherited method, should cleanup everything.
std::string GetBindName() const
Returns name of bind port.
virtual void DoCleanup()
Called when module object is cleaned up - should release all references if any.
void SetQueue(Reference &ref)
void ReadPortConfiguration()
Should be called from constructors of derived classes to read port configuration like queue and so on...
Reference on the arbitrary object
void Release()
Releases reference on the object.
Object * GetObject() const
Return pointer on the object.
Reference FindChild(const char *name) const
Searches for child in referenced object.
bool null() const
Returns true if reference contains nullptr.
virtual void ObjectCleanup()
Central cleanup method for worker.
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
@ parCreated
produced once when parameter is created
const char * xmlNumReconnAttr
const char * xmlReconnectAttr
const char * xmlSignalAttr
const char * xmlQueueAttr