diff options
m--------- | PMC | 0 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 2 | ||||
-rw-r--r-- | lib/circular_buffer.cpp | 28 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 43 |
4 files changed, 31 insertions, 42 deletions
diff --git a/PMC b/PMC -Subproject 7a850055339a8a7d1744de8ba12d3b95c786827 +Subproject cc42002165934e8a8800b0265ee5fe9707583f0 diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 54654ba..4c486ab 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -92,7 +92,7 @@ BufferQueueSptr Block::output_buffer_allocator( config.length = recommend_length; config.affinity = (*this)->block->buffer_affinity; config.token = token; - return BufferQueue::make_pool(config, THIS_MANY_BUFFERS); + return BufferQueue::make_circ(config, THIS_MANY_BUFFERS); } BufferQueueSptr Block::input_buffer_allocator( diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp index 1021a12..5e56469 100644 --- a/lib/circular_buffer.cpp +++ b/lib/circular_buffer.cpp @@ -5,6 +5,7 @@ #include <boost/format.hpp> #include <boost/bind.hpp> #include <boost/interprocess/shared_memory_object.hpp> +#include <boost/interprocess/anonymous_shared_memory.hpp> #include <boost/interprocess/mapped_region.hpp> #include <boost/lexical_cast.hpp> #include <boost/thread/thread.hpp> @@ -58,35 +59,12 @@ CircularBuffer::CircularBuffer(const size_t num_bytes) //////////////////////////////////////////////////////////////// // Step 0) Find an address that can be mapped across 2x length: - // Allocate physical memory 2x the required size. - // Map a virtual memory region across this memory. - // Now we have a 2x length swath of virtual memory. //////////////////////////////////////////////////////////////// { - shm_name = omg_so_unique(); - - //std::cout << "make shmem 2x\n" << std::endl; - ipc::shared_memory_object shm_obj_2x( - ipc::create_only, //only create - shm_name.c_str(), //name - ipc::read_write //read-write mode - ); - - //std::cout << "truncate 2x\n" << std::endl; - shm_obj_2x.truncate(2*len); - - //std::cout << "map region 0\n" << std::endl; - ipc::mapped_region region0( - shm_obj_2x, //Memory-mappable object - ipc::read_write, //Access mode - 0, //Offset from the beginning of shm - 2*len //Length of the region - ); - //std::cout << "region0.get_address() " << size_t(region0.get_address()) << std::endl; - - ipc::shared_memory_object::remove(shm_name.c_str()); + ipc::mapped_region region0(ipc::anonymous_shared_memory(len*2)); buff_addr = (char *)region0.get_address(); } + std::cout << "reserve addr " << std::hex << size_t(buff_addr) << std::dec << std::endl; //////////////////////////////////////////////////////////////// // Step 1) Allocate a chunk of physical memory of length bytes diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 7f70d39..071f2ac 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -52,6 +52,8 @@ struct InputBufferQueues //special case when the null buffer is possible if (_queues[i].empty()) return get_null_buff(); + this->try_stitch(i); + //there are enough enqueued bytes, but not in the front buffer const bool must_accumulate = _queues[i].front().length < _reserve_bytes[i]; @@ -60,7 +62,7 @@ struct InputBufferQueues const bool light_front = _queues[i].front().length <= _maximum_bytes[i]/2; const bool should_accumulate = heavy_load and light_front; - if (must_accumulate or should_accumulate) this->accumulate(i); + if (must_accumulate/* or should_accumulate*/) this->accumulate(i); ASSERT(_queues[i].front().length >= _reserve_bytes[i]); @@ -69,6 +71,28 @@ struct InputBufferQueues return _queues[i].front(); } + GRAS_FORCE_INLINE void try_stitch(const size_t i) + { + if (_queues[i].size() < 2) return; + SBuffer &b0 = _queues[i][0]; + SBuffer &b1 = _queues[i][1]; + + if (b0.offset > b0.get_actual_length()) + { + const size_t xfer_bytes = b0.length; + ASSERT(b1.offset >= xfer_bytes); + b1.offset -= xfer_bytes; + b1.length += xfer_bytes; + _queues[i].pop_front(); + return; + } + + const size_t xfer_bytes = b1.length; + b0.length += xfer_bytes; + b1.length = 0; + b1.offset += xfer_bytes; + } + //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); @@ -104,20 +128,7 @@ struct InputBufferQueues { ASSERT(not _queues[i].full()); if (buffer.length == 0) return; - - //does this buffer starts where the last one ends? - //perform buffer stitching into back of buffer - if ( - not _queues[i].empty() and _queues[i].back() == buffer and - (_queues[i].back().length + _queues[i].back().offset) == buffer.offset - ){ - _queues[i].back().length += buffer.length; - } - else - { - _queues[i].push_back(buffer); - } - + _queues[i].push_back(buffer); _enqueued_bytes[i] += buffer.length; __update(i); } @@ -287,7 +298,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b //update bounds on the current buffer front.offset += bytes_consumed; front.length -= bytes_consumed; - ASSERT(front.offset <= front.get_actual_length()); + //ASSERT(front.offset <= front.get_actual_length()); ASSERT((_queues[i].front().length % _items_sizes[i]) == 0); if (front.length == 0) this->pop(i); |