24    dabc::SocketIOAddon(fd, false, true),
 
   35    fHasExtraRequest(false)
 
   42    DOUT3(
"Destroy ServerOutputAddon %p", 
this);
 
   47    memset(&fServInfo, 0, 
sizeof(fServInfo));
 
   49    fServInfo.iEndian = 1;              
 
   50    fServInfo.iMaxBytes = maxbytes;     
 
   51    fServInfo.iBuffers = 1;             
 
   52    fServInfo.iStreams = isnewformat ? 0 : 1;     
 
   60       StartSend(&fServInfo, 
sizeof(fServInfo));
 
   62    memset(f_sbuf, 0, 
sizeof(f_sbuf));
 
   63    StartRecv(f_sbuf, 12);
 
   81          DOUT4(
"Send info completed");
 
   85             fState = oWaitingBuffer;
 
   89          fState = oWaitingBuffer;
 
   92       case oSendingEvents: {
 
   96          if (evsize % 2) evsize++;
 
   98          fSubHdr.InitFull(fSubevId);
 
   99          fSubHdr.SetRawDataSize(evsize);
 
  101          fEvHdr.Init(fEvCounter++);
 
  102          fEvHdr.SetFullSize(evsize + 
sizeof(fEvHdr) + 
sizeof(fSubHdr));
 
  104          StartSend(&fEvHdr, 
sizeof(fEvHdr), &fSubHdr, 
sizeof(fSubHdr), iter->
Event(), evsize);
 
  108             fState = oSendingLastEvent;
 
  113       case oSendingLastEvent:
 
  119             fState = fHasExtraRequest ? oWaitingBuffer : oWaitingReq;
 
  121             fState = oWaitingBuffer;
 
  122          fHasExtraRequest = 
false;
 
  132          EOUT(
"Send complete at wrong state %d", fState);
 
  141    if (strcmp(f_sbuf, 
"CLOSE")==0) {
 
  142       OnSocketError(0, 
"get CLOSE event"); 
 
  146    if (strcmp(f_sbuf, 
"GETEVT")!=0)
 
  147      EOUT(
"Wrong request string %s", f_sbuf);
 
  149    memset(f_sbuf, 0, 
sizeof(f_sbuf));
 
  150    StartRecv(f_sbuf, 12);
 
  151    fHasExtraRequest = 
false;
 
  156          EOUT(
"Get data request before send server info was completed");
 
  164          fState = oWaitingBuffer;
 
  166       case oWaitingReqBack:
 
  168          fState = oWaitingBuffer;
 
  171       case oSendingLastEvent:
 
  173          fHasExtraRequest = 
true;
 
  176          EOUT(
"Get request at wrong state %d", fState);
 
  187       EOUT(
"Didnot found OutputTransport on other side worker %p", fWorker());
 
  209          fState = oWaitingReqBack;
 
  219          EOUT(
"Write_Check at wrong state %d", fState);
 
  241       unsigned rawsize = 0;
 
  246          if (evsize % 2) evsize++;
 
  260    fHeader.SetUsedBufferSize(sendsize);
 
  266       fState = oSendingBuffer;
 
  267       StartNetSend(&fHeader, 
sizeof(fHeader), buf);
 
  269       fState = oSendingEvents;
 
  270       StartSend(&fHeader, 
sizeof(fHeader));
 
  282       case oWaitingReqBack:
 
  283          fState = (err==0) ? oDoingClose : oError;
 
  287       case oDoingClose: 
return;
 
  291          fState = (err==0) ? oDoingClose : oError;
 
  299    dabc::Transport(cmd, 0, outport),
 
  302    fSlaveQueueLength(5),
 
  338    DOUT0(
"Create MBS server fd:%d kind:%s port:%d limit:%d blocking:%s deliverall:%s",
 
  362    if (cmd.
IsName(
"SocketConnect")) {
 
  364       int fd = cmd.
GetInt(
"fd", -1);
 
  370       for (
unsigned n=0;n<NumOutputs();n++)
 
  371          if (IsOutputConnected(n)) {
 
  374             if (portindx<0) portindx = n;
 
  377       if ((fClientsLimit>0) && (numconn>=fClientsLimit)) {
 
  378          DOUT0(
"Reject connection %d, maximum number %d is achieved ", fd, numconn);
 
  383       DOUT3(
"Get new connection request with fd %d canrecv %s", fd, 
DBOOL(CanRecv()));
 
  387       if (!fIterKind.empty()) {
 
  390             EOUT(
"Fail to create events iterator %s", fIterKind.c_str());
 
  403       if (portindx<0) portindx = CreateOutput(
dabc::format(
"Slave%u",NumOutputs()), fSlaveQueueLength);
 
  407       tr()->AssignToThread(thread(), 
true);
 
  413       DOUT3(
"mbs::ServerTransport create new connection at running=%s", 
DBOOL(isTransportRunning()));
 
  418    if (cmd.
IsName(
"GetTransportStatistic")) {
 
  421       std::vector<uint64_t> cansend;
 
  422       for(
unsigned n=0;n<NumOutputs();n++) {
 
  423          if (IsOutputConnected(n)) {
 
  424             cnt++; cansend.push_back(NumCanSend(n));
 
  428       cmd.
SetInt(
"NumClients", cnt);
 
  429       cmd.
SetInt(
"NumCanRecv", NumCanRecv());
 
  431       if (cnt==1) cmd.
SetField(
"NumCanSend", cansend[0]); 
else 
  432       if (cnt>1) cmd.
SetField(
"NumCanSend", cansend); 
else 
  436       cmd.
SetInt(
"MbsPort", fPortNum);
 
  447    if (!CanRecv()) 
return false;
 
  450    if ((NumOutputs()==0) && fBlocking ) 
return false;
 
  452    bool allcansend = CanSendToAllOutputs(
true);
 
  461       if (fDeliverAll) 
return false;
 
  463       if (!RecvQueueFull()) {
 
  466          SignalRecvWhenFull();
 
  478    SendToAllOutputs(buf);
 
  481       DOUT2(
"Server transport saw EOF buffer");
 
  484       if ((NumOutputs()==0) || !fBlocking) {
 
  485          DOUT2(
"One could close server transport immediately");
 
  486          CloseTransport(
false);
 
  492    return fDoingClose == 0;
 
  498    if (name==InputName()) {
 
  501       DOUT2(
"mbs::ServerTransport detect new client on %s %s", name.c_str(), (on ? 
"CONNECTED" : 
"DISCONNECTED") );
 
  504          ProcessInputEvent(0);
 
  508       if (!on) FindPort(name).Disconnect();
 
  510       if (fDoingClose == 1) {
 
  512          for (
unsigned n=0;n<NumOutputs();n++)
 
  513             if (IsOutputConnected(n)) isany = 
true;
 
  515             DOUT2(
"Close server transport while all clients are closed");
 
  516             CloseTransport(
false);
 
Reference on memory from memory pool.
unsigned GetTypeId() const
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
Represents command with its arguments.
bool SetStr(const std::string &name, const char *value)
bool SetInt(const std::string &name, int v)
int GetInt(const std::string &name, int dflt=0) const
Iterator over events in dabc::Buffer class.
virtual bool Assign(const Buffer &buf)=0
virtual BufferSize_t EventSize()=0
virtual bool NextEvent()=0
static int ConnectPorts(Reference port1ref, Reference port2ref, Command cmd=nullptr)
Reference CreateObject(const std::string &classname, const std::string &objname)
Base class for output transport implementations.
void Write_CallBack(unsigned arg)
Reference on the dabc::Port class
bool SetField(const std::string &name, const RecordField &v)
bool IsName(const char *name) const
Returns true if object name is the same as specified one.
bool null() const
Returns true if reference contains nullptr.
Socket addon for handling connection requests on server side.
static bool SetNoDelaySocket(int fd)
Reference on dabc::Transport class
virtual bool StopTransport()
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
virtual void ProcessConnectionActivated(const std::string &name, bool on)
Method called when module on other side is started.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Uniform Resource Locator interpreter.
std::string GetOptionStr(const std::string &optname, const std::string &dflt="") const
bool HasOption(const std::string &optname) const
int GetOptionInt(const std::string &optname, int dflt=0) const
virtual void OnThreadAssigned()
void AssignAddon(WorkerAddon *addon)
Assigns addon to the worker Should be called before worker assigned to the thread.
Addon for output of server-side different kinds of MBS server
ServerOutputAddon(int fd, int kind, dabc::EventsIteratorRef &iter, uint32_t subid)
virtual unsigned Write_Buffer(dabc::Buffer &buf)
Start writing of buffer to output.
virtual void OnRecvCompleted()
Method called when receive operation is completed.
virtual double ProcessTimeout(double last_diff)
virtual void OnSocketError(int err, const std::string &info)
Generic error handler.
virtual unsigned Write_Check()
Check if output can be done.
virtual void OnThreadAssigned()
void FillServInfo(int32_t maxbytes, bool isnewformat)
void MakeCallback(unsigned sz)
virtual void OnSendCompleted()
Method called when send operation is completed.
virtual ~ServerOutputAddon()
int fKind
kind: stream or transport
uint32_t fSubevId
subevent id when non-MBS events are used
int fPortNum
used port number (only for info)
ServerTransport(dabc::Command cmd, const dabc::PortRef &outport, int kind, int portnum, dabc::SocketServerAddon *connaddon, const dabc::Url &url)
int fClientsLimit
maximum number of simultaneous clients
bool fBlocking
if true, server will block buffers until it can be delivered
bool fDeliverAll
if true, server will try deliver all events when clients are there (default for transport)
virtual bool StopTransport()
virtual int ExecuteCommand(dabc::Command cmd)
Main method where commands are executed.
int fSlaveQueueLength
queue length, used for slaves connections
void ProcessConnectionActivated(const std::string &name, bool on)
Method called when module on other side is started.
virtual bool StartTransport()
Methods activated by Port, when transport starts/stops.
std::string fIterKind
iterator kind when non-mbs events should be delivered to clients
virtual ~ServerTransport()
std::string format(const char *fmt,...)
const char * ServerKindToStr(int kind)