XrdCpMthrQueue.cc

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // XrdCpMthrQueue                                                       //
00004 //                                                                      //
00005 // Author: Fabrizio Furano (INFN Padova, 2004)                          //
00006 //                                                                      //
00007 // A thread safe queue to be used for multithreaded producers-consumers //
00008 //                                                                      //
00009 //////////////////////////////////////////////////////////////////////////
00010 
00011 //       $Id: XrdCpMthrQueue.cc 30949 2009-11-02 16:37:58Z ganis $
00012 
00013 const char *XrdCpMthrQueueCVSID = "$Id: XrdCpMthrQueue.cc 30949 2009-11-02 16:37:58Z ganis $";
00014 
00015 #include "XrdClient/XrdCpMthrQueue.hh"
00016 #include "XrdSys/XrdSysPthread.hh"
00017 #include "XrdClient/XrdClientDebug.hh"
00018 
00019 XrdCpMthrQueue::XrdCpMthrQueue(): fReadSem(0) {
00020    // Constructor
00021 
00022    fMsgQue.Clear();
00023    fTotSize = 0;
00024 }
00025 
00026 XrdCpMthrQueue::~XrdCpMthrQueue() {
00027    // Destructor
00028 
00029 
00030 }
00031 
00032 int XrdCpMthrQueue::PutBuffer(void *buf, long long offs, int len) {
00033    XrdCpMessage *m;
00034    bool wantstowait = FALSE;
00035 
00036    {
00037       XrdSysMutexHelper mtx(fMutex);
00038       
00039       if (fTotSize > CPMTQ_BUFFSIZE) wantstowait = TRUE;
00040    }
00041    
00042    if (wantstowait) fWriteCnd.Wait(60);
00043 
00044    m = new XrdCpMessage;
00045    m->offs = offs;
00046    m->buf = buf;
00047    m->len = len;
00048 
00049    // Put message in the list
00050    {
00051       XrdSysMutexHelper mtx(fMutex);
00052     
00053       fMsgQue.Push_back(m);
00054       fTotSize += len;
00055    }
00056     
00057    fReadSem.Post(); 
00058 
00059    return 0;
00060 }
00061 
00062 int XrdCpMthrQueue::GetBuffer(void **buf, long long &offs, int &len) {
00063    XrdCpMessage *res;
00064 
00065    res = 0;
00066  
00067    // If there is no data for one hour, then give up with an error
00068    if (!fReadSem.Wait(3600)) {
00069          XrdSysMutexHelper mtx(fMutex);
00070 
00071          if (fMsgQue.GetSize() > 0) {
00072 
00073             // If there are messages to dequeue, we pick the oldest one
00074             res = fMsgQue.Pop_front();
00075             if (res) fTotSize -= res->len;
00076          }
00077       }
00078 
00079 
00080    if (res) {
00081       *buf = res->buf;
00082       len = res->len;
00083       offs = res->offs;
00084       delete res;
00085       fWriteCnd.Signal();
00086    }
00087 
00088    return (res != 0);
00089 }
00090 
00091 
00092 void XrdCpMthrQueue::Clear() {
00093    void *buf;
00094    int len;
00095    long long offs;
00096 
00097    while (GetBuffer(&buf, offs, len)) {
00098       free(buf);
00099    }
00100 
00101    fTotSize = 0;
00102 
00103 }
00104 
00105    

Generated on Tue Jul 5 14:46:22 2011 for ROOT_528-00b_version by  doxygen 1.5.1