XrdClientInputBuffer.cc

Go to the documentation of this file.
00001 //////////////////////////////////////////////////////////////////////////
00002 //                                                                      //
00003 // XrdClientInputBuffer                                                 //
00004 //                                                                      //
00005 // Author: Fabrizio Furano (INFN Padova, 2004)                          //
00006 // Adapted from TXNetFile (root.cern.ch) originally done by             //
00007 //  Alvise Dorigo, Fabrizio Furano                                      //
00008 //          INFN Padova, 2003                                           //
00009 //                                                                      //
00010 // Buffer for incoming messages (responses)                             //
00011 //  Handles the waiting (with timeout) for a message to come            //
00012 //   belonging to a logical streamid                                    //
00013 //  Multithread friendly                                                //
00014 //                                                                      //
00015 //////////////////////////////////////////////////////////////////////////
00016 
00017 //       $Id: XrdClientInputBuffer.cc 30949 2009-11-02 16:37:58Z ganis $
00018 
00019 const char *XrdClientInputBufferCVSID = "$Id: XrdClientInputBuffer.cc 30949 2009-11-02 16:37:58Z ganis $";
00020 
00021 #include "XrdClient/XrdClientInputBuffer.hh"
00022 #include "XrdSys/XrdSysPthread.hh"
00023 #include "XrdClient/XrdClientDebug.hh"
00024 #ifndef WIN32
00025 #include <sys/time.h>
00026 #endif
00027 #include <stdio.h>
00028 
00029 using namespace std;
00030 
00031 //________________________________________________________________________
00032 int XrdClientInputBuffer::MsgForStreamidCnt(int streamid)
00033 {
00034     // Counts the number of messages belonging to the given streamid
00035 
00036     int cnt = 0;
00037     XrdClientMessage *m = 0;
00038 
00039     for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) {
00040        m = fMsgQue[fMsgIter];
00041        if (m->MatchStreamid(streamid))
00042           cnt++;
00043     }
00044 
00045     return cnt;
00046 }
00047 
00048 
00049 
00050 //________________________________________________________________________
00051 int XrdClientInputBuffer::WipeStreamid(int streamid)
00052 {
00053     // Remove all the pending messages for the given streamid
00054     // Healthy after connection shutdowns
00055 
00056     int cnt = 0;
00057     XrdClientMessage *m = 0;
00058    {
00059       XrdSysMutexHelper mtx(fMutex);
00060 
00061       for (fMsgIter = fMsgQue.GetSize()-1; fMsgIter >= 0; --fMsgIter) {
00062          m = fMsgQue[fMsgIter];
00063          if (m->MatchStreamid(streamid)) {
00064             delete m;
00065             fMsgQue.Erase(fMsgIter);
00066             cnt++;
00067          }
00068 
00069       }
00070    }
00071 
00072     return cnt;
00073 }
00074 
00075 //________________________________________________________________________
00076 XrdSysSemWait *XrdClientInputBuffer::GetSyncObjOrMakeOne(int streamid) {
00077    // Gets the right sync obj to wait for messages for a given streamid
00078    // If the semaphore is not available, it creates one.
00079 
00080    XrdSysSemWait *sem;
00081 
00082    {
00083       XrdSysMutexHelper mtx(fMutex);
00084       char buf[20];
00085 
00086       snprintf(buf, 20, "%d", streamid);
00087 
00088       sem = fSyncobjRepo.Find(buf);
00089 
00090       if (!sem) {
00091          sem = new XrdSysSemWait(0);
00092 
00093          fSyncobjRepo.Rep(buf, sem);
00094          return sem;
00095 
00096       } else
00097          return sem;
00098    }
00099 
00100 }
00101 
00102 
00103 
00104 //_______________________________________________________________________
00105 XrdClientInputBuffer::XrdClientInputBuffer() {
00106    // Constructor
00107 
00108    fMsgQue.Clear();
00109 }
00110 
00111 
00112 
00113 //_______________________________________________________________________
00114 int DeleteHashItem(const char *key, XrdSysSemWait *sem, void *Arg) {
00115 
00116    // This makes the Apply method delete the entry
00117    return -1;
00118 }
00119 
00120 XrdClientInputBuffer::~XrdClientInputBuffer() {
00121    // Destructor
00122 
00123    // Delete all the syncobjs
00124    {
00125       XrdSysMutexHelper mtx(fMutex);
00126 
00127 
00128       // Delete the content of the queue
00129       for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) {
00130          if (fMsgQue[fMsgIter]) delete fMsgQue[fMsgIter];
00131          fMsgQue[fMsgIter] = 0;
00132       }
00133 
00134       fMsgQue.Clear();
00135 
00136       // Delete all the syncobjs
00137       fSyncobjRepo.Apply(DeleteHashItem, 0);
00138 
00139    }
00140 
00141 
00142 }
00143 
00144 //_______________________________________________________________________
00145 int XrdClientInputBuffer::PutMsg(XrdClientMessage* m)
00146 {
00147    // Put message in the list
00148   int sz;
00149   XrdSysSemWait *sem = 0;
00150 
00151    {
00152       XrdSysMutexHelper mtx(fMutex);
00153     
00154       fMsgQue.Push_back(m);
00155       sz = MexSize();
00156     
00157       // Is anybody sleeping ?
00158       if (m)
00159          sem = GetSyncObjOrMakeOne( m->HeaderSID() );
00160 
00161    }
00162 
00163    if (sem) {
00164       sem->Post();
00165    }
00166 
00167    return sz;
00168 }
00169 
00170 
00171 //_______________________________________________________________________
00172 XrdClientMessage *XrdClientInputBuffer::GetMsg(int streamid, int secstimeout)
00173 {
00174    // Gets the first XrdClientMessage from the queue, given a matching streamid.
00175    // If there are no XrdClientMessages for the streamid, it waits for a number
00176    // of seconds for something to come
00177 
00178    XrdSysSemWait *sem = 0;
00179    XrdClientMessage *res = 0, *m = 0;
00180 
00181    // Find the sem where to wait for a msg
00182    sem = GetSyncObjOrMakeOne(streamid);
00183 
00184    int to = secstimeout;
00185    int dt = (to > 2) ? 2 : to;  // 2 secs steps
00186    while (to > 0) {
00187      int rc = sem->Wait(dt);
00188      if (!rc) {
00189        // make sure is not a spurious signal ...
00190        XrdSysMutexHelper mtx(fMutex);
00191        if (fMsgQue.GetSize() > 0) {
00192 
00193          // We were awakened. Or the timeout elapsed. The mtx is again locked.
00194          // If there are messages to dequeue, we pick the oldest one
00195          for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) {
00196            m = fMsgQue[fMsgIter];
00197            if ((!m) || m->IsError() || m->MatchStreamid(streamid)) {
00198              res = fMsgQue[fMsgIter];
00199              fMsgQue.Erase(fMsgIter);
00200              if (!m) return 0;
00201              break;
00202            }
00203          }
00204          break;
00205        }
00206      } else
00207        to -= dt;
00208    }
00209 
00210   return res;
00211 }

Generated on Tue Jul 5 14:46:18 2011 for ROOT_528-00b_version by  doxygen 1.5.1