00001 ////////////////////////////////////////////////////////////////////////// 00002 // // 00003 // XrdCpMthrQueue // 00004 // // 00005 // Author: Fabrizio Furano (INFN Padova, 2004) // 00006 // // 00007 // A thread safe queue to be used for multithreaded producers-consumers // 00008 // // 00009 ////////////////////////////////////////////////////////////////////////// 00010 00011 // $Id: XrdCpMthrQueue.cc 30949 2009-11-02 16:37:58Z ganis $ 00012 00013 const char *XrdCpMthrQueueCVSID = "$Id: XrdCpMthrQueue.cc 30949 2009-11-02 16:37:58Z ganis $"; 00014 00015 #include "XrdClient/XrdCpMthrQueue.hh" 00016 #include "XrdSys/XrdSysPthread.hh" 00017 #include "XrdClient/XrdClientDebug.hh" 00018 00019 XrdCpMthrQueue::XrdCpMthrQueue(): fReadSem(0) { 00020 // Constructor 00021 00022 fMsgQue.Clear(); 00023 fTotSize = 0; 00024 } 00025 00026 XrdCpMthrQueue::~XrdCpMthrQueue() { 00027 // Destructor 00028 00029 00030 } 00031 00032 int XrdCpMthrQueue::PutBuffer(void *buf, long long offs, int len) { 00033 XrdCpMessage *m; 00034 bool wantstowait = FALSE; 00035 00036 { 00037 XrdSysMutexHelper mtx(fMutex); 00038 00039 if (fTotSize > CPMTQ_BUFFSIZE) wantstowait = TRUE; 00040 } 00041 00042 if (wantstowait) fWriteCnd.Wait(60); 00043 00044 m = new XrdCpMessage; 00045 m->offs = offs; 00046 m->buf = buf; 00047 m->len = len; 00048 00049 // Put message in the list 00050 { 00051 XrdSysMutexHelper mtx(fMutex); 00052 00053 fMsgQue.Push_back(m); 00054 fTotSize += len; 00055 } 00056 00057 fReadSem.Post(); 00058 00059 return 0; 00060 } 00061 00062 int XrdCpMthrQueue::GetBuffer(void **buf, long long &offs, int &len) { 00063 XrdCpMessage *res; 00064 00065 res = 0; 00066 00067 // If there is no data for one hour, then give up with an error 00068 if (!fReadSem.Wait(3600)) { 00069 XrdSysMutexHelper mtx(fMutex); 00070 00071 if (fMsgQue.GetSize() > 0) { 00072 00073 // If there are messages to dequeue, we pick the oldest one 00074 res = fMsgQue.Pop_front(); 00075 if (res) fTotSize -= res->len; 00076 } 00077 } 00078 00079 00080 if (res) { 00081 *buf = res->buf; 00082 len = res->len; 00083 offs = res->offs; 00084 delete res; 00085 fWriteCnd.Signal(); 00086 } 00087 00088 return (res != 0); 00089 } 00090 00091 00092 void XrdCpMthrQueue::Clear() { 00093 void *buf; 00094 int len; 00095 long long offs; 00096 00097 while (GetBuffer(&buf, offs, len)) { 00098 free(buf); 00099 } 00100 00101 fTotSize = 0; 00102 00103 } 00104 00105