summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-12-20 00:45:12 -0800
committerJosh Blum2012-12-20 00:45:12 -0800
commitb1093925bff9a104ec34ee058780bb247e319bc2 (patch)
treef29de5fcb695cacc45a4412f4a22be03ac48cbf7
parentb6dd30a39ffcd028c7ac5ab8cd7221b158050097 (diff)
downloadsandhi-b1093925bff9a104ec34ee058780bb247e319bc2.tar.gz
sandhi-b1093925bff9a104ec34ee058780bb247e319bc2.tar.bz2
sandhi-b1093925bff9a104ec34ee058780bb247e319bc2.zip
toying with circ buff and stitching
m---------PMC0
-rw-r--r--lib/block_allocator.cpp2
-rw-r--r--lib/circular_buffer.cpp28
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp43
4 files changed, 31 insertions, 42 deletions
diff --git a/PMC b/PMC
-Subproject 7a850055339a8a7d1744de8ba12d3b95c786827
+Subproject cc42002165934e8a8800b0265ee5fe9707583f0
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 54654ba..4c486ab 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -92,7 +92,7 @@ BufferQueueSptr Block::output_buffer_allocator(
config.length = recommend_length;
config.affinity = (*this)->block->buffer_affinity;
config.token = token;
- return BufferQueue::make_pool(config, THIS_MANY_BUFFERS);
+ return BufferQueue::make_circ(config, THIS_MANY_BUFFERS);
}
BufferQueueSptr Block::input_buffer_allocator(
diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp
index 1021a12..5e56469 100644
--- a/lib/circular_buffer.cpp
+++ b/lib/circular_buffer.cpp
@@ -5,6 +5,7 @@
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
+#include <boost/interprocess/anonymous_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread/thread.hpp>
@@ -58,35 +59,12 @@ CircularBuffer::CircularBuffer(const size_t num_bytes)
////////////////////////////////////////////////////////////////
// Step 0) Find an address that can be mapped across 2x length:
- // Allocate physical memory 2x the required size.
- // Map a virtual memory region across this memory.
- // Now we have a 2x length swath of virtual memory.
////////////////////////////////////////////////////////////////
{
- shm_name = omg_so_unique();
-
- //std::cout << "make shmem 2x\n" << std::endl;
- ipc::shared_memory_object shm_obj_2x(
- ipc::create_only, //only create
- shm_name.c_str(), //name
- ipc::read_write //read-write mode
- );
-
- //std::cout << "truncate 2x\n" << std::endl;
- shm_obj_2x.truncate(2*len);
-
- //std::cout << "map region 0\n" << std::endl;
- ipc::mapped_region region0(
- shm_obj_2x, //Memory-mappable object
- ipc::read_write, //Access mode
- 0, //Offset from the beginning of shm
- 2*len //Length of the region
- );
- //std::cout << "region0.get_address() " << size_t(region0.get_address()) << std::endl;
-
- ipc::shared_memory_object::remove(shm_name.c_str());
+ ipc::mapped_region region0(ipc::anonymous_shared_memory(len*2));
buff_addr = (char *)region0.get_address();
}
+ std::cout << "reserve addr " << std::hex << size_t(buff_addr) << std::dec << std::endl;
////////////////////////////////////////////////////////////////
// Step 1) Allocate a chunk of physical memory of length bytes
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 7f70d39..071f2ac 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -52,6 +52,8 @@ struct InputBufferQueues
//special case when the null buffer is possible
if (_queues[i].empty()) return get_null_buff();
+ this->try_stitch(i);
+
//there are enough enqueued bytes, but not in the front buffer
const bool must_accumulate = _queues[i].front().length < _reserve_bytes[i];
@@ -60,7 +62,7 @@ struct InputBufferQueues
const bool light_front = _queues[i].front().length <= _maximum_bytes[i]/2;
const bool should_accumulate = heavy_load and light_front;
- if (must_accumulate or should_accumulate) this->accumulate(i);
+ if (must_accumulate/* or should_accumulate*/) this->accumulate(i);
ASSERT(_queues[i].front().length >= _reserve_bytes[i]);
@@ -69,6 +71,28 @@ struct InputBufferQueues
return _queues[i].front();
}
+ GRAS_FORCE_INLINE void try_stitch(const size_t i)
+ {
+ if (_queues[i].size() < 2) return;
+ SBuffer &b0 = _queues[i][0];
+ SBuffer &b1 = _queues[i][1];
+
+ if (b0.offset > b0.get_actual_length())
+ {
+ const size_t xfer_bytes = b0.length;
+ ASSERT(b1.offset >= xfer_bytes);
+ b1.offset -= xfer_bytes;
+ b1.length += xfer_bytes;
+ _queues[i].pop_front();
+ return;
+ }
+
+ const size_t xfer_bytes = b1.length;
+ b0.length += xfer_bytes;
+ b1.length = 0;
+ b1.offset += xfer_bytes;
+ }
+
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
@@ -104,20 +128,7 @@ struct InputBufferQueues
{
ASSERT(not _queues[i].full());
if (buffer.length == 0) return;
-
- //does this buffer starts where the last one ends?
- //perform buffer stitching into back of buffer
- if (
- not _queues[i].empty() and _queues[i].back() == buffer and
- (_queues[i].back().length + _queues[i].back().offset) == buffer.offset
- ){
- _queues[i].back().length += buffer.length;
- }
- else
- {
- _queues[i].push_back(buffer);
- }
-
+ _queues[i].push_back(buffer);
_enqueued_bytes[i] += buffer.length;
__update(i);
}
@@ -287,7 +298,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
//update bounds on the current buffer
front.offset += bytes_consumed;
front.length -= bytes_consumed;
- ASSERT(front.offset <= front.get_actual_length());
+ //ASSERT(front.offset <= front.get_actual_length());
ASSERT((_queues[i].front().length % _items_sizes[i]) == 0);
if (front.length == 0) this->pop(i);