23 dabc::SocketIOAddon(fd),
34 DOUT3(
"Create mbs::ClientTransport::ClientTransport() %p fd:%d kind:%d",
this, fd, kind);
39 DOUT3(
"Destroy mbs::ClientTransport::~ClientTransport() %p",
this);
44 DOUT3(
"mbs::ClientTransport::ObjectCleanup\n");
46 if ((fState!=ioError) && (fState!=ioClosed)) {
47 strcpy(fSendBuf,
"CLOSE");
48 DoSendBuffer(fSendBuf, 12);
58 return fServInfo.iStreams==0;
63 uint32_t sz = fHeader.BufferLength();
64 if (sz <
sizeof(fHeader)) {
65 EOUT(
"Wrong buffer length %u", sz);
90 if (fServInfo.iEndian != 1) {
95 if (fServInfo.iEndian != 1) {
96 EOUT(
"Cannot correctly define server endian");
100 if ((fState != ioError) && (fServInfo.iStreams != 0) && (fServInfo.iBuffers != 1)) {
101 DOUT0(
"Number of buffers %u per stream bigger than 1", fServInfo.iBuffers);
102 DOUT0(
"This will lead to event spanning which is not optimal for DABC");
103 DOUT0(
"Set buffers number to 1 or call 'enable dabc' on mbs side");
108 if (fState != ioError) {
109 std::string info =
"";
110 if (fServInfo.iStreams > 0)
dabc::formats(info,
"streams = %u", fServInfo.iStreams);
112 DOUT0(
"Get MBS server info: %s buf_per_stream = %u, swap = %s spanning %s",
113 info.c_str(), fServInfo.iBuffers,
DBOOL(fSwapping),
DBOOL(fSpanning));
117 fPendingStart =
false;
129 if (ReadBufferSize() > (
unsigned) fServInfo.iMaxBytes) {
130 EOUT(
"Buffer size %u bigger than allowed by info record %d", ReadBufferSize(), fServInfo.iMaxBytes);
134 if (ReadBufferSize() == 0) {
136 DOUT0(
"Keep alive buffer from MBS side");
139 fState = ioWaitBuffer;
153 if (fHeader.UsedBufferSize() > 0) {
154 fState = ioComplBuffer;
158 if (IsDabcEnabledOnMbsSide()) {
159 EOUT(
"Empty buffer from mbs when dabc enabled?");
163 DOUT1(
"Keep alive buffer from MBS");
170 EOUT(
"One should not complete recv in such state %d", fState);
178 DOUT3(
"MBS client Socket close\n");
186 DOUT3(
"MBS client Socket Error\n");
198 if (fState != ioInit) {
199 EOUT(
"Get thread assigned in not-init state - check why");
203 StartRecv(&fServInfo,
sizeof(fServInfo));
214 if (fState == ioRecvInfo) {
215 EOUT(
"Did not get server info in reasonable time");
225 strcpy(fSendBuf,
"GETEVT");
226 StartSend(fSendBuf, 12);
229 StartRecv(&fHeader,
sizeof(fHeader));
230 fState = ioRecvHeader;
238 EOUT(
"Didnot found DataInputTransport on other side worker %p", fWorker());
252 if (fPendingStart)
EOUT(
"Start already pending???");
253 fPendingStart =
true;
259 EOUT(
"Get read_size at wrong state %d", fState);
269 DOUT4(
"BUFFER_START %u USED %u h_beg %u h_end %u", ReadBufferSize(), fHeader.UsedBufferSize(), fHeader.h_begin, fHeader.h_end);
271 if (fState != ioWaitBuffer) {
272 EOUT(
"Start reading at wrong place");
277 EOUT(
"Provided buffer size too small %u, required %u",
282 bool started =
false;
284 if (!fSpanBuffer.null()) {
286 if (fHeader.h_begin==0) {
287 EOUT(
"We expecting spanned buffer in the begin, but didnot get it");
293 if (extra.
GetTotalSize() < ReadBufferSize() + fSpanBuffer.GetTotalSize()) {
294 EOUT(
"Buffer size %u not enough to read %u and add spanned buffer %u", extra.
GetTotalSize(), ReadBufferSize(), fSpanBuffer.GetTotalSize());
302 started = StartRecv(extra, ReadBufferSize());
304 started = StartRecv(buf, ReadBufferSize());
309 fState = ioRecvBuffer;
318 if (fState!=ioComplBuffer) {
319 EOUT(
"Reading complete at strange place!!!");
323 unsigned read_shift = 0;
324 if (!fSpanBuffer.null()) read_shift = fSpanBuffer.GetTotalSize() -
sizeof(
mbs::Header);
330 if (read_shift>0) ptr.
shift(read_shift);
333 while (!ptr.
null()) {
341 if (!fSpanBuffer.null()) {
343 ptr.
shift(read_shift);
346 unsigned new_block_size = hdr->
FullSize();
350 DOUT4(
"Copy block %u to begin", fSpanBuffer.GetTotalSize());
356 fSpanBuffer.Release();
361 buf.
SetTotalSize(read_shift + fHeader.UsedBufferSize());
366 if (fHeader.h_end != 0) {
370 unsigned useful_sz(0), last_sz(0);
373 useful_sz += last_sz;
378 if (fSpanBuffer.null()) {
379 EOUT(
"FAIL to duplicate buffer!!!");
386 fSpanBuffer.CutFromBegin(useful_sz);
388 DOUT4(
"Left block %u from the end", fSpanBuffer.GetTotalSize());
395 DOUT0(
"EXTREME CASE - FULL BUFFER IS JUST PEACE FROM THE MIDDLE");
Reference on memory from memory pool.
Buffer Duplicate() const
Duplicates instance of Buffer with new segments list independent from source.
void CutFromBegin(BufferSize_t len)
Remove part of buffer from the beginning.
void SetTotalSize(BufferSize_t len)
Set total length of the buffer to specified value Size cannot be bigger than original size of the buf...
BufferSize_t CopyFrom(const Buffer &srcbuf, BufferSize_t len=0)
Copy content from source buffer.
BufferSize_t GetTotalSize() const
Return total size of all buffer segments.
void SetTypeId(unsigned tid)
void * SegmentPtr(unsigned n=0) const
Returns pointer on the segment, no any boundary checks.
Represents command with its arguments.
Manipulator with dabc::Buffer class.
BufferSize_t shift(BufferSize_t sz)
void setfullsize(BufferSize_t sz)
BufferSize_t rawsize() const
virtual void ProcessEvent(const EventId &)
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
virtual void OnThreadAssigned()
bool IsDabcEnabledOnMbsSide()
void MakeCallback(unsigned sz)
ClientTransport(int fd, int kind)
virtual unsigned Read_Size()
Defines required buffer size for next operation.
virtual unsigned Read_Complete(dabc::Buffer &buf)
Complete reading of the buffer from source,.
virtual void OnSocketError(int err, const std::string &info)
Generic error handler.
virtual void OnSendCompleted()
Method called when send operation is completed.
virtual unsigned Read_Start(dabc::Buffer &buf)
Prepare buffer for reading (if required)
unsigned ReadBufferSize()
virtual double ProcessTimeout(double last_diff)
mbs::TransportInfo fServInfo
virtual ~ClientTransport()
virtual void OnThreadAssigned()
virtual void OnRecvCompleted()
Method called when receive operation is completed.
virtual void ProcessEvent(const dabc::EventId &)
virtual void ObjectCleanup()
User method to cleanup object content before it will be destroyed Main motivation is to release any r...
Read iterator for MBS events/subevents.
EventHeader * evnt() const
void formats(std::string &sbuf, const char *fmt,...)
void SwapData(void *data, unsigned bytessize)
Event structure, exchanged between DABC threads.