24 dabc::SocketIOAddon(fd, false, true),
35 fHasExtraRequest(false)
42 DOUT3(
"Destroy ServerOutputAddon %p",
this);
47 memset(&fServInfo, 0,
sizeof(fServInfo));
49 fServInfo.iEndian = 1;
50 fServInfo.iMaxBytes = maxbytes;
51 fServInfo.iBuffers = 1;
52 fServInfo.iStreams = isnewformat ? 0 : 1;
60 StartSend(&fServInfo,
sizeof(fServInfo));
62 memset(f_sbuf, 0,
sizeof(f_sbuf));
63 StartRecv(f_sbuf, 12);
81 DOUT4(
"Send info completed");
85 fState = oWaitingBuffer;
89 fState = oWaitingBuffer;
92 case oSendingEvents: {
96 if (evsize % 2) evsize++;
98 fSubHdr.InitFull(fSubevId);
99 fSubHdr.SetRawDataSize(evsize);
101 fEvHdr.Init(fEvCounter++);
102 fEvHdr.SetFullSize(evsize +
sizeof(fEvHdr) +
sizeof(fSubHdr));
104 StartSend(&fEvHdr,
sizeof(fEvHdr), &fSubHdr,
sizeof(fSubHdr), iter->
Event(), evsize);
108 fState = oSendingLastEvent;
113 case oSendingLastEvent:
119 fState = fHasExtraRequest ? oWaitingBuffer : oWaitingReq;
121 fState = oWaitingBuffer;
122 fHasExtraRequest =
false;
132 EOUT(
"Send complete at wrong state %d", fState);
141 if (strcmp(f_sbuf,
"CLOSE")==0) {
142 OnSocketError(0,
"get CLOSE event");
146 if (strcmp(f_sbuf,
"GETEVT")!=0)
147 EOUT(
"Wrong request string %s", f_sbuf);
149 memset(f_sbuf, 0,
sizeof(f_sbuf));
150 StartRecv(f_sbuf, 12);
151 fHasExtraRequest =
false;
156 EOUT(
"Get data request before send server info was completed");
164 fState = oWaitingBuffer;
166 case oWaitingReqBack:
168 fState = oWaitingBuffer;
171 case oSendingLastEvent:
173 fHasExtraRequest =
true;
176 EOUT(
"Get request at wrong state %d", fState);
187 EOUT(
"Didnot found OutputTransport on other side worker %p", fWorker());
209 fState = oWaitingReqBack;
219 EOUT(
"Write_Check at wrong state %d", fState);
241 unsigned rawsize = 0;
246 if (evsize % 2) evsize++;
260 fHeader.SetUsedBufferSize(sendsize);
266 fState = oSendingBuffer;
267 StartNetSend(&fHeader,
sizeof(fHeader), buf);
269 fState = oSendingEvents;
270 StartSend(&fHeader,
sizeof(fHeader));
282 case oWaitingReqBack:
283 fState = (err==0) ? oDoingClose : oError;
287 case oDoingClose:
return;
291 fState = (err==0) ? oDoingClose : oError;
299 dabc::Transport(cmd, 0, outport),
302 fSlaveQueueLength(5),
338 DOUT0(
"Create MBS server fd:%d kind:%s port:%d limit:%d blocking:%s deliverall:%s",
362 if (cmd.
IsName(
"SocketConnect")) {
364 int fd = cmd.
GetInt(
"fd", -1);
370 for (
unsigned n=0;n<NumOutputs();n++)
371 if (IsOutputConnected(n)) {
374 if (portindx<0) portindx = n;
377 if ((fClientsLimit>0) && (numconn>=fClientsLimit)) {
378 DOUT0(
"Reject connection %d, maximum number %d is achieved ", fd, numconn);
383 DOUT3(
"Get new connection request with fd %d canrecv %s", fd,
DBOOL(CanRecv()));
387 if (!fIterKind.empty()) {
390 EOUT(
"Fail to create events iterator %s", fIterKind.c_str());
403 if (portindx<0) portindx = CreateOutput(
dabc::format(
"Slave%u",NumOutputs()), fSlaveQueueLength);
407 tr()->AssignToThread(thread(),
true);
413 DOUT3(
"mbs::ServerTransport create new connection at running=%s",
DBOOL(isTransportRunning()));
418 if (cmd.
IsName(
"GetTransportStatistic")) {
421 std::vector<uint64_t> cansend;
422 for(
unsigned n=0;n<NumOutputs();n++) {
423 if (IsOutputConnected(n)) {
424 cnt++; cansend.push_back(NumCanSend(n));
428 cmd.
SetInt(
"NumClients", cnt);
429 cmd.
SetInt(
"NumCanRecv", NumCanRecv());
431 if (cnt==1) cmd.
SetField(
"NumCanSend", cansend[0]);
else
432 if (cnt>1) cmd.
SetField(
"NumCanSend", cansend);
else
436 cmd.
SetInt(
"MbsPort", fPortNum);
447 if (!CanRecv())
return false;
450 if ((NumOutputs()==0) && fBlocking )
return false;
452 bool allcansend = CanSendToAllOutputs(
true);
461 if (fDeliverAll)
return false;
463 if (!RecvQueueFull()) {
466 SignalRecvWhenFull();
478 SendToAllOutputs(buf);
481 DOUT2(
"Server transport saw EOF buffer");
484 if ((NumOutputs()==0) || !fBlocking) {
485 DOUT2(
"One could close server transport immediately");
486 CloseTransport(
false);
492 return fDoingClose == 0;
498 if (name==InputName()) {
501 DOUT2(
"mbs::ServerTransport detect new client on %s %s", name.c_str(), (on ?
"CONNECTED" :
"DISCONNECTED") );
504 ProcessInputEvent(0);
508 if (!on) FindPort(name).Disconnect();
510 if (fDoingClose == 1) {
512 for (
unsigned n=0;n<NumOutputs();n++)
513 if (IsOutputConnected(n)) isany =
true;
515 DOUT2(
"Close server transport while all clients are closed");
516 CloseTransport(
false);
Reference on memory from memory pool.
unsigned GetTypeId() const
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Represents command with its arguments.
bool SetStr(const std::string &name, const char *value)
bool SetInt(const std::string &name, int v)
int GetInt(const std::string &name, int dflt=0) const
Iterator over events in dabc::Buffer class.
virtual bool Assign(const Buffer &buf)=0
virtual BufferSize_t EventSize()=0
virtual bool NextEvent()=0
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
Reference CreateObject(const std::string &classname, const std::string &objname)
Base class for output transport implementations.
void Write_CallBack(unsigned arg)
Reference on the dabc::Port class
bool SetField(const std::string &name, const RecordField &v)
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
Socket addon for handling connection requests on server side.
static bool SetNoDelaySocket(int fd)
Reference on dabc::Transport class
virtual bool StopTransport()
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual void ProcessConnectionActivated(const std::string &name, bool on)
Method called when module on other side is started.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Uniform Resource Locator interpreter.
std::string GetOptionStr(const std::string &optname, const std::string &dflt="") const
bool HasOption(const std::string &optname) const
int GetOptionInt(const std::string &optname, int dflt=0) const
virtual void OnThreadAssigned()
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
Addon for output of server-side different kinds of MBS server
ServerOutputAddon(int fd, int kind, dabc::EventsIteratorRef &iter, uint32_t subid)
virtual unsigned Write_Buffer(dabc::Buffer &buf)
Start writing of buffer to output.
virtual void OnRecvCompleted()
Method called when receive operation is completed.
virtual double ProcessTimeout(double last_diff)
virtual void OnSocketError(int err, const std::string &info)
Generic error handler.
virtual unsigned Write_Check()
Check if output can be done.
virtual void OnThreadAssigned()
void FillServInfo(int32_t maxbytes, bool isnewformat)
void MakeCallback(unsigned sz)
virtual void OnSendCompleted()
Method called when send operation is completed.
virtual ~ServerOutputAddon()
int fKind
kind: stream or transport
uint32_t fSubevId
subevent id when non-MBS events are used
int fPortNum
used port number (only for info)
ServerTransport(dabc::Command cmd, const dabc::PortRef &outport, int kind, int portnum, dabc::SocketServerAddon *connaddon, const dabc::Url &url)
int fClientsLimit
maximum number of simultaneous clients
bool fBlocking
if true, server will block buffers until it can be delivered
bool fDeliverAll
if true, server will try deliver all events when clients are there (default for transport)
virtual bool StopTransport()
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
int fSlaveQueueLength
queue length, used for slaves connections
void ProcessConnectionActivated(const std::string &name, bool on)
Method called when module on other side is started.
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
std::string fIterKind
iterator kind when non-mbs events should be delivered to clients
virtual ~ServerTransport()
std::string format(const char *fmt,...)
const char * ServerKindToStr(int kind)