24 fWithMutex(withmutex),
34 fBlockWhenUnconnected(false),
35 fBlockWhenConnected(true)
39 DOUT3(
"Create buffers queue %p",
this);
47 EOUT(
"Queue was not correctly disconnected %u", fConnected);
49 if (fQueue.Size() != 0) {
63 if (buf.
null())
return true;
84 EOUT(
"Buffer ref cnt %d bigger than 1, which means extra buffer instance inside thread", buf.
NumReferences());
91 if (fQueue.Full() && !((fConnected == MaskConn) ? fBlockWhenConnected : fBlockWhenUnconnected)) {
92 fQueue.PopBuffer(skipbuf);
95 if (!fQueue.PushBuffer(buf)) {
96 EOUT(
"Not able to push buffer into the %s -> %s queue, mutex: %s skipped: %s, queuefull: %s %u %s, connected: %s, blflags: %s %s",
97 fOut.ItemName().c_str(), fInp.ItemName().c_str(),
DBOOL(QueueMutex()!=0),
DBOOL(!skipbuf.
null()),
DBOOL(fQueue.Full()), fQueue.Size(), fQueue.Capacity(),
98 DBOOL(fConnected == MaskConn),
DBOOL(fBlockWhenConnected),
DBOOL(fBlockWhenUnconnected));
101 if (!buf.
null()) {
EOUT(
"Something went wrong - buffer is not null here"); exit(3); }
103 if (fSignalOut==2) fSignalOut = 3;
108 if (fConnected & MaskInp)
109 switch (fInpSignKind) {
113 if (fSignalInp == 3) { makesig =
true; fSignalInp = 1; }
117 if (fSignalInp == 3) { makesig =
true; fSignalInp = 2; }
149 if (!buf.
null()) {
EOUT(
"AAAAAAAAAA"); exit(432); }
151 fQueue.PopBuffer(buf);
153 if (fSignalInp == 2) fSignalInp = 3;
157 switch (fOutSignKind) {
162 if (fSignalOut == 3) { makesig =
true; fSignalOut = 1; }
166 if (fSignalOut == 3) { makesig =
true; fSignalOut = 2; }
200 unsigned id(0), evnt(0);
210 if (fSignalInp == 2) fSignalInp = 3;
214 switch (fOutSignKind) {
219 if (fSignalOut == 3) { makesig =
true; fSignalOut = 1; }
223 if (fSignalOut == 3) { makesig =
true; fSignalOut = 2; }
234 DOUT3(
"Producing output signal from DummyRecv");
255 if (fromoutputport) {
262 fSignalOut = fQueue.Full() ? 3 : 2;
270 fSignalInp = fQueue.Empty() ? 3 : 2;
303 fConnected = fConnected & ~(isinp ? MaskInp : MaskOut);
304 if (fConnected == 0) cleanup =
true;
307 DOUT3(
"Queue %p disconnected witherr %s isinp %s conn %u m1:%s m2:%s",
this,
DBOOL(witherr),
DBOOL(isinp), fConnected, m1.
GetName(), m2.
GetName());
318 DOUT3(
"Perform queue %p cleanup by disconnect",
this);
326 fQueue.Cleanup(QueueMutex());
359 std::string blocking = port_out.
Cfg(
"blocking", cmd).
AsStr();
360 if (blocking.empty()) blocking = port_inp.
Cfg(
"blocking").
AsStr(
"connected");
370 if (!q_out.
null() && !q_inp.
null()) {
371 EOUT(
"Both ports have existing queues - should not happen");
376 bool withmutex(
true), assign_out(
true), assign_inp(
true);
379 DOUT3(
"!!!! Can create queue without mutex !!!");
390 if (withmutex) q()->EnableMutex();
393 DOUT3(
"REUSE queue of output port %s", port_out.
ItemName().c_str());
397 if (withmutex) q()->EnableMutex();
399 DOUT3(
"REUSE queue of input port %s", port_inp.
ItemName().c_str());
404 if (blocking ==
"disconnected") {
405 q()->fBlockWhenUnconnected =
true;
406 q()->fBlockWhenConnected =
false;
408 if (blocking ==
"never") {
409 DOUT0(
"Never block output port %s", port_out.
ItemName().c_str());
410 q()->fBlockWhenUnconnected =
false;
411 q()->fBlockWhenConnected =
false;
413 if (blocking ==
"always") {
414 q()->fBlockWhenUnconnected =
true;
415 q()->fBlockWhenConnected =
true;
417 q()->fBlockWhenUnconnected =
false;
418 q()->fBlockWhenConnected =
true;
423 q()->fInpId = port_inp.
ItemId();
429 q()->fOutId = port_out.
ItemId();
457 if (m1running && !m2running)
460 if (!m1running && m2running)
Reference on memory from memory pool.
Represents command with its arguments.
bool SetStr(const std::string &name, const char *value)
bool SetRef(const std::string &name, Reference ref)
Set reference to the command.
Reference on the dabc::LocalTransport
Transport between two ports on the same node
void Disconnect(bool isinp, bool witherr=false)
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
virtual ~LocalTransport()
void ConfirmEvent(bool isoutput)
LocalTransport(unsigned capacity, bool withmutex)
void PortActivated(int itemkind, bool on)
Lock guard for posix mutex.
WorkerRef GetModule() const
Reference on dabc::Module class
Base class for most of the DABC classes.
void SetFlag(unsigned fl, bool on=true)
Change value of selected flag, not thread safe
@ flAutoDestroy
object will be automatically destroyed when no references exists, normally set in constructor,...
Reference on the dabc::Port class
int GetSignalingKind()
Returns signaling method configured for the port.
unsigned QueueCapacity() const
Returns queue capacity of the port.
std::string AsStr(const std::string &dflt="") const
Reference on the arbitrary object
void Release()
Releases reference on the object.
const char * GetName() const
Return name of referenced object, if object not assigned, returns "---".
unsigned NumReferences() const
Returns number of references on the object.
bool null() const
Returns true if reference contains nullptr.
std::string ItemName(bool compact=true) const
Produce string, which can be used as name argument in dabc::mgr.FindItem(name) call.
Reference on dabc::Worker
RecordField Cfg(const std::string &name, Command cmd=nullptr) const
Returns configuration record of specified name.
bool FireEvent(uint16_t evid, uint32_t arg)
bool Execute(Command cmd, double tmout=-1.)
bool IsSameThread(const WorkerRef &ref)
Returns true if two workers share same thread.