00001 ////////////////////////////////////////////////////////////////////////// 00002 // // 00003 // XrdClientInputBuffer // 00004 // // 00005 // Author: Fabrizio Furano (INFN Padova, 2004) // 00006 // Adapted from TXNetFile (root.cern.ch) originally done by // 00007 // Alvise Dorigo, Fabrizio Furano // 00008 // INFN Padova, 2003 // 00009 // // 00010 // Buffer for incoming messages (responses) // 00011 // Handles the waiting (with timeout) for a message to come // 00012 // belonging to a logical streamid // 00013 // Multithread friendly // 00014 // // 00015 ////////////////////////////////////////////////////////////////////////// 00016 00017 // $Id: XrdClientInputBuffer.cc 30949 2009-11-02 16:37:58Z ganis $ 00018 00019 const char *XrdClientInputBufferCVSID = "$Id: XrdClientInputBuffer.cc 30949 2009-11-02 16:37:58Z ganis $"; 00020 00021 #include "XrdClient/XrdClientInputBuffer.hh" 00022 #include "XrdSys/XrdSysPthread.hh" 00023 #include "XrdClient/XrdClientDebug.hh" 00024 #ifndef WIN32 00025 #include <sys/time.h> 00026 #endif 00027 #include <stdio.h> 00028 00029 using namespace std; 00030 00031 //________________________________________________________________________ 00032 int XrdClientInputBuffer::MsgForStreamidCnt(int streamid) 00033 { 00034 // Counts the number of messages belonging to the given streamid 00035 00036 int cnt = 0; 00037 XrdClientMessage *m = 0; 00038 00039 for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) { 00040 m = fMsgQue[fMsgIter]; 00041 if (m->MatchStreamid(streamid)) 00042 cnt++; 00043 } 00044 00045 return cnt; 00046 } 00047 00048 00049 00050 //________________________________________________________________________ 00051 int XrdClientInputBuffer::WipeStreamid(int streamid) 00052 { 00053 // Remove all the pending messages for the given streamid 00054 // Healthy after connection shutdowns 00055 00056 int cnt = 0; 00057 XrdClientMessage *m = 0; 00058 { 00059 XrdSysMutexHelper mtx(fMutex); 00060 00061 for (fMsgIter = fMsgQue.GetSize()-1; fMsgIter >= 0; --fMsgIter) { 00062 m = fMsgQue[fMsgIter]; 00063 if (m->MatchStreamid(streamid)) { 00064 delete m; 00065 fMsgQue.Erase(fMsgIter); 00066 cnt++; 00067 } 00068 00069 } 00070 } 00071 00072 return cnt; 00073 } 00074 00075 //________________________________________________________________________ 00076 XrdSysSemWait *XrdClientInputBuffer::GetSyncObjOrMakeOne(int streamid) { 00077 // Gets the right sync obj to wait for messages for a given streamid 00078 // If the semaphore is not available, it creates one. 00079 00080 XrdSysSemWait *sem; 00081 00082 { 00083 XrdSysMutexHelper mtx(fMutex); 00084 char buf[20]; 00085 00086 snprintf(buf, 20, "%d", streamid); 00087 00088 sem = fSyncobjRepo.Find(buf); 00089 00090 if (!sem) { 00091 sem = new XrdSysSemWait(0); 00092 00093 fSyncobjRepo.Rep(buf, sem); 00094 return sem; 00095 00096 } else 00097 return sem; 00098 } 00099 00100 } 00101 00102 00103 00104 //_______________________________________________________________________ 00105 XrdClientInputBuffer::XrdClientInputBuffer() { 00106 // Constructor 00107 00108 fMsgQue.Clear(); 00109 } 00110 00111 00112 00113 //_______________________________________________________________________ 00114 int DeleteHashItem(const char *key, XrdSysSemWait *sem, void *Arg) { 00115 00116 // This makes the Apply method delete the entry 00117 return -1; 00118 } 00119 00120 XrdClientInputBuffer::~XrdClientInputBuffer() { 00121 // Destructor 00122 00123 // Delete all the syncobjs 00124 { 00125 XrdSysMutexHelper mtx(fMutex); 00126 00127 00128 // Delete the content of the queue 00129 for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) { 00130 if (fMsgQue[fMsgIter]) delete fMsgQue[fMsgIter]; 00131 fMsgQue[fMsgIter] = 0; 00132 } 00133 00134 fMsgQue.Clear(); 00135 00136 // Delete all the syncobjs 00137 fSyncobjRepo.Apply(DeleteHashItem, 0); 00138 00139 } 00140 00141 00142 } 00143 00144 //_______________________________________________________________________ 00145 int XrdClientInputBuffer::PutMsg(XrdClientMessage* m) 00146 { 00147 // Put message in the list 00148 int sz; 00149 XrdSysSemWait *sem = 0; 00150 00151 { 00152 XrdSysMutexHelper mtx(fMutex); 00153 00154 fMsgQue.Push_back(m); 00155 sz = MexSize(); 00156 00157 // Is anybody sleeping ? 00158 if (m) 00159 sem = GetSyncObjOrMakeOne( m->HeaderSID() ); 00160 00161 } 00162 00163 if (sem) { 00164 sem->Post(); 00165 } 00166 00167 return sz; 00168 } 00169 00170 00171 //_______________________________________________________________________ 00172 XrdClientMessage *XrdClientInputBuffer::GetMsg(int streamid, int secstimeout) 00173 { 00174 // Gets the first XrdClientMessage from the queue, given a matching streamid. 00175 // If there are no XrdClientMessages for the streamid, it waits for a number 00176 // of seconds for something to come 00177 00178 XrdSysSemWait *sem = 0; 00179 XrdClientMessage *res = 0, *m = 0; 00180 00181 // Find the sem where to wait for a msg 00182 sem = GetSyncObjOrMakeOne(streamid); 00183 00184 int to = secstimeout; 00185 int dt = (to > 2) ? 2 : to; // 2 secs steps 00186 while (to > 0) { 00187 int rc = sem->Wait(dt); 00188 if (!rc) { 00189 // make sure is not a spurious signal ... 00190 XrdSysMutexHelper mtx(fMutex); 00191 if (fMsgQue.GetSize() > 0) { 00192 00193 // We were awakened. Or the timeout elapsed. The mtx is again locked. 00194 // If there are messages to dequeue, we pick the oldest one 00195 for (fMsgIter = 0; fMsgIter < fMsgQue.GetSize(); ++fMsgIter) { 00196 m = fMsgQue[fMsgIter]; 00197 if ((!m) || m->IsError() || m->MatchStreamid(streamid)) { 00198 res = fMsgQue[fMsgIter]; 00199 fMsgQue.Erase(fMsgIter); 00200 if (!m) return 0; 00201 break; 00202 } 00203 } 00204 break; 00205 } 00206 } else 00207 to -= dt; 00208 } 00209 00210 return res; 00211 }