00001 ////////////////////////////////////////////////////////////////////////// 00002 // // 00003 // XrdCpMthrQueue // 00004 // // 00005 // Author: Fabrizio Furano (INFN Padova, 2004) // 00006 // // 00007 // A thread safe queue to be used for multithreaded producers-consumers // 00008 // // 00009 ////////////////////////////////////////////////////////////////////////// 00010 00011 // $Id: XrdCpMthrQueue.hh 28902 2009-06-11 12:36:21Z ganis $ 00012 00013 #include "XrdSys/XrdSysPthread.hh" 00014 #include "XrdClient/XrdClientVector.hh" 00015 #include "XrdSys/XrdSysSemWait.hh" 00016 #include "XrdSys/XrdSysHeaders.hh" 00017 00018 using namespace std; 00019 00020 struct XrdCpMessage { 00021 void *buf; 00022 long long offs; 00023 int len; 00024 }; 00025 00026 // The max allowed size for this queue 00027 // If this value is reached, then the writer has to wait... 00028 #define CPMTQ_BUFFSIZE 50000000 00029 00030 class XrdCpMthrQueue { 00031 private: 00032 long fTotSize; 00033 XrdClientVector<XrdCpMessage*> fMsgQue; // queue for incoming messages 00034 int fMsgIter; // an iterator on it 00035 00036 XrdSysRecMutex fMutex; // mutex to protect data structures 00037 00038 XrdSysSemWait fReadSem; // variable to make the reader wait 00039 // until some data is available 00040 XrdSysCondVar fWriteCnd; // variable to make the writer wait 00041 // if the queue is full 00042 public: 00043 00044 XrdCpMthrQueue(); 00045 ~XrdCpMthrQueue(); 00046 00047 int PutBuffer(void *buf, long long offs, int len); 00048 int GetBuffer(void **buf, long long &offs, int &len); 00049 int GetLength() { return fMsgQue.GetSize(); } 00050 void Clear(); 00051 }; 00052