#include <signal.h>
#include <unistd.h>
#include <time.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/select.h>
using namespace std;
#include "hrevbuffer.h"
ClassImp(HRevBuffer)
Int_t iTimeOut;
Int_t imySig = 0;
Int_t iOutMode = 0;
typedef struct
{
Int_t iSize;
Int_t iMode;
Int_t iIdent;
Int_t iBufRequ;
} srevComm;
typedef struct
{
Int_t iSize;
Int_t iMode;
Int_t iHeadPar;
Int_t iTimeOut;
} srevInfo;
typedef struct
{
Int_t iSize;
Int_t iBufNo;
Int_t iEvtNo;
} sptrevDummy;
static void exitCli(Int_t signal)
{
Int_t iSleep;
Int_t iSocket;
iSocket = imySig;
imySig = -1;
iSleep = iTimeOut;
printf("\n-I- user specified CTL C: ");
if (iSocket > 0)
{
if (iOutMode)
printf("close connection (socket %d), hold client for %d s\n",
iSocket, iSleep);
else printf("close connection\n");
if (iSleep) sleep(iSleep);
}
else printf("\n");
}
static Int_t rclose(Int_t *piSocket, Int_t iMode)
{
Int_t iSocket;
Int_t iRC;
Int_t iDebug = 0;
Int_t iError = 0;
Int_t iClose = 1;
Char_t cModule[32] = "rclose";
Char_t cMsg[128] = "";
if (iMode < 0)
{
iMode = -iMode;
iClose = 0;
}
if ( (iMode < 0) || (iMode > 3) )
{
if (iClose == 0) iMode = -iMode;
printf("-E- %s: invalid shutdown mode: %d\n", cModule, iMode);
iError = 2;
}
iSocket = *piSocket;
if (iSocket > 0)
{
if (iMode < 3)
{
iRC = shutdown(iSocket, iMode);
if (iRC)
{
sprintf(cMsg, "-E- %s: shutdown(%d) rc = %d",
cModule, iMode, iRC);
perror(cMsg);
iError = -1;
}
else if (iDebug)
printf(" %s: shutdown(%d) successfull\n",
cModule, iMode);
}
if (iClose)
{
iRC = close(iSocket);
if (iRC)
{
sprintf(cMsg, "-E- %s: close rc = %d", cModule, iRC);
perror(cMsg);
iError = -2;
}
else if (iDebug)
printf(" %s: connection closed\n", cModule);
}
}
else
{
printf("-E- %s: invalid socket: %d\n", cModule, iSocket);
iError = 1;
}
*piSocket = 0;
return(iError);
}
static Long_t swaplw(Long_t *pp_source, Long_t l_len, Long_t* pp_dest)
{
UChar_t *p_source, *p_dest, *p_s, *p_d;
ULong_t lu_save;
p_source = (UChar_t *) pp_source;
p_dest = (UChar_t *) pp_dest;
if ( !p_dest)
{
for (p_d = (UChar_t *) p_source,
p_s = (UChar_t *) &lu_save;
p_d < p_source + (l_len * 4);
)
{
lu_save = *( (Long_t *) p_d);
p_s += 4;
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
}
} else{
for (p_s = (UChar_t *) p_source,
p_d = (UChar_t *) p_dest;
p_s < p_source + (l_len * 4);
p_s += 4)
{
p_s += 4;
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
}
}
return(1);
}
HRevBuffer::HRevBuffer(Int_t iMode)
{
iSwap = 0;
iSocket = 0;
iEvtPar = 0;
iBufSize = 0;
signal(SIGINT, &exitCli);
iDebug = iMode;
iOutMode = iMode;
if (iDebug == 1)
cout << "-I- client runs in debug mode (1)" << endl;
else if (iDebug == 2)
cout << "-I- client shows buffer numbers and select/receive (mode 2)"
<< endl;
else if (iDebug == 3)
cout << "-I- client shows buffer numbers (mode 3)" << endl;
iBufSizeAlloc = 32768;
piBuf = new Int_t [iBufSizeAlloc/sizeof(int)];
}
HRevBuffer::~HRevBuffer()
{
delete [] piBuf;
piNextEvt = 0;
}
TSocket *HRevBuffer::RevOpen (const Char_t *pNode, Int_t iPort, Int_t iEvent)
{
if (iEvent < 0)
{
cout << "-E- number of requested events (" << iEvent
<< ") invalid" << endl;
return(0);
}
iEvtMax = iEvent;
iEvtNo = 0;
iBufNo = 0;
iBufNo1 = 0;
iBufNo2 = 0;
if (!iSocket)
{
iEvtNo = -1;
if (iPort == 0) iPort = 7031;
cout << "-I- open connection to server " << pNode
<< ":" << iPort << endl;
pTSocket = new TSocket(pNode, iPort);
if ( !pTSocket->IsValid() )
{
cout << "-E- open connection to server " << pNode
<< " failed" << endl;
return(0);
}
cout << " connection to server " << pNode
<< ":" << iPort << " okay" << endl;
iSocket = pTSocket->GetDescriptor();
imySig = iSocket;
if (iDebug == 1)
cout << " new socket " << iSocket << endl;
}
else if (iDebug == 1)
cout << "-D- still socket " << iSocket << endl;
return(pTSocket);
}
UInt_t *HRevBuffer::RevGet(TSocket *pSocket, Int_t iFlush)
{
Int_t iint = sizeof(int);
Int_t ibyte, ilen, inew = 0;
Int_t imaxSE = 20;
Int_t indSize[20];
Int_t ioff=0, irem, ind, ii, jj;
Char_t cMsg[128] = "";
Char_t *pcBuf;
Int_t iSize, iRC;
Long_t lRC;
Int_t iError = 0;
Int_t iRetry;
Int_t iRetryMax = 1000;
Int_t iRetryFirst;
Int_t iRetryRecv = 0;
Int_t iRetryRecvLim = 1;
Int_t *piComm;
srevComm sComm;
Int_t iCommSize = sizeof(sComm);
Int_t *piInfo;
srevInfo sInfo;
Int_t iInfoSize = sizeof(sInfo);
Int_t iBufNoServ;
if (iEvtNo >= iEvtMax) goto gEndGet;
piComm = &(sComm.iSize);
sComm.iSize = htonl(iCommSize-iint);
sComm.iMode = htonl(1);
sComm.iIdent = 1;
sComm.iBufRequ = htonl(1);
if (iEvtNo == -1)
{
if (iDebug == 1)
cout << "-D- commbuf (data size " << ntohl(sComm.iSize)
<< " byte): mode(1) " << ntohl(sComm.iMode)
<< ", request " << ntohl(sComm.iBufRequ)
<< " event buffer(s)" << endl;
ilen = pSocket->SendRaw(piComm, iCommSize, kDefault);
if (ilen < 0)
{
cout << "-E- sending request for events to server, rc = "
<< ilen << endl;
iError = 1;
goto gEndGet;
}
if (iDebug == 1)
cout << " communication buffer sent (request info buffer) "
<< endl;
piInfo = &(sInfo.iSize);
ilen = pSocket->RecvRaw(piInfo, iInfoSize, kDefault);
if (ilen < 0)
{
cout << "-E- receiving info buffer from server, rc = "
<< ilen << endl;
iError = 1;
goto gEndGet;
}
iHeadPar = ntohl(sInfo.iHeadPar);
iTimeOut = ntohl(sInfo.iTimeOut);
if (iDebug == 1)
{
cout << "-D- info buffer received:" << endl;
cout << " size data " << ntohl(sInfo.iSize)
<< ", mode (1) " << ntohl(sInfo.iMode)
<< ", header parms " << iHeadPar
<< ", timeout " << iTimeOut << endl;
}
if ( (ntohl(sInfo.iMode) != 1) ||
( (int) ntohl(sInfo.iSize) != iInfoSize-iint) )
{
cout << "-E- invalid info buffer received: " << endl;
cout << " size data ( " << iInfoSize-iint
<< ") " << ntohl(sInfo.iSize)
<< ", mode (1) " << ntohl(sInfo.iMode)
<< ", header parms " << iHeadPar
<< ", timeout " << iTimeOut << endl;
iError = 1;
goto gEndGet;
}
iEvtNo = 0;
inew = 1;
}
else
{
if (iFlush)
{
inew = 1;
if (iDebug == 1)
cout << "-D- skip current buffer" << endl;
}
else
{
if (iEvtNo >= 0)
{
if (iEvtRel >= iEvtBuf)
{
if (iDebug == 1)
cout << " request new buffer" << endl;
inew = 1;
}
else inew = 0;
}
}
}
if (inew)
{
gRetryBuf:
iEvtRel = 0;
iRetry = 0;
iRetryFirst = 1;
if (imySig == -1)
sComm.iBufRequ = htonl(0);
ilen = pSocket->SendRaw(piComm, iCommSize, kDefault);
if (ilen < 0)
{
cout << "-E- sending request for buffer " << iBufNo+1
<< " to server, rc = " << ilen << endl;
iError = 1;
goto gEndGet;
}
if (imySig == -1) goto gEndGet;
if (iDebug == 1)
cout << "-D- communication buffer sent (request next buffer) "
<< endl;
gRetryLen:
piBuf[0] = -1;
iSize = iint;
pcBuf = (Char_t *) piBuf;
while(iSize > 0)
{
if ( (imySig == -1) && (iDebug) )
cout << " CTL C detected (before recv len)" << endl;
gNextRecvL:
iRC = recv(iSocket, pcBuf, iSize, 0);
if (iRC < 0)
{
if (imySig == -1)
{
if (iDebug)
{
sprintf(cMsg, "\n-E- receiving data length from server");
perror(cMsg);
cout << " CTL C detected (during recv len)" << endl;
}
goto gNextRecvL;
}
else
{
sprintf(cMsg, "\n-E- receiving data length from server");
perror(cMsg);
if (iDebug) cout << " retry" << endl;
iRetryRecv++;
if (iRetryRecv > iRetryRecvLim)
{
iError = 1;
goto gEndGet;
}
else goto gNextRecvL;
}
}
if ( iRC == 0 )
{
if ( (iDebug == 2) || (iDebug == 3) )
cout << endl;
cout << "-E- receiving data length: connection closed by server"
<< endl;
iError = 1;
goto gEndGet;
}
iRetryRecv = 0;
iSize -= iRC;
pcBuf += iRC;
}
if (iDebug == 2)
{
printf("Rl:");
fflush(stdout);
}
if ( (imySig == -1) && (iDebug) )
cout << " CTL C detected (after recv len)" << endl;
iBufSize = ntohl(piBuf[0]);
if (iDebug == 1)
cout << " data size received: " << iBufSize << endl;
if (iBufSize <= 0)
{
if (iBufSize == 0)
{
if (iDebug)
cout << endl;
cout << "-W- server closed connection" << endl;
cout << " " << iEvtNo << " of " << iEvtMax
<< " events received" << endl;
iError = 1;
goto gEndGet;
}
if (iBufSize == -1)
{
if (iRetryFirst)
{
cout << endl << "-E- no data length received: ";
iRetryFirst = 0;
}
iRetry++;
if (iRetry > iRetryMax)
{
cout << iRetryMax << "times" << endl;
iError = 1;
goto gEndGet;
}
goto gRetryLen;
}
else
{
cout << endl << "-E- invalid data length received: "
<< iBufSize << endl;
iError = 1;
}
goto gEndGet;
}
if (iRetry)
cout << iRetry << "times" << endl;
if (iBufSize+iint > iBufSizeAlloc)
{
delete [] piBuf;
iBufSizeAlloc = iBufSize+iint;
piBuf = new Int_t [iBufSizeAlloc/iint];
piBuf[0] = iBufSize;
if (iDebug == 1)
cout << "-I- total buffer increased to "
<< iBufSizeAlloc << " bytes" << endl;
}
piBuf[1] = -1;
iSize = iBufSize;
pcBuf = (Char_t *) &(piBuf[1]);
while(iSize > 0)
{
if ( (imySig == -1) && (iDebug) )
cout << " CTL C detected (before recv data)" << endl;
gNextRecvD:
iRC = recv(iSocket, pcBuf, iSize, 0);
if (iRC < 0)
{
if (imySig == -1)
{
if (iDebug)
{
sprintf(cMsg, "\n-E- receiving data from server");
perror(cMsg);
cout << " CTL C detected (during recv data)" << endl;
}
goto gNextRecvD;
}
else
{
sprintf(cMsg, "\n-E- receiving data from server");
perror(cMsg);
iRetryRecv++;
if (iRetryRecv > iRetryRecvLim)
{
iError = 1;
goto gEndGet;
}
else goto gNextRecvD;
}
}
if ( iRC == 0 )
{
if ( (iDebug == 2) || (iDebug == 3) )
cout << endl;
cout << "-E- receiving data: connection closed by server"
<< endl;
iError = 1;
goto gEndGet;
}
iRetryRecv = 0;
iSize -= iRC;
pcBuf += iRC;
}
if (iDebug == 2)
{
printf("Rd:");
fflush(stdout);
}
if (imySig == -1)
{
if (iDebug)
cout << " CTL C detected (after recv data)" << endl;
goto gEndGet;
}
iBufNoServ = ntohl(piBuf[1]);
iEvtBuf = ntohl(piBuf[2]);
if (iEvtBuf == 0)
{
if (iDebug == 1)
printf(" dummy buffer no. %d, %d events\n",
iBufNoServ, iEvtBuf);
printf("*** connection to remote event server okay, but currently no DAQ events (%d)\n",
iBufNoServ);
goto gRetryBuf;
}
if (!iSwap)
{
if ( (piBuf[3] < 1) || (piBuf[3] >= iBufSize) )
iSwap = 1;
}
if (iSwap)
{
lRC = swaplw( (Long_t*)(&piBuf[1+iHeadPar]), (iBufSize/iint)-iHeadPar, 0);
if ( (iBufNo == 0) && (iDebug) )
cout << " Event data swapped" << endl;
}
iBufNo++;
if (iEvtNo == 0)
{
iBufNo = 1;
iBufNo1 = iBufNoServ;
}
iBufNo2 = iBufNoServ;
if (iDebug >= 2)
{
printf("%d:", iBufNoServ);
fflush(stdout);
}
if (iDebug == 1)
{
cout << endl << "buffer " << iBufNo
<< " (" << iBufNoServ << "): "
<< " size "
<< iBufSize << " byte" << endl;
}
iEvtRel = 1;
piNextEvt = piBuf + iHeadPar+1;
}
else
{
iEvtRel++;
piNextEvt += iEvtPar;
if (iEvtNo == 0)
{
iBufNo = 1;
iBufNoServ = ntohl(piBuf[1]);
iBufNo1 = iBufNoServ;
}
}
iEvtNo++;
ibyte = (piNextEvt[0]/8)*8;
if ( ibyte < piNextEvt[0] ) ibyte += 8;
iEvtPar = ibyte/iint;
if (iDebug == 1)
{
cout << " event no. " << iEvtNo << " (" << piNextEvt[3]
<< "): id " << piNextEvt[2] << ", size " << piNextEvt[0]
<< endl;
ii = 0;
indSize[ii] = 0;
irem = piNextEvt[0] - 32;
while ( (irem > 0) && (ii < imaxSE) )
{
ii++;
if (ii == 1) ioff = 8;
else ioff += ibyte/iint;
indSize[ii] = ioff;
ibyte = (piNextEvt[ioff]/8)*8;
if ( ibyte < piNextEvt[ioff] )
ibyte += 8;
if (irem <= piNextEvt[ioff])
irem -= piNextEvt[ioff];
else irem -= ibyte;
cout << " subevent " << ii << ": size "
<< piNextEvt[ioff];
if (irem <= 0) cout << endl;
else cout << ", padded " << ibyte
<< ", remainder " << irem << endl;
if (ii == imaxSE-1)
cout << "-W- only " << ii << " subevents tested" << endl;
}
if (iDebug == 2) for (jj=1; jj<=ii; jj++)
{
ind = indSize[jj];
cout << " subevent " << jj << ": size " << piNextEvt[ind]
<< " (index " << ind << ")" << endl;
}
}
if (imySig == -1)
{
cout << endl << "-D- CTL C specified";
if (iDebug) cout << " (at end RevGet)" << endl;
else cout << endl;
goto gEndGet;
}
if (iEvtNo == iEvtMax)
{
cout << endl << "-I- all required events ("
<< iEvtMax << ") received: " << iBufNo << " buffers ("
<< iBufNo1 << " - " << iBufNo2 << ")" << endl;
}
return( (UInt_t *) piNextEvt);
gEndGet:
if ( (iError) || (imySig == -1) )
{
if (iDebug)
cout << " RevGet: closing connection to server";
iRC = rclose(&iSocket, 2);
if ( (iDebug) && (iRC == 0) )
cout << " - done" << endl;
imySig = 0;
}
else if (iDebug)
cout << " RevGet: keeping connection to server" << endl;
return 0 ;
}
Int_t HRevBuffer::RevBufsize()
{
return iBufSize;
}
void HRevBuffer::RevBufWait( Int_t iWait )
{
if (iWait > 0) sleep(iWait);
}
void HRevBuffer::RevClose( TSocket *pSocket )
{
Int_t iRC;
Int_t *piComm;
srevComm sComm;
Int_t iCommSize = sizeof(sComm);
if (imySig < 0) return;
if (iSocket == 0) return;
piComm = &(sComm.iSize);
sComm.iSize = htonl(iCommSize-sizeof(int));
sComm.iMode = htonl(1);
sComm.iIdent = 1;
sComm.iBufRequ = htonl(0);
if (iDebug == 1)
cout << "-D- send communication buffer (close request, size data "
<< ntohl(sComm.iSize) << " byte): "
<< ntohl(sComm.iMode) << ", "
<< ntohl(sComm.iBufRequ) << endl;
iRC = pSocket->SendRaw(piComm, iCommSize, kDefault);
if (iRC < 0)
cout << "-E- sending close request to server, rc = "
<< iRC << endl;
else if (iDebug == 1)
cout << " close request sent" << endl;
if (iDebug)
cout << " RevClose: closing connection to server";
iRC = rclose(&iSocket, 2);
if ( (iDebug) && (iRC == 0) )
cout << " - done" << endl;
imySig = 0;
cout << "-I- connection to server closed" << endl;
}
Last change: Sat May 22 13:08:05 2010
Last generated: 2010-05-22 13:08
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.