23    dabc::SocketIOAddon(fd),
 
   34    DOUT3(
"Create mbs::ClientTransport::ClientTransport() %p fd:%d kind:%d", 
this, fd, kind);
 
   39    DOUT3(
"Destroy mbs::ClientTransport::~ClientTransport() %p", 
this);
 
   44    DOUT3(
"mbs::ClientTransport::ObjectCleanup\n");
 
   46    if ((fState!=ioError) && (fState!=ioClosed)) {
 
   47       strcpy(fSendBuf, 
"CLOSE");
 
   48       DoSendBuffer(fSendBuf, 12);
 
   58    return fServInfo.iStreams==0;
 
   63    uint32_t sz = fHeader.BufferLength();
 
   64    if (sz < 
sizeof(fHeader)) {
 
   65       EOUT(
"Wrong buffer length %u", sz);
 
   90          if (fServInfo.iEndian != 1) {
 
   95          if (fServInfo.iEndian != 1) {
 
   96             EOUT(
"Cannot correctly define server endian");
 
  100          if ((fState != ioError) && (fServInfo.iStreams != 0) && (fServInfo.iBuffers != 1)) {
 
  101             DOUT0(
"Number of buffers %u per stream bigger than 1", fServInfo.iBuffers);
 
  102             DOUT0(
"This will lead to event spanning which is not optimal for DABC");
 
  103             DOUT0(
"Set buffers number to 1 or call 'enable dabc' on mbs side");
 
  108          if (fState != ioError) {
 
  109             std::string info = 
"";
 
  110             if (fServInfo.iStreams > 0) 
dabc::formats(info, 
"streams = %u", fServInfo.iStreams);
 
  112             DOUT0(
"Get MBS server info: %s buf_per_stream = %u, swap = %s spanning %s",
 
  113                   info.c_str(), fServInfo.iBuffers, 
DBOOL(fSwapping), 
DBOOL(fSpanning));
 
  117             fPendingStart = 
false;
 
  129          if (ReadBufferSize() > (
unsigned) fServInfo.iMaxBytes) {
 
  130             EOUT(
"Buffer size %u bigger than allowed by info record %d", ReadBufferSize(), fServInfo.iMaxBytes);
 
  134          if (ReadBufferSize() == 0) {
 
  136             DOUT0(
"Keep alive buffer from MBS side");
 
  139             fState = ioWaitBuffer;
 
  153          if (fHeader.UsedBufferSize() > 0) {
 
  154             fState = ioComplBuffer;
 
  158          if (IsDabcEnabledOnMbsSide()) {
 
  159             EOUT(
"Empty buffer from mbs when dabc enabled?");
 
  163             DOUT1(
"Keep alive buffer from MBS");
 
  170          EOUT(
"One should not complete recv in such state %d", fState);
 
  178       DOUT3(
"MBS client Socket close\n");
 
  186       DOUT3(
"MBS client Socket Error\n");
 
  198    if (fState != ioInit) {
 
  199       EOUT(
"Get thread assigned in not-init state - check why");
 
  203    StartRecv(&fServInfo, 
sizeof(fServInfo));
 
  214    if (fState == ioRecvInfo) {
 
  215       EOUT(
"Did not get server info in reasonable time");
 
  225       strcpy(fSendBuf, 
"GETEVT");
 
  226       StartSend(fSendBuf, 12);
 
  229    StartRecv(&fHeader, 
sizeof(fHeader));
 
  230    fState = ioRecvHeader;
 
  238       EOUT(
"Didnot found DataInputTransport on other side worker %p", fWorker());
 
  252          if (fPendingStart) 
EOUT(
"Start already pending???");
 
  253          fPendingStart = 
true;
 
  259          EOUT(
"Get read_size at wrong state %d", fState);
 
  269    DOUT4(
"BUFFER_START %u USED %u h_beg %u h_end %u", ReadBufferSize(), fHeader.UsedBufferSize(), fHeader.h_begin, fHeader.h_end);
 
  271    if (fState != ioWaitBuffer) {
 
  272       EOUT(
"Start reading at wrong place");
 
  277       EOUT(
"Provided buffer size too small %u, required %u",
 
  282    bool started = 
false;
 
  284    if (!fSpanBuffer.null()) {
 
  286       if (fHeader.h_begin==0) {
 
  287          EOUT(
"We expecting spanned buffer in the begin, but didnot get it");
 
  293       if (extra.
GetTotalSize() < ReadBufferSize() + fSpanBuffer.GetTotalSize()) {
 
  294          EOUT(
"Buffer size %u not enough to read %u and add spanned buffer %u", extra.
GetTotalSize(), ReadBufferSize(), fSpanBuffer.GetTotalSize());
 
  302       started = StartRecv(extra, ReadBufferSize());
 
  304       started = StartRecv(buf, ReadBufferSize());
 
  309    fState = ioRecvBuffer;
 
  318    if (fState!=ioComplBuffer) {
 
  319       EOUT(
"Reading complete at strange place!!!");
 
  323    unsigned read_shift = 0;
 
  324    if (!fSpanBuffer.null()) read_shift = fSpanBuffer.GetTotalSize() - 
sizeof(
mbs::Header);
 
  330       if (read_shift>0) ptr.
shift(read_shift);
 
  333       while (!ptr.
null()) {
 
  341    if (!fSpanBuffer.null()) {
 
  343       ptr.
shift(read_shift);
 
  346       unsigned new_block_size = hdr->
FullSize();
 
  350       DOUT4(
"Copy block %u to begin", fSpanBuffer.GetTotalSize());
 
  356       fSpanBuffer.Release();
 
  361    buf.
SetTotalSize(read_shift + fHeader.UsedBufferSize());
 
  366       if (fHeader.h_end != 0) {
 
  370          unsigned useful_sz(0), last_sz(0);
 
  373             useful_sz += last_sz;
 
  378          if (fSpanBuffer.null()) {
 
  379             EOUT(
"FAIL to duplicate buffer!!!");
 
  386          fSpanBuffer.CutFromBegin(useful_sz);
 
  388          DOUT4(
"Left block %u from the end", fSpanBuffer.GetTotalSize());
 
  395       DOUT0(
"EXTREME CASE - FULL BUFFER IS JUST PEACE FROM THE MIDDLE");
 
Reference on memory from memory pool.
 
Buffer Duplicate() const
Duplicates instance of Buffer with new segments list independent from source.
 
void CutFromBegin(BufferSize_t len)
Remove part of buffer from the beginning.
 
void SetTotalSize(BufferSize_t len)
Set total length of the buffer to specified value Size cannot be bigger than original size of the buf...
 
BufferSize_t CopyFrom(const Buffer &srcbuf, BufferSize_t len=0)
Copy content from source buffer.
 
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
 
void SetTypeId(unsigned tid)
 
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
 
Represents command with its arguments.
 
Manipulator with dabc::Buffer class.
 
BufferSize_t shift(BufferSize_t sz)
 
void setfullsize(BufferSize_t sz)
 
BufferSize_t rawsize() const
 
virtual void ProcessEvent(const EventId &)
 
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
 
virtual void OnThreadAssigned()
 
bool IsDabcEnabledOnMbsSide()
 
void MakeCallback(unsigned sz)
 
ClientTransport(int fd, int kind)
 
virtual unsigned Read_Size()
Defines required buffer size for next operation.
 
virtual unsigned Read_Complete(dabc::Buffer &buf)
Complete reading of the buffer from source,.
 
virtual void OnSocketError(int err, const std::string &info)
Generic error handler.
 
virtual void OnSendCompleted()
Method called when send operation is completed.
 
virtual unsigned Read_Start(dabc::Buffer &buf)
Prepare buffer for reading (if required)
 
unsigned ReadBufferSize()
 
virtual double ProcessTimeout(double last_diff)
 
mbs::TransportInfo fServInfo
 
virtual ~ClientTransport()
 
virtual void OnThreadAssigned()
 
virtual void OnRecvCompleted()
Method called when receive operation is completed.
 
virtual void ProcessEvent(const dabc::EventId &)
 
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
 
Read iterator for MBS events/subevents.
 
EventHeader * evnt() const
 
void formats(std::string &sbuf, const char *fmt,...)
 
void SwapData(void *data, unsigned bytessize)
 
Event structure, exchanged between DABC threads.