DABC (Data Acquisition Backbone Core)  2.9.9
MemoryPool.cxx
Go to the documentation of this file.
1 // $Id: MemoryPool.cxx 4506 2020-05-26 07:55:57Z linev $
2 
3 /************************************************************
4  * The Data Acquisition Backbone Core (DABC) *
5  ************************************************************
6  * Copyright (C) 2009 - *
7  * GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
8  * Planckstr. 1, 64291 Darmstadt, Germany *
9  * Contact: http://dabc.gsi.de *
10  ************************************************************
11  * This software can be used under the GPL license *
12  * agreements as stated in LICENSE.txt file *
13  * which is part of the distribution. *
14  ************************************************************/
15 
16 #include "dabc/MemoryPool.h"
17 
18 #include <cstdlib>
19 
20 #include "dabc/defines.h"
21 
22 namespace dabc {
23 
25 
26 
27  class MemoryBlock {
28  public:
29 
30  struct Entry {
31  void *buf;
33  bool owner;
34  int refcnt;
35  };
36 
37  typedef Queue<unsigned, false> FreeQueue;
38 
39  Entry* fArr;
40  unsigned fNumber;
42 
43  MemoryBlock() :
44  fArr(0),
45  fNumber(0),
46  fFree()
47  {
48  }
49 
50  virtual ~MemoryBlock()
51  {
52  Release();
53  }
54 
55  inline bool IsAnyFree() const { return !fFree.Empty(); }
56 
57  void Release()
58  {
59  if (fArr==0) return;
60 
61  for (unsigned n=0;n<fNumber;n++) {
62  if (fArr[n].buf && fArr[n].owner) {
63  free(fArr[n].buf);
64  }
65  fArr[n].buf = 0;
66  fArr[n].owner = false;
67  }
68 
69  delete[] fArr;
70  fArr = 0;
71  fNumber = 0;
72 
73  fFree.Reset();
74  }
75 
76  bool Allocate(unsigned number, unsigned size, unsigned align)
77  {
78  Release();
79 
80  fArr = new Entry[number];
81  fNumber = number;
82 
83  fFree.Allocate(number);
84 
85  for (unsigned n=0;n<fNumber;n++) {
86 
87  void* buf(0);
88  int res = posix_memalign(&buf, align, size);
89 
90  if ((res!=0) || (buf==0)) {
91  EOUT("Cannot allocate data for new Memory Block");
92  throw dabc::Exception(ex_Pool, "Cannot allocate buffer", "MemBlock");
93  return false;
94  }
95 
96  fArr[n].buf = buf;
97  fArr[n].size = size;
98  fArr[n].owner = true;
99  fArr[n].refcnt = 0;
100 
101  fFree.Push(n);
102  }
103 
104  return true;
105  }
106 
107  bool Assign(bool isowner, const std::vector<void*>& bufs, const std::vector<unsigned>& sizes) throw()
108  {
109  Release();
110 
111  fArr = new Entry[bufs.size()];
112  fNumber = bufs.size();
113 
114  fFree.Allocate(bufs.size());
115 
116  for (unsigned n=0;n<fNumber;n++) {
117  fArr[n].buf = bufs[n];
118  fArr[n].size = sizes[n];
119  fArr[n].owner = isowner;
120  fArr[n].refcnt = 0;
121 
122  fFree.Push(n);
123  }
124  return true;
125  }
126 
127  };
128 
129 }
130 
131 // ---------------------------------------------------------------------------------
132 
134 unsigned dabc::MemoryPool::fDfltBufSize = 4096;
135 
136 
137 dabc::MemoryPool::MemoryPool(const std::string &name, bool withmanager) :
138  dabc::ModuleAsync(std::string(withmanager ? "" : "#") + name),
139  fMem(0),
140  fAlignment(fDfltAlignment),
141  fReqests(),
142  fPending(16), // size 16 is preliminary and always can be extended
143  fEvntFired(false),
144  fProcessingReq(false),
145  fChangeCounter(0),
146  fUseThread(false)
147 {
148  DOUT3("MemoryPool %p name %s constructor", this, GetName());
149  SetAutoStop(false);
150 }
151 
153 {
154  Release();
155 }
156 
157 bool dabc::MemoryPool::Find(ConfigIO &cfg)
158 {
159  DOUT4("Module::Find %p name = %s parent %p", this, GetName(), GetParent());
160 
161  if (GetParent()==0) return false;
162 
163  // module will always have tag "Module", class could be specified with attribute
164  while (cfg.FindItem(xmlMemoryPoolNode)) {
165  if (cfg.CheckAttr(xmlNameAttr, GetName())) return true;
166  }
167 
168  return false;
169 }
170 
171 
172 bool dabc::MemoryPool::SetAlignment(unsigned align)
173 {
174  LockGuard lock(ObjectMutex());
175  if (fMem!=0) return false;
176  fAlignment = align;
177  return true;
178 }
179 
180 
181 unsigned dabc::MemoryPool::GetAlignment() const
182 {
183  LockGuard lock(ObjectMutex());
184  return fAlignment;
185 }
186 
187 bool dabc::MemoryPool::_Allocate(BufferSize_t bufsize, unsigned number) throw()
188 {
189  if (fMem!=0) return false;
190 
191  if (bufsize*number==0) return false;
192 
193  DOUT3("POOL:%s Create num:%u X size:%u buffers align:%u", GetName(), number, bufsize, fAlignment);
194 
195  fMem = new MemoryBlock;
196  fMem->Allocate(number, bufsize, fAlignment);
197 
198  fChangeCounter++;
199 
200  return true;
201 }
202 
203 bool dabc::MemoryPool::Allocate(BufferSize_t bufsize, unsigned number) throw()
204 {
205  LockGuard lock(ObjectMutex());
206  return _Allocate(bufsize, number);
207 }
208 
209 bool dabc::MemoryPool::Assign(bool isowner, const std::vector<void*>& bufs, const std::vector<unsigned>& sizes) throw()
210 {
211  LockGuard lock(ObjectMutex());
212 
213  if (fMem!=0) return false;
214 
215  if ((bufs.size() != sizes.size()) || (bufs.size()==0)) return false;
216 
217  fMem = new MemoryBlock;
218  fMem->Assign(isowner, bufs, sizes);
219 
220  return true;
221 }
222 
223 bool dabc::MemoryPool::Release() throw()
224 {
225  LockGuard lock(ObjectMutex());
226 
227  if (fMem) {
228  delete fMem;
229  fMem = 0;
230  fChangeCounter++;
231  }
232 
233  return true;
234 }
235 
236 
237 bool dabc::MemoryPool::IsEmpty() const
238 {
239  LockGuard lock(ObjectMutex());
240 
241  return fMem==0;
242 }
243 
244 unsigned dabc::MemoryPool::GetNumBuffers() const
245 {
246  LockGuard lock(ObjectMutex());
247 
248  return (fMem==0) ? 0 : fMem->fNumber;
249 }
250 
251 unsigned dabc::MemoryPool::GetBufferSize(unsigned id) const
252 {
253  LockGuard lock(ObjectMutex());
254 
255  return (fMem==0) || (id>=fMem->fNumber) ? 0 : fMem->fArr[id].size;
256 }
257 
258 unsigned dabc::MemoryPool::GetMaxBufSize() const
259 {
260  LockGuard lock(ObjectMutex());
261 
262  if (fMem==0) return 0;
263 
264  unsigned max(0);
265 
266  for (unsigned id=0;id<fMem->fNumber;id++)
267  if (fMem->fArr[id].size > max) max = fMem->fArr[id].size;
268 
269  return max;
270 }
271 
272 unsigned dabc::MemoryPool::GetMinBufSize() const
273 {
274  LockGuard lock(ObjectMutex());
275 
276  if (fMem==0) return 0;
277 
278  unsigned min(0);
279 
280  for (unsigned id=0;id<fMem->fNumber;id++)
281  if ((min==0) || (fMem->fArr[id].size < min)) min = fMem->fArr[id].size;
282 
283  return min;
284 }
285 
286 
287 void* dabc::MemoryPool::GetBufferLocation(unsigned id) const
288 {
289  LockGuard lock(ObjectMutex());
290 
291  return (fMem==0) || (id>=fMem->fNumber) ? 0 : fMem->fArr[id].buf;
292 }
293 
294 bool dabc::MemoryPool::_GetPoolInfo(std::vector<void*>& bufs, std::vector<unsigned>& sizes, unsigned* changecnt)
295 {
296  if (changecnt!=0) {
297  if (*changecnt == fChangeCounter) return false;
298  }
299 
300  if (fMem!=0)
301  for(unsigned n=0;n<fMem->fNumber;n++) {
302  bufs.push_back(fMem->fArr[n].buf);
303  sizes.push_back(fMem->fArr[n].size);
304  }
305 
306  if (changecnt!=0) *changecnt = fChangeCounter;
307 
308  return true;
309 }
310 
311 bool dabc::MemoryPool::GetPoolInfo(std::vector<void*>& bufs, std::vector<unsigned>& sizes)
312 {
313  LockGuard lock(ObjectMutex());
314  return _GetPoolInfo(bufs, sizes);
315 }
316 
317 
318 bool dabc::MemoryPool::TakeRawBuffer(unsigned& indx)
319 {
320  LockGuard lock(ObjectMutex());
321  if ((fMem==0) || !fMem->IsAnyFree()) return false;
322  indx = fMem->fFree.Pop();
323  return true;
324 }
325 
326 void dabc::MemoryPool::ReleaseRawBuffer(unsigned indx)
327 {
328  LockGuard lock(ObjectMutex());
329  if (fMem) fMem->fFree.Push(indx);
330 }
331 
332 
333 dabc::Buffer dabc::MemoryPool::_TakeBuffer(BufferSize_t size, bool except, bool reserve_memory)
334 {
335  Buffer res;
336 
337 // DOUT0("_TakeBuffer obj %p", &res);
338 
339  // if no memory is available, try to allocate it
340  if (fMem==0) _Allocate();
341 
342  if (fMem==0) {
343  if (except) throw dabc::Exception(ex_Pool, "No memory allocated in the pool", ItemName());
344  return res;
345  }
346 
347  if (!fMem->IsAnyFree() && reserve_memory) {
348  if (except) throw dabc::Exception(ex_Pool, "No any memory is available in the pool", ItemName());
349  return res;
350  }
351 
352  if ((size==0) && reserve_memory) size = fMem->fArr[fMem->fFree.Front()].size;
353 
354  // first check if required size is available
355  BufferSize_t sum(0);
356  unsigned cnt(0);
357  while (sum<size) {
358  if (cnt>=fMem->fFree.Size()) {
359  if (except) throw dabc::Exception(ex_Pool, "Cannot reserve buffer of requested size", ItemName());
360  return res;
361  }
362 
363  unsigned id = fMem->fFree.Item(cnt);
364  sum += fMem->fArr[id].size;
365  cnt++;
366  }
367 
368  res.AllocateContainer(cnt < 8 ? 8 : cnt);
369 
370  sum = 0;
371  cnt = 0;
372  MemSegment* segs = res.Segments();
373 
374  while (sum<size) {
375  unsigned id = fMem->fFree.Pop();
376 
377  if (fMem->fArr[id].refcnt!=0)
378  throw dabc::Exception(ex_Pool, "Buffer is not free even is declared so", ItemName());
379 
380  if (cnt>=res.GetObject()->fCapacity)
381  throw dabc::Exception(ex_Pool, "All mem segments does not fit into preallocated list", ItemName());
382 
383  segs[cnt].buffer = fMem->fArr[id].buf;
384  segs[cnt].datasize = fMem->fArr[id].size;
385  segs[cnt].id = id;
386 
387  // Provide buffer of exactly requested size
388  BufferSize_t restsize = size - sum;
389  if (restsize < segs[cnt].datasize) segs[cnt].datasize = restsize;
390 
391  sum += fMem->fArr[id].size;
392 
393  // increment reference counter on the memory space
394  fMem->fArr[id].refcnt++;
395 
396  cnt++;
397  }
398 
399  res.GetObject()->fPool.SetObject(this, false);
400 
401  res.GetObject()->fNumSegments = cnt;
402 
403  res.SetTypeId(mbt_Generic);
404 
405  return res;
406 }
407 
408 
410 {
411  dabc::Buffer res;
412 
413  {
414  LockGuard lock(ObjectMutex());
415 
416  res = _TakeBuffer(size, true);
417  }
418 
419  return res;
420 }
421 
422 
423 void dabc::MemoryPool::IncreaseSegmRefs(MemSegment* segm, unsigned num)
424 {
425  LockGuard lock(ObjectMutex());
426 
427  if (fMem==0)
428  throw dabc::Exception(ex_Pool, "Memory was not allocated in the pool", ItemName());
429 
430  for (unsigned cnt=0;cnt<num;cnt++) {
431  unsigned id = segm[cnt].id;
432  if (id>fMem->fNumber)
433  throw dabc::Exception(ex_Pool, "Wrong buffer id in the segments list of buffer", ItemName());
434 
435  if (fMem->fArr[id].refcnt + 1 == 0)
436  throw dabc::Exception(ex_Pool, "To many references on single segments - how it can be", ItemName());
437 
438  fMem->fArr[id].refcnt++;
439  }
440 }
441 
442 bool dabc::MemoryPool::IsSingleSegmRefs(MemSegment* segm, unsigned num)
443 {
444  LockGuard lock(ObjectMutex());
445 
446  if (!fMem)
447  throw dabc::Exception(ex_Pool, "Memory was not allocated in the pool", ItemName());
448 
449  for (unsigned cnt=0;cnt<num;cnt++) {
450  unsigned id = segm[cnt].id;
451 
452  if (id>fMem->fNumber)
453  throw dabc::Exception(ex_Pool, "Wrong buffer id in the segments list of buffer", ItemName());
454 
455  if (fMem->fArr[id].refcnt != 1) return false;
456  }
457 
458  return true;
459 }
460 
461 
462 void dabc::MemoryPool::DecreaseSegmRefs(MemSegment* segm, unsigned num)
463 {
464  LockGuard lock(ObjectMutex());
465 
466  if (!fMem)
467  throw dabc::Exception(ex_Pool, "Memory was not allocated in the pool", ItemName());
468 
469  for (unsigned cnt=0;cnt<num;cnt++) {
470  unsigned id = segm[cnt].id;
471  if (id > fMem->fNumber)
472  throw dabc::Exception(ex_Pool, "Wrong buffer id in the segments list of buffer", ItemName());
473 
474  if (fMem->fArr[id].refcnt == 0)
475  throw dabc::Exception(ex_Pool, "Reference counter of specified segment is already 0", ItemName());
476 
477  if (--(fMem->fArr[id].refcnt) == 0) fMem->fFree.Push(id);
478  }
479 
480 }
481 
482 
483 bool dabc::MemoryPool::ProcessSend(unsigned port)
484 {
485  if (!fReqests[port].pending) {
486  fPending.Push(port);
487  fReqests[port].pending = true;
488  }
489 
490  RecheckRequests(true);
491 
492  return true; // always
493 }
494 
495 
496 void dabc::MemoryPool::ProcessEvent(const EventId& ev)
497 {
498 
499  if (ev.GetCode() == evntProcessRequests) {
500  {
501  LockGuard guard(ObjectMutex());
502  fEvntFired = false;
503  }
504 
505  RecheckRequests();
506 
507  return;
508  }
509 
511 }
512 
513 bool dabc::MemoryPool::RecheckRequests(bool from_recv)
514 {
515  // method called when one need to check if any requester need buffer and
516  // was blocked by lack of free memory
517 
518  if (fProcessingReq) {
519  EOUT("Event processing mismatch in the POOL - called for the second time");
520  return false;
521  }
522 
523  int cnt(100);
524 
525  fProcessingReq = true;
526 
527  while (!fPending.Empty() && (cnt-->0)) {
528 
529  // no any pending requests, nothing to do
530  unsigned portid = fPending.Front();
531 
532  if (portid>=fReqests.size()) {
533  EOUT("Old requests is not yet removed!!!");
534  fPending.Pop();
535  continue;
536  }
537 
538  if (!IsOutputConnected(portid)) {
539  // if port was disconnected, just skip all pending requests
540  fReqests[portid].pending = false;
541  fPending.Pop();
542  continue;
543  }
544 
545  if (!fReqests[portid].pending) {
546  EOUT("Request %u was not pending", portid);
547  fReqests[portid].pending = true;
548  }
549 
550  if (!CanSend(portid)) {
551  EOUT("Cannot send buffer to output %u", portid);
552  fPending.Pop();
553  continue;
554  }
555 
556  BufferSize_t sz = fReqests[portid].size;
557  Buffer buf;
558 
559  {
560  LockGuard lock(ObjectMutex());
561 
562  buf = _TakeBuffer(sz, false, true);
563 
564  if (buf.null()) { fProcessingReq = false; return false; }
565  }
566 
567  // we cannot get buffer, break loop and do not
568 
569  DOUT5("Memory pool %s send buffer size %u to output %u", GetName(), buf.GetTotalSize(), portid);
570 
571  if (CanSend(portid)) {
572  Send(portid, buf);
573  } else {
574  EOUT("Buffer %u is ready, but cannot be add to the queue", buf.GetTotalSize());
575  buf.Release();
576  }
577 
578  fPending.Pop();
579  fReqests[portid].pending = false;
580 
581  if (from_recv && fPending.Empty()) {
582  fProcessingReq = false;
583  return true;
584  }
585 
586  // if there are still requests pending, put it in the queue back
587  if (CanSend(portid)) {
588  fPending.Push(portid);
589  fReqests[portid].pending = true;
590  }
591  }
592 
593  // we need to fire event again while not all requests was processed
594  if (!fPending.Empty() && (cnt<=0))
595  FireEvent(evntProcessRequests);
596 
597  fProcessingReq = false;
598 
599  return fPending.Empty();
600 }
601 
602 
603 bool dabc::MemoryPool::CheckChangeCounter(unsigned &cnt)
604 {
605  bool res = cnt!=fChangeCounter;
606 
607  cnt = fChangeCounter;
608 
609  return res;
610 }
611 
612 bool dabc::MemoryPool::Reconstruct(Command cmd)
613 {
614  unsigned buffersize = Cfg(xmlBufferSize, cmd).AsUInt(GetDfltBufSize());
615 
616 // dabc::SetDebugLevel(2);
617  unsigned numbuffers = Cfg(xmlNumBuffers, cmd).AsUInt();
618 // dabc::SetDebugLevel(1);
619 
620  unsigned align = Cfg(xmlAlignment, cmd).AsUInt(GetDfltAlignment());
621 
622  DOUT1("POOL:%s bufsize:%u X num:%u", GetName(), buffersize, numbuffers);
623 
624  if (align) SetAlignment(align);
625 
626  return Allocate(buffersize, numbuffers);
627 }
628 
629 double dabc::MemoryPool::GetUsedRatio() const
630 {
631  LockGuard lock(ObjectMutex());
632 
633  if (fMem==0) return 0.;
634 
635  double sum1(0.), sum2(0.);
636  for(unsigned n=0;n<fMem->fNumber;n++) {
637  sum1 += fMem->fArr[n].size;
638  if (fMem->fArr[n].refcnt>0)
639  sum2 += fMem->fArr[n].size;
640  }
641 
642  return sum1>0. ? sum2/sum1 : 0.;
643 }
644 
645 
646 int dabc::MemoryPool::ExecuteCommand(Command cmd)
647 {
648  if (cmd.IsName("CreateNewRequester")) {
649 
650  unsigned portindx = NumOutputs();
651  std::string name = dabc::format("Output%u", portindx);
652 
653  for (unsigned n=0;n<fReqests.size();n++)
654  if (fReqests[n].disconn) {
655  portindx = n;
656  name = OutputName(portindx);
657  break;
658  }
659 
660  if (portindx==NumOutputs()) {
661  CreateOutput(name, 1);
662  portindx = FindOutput(name);
663  if (portindx != fReqests.size()) {
664  EOUT("Cannot create port");
665  exit(444);
666  }
667  fReqests.push_back(RequesterReq());
668  }
669 
670  cmd.SetStr("PortName", name);
671 
672  return cmd_true;
673  }
674 
675  return ModuleAsync::ExecuteCommand(cmd);
676 }
677 
678 void dabc::MemoryPool::ProcessConnectionActivated(const std::string &name, bool on)
679 {
680  DOUT4(" MemoryPool %s Port %s active=%s", GetName(), name.c_str(), DBOOL(on));
681 
682  if (on) ProduceOutputEvent(FindOutput(name));
683 }
684 
685 
686 void dabc::MemoryPool::ProcessConnectEvent(const std::string &name, bool on)
687 {
688  unsigned portid = FindOutput(name);
689  if (portid >= fReqests.size()) return;
690  if (!on) fReqests[portid].disconn = true;
691 }
692 
693 
694 // =========================================================================
695 
697 {
698  if (GetObject()==0) return dabc::Reference();
699 
700  dabc::Command cmd("CreateNewRequester");
701 
702  if (Execute(cmd)==dabc::cmd_true)
703  return FindPort(cmd.GetStr("PortName"));
704 
705  return dabc::Reference();
706 }
707 
708 
Reference on memory from memory pool.
Definition: Buffer.h:135
Represents command with its arguments.
Definition: Command.h:99
DABC exception.
Definition: Exception.h:57
bool Allocate(unsigned number, unsigned size, unsigned align)
Definition: MemoryPool.cxx:76
virtual ~MemoryBlock()
Definition: MemoryPool.cxx:50
Entry * fArr
array of buffers
Definition: MemoryPool.cxx:39
bool Assign(bool isowner, const std::vector< void * > &bufs, const std::vector< unsigned > &sizes)
Definition: MemoryPool.cxx:107
unsigned fNumber
number of buffers
Definition: MemoryPool.cxx:40
FreeQueue fFree
list of free buffers
Definition: MemoryPool.cxx:41
bool IsAnyFree() const
Definition: MemoryPool.cxx:55
Queue< unsigned, false > FreeQueue
Definition: MemoryPool.cxx:37
Reference CreateNewRequester()
Definition: MemoryPool.cxx:696
static unsigned fDfltAlignment
default alignment for memory allocation
Definition: MemoryPool.h:70
virtual void ProcessEvent(const EventId &)
Definition: MemoryPool.cxx:496
virtual ~MemoryPool()
Definition: MemoryPool.cxx:152
Buffer _TakeBuffer(BufferSize_t size, bool except, bool reserve_memory=true)
Central method, which reserves memory from pool and fill structures of buffer.
Definition: MemoryPool.cxx:333
bool Release()
Release memory and structures, allocated by memory pool.
Definition: MemoryPool.cxx:223
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: MemoryPool.cxx:646
bool SetAlignment(unsigned align)
Following methods could be used for configuration of pool before memory pool is allocated.
Definition: MemoryPool.cxx:172
bool IsEmpty() const
Return true, if memory pool is not yet created.
Definition: MemoryPool.cxx:237
bool Assign(bool isowner, const std::vector< void * > &bufs, const std::vector< unsigned > &sizes)
This is alternative method to supply memory to the pool.
Definition: MemoryPool.cxx:209
void ReleaseRawBuffer(unsigned indx)
Release raw buffer, allocated before by TakeRawBuffer.
Definition: MemoryPool.cxx:326
bool Reconstruct(Command cmd)
Reconstruct memory pool base on command configuration.
Definition: MemoryPool.cxx:612
virtual void ProcessConnectionActivated(const std::string &name, bool on)
Method called when port started or stopped.
Definition: MemoryPool.cxx:678
void IncreaseSegmRefs(MemSegment *segm, unsigned num)
Method increases ref.counuters of all segments.
Definition: MemoryPool.cxx:423
bool _Allocate(BufferSize_t bufsize=0, unsigned number=0)
Method to allocate memory for the pool, mutex should be locked.
Definition: MemoryPool.cxx:187
bool GetPoolInfo(std::vector< void * > &bufs, std::vector< unsigned > &sizes)
Return pointers and sizes of all memory buffers in the pool.
Definition: MemoryPool.cxx:311
unsigned GetMaxBufSize() const
Return maximum buffer size in the pool.
Definition: MemoryPool.cxx:258
bool CheckChangeCounter(unsigned &cnt)
Check if memory pool structure was changed since last call, do not involves memory pool mutex.
Definition: MemoryPool.cxx:603
bool IsSingleSegmRefs(MemSegment *segm, unsigned num)
Return true when all segments has refcnt==1.
Definition: MemoryPool.cxx:442
unsigned GetBufferSize(unsigned id) const
Returns size of preallocated buffer.
Definition: MemoryPool.cxx:251
bool TakeRawBuffer(unsigned &indx)
Reserve raw buffer without creating Buffer instance.
Definition: MemoryPool.cxx:318
double GetUsedRatio() const
Return relative usage of memory pool buffers.
Definition: MemoryPool.cxx:629
bool Allocate(BufferSize_t bufsize=0, unsigned number=0)
Allocates memory for the memory pool and creates references.
Definition: MemoryPool.cxx:203
bool _GetPoolInfo(std::vector< void * > &bufs, std::vector< unsigned > &sizes, unsigned *changecnt=0)
Return pointers and sizes of all memory buffers in the pool Could be used by devices and transport to...
Definition: MemoryPool.cxx:294
Buffer TakeBuffer(BufferSize_t size=0)
Returns Buffer object with exclusive access rights.
Definition: MemoryPool.cxx:409
virtual bool Find(ConfigIO &cfg)
Method to locate object in xml file.
Definition: MemoryPool.cxx:157
unsigned GetMinBufSize() const
Return minimum buffer size in the pool.
Definition: MemoryPool.cxx:272
unsigned GetAlignment() const
Following methods should be used after memory pool is created.
Definition: MemoryPool.cxx:181
bool RecheckRequests(bool from_recv=false)
Definition: MemoryPool.cxx:513
unsigned GetNumBuffers() const
Returns number of preallocated buffers.
Definition: MemoryPool.cxx:244
MemoryPool(const std::string &name, bool withmanager=false)
Definition: MemoryPool.cxx:137
void DecreaseSegmRefs(MemSegment *segm, unsigned num)
Decrease references of specified segments.
Definition: MemoryPool.cxx:462
bool ProcessSend(unsigned port)
Method called by framework when at least one buffer can be send to output port.
Definition: MemoryPool.cxx:483
virtual void ProcessConnectEvent(const std::string &name, bool on)
Method called by framework when connection state of the item is changed.
Definition: MemoryPool.cxx:686
static unsigned fDfltBufSize
default buffer size
Definition: MemoryPool.h:71
void * GetBufferLocation(unsigned id) const
Returns location of preallocated buffer.
Definition: MemoryPool.cxx:287
virtual void ProcessEvent(const EventId &)
Definition: Module.cxx:822
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
Definition: Module.h:232
bool Empty() const
Definition: Queue.h:325
void Push(T val)
Definition: Queue.h:215
void Reset()
Definition: Queue.h:312
void Allocate(unsigned capacity)
Definition: Queue.h:115
Reference on the arbitrary object
Definition: Reference.h:73
#define DOUT5(args ...)
Definition: logging.h:188
#define DOUT3(args ...)
Definition: logging.h:176
#define EOUT(args ...)
Definition: logging.h:150
#define DOUT1(args ...)
Definition: logging.h:162
#define DBOOL(arg)
Definition: logging.h:191
#define DOUT4(args ...)
Definition: logging.h:182
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
Definition: XmlEngine.cxx:917
Event manipulation API.
Definition: api.h:23
const char * xmlBufferSize
Definition: Object.cxx:49
uint32_t BufferSize_t
Definition: Buffer.h:32
std::string format(const char *fmt,...)
Definition: string.cxx:49
const char * xmlAlignment
Definition: Object.cxx:51
@ evntProcessRequests
Definition: MemoryPool.cxx:24
@ mbt_Generic
Definition: Buffer.h:40
const char * xmlMemoryPoolNode
Definition: Object.cxx:32
const char * xmlNumBuffers
Definition: Object.cxx:50
@ evntModuleLast
Definition: ModuleItem.h:51
@ ex_Pool
Definition: Exception.h:37
const char * xmlNameAttr
Definition: ConfigBase.cxx:33
@ cmd_true
Definition: Command.h:38
void * buf
pointer on raw memory
Definition: MemoryPool.cxx:31
BufferSize_t size
size of the block
Definition: MemoryPool.cxx:32
int refcnt
usage counter - number of references on the memory
Definition: MemoryPool.cxx:34
bool owner
is memory should be released
Definition: MemoryPool.cxx:33