//_HADES_CLASS_DESCRIPTION
//////////////////////////////////////////////////////////////////////
// Copyright:
// GSI, Gesellschaft fuer Schwerionenforschung mbH
// Planckstr. 1
// D-64291 Darmstadt
// Germany
//////////////////////////////////////////////////////////////////////
// HRevBuffer.cc
// ROOT client package for multithreaded Hades remote event server
// created 14. 5.1999 by Horst Goeringer
//////////////////////////////////////////////////////////////////////
//
// Imported into Hydra as hrevbuffer.cc and hrevbuffer.h by
// Simon Lang, 08.12.05
//
//////////////////////////////////////////////////////////////////////
#include <signal.h>
#include <unistd.h>
#include <time.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/select.h>
using namespace std;
#include "hrevbuffer.h" // class definition
ClassImp(HRevBuffer)
int iTimeOut; // needed in exitCli to handle CTL C
int imySig = 0; // needed in exitCli to handle CTL C
int iOutMode = 0; // needed in exitCli to handle CTL C
/* client requests events from server */
typedef struct
{
int iSize; /* size of comm. buffer without this element (bytes) */
int iMode; /* must be 1: client requests data */
int iIdent;/* =1: NOT IN NET REPR. to identify client endian type */
int iBufRequ; /* number of buffers requested */
} srevComm;
/* server sends info to client */
typedef struct
{
int iSize; /* size of info buffer without this element (bytes) */
int iMode; /* must be 1: server sends info */
/* else: client tries swap */
int iHeadPar; /* no. of header parms in buffer */
int iTimeOut; /* wait time client after CTL C */
} srevInfo;
/* server sends dummy buffer to client, if no DAQ events available */
typedef struct
{
int iSize; /* size of data following */
int iBufNo; /* current buffer number */
int iEvtNo; /* no. of events in buffer (= 0) */
} sptrevDummy;
static void exitCli(int signal) /* handles Ctl C */
{
int iSleep; /* sleep time here */
int iSocket;
iSocket = imySig;
imySig = -1;
iSleep = iTimeOut;
printf("\n-I- user specified CTL C: ");
if (iSocket > 0)
{
if (iOutMode)
printf("close connection (socket %d), hold client for %d s\n",
iSocket, iSleep);
else printf("close connection\n");
if (iSleep) sleep(iSleep);
}
else printf("\n");
}
static int rclose(int *piSocket, int iMode)
{
int iSocket;
int iRC;
int iDebug = 0;
int iError = 0;
int iClose = 1;
char cModule[32] = "rclose";
char cMsg[128] = "";
if (iMode < 0)
{
iMode = -iMode;
iClose = 0; /* no shutdown, only close */
}
if ( (iMode < 0) || (iMode > 3) )
{
if (iClose == 0) iMode = -iMode;
printf("-E- %s: invalid shutdown mode: %d\n", cModule, iMode);
iError = 2;
}
iSocket = *piSocket;
if (iSocket > 0)
{
if (iMode < 3)
{
iRC = shutdown(iSocket, iMode);
if (iRC)
{
sprintf(cMsg, "-E- %s: shutdown(%d) rc = %d",
cModule, iMode, iRC);
perror(cMsg);
iError = -1;
}
else if (iDebug)
printf(" %s: shutdown(%d) successfull\n",
cModule, iMode);
}
if (iClose)
{
iRC = close(iSocket);
if (iRC)
{
sprintf(cMsg, "-E- %s: close rc = %d", cModule, iRC);
perror(cMsg);
iError = -2;
}
else if (iDebug)
printf(" %s: connection closed\n", cModule);
} /* (iClose) */
} /* (iSocket > 0) */
else
{
printf("-E- %s: invalid socket: %d\n", cModule, iSocket);
iError = 1;
}
*piSocket = 0;
return(iError);
}
static long swaplw(long *pp_source, long l_len, long* pp_dest)
{
char unsigned *p_source, *p_dest, *p_s, *p_d;
long unsigned lu_save;
/* +++ action +++ */
p_source = (unsigned char *) pp_source;
p_dest = (unsigned char *) pp_dest;
switch ( (int) p_dest)
{
case 0: /* source == destination */
for (p_d = (unsigned char *) p_source,
p_s = (unsigned char *) &lu_save;
p_d < p_source + (l_len * 4);
)
{
lu_save = *( (long *) p_d);
p_s += 4; /* increment source */
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
}
break;
default: /* source != destination */
for (p_s = (unsigned char *) p_source,
p_d = (unsigned char *) p_dest;
p_s < p_source + (l_len * 4);
p_s += 4)
{
p_s += 4; /* increment source */
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
*(p_d++) = *(--p_s);
}
break;
} /* switch */
return(1);
}
HRevBuffer::HRevBuffer(Int_t iMode)
{
iSwap = 0;
iSocket = 0;
iEvtPar = 0;
iBufSize = 0;
signal(SIGINT, &exitCli); // from now catch Ctl C
iDebug = iMode;
iOutMode = iMode;
if (iDebug == 1)
cout << "-I- client runs in debug mode (1)" << endl;
else if (iDebug == 2)
cout << "-I- client shows buffer numbers and select/receive (mode 2)"
<< endl;
else if (iDebug == 3)
cout << "-I- client shows buffer numbers (mode 3)" << endl;
iBufSizeAlloc = 32768;
piBuf = new int [iBufSizeAlloc/sizeof(int)]; // 32k buffer
// cout << " HRevBuffer() executed" << endl;
} // constructor
HRevBuffer::~HRevBuffer()
{
// cout << " ~HRevBuffer() ..." << endl;
delete [] piBuf;
piNextEvt = 0;
// cout << " ~HRevBuffer() executed" << endl;
} // destructor
TSocket *HRevBuffer::RevOpen ( char *pNode, Int_t iPort, Int_t iEvent)
{
if (iEvent < 0)
{
cout << "-E- number of requested events (" << iEvent
<< ") invalid" << endl;
return(0);
}
iEvtMax = iEvent;
iEvtNo = 0;
iBufNo = 0;
iBufNo1 = 0;
iBufNo2 = 0;
if (!iSocket)
{
iEvtNo = -1; // initialization (info buffer) required
if (iPort == 0) iPort = 7031; // default for Hades
cout << "-I- open connection to server " << pNode
<< ":" << iPort << endl;
pTSocket = new TSocket(pNode, iPort);
if ( !pTSocket->IsValid() )
{
cout << "-E- open connection to server " << pNode
<< " failed" << endl;
return(0);
}
cout << " connection to server " << pNode
<< ":" << iPort << " okay" << endl;
iSocket = pTSocket->GetDescriptor();
imySig = iSocket;
if (iDebug == 1)
cout << " new socket " << iSocket << endl;
}
else if (iDebug == 1)
cout << "-D- still socket " << iSocket << endl;
return(pTSocket);
} // RevOpen
UInt_t *HRevBuffer::RevGet(TSocket *pSocket, Int_t iFlush)
{
Int_t iint = sizeof(int);
Int_t ibyte, ilen, inew = 0;
Int_t imaxSE = 20;
Int_t indSize[20]; // index imaxSE
Int_t ioff=0, irem, ind, ii, jj;
Char_t cMsg[128] = "";
Char_t *pcBuf;
Int_t iSize, iRC;
long lRC;
Int_t iError = 0;
Int_t iRetry;
Int_t iRetryMax = 1000;
Int_t iRetryFirst;
Int_t iRetryRecv = 0; // count retries of recv call
Int_t iRetryRecvLim = 1; // max. no. of succeeding retries
Int_t *piComm;
srevComm sComm;
Int_t iCommSize = sizeof(sComm); // size comm. buffer (byte)
Int_t *piInfo;
srevInfo sInfo;
Int_t iInfoSize = sizeof(sInfo); // size info buffer (byte)
Int_t iBufNoServ; // buffer no., from server
if (iEvtNo >= iEvtMax) goto gEndGet;
piComm = &(sComm.iSize); // communication buffer
sComm.iSize = htonl(iCommSize-iint); // size of data following
sComm.iMode = htonl(1); // required: get events
sComm.iIdent = 1; // required: tell endian type
sComm.iBufRequ = htonl(1); // send one buffer
// initialize communication with server
if (iEvtNo == -1)
{
if (iDebug == 1)
cout << "-D- commbuf (data size " << ntohl(sComm.iSize)
<< " byte): mode(1) " << ntohl(sComm.iMode)
<< ", request " << ntohl(sComm.iBufRequ)
<< " event buffer(s)" << endl;
// request event buffer from server
ilen = pSocket->SendRaw(piComm, iCommSize, kDefault);
if (ilen < 0)
{
cout << "-E- sending request for events to server, rc = "
<< ilen << endl;
iError = 1;
goto gEndGet;
}
if (iDebug == 1)
cout << " communication buffer sent (request info buffer) "
<< endl;
// receive info buffer from server
piInfo = &(sInfo.iSize);
ilen = pSocket->RecvRaw(piInfo, iInfoSize, kDefault);
if (ilen < 0)
{
cout << "-E- receiving info buffer from server, rc = "
<< ilen << endl;
iError = 1;
goto gEndGet;
}
iHeadPar = ntohl(sInfo.iHeadPar);
iTimeOut = ntohl(sInfo.iTimeOut);
if (iDebug == 1)
{
cout << "-D- info buffer received:" << endl;
cout << " size data " << ntohl(sInfo.iSize)
<< ", mode (1) " << ntohl(sInfo.iMode)
<< ", header parms " << iHeadPar
<< ", timeout " << iTimeOut << endl;
}
if ( (ntohl(sInfo.iMode) != 1) ||
( (int) ntohl(sInfo.iSize) != iInfoSize-iint) )
{
cout << "-E- invalid info buffer received: " << endl;
cout << " size data ( " << iInfoSize-iint
<< ") " << ntohl(sInfo.iSize)
<< ", mode (1) " << ntohl(sInfo.iMode)
<< ", header parms " << iHeadPar
<< ", timeout " << iTimeOut << endl;
iError = 1;
goto gEndGet;
}
iEvtNo = 0; // initilization done
inew = 1; // request new buffer
} // (iEvtNo == -1)
else
{
if (iFlush)
{
inew = 1; // request new buffer
if (iDebug == 1)
cout << "-D- skip current buffer" << endl;
}
else
{
if (iEvtNo >= 0)
{
// check if new buffer needed
if (iEvtRel >= iEvtBuf)
{
if (iDebug == 1)
cout << " request new buffer" << endl;
inew = 1;
}
else inew = 0;
} // (iEvtNo > 0)
} // (!iFlush)
} // (iEvtNo != -1)
// request new buffer
if (inew)
{
gRetryBuf:
iEvtRel = 0;
iRetry = 0;
iRetryFirst = 1;
if (imySig == -1)
sComm.iBufRequ = htonl(0); // signal finish to server
// request next buffer or finish
ilen = pSocket->SendRaw(piComm, iCommSize, kDefault);
if (ilen < 0)
{
cout << "-E- sending request for buffer " << iBufNo+1
<< " to server, rc = " << ilen << endl;
iError = 1;
goto gEndGet;
}
if (imySig == -1) goto gEndGet;
if (iDebug == 1)
cout << "-D- communication buffer sent (request next buffer) "
<< endl;
gRetryLen:
// get size of data following
piBuf[0] = -1; // enables receive check
iSize = iint;
pcBuf = (char *) piBuf;
while(iSize > 0)
{
if ( (imySig == -1) && (iDebug) )
cout << " CTL C detected (before recv len)" << endl;
gNextRecvL:
iRC = recv(iSocket, pcBuf, iSize, 0);
if (iRC < 0)
{
if (imySig == -1)
{
if (iDebug)
{
sprintf(cMsg, "n-E- receiving data length from server");
perror(cMsg);
cout << " CTL C detected (during recv len)" << endl;
}
goto gNextRecvL;
}
else
{
sprintf(cMsg, "n-E- receiving data length from server");
perror(cMsg);
if (iDebug) cout << " retry" << endl;
iRetryRecv++; // count no. of retries to limit them
if (iRetryRecv > iRetryRecvLim) // avoid infinite loop
{
iError = 1;
goto gEndGet;
}
else goto gNextRecvL;
}
}
if ( iRC == 0 )
{
if ( (iDebug == 2) || (iDebug == 3) )
cout << endl;
cout << "-E- receiving data length: connection closed by server"
<< endl;
iError = 1;
goto gEndGet;
}
iRetryRecv = 0;
iSize -= iRC;
pcBuf += iRC;
} /* while(iSize > 0) */
if (iDebug == 2)
{
printf("Rl:");
fflush(stdout);
}
if ( (imySig == -1) && (iDebug) )
cout << " CTL C detected (after recv len)" << endl;
iBufSize = ntohl(piBuf[0]);
if (iDebug == 1)
cout << " data size received: " << iBufSize << endl;
if (iBufSize <= 0)
{
if (iBufSize == 0)
{
if (iDebug)
cout << endl;
cout << "-W- server closed connection" << endl;
cout << " " << iEvtNo << " of " << iEvtMax
<< " events received" << endl;
iError = 1;
goto gEndGet;
}
if (iBufSize == -1)
{
if (iRetryFirst)
{
cout << endl << "-E- no data length received: ";
iRetryFirst = 0;
}
iRetry++;
if (iRetry > iRetryMax)
{
cout << iRetryMax << "times" << endl;
iError = 1;
goto gEndGet;
}
goto gRetryLen;
}
else
{
cout << endl << "-E- invalid data length received: "
<< iBufSize << endl;
iError = 1;
}
goto gEndGet;
}
if (iRetry)
cout << iRetry << "times" << endl;
// increase data buffer, if necessary
if (iBufSize+iint > iBufSizeAlloc)
{
delete [] piBuf;
iBufSizeAlloc = iBufSize+iint;
// new total buffer size (including length field)
piBuf = new int [iBufSizeAlloc/iint];
piBuf[0] = iBufSize;
// keep sent buffer size (without length field)
if (iDebug == 1)
cout << "-I- total buffer increased to "
<< iBufSizeAlloc << " bytes" << endl;
}
// get event buffer without length field
piBuf[1] = -1; // enables receive check
iSize = iBufSize;
pcBuf = (char *) &(piBuf[1]);
while(iSize > 0)
{
if ( (imySig == -1) && (iDebug) )
cout << " CTL C detected (before recv data)" << endl;
gNextRecvD:
iRC = recv(iSocket, pcBuf, iSize, 0);
if (iRC < 0)
{
if (imySig == -1)
{
if (iDebug)
{
sprintf(cMsg, "n-E- receiving data from server");
perror(cMsg);
cout << " CTL C detected (during recv data)" << endl;
}
goto gNextRecvD;
}
else
{
sprintf(cMsg, "n-E- receiving data from server");
perror(cMsg);
iRetryRecv++; // count no. of retries to limit them
if (iRetryRecv > iRetryRecvLim) // avoid infinite loop
{
iError = 1;
goto gEndGet;
}
else goto gNextRecvD;
}
}
if ( iRC == 0 )
{
if ( (iDebug == 2) || (iDebug == 3) )
cout << endl;
cout << "-E- receiving data: connection closed by server"
<< endl;
iError = 1;
goto gEndGet;
}
iRetryRecv = 0;
iSize -= iRC;
pcBuf += iRC;
} /* while(iSize > 0) */
if (iDebug == 2)
{
printf("Rd:");
fflush(stdout);
}
if (imySig == -1)
{
if (iDebug)
cout << " CTL C detected (after recv data)" << endl;
goto gEndGet;
}
iBufNoServ = ntohl(piBuf[1]);
iEvtBuf = ntohl(piBuf[2]);
if (iEvtBuf == 0)
{
if (iDebug == 1)
printf(" dummy buffer no. %d, %d eventsn",
iBufNoServ, iEvtBuf);
printf("*** connection to remote event server okay, but currently no DAQ events (%d)n",
iBufNoServ);
goto gRetryBuf;
}
if (!iSwap)
{
if ( (piBuf[3] < 1) || (piBuf[3] >= iBufSize) )
iSwap = 1; // DDD dirty fix! check piBuf[4] !
}
// swap events in buffer (behind buffer header)
if (iSwap)
{
/*
* Hint: There are a lot of basically unnecessary type casts in
* this version of swaplw. One should clean it up.
*/
lRC = swaplw( (long*)(&piBuf[1+iHeadPar]), (iBufSize/iint)-iHeadPar, 0);
if ( (iBufNo == 0) && (iDebug) )
cout << " Event data swapped" << endl;
}
iBufNo++;
if (iEvtNo == 0)
{
iBufNo = 1; // restart counting
iBufNo1 = iBufNoServ; // keep first buffer no.
}
iBufNo2 = iBufNoServ; // keep last buffer no.
if (iDebug >= 2)
{
printf("%d:", iBufNoServ);
fflush(stdout);
}
if (iDebug == 1)
{
cout << endl << "buffer " << iBufNo
<< " (" << iBufNoServ << "): "
<< " size "
<< iBufSize << " byte" << endl;
}
iEvtRel = 1; // first event in buffer
piNextEvt = piBuf + iHeadPar+1; // ptr first element in buffer
} // new buffer
else
{
iEvtRel++;
piNextEvt += iEvtPar; // skip previous event
//printf("DDD new event length: %dn", *piNextEvt);
// first event after reopen existing connection, no new buffer
if (iEvtNo == 0)
{
iBufNo = 1; // restart counting
iBufNoServ = ntohl(piBuf[1]);
iBufNo1 = iBufNoServ;
} // (iEvtNo == 0)
} // continue with current buffer
iEvtNo++; // total event no.
ibyte = (piNextEvt[0]/8)*8;
if ( ibyte < piNextEvt[0] ) ibyte += 8;
iEvtPar = ibyte/iint; // no. of parameters with padding
if (iDebug == 1)
{
cout << " event no. " << iEvtNo << " (" << piNextEvt[3]
<< "): id " << piNextEvt[2] << ", size " << piNextEvt[0]
<< endl;
ii = 0;
indSize[ii] = 0; // index event length
irem = piNextEvt[0] - 32; // remaining bytes for subevents
while ( (irem > 0) && (ii < imaxSE) )
{
ii++; // count subevents
if (ii == 1) ioff = 8; // skip event header
else ioff += ibyte/iint;
indSize[ii] = ioff; // buffer index of length current subevent
ibyte = (piNextEvt[ioff]/8)*8;
if ( ibyte < piNextEvt[ioff] )
ibyte += 8; // padded subevent size
if (irem <= piNextEvt[ioff]) // last subevent
irem -= piNextEvt[ioff];
else irem -= ibyte;
cout << " subevent " << ii << ": size "
<< piNextEvt[ioff];
if (irem <= 0) cout << endl;
else cout << ", padded " << ibyte
<< ", remainder " << irem << endl;
if (ii == imaxSE-1)
cout << "-W- only " << ii << " subevents tested" << endl;
} // while (irem)
if (iDebug == 2) for (jj=1; jj<=ii; jj++)
{
ind = indSize[jj];
cout << " subevent " << jj << ": size " << piNextEvt[ind]
<< " (index " << ind << ")" << endl;
}
} // (iDebug == 1)
if (imySig == -1)
{
cout << endl << "-D- CTL C specified";
if (iDebug) cout << " (at end RevGet)" << endl;
else cout << endl;
goto gEndGet;
}
if (iEvtNo == iEvtMax)
{
cout << endl << "-I- all required events ("
<< iEvtMax << ") received: " << iBufNo << " buffers ("
<< iBufNo1 << " - " << iBufNo2 << ")" << endl;
}
return( (UInt_t *) piNextEvt);
gEndGet:
if ( (iError) || (imySig == -1) )
{
if (iDebug)
cout << " RevGet: closing connection to server";
iRC = rclose(&iSocket, 2);
if ( (iDebug) && (iRC == 0) )
cout << " - done" << endl;
imySig = 0; // notify CTL C handler
}
else if (iDebug)
cout << " RevGet: keeping connection to server" << endl;
return 0 ;
} // RevGet
Int_t HRevBuffer::RevBufsize()
{
return iBufSize;
} // RevBufsize
void HRevBuffer::RevBufWait( Int_t iWait )
{
if (iWait > 0) sleep(iWait);
} // RevBufWait
void HRevBuffer::RevClose( TSocket *pSocket )
{
Int_t iRC;
Int_t *piComm;
srevComm sComm;
Int_t iCommSize = sizeof(sComm); // size comm. buffer (byte)
if (imySig < 0) return; // CTL Y: connection closed elsewhere
if (iSocket == 0) return;
// tell server that no more events needed
piComm = &(sComm.iSize); // communication buffer
sComm.iSize = htonl(iCommSize-sizeof(int));// size of data following
sComm.iMode = htonl(1); // required
sComm.iIdent = 1; // required: tell endian type
sComm.iBufRequ = htonl(0); // no more event buffers
if (iDebug == 1)
cout << "-D- send communication buffer (close request, size data "
<< ntohl(sComm.iSize) << " byte): "
<< ntohl(sComm.iMode) << ", "
<< ntohl(sComm.iBufRequ) << endl;
iRC = pSocket->SendRaw(piComm, iCommSize, kDefault);
if (iRC < 0)
cout << "-E- sending close request to server, rc = "
<< iRC << endl;
else if (iDebug == 1)
cout << " close request sent" << endl;
if (iDebug)
cout << " RevClose: closing connection to server";
iRC = rclose(&iSocket, 2);
if ( (iDebug) && (iRC == 0) )
cout << " - done" << endl;
imySig = 0; // notify CTL C handler
cout << "-I- connection to server closed" << endl;
} // RevClose
ROOT page - Class index - Class Hierarchy - Top of the page
This page has been automatically generated. If you have any comments or suggestions about the page layout send a mail to ROOT support, or contact the developers with any questions or problems regarding ROOT.