20 #include "dabc/defines.h"
61 for (
unsigned n=0;n<
fNumber;n++) {
76 bool Allocate(
unsigned number,
unsigned size,
unsigned align)
80 fArr =
new Entry[number];
85 for (
unsigned n=0;n<
fNumber;n++) {
88 int res = posix_memalign(&buf, align, size);
90 if ((res!=0) || (buf==0)) {
91 EOUT(
"Cannot allocate data for new Memory Block");
107 bool Assign(
bool isowner,
const std::vector<void*>& bufs,
const std::vector<unsigned>& sizes)
throw()
111 fArr =
new Entry[bufs.size()];
116 for (
unsigned n=0;n<
fNumber;n++) {
138 dabc::ModuleAsync(std::string(withmanager ?
"" :
"#") + name),
140 fAlignment(fDfltAlignment),
144 fProcessingReq(false),
148 DOUT3(
"MemoryPool %p name %s constructor",
this, GetName());
159 DOUT4(
"Module::Find %p name = %s parent %p",
this, GetName(),
GetParent());
165 if (cfg.CheckAttr(
xmlNameAttr, GetName()))
return true;
174 LockGuard lock(ObjectMutex());
175 if (fMem!=0)
return false;
183 LockGuard lock(ObjectMutex());
189 if (fMem!=0)
return false;
191 if (bufsize*number==0)
return false;
193 DOUT3(
"POOL:%s Create num:%u X size:%u buffers align:%u", GetName(), number, bufsize, fAlignment);
195 fMem =
new MemoryBlock;
196 fMem->Allocate(number, bufsize, fAlignment);
205 LockGuard lock(ObjectMutex());
206 return _Allocate(bufsize, number);
209 bool dabc::MemoryPool::Assign(
bool isowner,
const std::vector<void*>& bufs,
const std::vector<unsigned>& sizes)
throw()
211 LockGuard lock(ObjectMutex());
213 if (fMem!=0)
return false;
215 if ((bufs.size() != sizes.size()) || (bufs.size()==0))
return false;
217 fMem =
new MemoryBlock;
218 fMem->Assign(isowner, bufs, sizes);
225 LockGuard lock(ObjectMutex());
239 LockGuard lock(ObjectMutex());
246 LockGuard lock(ObjectMutex());
248 return (fMem==0) ? 0 : fMem->fNumber;
253 LockGuard lock(ObjectMutex());
255 return (fMem==0) || (
id>=fMem->fNumber) ? 0 : fMem->fArr[
id].size;
260 LockGuard lock(ObjectMutex());
262 if (fMem==0)
return 0;
266 for (
unsigned id=0;
id<fMem->fNumber;
id++)
267 if (fMem->fArr[
id].size > max) max = fMem->fArr[id].size;
274 LockGuard lock(ObjectMutex());
276 if (fMem==0)
return 0;
280 for (
unsigned id=0;
id<fMem->fNumber;
id++)
281 if ((min==0) || (fMem->fArr[
id].size < min)) min = fMem->fArr[id].size;
289 LockGuard lock(ObjectMutex());
291 return (fMem==0) || (
id>=fMem->fNumber) ? 0 : fMem->fArr[
id].buf;
297 if (*changecnt == fChangeCounter)
return false;
301 for(
unsigned n=0;n<fMem->fNumber;n++) {
302 bufs.push_back(fMem->fArr[n].buf);
303 sizes.push_back(fMem->fArr[n].size);
306 if (changecnt!=0) *changecnt = fChangeCounter;
313 LockGuard lock(ObjectMutex());
314 return _GetPoolInfo(bufs, sizes);
320 LockGuard lock(ObjectMutex());
321 if ((fMem==0) || !fMem->IsAnyFree())
return false;
322 indx = fMem->fFree.Pop();
328 LockGuard lock(ObjectMutex());
329 if (fMem) fMem->fFree.Push(indx);
340 if (fMem==0) _Allocate();
347 if (!fMem->IsAnyFree() && reserve_memory) {
352 if ((size==0) && reserve_memory) size = fMem->fArr[fMem->fFree.Front()].size;
358 if (cnt>=fMem->fFree.Size()) {
363 unsigned id = fMem->fFree.Item(cnt);
364 sum += fMem->fArr[id].size;
368 res.AllocateContainer(cnt < 8 ? 8 : cnt);
372 MemSegment* segs = res.Segments();
375 unsigned id = fMem->fFree.Pop();
377 if (fMem->fArr[
id].refcnt!=0)
380 if (cnt>=res.GetObject()->fCapacity)
383 segs[cnt].buffer = fMem->fArr[id].buf;
384 segs[cnt].datasize = fMem->fArr[id].size;
389 if (restsize < segs[cnt].datasize) segs[cnt].datasize = restsize;
391 sum += fMem->fArr[id].size;
394 fMem->fArr[id].refcnt++;
399 res.GetObject()->fPool.SetObject(
this,
false);
401 res.GetObject()->fNumSegments = cnt;
414 LockGuard lock(ObjectMutex());
416 res = _TakeBuffer(size,
true);
425 LockGuard lock(ObjectMutex());
430 for (
unsigned cnt=0;cnt<num;cnt++) {
431 unsigned id = segm[cnt].id;
432 if (
id>fMem->fNumber)
435 if (fMem->fArr[
id].refcnt + 1 == 0)
438 fMem->fArr[id].refcnt++;
444 LockGuard lock(ObjectMutex());
449 for (
unsigned cnt=0;cnt<num;cnt++) {
450 unsigned id = segm[cnt].id;
452 if (
id>fMem->fNumber)
455 if (fMem->fArr[
id].refcnt != 1)
return false;
464 LockGuard lock(ObjectMutex());
469 for (
unsigned cnt=0;cnt<num;cnt++) {
470 unsigned id = segm[cnt].id;
471 if (
id > fMem->fNumber)
474 if (fMem->fArr[
id].refcnt == 0)
477 if (--(fMem->fArr[
id].refcnt) == 0) fMem->fFree.Push(
id);
485 if (!fReqests[port].pending) {
487 fReqests[port].pending =
true;
490 RecheckRequests(
true);
501 LockGuard guard(ObjectMutex());
518 if (fProcessingReq) {
519 EOUT(
"Event processing mismatch in the POOL - called for the second time");
525 fProcessingReq =
true;
527 while (!fPending.Empty() && (cnt-->0)) {
530 unsigned portid = fPending.Front();
532 if (portid>=fReqests.size()) {
533 EOUT(
"Old requests is not yet removed!!!");
538 if (!IsOutputConnected(portid)) {
540 fReqests[portid].pending =
false;
545 if (!fReqests[portid].pending) {
546 EOUT(
"Request %u was not pending", portid);
547 fReqests[portid].pending =
true;
550 if (!CanSend(portid)) {
551 EOUT(
"Cannot send buffer to output %u", portid);
560 LockGuard lock(ObjectMutex());
562 buf = _TakeBuffer(sz,
false,
true);
564 if (buf.null()) { fProcessingReq =
false;
return false; }
569 DOUT5(
"Memory pool %s send buffer size %u to output %u", GetName(), buf.GetTotalSize(), portid);
571 if (CanSend(portid)) {
574 EOUT(
"Buffer %u is ready, but cannot be add to the queue", buf.GetTotalSize());
579 fReqests[portid].pending =
false;
581 if (from_recv && fPending.Empty()) {
582 fProcessingReq =
false;
587 if (CanSend(portid)) {
588 fPending.Push(portid);
589 fReqests[portid].pending =
true;
594 if (!fPending.Empty() && (cnt<=0))
597 fProcessingReq =
false;
599 return fPending.Empty();
605 bool res = cnt!=fChangeCounter;
607 cnt = fChangeCounter;
614 unsigned buffersize = Cfg(
xmlBufferSize, cmd).AsUInt(GetDfltBufSize());
620 unsigned align = Cfg(
xmlAlignment, cmd).AsUInt(GetDfltAlignment());
622 DOUT1(
"POOL:%s bufsize:%u X num:%u", GetName(), buffersize, numbuffers);
624 if (align) SetAlignment(align);
626 return Allocate(buffersize, numbuffers);
631 LockGuard lock(ObjectMutex());
633 if (fMem==0)
return 0.;
635 double sum1(0.), sum2(0.);
636 for(
unsigned n=0;n<fMem->fNumber;n++) {
637 sum1 += fMem->fArr[n].size;
638 if (fMem->fArr[n].refcnt>0)
639 sum2 += fMem->fArr[n].size;
642 return sum1>0. ? sum2/sum1 : 0.;
648 if (cmd.IsName(
"CreateNewRequester")) {
650 unsigned portindx = NumOutputs();
653 for (
unsigned n=0;n<fReqests.size();n++)
654 if (fReqests[n].disconn) {
656 name = OutputName(portindx);
660 if (portindx==NumOutputs()) {
661 CreateOutput(name, 1);
662 portindx = FindOutput(name);
663 if (portindx != fReqests.size()) {
664 EOUT(
"Cannot create port");
667 fReqests.push_back(RequesterReq());
670 cmd.SetStr(
"PortName", name);
680 DOUT4(
" MemoryPool %s Port %s active=%s", GetName(), name.c_str(),
DBOOL(on));
682 if (on) ProduceOutputEvent(FindOutput(name));
688 unsigned portid = FindOutput(name);
689 if (portid >= fReqests.size())
return;
690 if (!on) fReqests[portid].disconn =
true;
703 return FindPort(cmd.GetStr(
"PortName"));
Reference on memory from memory pool.
Represents command with its arguments.
bool Allocate(unsigned number, unsigned size, unsigned align)
Entry * fArr
array of buffers
bool Assign(bool isowner, const std::vector< void * > &bufs, const std::vector< unsigned > &sizes)
unsigned fNumber
number of buffers
FreeQueue fFree
list of free buffers
Queue< unsigned, false > FreeQueue
Reference CreateNewRequester()
static unsigned fDfltAlignment
default alignment for memory allocation
virtual void ProcessEvent(const EventId &)
Buffer _TakeBuffer(BufferSize_t size, bool except, bool reserve_memory=true)
Central method, which reserves memory from pool and fill structures of buffer.
bool Release()
Release memory and structures, allocated by memory pool.
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
bool SetAlignment(unsigned align)
Following methods could be used for configuration of pool before memory pool is allocated.
bool IsEmpty() const
Return true, if memory pool is not yet created.
bool Assign(bool isowner, const std::vector< void * > &bufs, const std::vector< unsigned > &sizes)
This is alternative method to supply memory to the pool.
void ReleaseRawBuffer(unsigned indx)
Release raw buffer, allocated before by TakeRawBuffer.
bool Reconstruct(Command cmd)
Reconstruct memory pool base on command configuration.
virtual void ProcessConnectionActivated(const std::string &name, bool on)
Method called when port started or stopped.
void IncreaseSegmRefs(MemSegment *segm, unsigned num)
Method increases ref.counuters of all segments.
bool _Allocate(BufferSize_t bufsize=0, unsigned number=0)
Method to allocate memory for the pool, mutex should be locked.
bool GetPoolInfo(std::vector< void * > &bufs, std::vector< unsigned > &sizes)
Return pointers and sizes of all memory buffers in the pool.
unsigned GetMaxBufSize() const
Return maximum buffer size in the pool.
bool CheckChangeCounter(unsigned &cnt)
Check if memory pool structure was changed since last call, do not involves memory pool mutex.
bool IsSingleSegmRefs(MemSegment *segm, unsigned num)
Return true when all segments has refcnt==1.
unsigned GetBufferSize(unsigned id) const
Returns size of preallocated buffer.
bool TakeRawBuffer(unsigned &indx)
Reserve raw buffer without creating Buffer instance.
double GetUsedRatio() const
Return relative usage of memory pool buffers.
bool Allocate(BufferSize_t bufsize=0, unsigned number=0)
Allocates memory for the memory pool and creates references.
bool _GetPoolInfo(std::vector< void * > &bufs, std::vector< unsigned > &sizes, unsigned *changecnt=0)
Return pointers and sizes of all memory buffers in the pool Could be used by devices and transport to...
Buffer TakeBuffer(BufferSize_t size=0)
Returns Buffer object with exclusive access rights.
virtual bool Find(ConfigIO &cfg)
Method to locate object in xml file.
unsigned GetMinBufSize() const
Return minimum buffer size in the pool.
unsigned GetAlignment() const
Following methods should be used after memory pool is created.
bool RecheckRequests(bool from_recv=false)
unsigned GetNumBuffers() const
Returns number of preallocated buffers.
MemoryPool(const std::string &name, bool withmanager=false)
void DecreaseSegmRefs(MemSegment *segm, unsigned num)
Decrease references of specified segments.
bool ProcessSend(unsigned port)
Method called by framework when at least one buffer can be send to output port.
virtual void ProcessConnectEvent(const std::string &name, bool on)
Method called by framework when connection state of the item is changed.
static unsigned fDfltBufSize
default buffer size
void * GetBufferLocation(unsigned id) const
Returns location of preallocated buffer.
virtual void ProcessEvent(const EventId &)
virtual int ExecuteCommand(Command cmd)
Main method where commands are executed.
void Allocate(unsigned capacity)
Reference on the arbitrary object
XMLNodePointer_t GetParent(XMLNodePointer_t xmlnode)
const char * xmlBufferSize
std::string format(const char *fmt,...)
const char * xmlAlignment
const char * xmlMemoryPoolNode
const char * xmlNumBuffers
void * buf
pointer on raw memory
BufferSize_t size
size of the block
int refcnt
usage counter - number of references on the memory
bool owner
is memory should be released