00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 const char *XrdBufferCVSID = "$Id: XrdBuffer.cc 25553 2008-09-26 07:57:42Z ganis $";
00014
00015 #include <time.h>
00016 #include <unistd.h>
00017 #if !defined(__macos__) && !defined(__FreeBSD__)
00018 #include <malloc.h>
00019 #endif
00020 #include <stdio.h>
00021 #include <stdlib.h>
00022 #include <sys/types.h>
00023
00024 #include "XrdSys/XrdSysError.hh"
00025 #include "XrdSys/XrdSysPlatform.hh"
00026 #include "XrdSys/XrdSysTimer.hh"
00027 #include "Xrd/XrdBuffer.hh"
00028 #include "Xrd/XrdTrace.hh"
00029
00030
00031
00032
00033
00034 void *XrdReshaper(void *pp)
00035 {
00036 XrdBuffManager *bmp = (XrdBuffManager *)pp;
00037 bmp->Reshape();
00038 return (void *)0;
00039 }
00040
00041
00042
00043
00044
00045 extern XrdSysError XrdLog;
00046
00047 #ifndef NODEBUG
00048 extern XrdOucTrace XrdTrace;
00049 #endif
00050
00051 const char *XrdBuffManager::TraceID = "BuffManager";
00052
00053
00054
00055
00056
00057 XrdBuffManager::XrdBuffManager(int minrst) :
00058 slots(XRD_BUCKETS),
00059 shift(XRD_BUSHIFT),
00060 pagsz(getpagesize()),
00061 maxsz(1<<(XRD_BUSHIFT+XRD_BUCKETS-1)),
00062 Reshaper(0, "buff reshaper")
00063 {
00064
00065
00066
00067 totbuf = 0;
00068 totreq = 0;
00069 totalo = 0;
00070 totadj = 0;
00071 #ifdef _SC_PHYS_PAGES
00072 maxalo = static_cast<long long>(pagsz)/8
00073 * static_cast<long long>(sysconf(_SC_PHYS_PAGES));
00074 #else
00075 maxalo = 0x7ffffff;
00076 #endif
00077 rsinprog = 0;
00078 minrsw = minrst;
00079 memset(static_cast<void *>(bucket), 0, sizeof(bucket));
00080 }
00081
00082
00083
00084
00085
00086 void XrdBuffManager::Init()
00087 {
00088 pthread_t tid;
00089 int rc;
00090
00091
00092
00093 if ((rc = XrdSysThread::Run(&tid, XrdReshaper, static_cast<void *>(this), 0,
00094 "Buffer Manager reshaper")))
00095 XrdLog.Emsg("BuffManager", rc, "create reshaper thread");
00096 }
00097
00098
00099
00100
00101
00102 XrdBuffer *XrdBuffManager::Obtain(int sz)
00103 {
00104 long long ik, mk, pk;
00105 int bindex = 0;
00106 XrdBuffer *bp;
00107 char *memp;
00108
00109
00110
00111 if (sz <= 0 || sz > maxsz) return 0;
00112
00113
00114
00115 ik = mk = sz >> shift;
00116 while((ik = ik>>1)) bindex++;
00117 if ((mk = 1 << (shift+bindex)) < sz) {bindex++; mk = mk << 1;}
00118 if (bindex >= slots) return 0;
00119
00120
00121
00122 Reshaper.Lock();
00123 totreq++;
00124 bucket[bindex].numreq++;
00125 if ((bp = bucket[bindex].bnext))
00126 {bucket[bindex].bnext = bp->next; bucket[bindex].numbuf--;}
00127 Reshaper.UnLock();
00128
00129
00130
00131 if (bp) return bp;
00132
00133
00134
00135 pk = (mk < pagsz ? mk : pagsz);
00136 if (!(memp = static_cast<char *>(memalign(pk, mk)))) return 0;
00137
00138
00139
00140 if (!(bp = new XrdBuffer(memp, mk, bindex))) {free(memp); return 0;}
00141
00142
00143
00144 Reshaper.Lock();
00145 totbuf++;
00146 if ((totalo += mk) > maxalo && !rsinprog)
00147 {rsinprog = 1; Reshaper.Signal();}
00148 Reshaper.UnLock();
00149 return bp;
00150 }
00151
00152
00153
00154
00155
00156 int XrdBuffManager::Recalc(int sz)
00157 {
00158 int ik, mk;
00159 int bindex = 0;
00160
00161
00162
00163 if (sz <= 0 || sz > maxsz) return 0;
00164
00165
00166
00167 ik = mk = sz >> shift;
00168 while((ik = ik>>1)) bindex++;
00169 if ((mk = 1 << (shift+bindex)) < sz) {bindex++; mk = mk << 1;}
00170 if (bindex >= slots) return 0;
00171
00172
00173
00174 return mk;
00175 }
00176
00177
00178
00179
00180
00181 void XrdBuffManager::Release(XrdBuffer *bp)
00182 {
00183 int bindex = bp->bindex;
00184
00185
00186
00187 Reshaper.Lock();
00188 bp->next = bucket[bp->bindex].bnext;
00189 bucket[bp->bindex].bnext = bp;
00190 bucket[bindex].numbuf++;
00191 Reshaper.UnLock();
00192 }
00193
00194
00195
00196
00197
00198 void XrdBuffManager::Reshape()
00199 {
00200 int i, bufprof[XRD_BUCKETS], numfreed;
00201 time_t delta, lastshape = time(0);
00202 long long memslot, memhave, memtarget = (long long)(.80*(float)maxalo);
00203 XrdSysTimer Timer;
00204 float requests, buffers;
00205 XrdBuffer *bp;
00206
00207
00208
00209 while(1)
00210 {Reshaper.Lock();
00211 while(Reshaper.Wait(minrsw) && totalo <= maxalo)
00212 {TRACE(MEM, "Reshaper has " <<(totalo>>10) <<"K; target " <<(memtarget>>10) <<"K");}
00213 if ((delta = (time(0) - lastshape)) < minrsw)
00214 {Reshaper.UnLock();
00215 Timer.Wait((minrsw-delta)*1000);
00216 Reshaper.Lock();
00217 }
00218
00219
00220
00221 if (totreq > slots)
00222 {requests = (float)totreq;
00223 buffers = (float)totbuf;
00224 for (i = 0; i < slots; i++)
00225 {bufprof[i] = (int)(buffers*(((float)bucket[i].numreq)/requests));
00226 bucket[i].numreq = 0;
00227 }
00228 totreq = 0; memhave = totalo;
00229 } else memhave = 0;
00230 Reshaper.UnLock();
00231
00232
00233
00234 memslot = maxsz; numfreed = 0;
00235 for (i = slots-1; i >= 0 && memhave > memtarget; i--)
00236 {Reshaper.Lock();
00237 while(bucket[i].numbuf > bufprof[i])
00238 if ((bp = bucket[i].bnext))
00239 {bucket[i].bnext = bp->next;
00240 delete bp;
00241 bucket[i].numbuf--; numfreed++;
00242 memhave -= memslot; totalo -= memslot;
00243 } else {bucket[i].numbuf = 0; break;}
00244 Reshaper.UnLock();
00245 memslot = memslot>>1;
00246 }
00247
00248
00249
00250 totadj += numfreed;
00251 TRACE(MEM, "Pool reshaped; " <<numfreed <<" freed; have " <<(memhave>>10) <<"K; target " <<(memtarget>>10) <<"K");
00252 lastshape = time(0);
00253 rsinprog = 0;
00254 }
00255 }
00256
00257
00258
00259
00260
00261 void XrdBuffManager::Set(int maxmem, int minw)
00262 {
00263
00264
00265
00266 Reshaper.Lock();
00267 if (maxmem > 0) maxalo = (long long)maxmem;
00268 if (minw > 0) minrsw = minw;
00269 Reshaper.UnLock();
00270 }
00271
00272
00273
00274
00275
00276 int XrdBuffManager::Stats(char *buff, int blen, int do_sync)
00277 {
00278 static char statfmt[] = "<stats id=\"buff\"><reqs>%d</reqs>"
00279 "<mem>%lld</mem><buffs>%d</buffs><adj>%d</adj></stats>";
00280 int nlen;
00281
00282
00283
00284 if (!buff) return sizeof(statfmt) + 16*4;
00285
00286
00287
00288 if (do_sync) Reshaper.Lock();
00289 nlen = snprintf(buff, blen, statfmt, totreq, totalo, totbuf, totadj);
00290 if (do_sync) Reshaper.UnLock();
00291 return nlen;
00292 }