summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/gras/CMakeLists.txt1
-rw-r--r--include/gras/block.hpp9
-rw-r--r--include/gras/block.i1
-rw-r--r--include/gras/buffer_queue.hpp73
-rw-r--r--lib/CMakeLists.txt3
-rw-r--r--lib/block.cpp15
-rw-r--r--lib/block_allocator.cpp28
-rw-r--r--lib/block_task.cpp30
-rw-r--r--lib/buffer_queue_circ.cpp137
-rw-r--r--lib/buffer_queue_pool.cpp67
-rw-r--r--lib/circular_buffer.cpp132
-rw-r--r--lib/gras_impl/block_actor.hpp13
-rw-r--r--lib/gras_impl/debug.hpp2
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp49
-rw-r--r--lib/gras_impl/messages.hpp21
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp84
-rw-r--r--lib/gras_impl/simple_buffer_queue.hpp (renamed from lib/gras_impl/buffer_queue.hpp)14
-rw-r--r--lib/input_handlers.cpp17
-rw-r--r--lib/output_handlers.cpp17
-rw-r--r--lib/register_messages.cpp3
-rw-r--r--lib/topology_handler.cpp29
21 files changed, 596 insertions, 149 deletions
diff --git a/include/gras/CMakeLists.txt b/include/gras/CMakeLists.txt
index e8fb85e..f0b0d3a 100644
--- a/include/gras/CMakeLists.txt
+++ b/include/gras/CMakeLists.txt
@@ -21,6 +21,7 @@ install(FILES
thread_pool.hpp
top_block.hpp
work_buffer.hpp
+ buffer_queue.hpp
DESTINATION include/gras
COMPONENT ${GRAS_COMP_DEVEL}
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index 3899561..524ff00 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -8,6 +8,7 @@
#include <gras/tag_iter.hpp>
#include <gras/tags.hpp>
#include <gras/work_buffer.hpp>
+#include <gras/buffer_queue.hpp>
#include <vector>
#include <string>
@@ -337,9 +338,9 @@ struct GRAS_API Block : Element
* \param which_output the output port index number
* \param token the token for the buffer's returner
* \param recommend_length the schedulers recommended length in bytes
- * \return the token used for the buffer allocation (may be the same)
+ * \return a shared ptr to a new buffer queue object
*/
- virtual SBufferToken output_buffer_allocator(
+ virtual BufferQueueSptr output_buffer_allocator(
const size_t which_output,
const SBufferToken &token,
const size_t recommend_length
@@ -356,9 +357,9 @@ struct GRAS_API Block : Element
* \param which_input the input port index number
* \param token the token for the buffer's returner
* \param recommend_length the schedulers recommended length in bytes
- * \return the token used for the buffer allocation (may be the same)
+ * \return a shared ptr to a new buffer queue object
*/
- virtual SBufferToken input_buffer_allocator(
+ virtual BufferQueueSptr input_buffer_allocator(
const size_t which_input,
const SBufferToken &token,
const size_t recommend_length
diff --git a/include/gras/block.i b/include/gras/block.i
index 80c3905..ef2738c 100644
--- a/include/gras/block.i
+++ b/include/gras/block.i
@@ -11,6 +11,7 @@
%import <gras/tags.i>
%include <gras/tag_iter.i>
%import <gras/sbuffer.i>
+%include <gras/buffer_queue.hpp>
%include <gras/block.hpp>
#endif /*INCLUDED_GRAS_BLOCK_I*/
diff --git a/include/gras/buffer_queue.hpp b/include/gras/buffer_queue.hpp
new file mode 100644
index 0000000..da8de41
--- /dev/null
+++ b/include/gras/buffer_queue.hpp
@@ -0,0 +1,73 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#ifndef INCLUDED_GRAS_BUFFER_QUEUE_HPP
+#define INCLUDED_GRAS_BUFFER_QUEUE_HPP
+
+#include <gras/gras.hpp>
+#include <gras/sbuffer.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace gras
+{
+
+struct BufferQueue;
+
+typedef boost::shared_ptr<BufferQueue> BufferQueueSptr;
+
+//! Buffer Queue is an interface enabling us to create custom buffer allocators.
+struct BufferQueue
+{
+
+ /*!
+ * Create a buffer queue object using the pool allocator.
+ * A pool of buffers contains individual buffer allocations.
+ *
+ * The config param is used to pass information
+ * about per-buffer size, affinity, and token.
+ *
+ * \param config used to alloc one buffer
+ * \param num_buffs alloc this many buffs
+ * \return a new buffer queue sptr
+ */
+ GRAS_API static BufferQueueSptr make_pool(
+ const SBufferConfig &config,
+ const size_t num_buffs
+ );
+
+ /*!
+ * Create a buffer queue object using the circular allocator.
+ * The circular allocator contains one large double-mapped buffer.
+ * This buffer is virtually mapped so that buff[n+len] = buff[n].
+ *
+ * Pieces of this buffer can be passed around in smaller chunks.
+ * The number of chunks is dictated by num_buffs,
+ * the size of the chunks is dictated by config.length.
+ * The size of the circular buffer will be num_buffs*config.length
+ *
+ * \param config used to alloc one buffer
+ * \param num_buffs this many smaller chunks
+ *
+ * \return a new buffer queue sptr
+ */
+ GRAS_API static BufferQueueSptr make_circ(
+ const SBufferConfig &config,
+ const size_t num_buffs
+ );
+
+ //! Get a reference to the buffer at the front of the queue
+ virtual SBuffer &front(void) = 0;
+
+ //! Pop off the used portion of the queue
+ virtual void pop(void) = 0;
+
+ //! Push a used buffer back into the queue
+ virtual void push(const SBuffer &buff) = 0;
+
+ //! Is the queue empty?
+ virtual bool empty(void) const = 0;
+
+};
+
+} //namespace gras
+
+#endif /*INCLUDED_GRAS_BUFFER_QUEUE_HPP*/
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 8f6f46c..bc38d06 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -40,6 +40,9 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/debug.cpp
${CMAKE_CURRENT_SOURCE_DIR}/element.cpp
${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/circular_buffer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_circ.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp
diff --git a/lib/block.cpp b/lib/block.cpp
index 85f7654..c806063 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -34,7 +34,6 @@ Block::Block(const std::string &name):
(*this)->block->name = name; //for debug purposes
//setup some state variables
- (*this)->block->topology_init = false;
(*this)->block->block_ptr = this;
(*this)->block->block_state = BlockActor::BLOCK_STATE_INIT;
@@ -89,8 +88,11 @@ InputPortConfig Block::get_input_config(const size_t which_input) const
void Block::set_input_config(const size_t which_input, const InputPortConfig &config)
{
vector_set((*this)->block->input_configs, config, which_input);
- if ((*this)->block->topology_init)
- (*this)->block->Push(UpdateInputsMessage(), Theron::Address());
+ {
+ InputUpdateMessage message;
+ message.index = which_input;
+ (*this)->block->Push(message, Theron::Address());
+ }
}
OutputPortConfig Block::get_output_config(const size_t which_output) const
@@ -101,8 +103,11 @@ OutputPortConfig Block::get_output_config(const size_t which_output) const
void Block::set_output_config(const size_t which_output, const OutputPortConfig &config)
{
vector_set((*this)->block->output_configs, config, which_output);
- if ((*this)->block->topology_init)
- (*this)->block->Push(UpdateInputsMessage(), Theron::Address());
+ {
+ OutputUpdateMessage message;
+ message.index = which_output;
+ (*this)->block->Push(message, Theron::Address());
+ }
}
void Block::consume(const size_t which_input, const size_t num_items)
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 64b3aed..4c486ab 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -58,7 +58,6 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
//allocate output buffers which will also wake up the task
const size_t num_outputs = this->get_num_outputs();
- this->output_buffer_tokens.resize(num_outputs);
for (size_t i = 0; i < num_outputs; i++)
{
const size_t bytes = recommend_length(
@@ -71,7 +70,8 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1);
SBufferToken token = SBufferToken(new SBufferDeleter(deleter));
- this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes);
+ BufferQueueSptr queue = block_ptr->output_buffer_allocator(i, token, bytes);
+ this->output_queues.set_buffer_queue(i, queue);
InputAllocMessage message;
message.token = SBufferToken(new SBufferDeleter(deleter));
@@ -82,29 +82,23 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
this->Send(0, from); //ACK
}
-SBufferToken Block::output_buffer_allocator(
+BufferQueueSptr Block::output_buffer_allocator(
const size_t,
const SBufferToken &token,
const size_t recommend_length
){
- for (size_t j = 0; j < THIS_MANY_BUFFERS; j++)
- {
- SBufferConfig config;
- config.memory = NULL;
- config.length = recommend_length;
- config.affinity = (*this)->block->buffer_affinity;
- config.token = token;
- SBuffer buff(config);
- std::memset(buff.get_actual_memory(), 0, buff.get_actual_length());
- //buffer derefs here and the token messages it back to the block
- }
- return token;
+ SBufferConfig config;
+ config.memory = NULL;
+ config.length = recommend_length;
+ config.affinity = (*this)->block->buffer_affinity;
+ config.token = token;
+ return BufferQueue::make_circ(config, THIS_MANY_BUFFERS);
}
-SBufferToken Block::input_buffer_allocator(
+BufferQueueSptr Block::input_buffer_allocator(
const size_t,
const SBufferToken &,
const size_t
){
- return SBufferToken(); //null
+ return BufferQueueSptr(); //null
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 9e5917d..4f44de3 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -31,9 +31,6 @@ void BlockActor::mark_done(void)
//release upstream, downstream, and executor tokens
this->token_pool.clear();
- //release allocator tokens, buffers can now call deleters
- this->output_buffer_tokens.clear();
-
//release all buffers in queues
this->input_queues.flush_all();
this->output_queues.flush_all();
@@ -95,8 +92,8 @@ void BlockActor::output_fail(const size_t i)
throw std::runtime_error("output_fail called on maximum_items buffer");
}
- //force pop so next work() gets a new buffer
- this->flush_output(i, true);
+ //mark fail: not ready until a new buffer appears
+ this->output_queues.fail(i);
}
GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
@@ -156,7 +153,8 @@ void BlockActor::handle_task(void)
this->input_items.max() = std::max(this->input_items.max(), items);
//inline dealings, how and when input buffers can be inlined into output buffers
- //continue;
+ continue; //FIXME to implement needs change
+ /*
if (
buff.unique() and
input_configs[i].inline_buffer and
@@ -170,6 +168,7 @@ void BlockActor::handle_task(void)
this->output_queues.push_front(output_inline_index, new_obuff); //you got inlined!
output_inline_index++; //done do this output port again
}
+ */
}
//------------------------------------------------------------------
@@ -234,7 +233,7 @@ void BlockActor::handle_task(void)
void BlockActor::consume(const size_t i, const size_t items)
{
#ifdef ITEM_CONSPROD
- std::cerr << "consume " << items << std::endl;
+ std::cerr << name << " consume " << items << std::endl;
#endif
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
@@ -245,7 +244,7 @@ void BlockActor::consume(const size_t i, const size_t items)
void BlockActor::produce(const size_t i, const size_t items)
{
#ifdef ITEM_CONSPROD
- std::cerr << "produce " << items << std::endl;
+ std::cerr << name << " produce " << items << std::endl;
#endif
SBuffer &buff = this->output_queues.front(i);
this->items_produced[i] += items;
@@ -263,9 +262,9 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
this->post_downstream(i, buff_msg);
}
-GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force_pop)
+GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
{
- if (not this->output_queues.ready(i) or this->output_queues.front(i).length == 0) return;
+ if (this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return;
SBuffer &buff = this->output_queues.front(i);
InputBufferMessage buff_msg;
buff_msg.buffer = buff;
@@ -275,13 +274,6 @@ GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force
buff.offset += buff.length;
buff.length = 0;
- //when to pop the buffer and give next work a new one
- const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i];
- if (
- force_pop or (buff.offset*2 > buff.get_actual_length()) or
- (buff.get_actual_length() - buff.offset) < reserve_bytes
- )
- {
- this->output_queues.pop(i);
- }
+ //release whatever has been used of the output buffer
+ this->output_queues.pop(i);
}
diff --git a/lib/buffer_queue_circ.cpp b/lib/buffer_queue_circ.cpp
new file mode 100644
index 0000000..60a2b51
--- /dev/null
+++ b/lib/buffer_queue_circ.cpp
@@ -0,0 +1,137 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/buffer_queue.hpp>
+#include <gras_impl/debug.hpp>
+#include <boost/circular_buffer.hpp>
+#include <vector>
+
+using namespace gras;
+
+SBuffer make_circular_buffer(const size_t num_bytes); //circular_buffer.cpp
+
+static void buffer_dummy_delete(SBuffer &, const SBuffer /*ref holder*/)
+{
+ //NOP
+}
+
+struct BufferQueueCirc : BufferQueue
+{
+ BufferQueueCirc(const SBufferConfig &config, const size_t num);
+
+ ~BufferQueueCirc(void)
+ {
+ _token.reset();
+ _available_buffers.clear();
+ _returned_buffers.clear();
+ }
+
+ SBuffer &front(void);
+
+ void pop(void);
+
+ void push(const SBuffer &buff);
+
+ bool empty(void) const
+ {
+ return _bytes_avail == 0 or _available_buffers.empty();
+ }
+
+ SBufferToken _token;
+ SBuffer _circ_buff;
+ char *_write_ptr;
+ size_t _bytes_avail;
+ size_t _ack_index;
+ boost::circular_buffer<SBuffer> _available_buffers;
+ std::vector<SBuffer> _returned_buffers;
+ std::vector<size_t> _outgone_bytes;
+
+};
+
+BufferQueueCirc::BufferQueueCirc(const SBufferConfig &config, const size_t num_buffs):
+ _token(config.token),
+ _ack_index(0)
+{
+ //allocate a large buffer
+ const size_t num_bytes = config.length * num_buffs;
+ _circ_buff = make_circular_buffer(num_bytes);
+ _write_ptr = (char *)_circ_buff.get_actual_memory();
+ _bytes_avail = _circ_buff.get_actual_length();
+
+ //create dummy deleter to hold ref to circ buff
+ SBufferDeleter deleter = boost::bind(&buffer_dummy_delete, _1, _circ_buff);
+
+ //allocate pool of sbuffers
+ _available_buffers.set_capacity(num_buffs);
+ _returned_buffers.resize(num_buffs);
+ _outgone_bytes.resize(num_buffs, 0);
+ SBufferConfig sconfig = config;
+ sconfig.deleter = deleter;
+ sconfig.memory = _circ_buff.get_actual_memory();
+ for (size_t i = 0; i < num_buffs; i++)
+ {
+ sconfig.user_index = i;
+ SBuffer buff(sconfig);
+ //buffer derefs and returns to this queue thru token callback
+ }
+}
+
+SBuffer &BufferQueueCirc::front(void)
+{
+ ASSERT(not this->empty());
+ SBuffer &front = _available_buffers.front();
+ ASSERT(front);
+ front->config.memory = _write_ptr;
+ return front;
+}
+
+void BufferQueueCirc::pop(void)
+{
+ ASSERT(not this->empty());
+ SBuffer &front = _available_buffers.front();
+ const size_t num_bytes = front.offset;
+
+ //store number of bytes for buffer return
+ _outgone_bytes[front.get_user_index()] = num_bytes;
+
+ //pop the buffer from internal reference
+ _available_buffers.pop_front();
+
+ //adjust the write pointer
+ _write_ptr += num_bytes;
+
+ //handle circular wrap
+ if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length()))
+ {
+ _write_ptr -= _circ_buff.get_actual_length();
+ }
+
+ //subtract out of available bytes
+ ASSERT(_bytes_avail >= num_bytes);
+ _bytes_avail -= num_bytes;
+}
+
+void BufferQueueCirc::push(const SBuffer &buff)
+{
+ _returned_buffers[buff.get_user_index()] = buff;
+
+ //ack starting at the expected index and up
+ while (_returned_buffers[_ack_index])
+ {
+ //return the held bytes to the available
+ _bytes_avail += _outgone_bytes[_ack_index];
+
+ //remove the buffer container into the queue
+ _available_buffers.push_back(_returned_buffers[_ack_index]);
+ _returned_buffers[_ack_index].reset();
+
+ //increment the ack index for the next run
+ if (++_ack_index == _returned_buffers.size()) _ack_index = 0;
+ }
+}
+
+BufferQueueSptr BufferQueue::make_circ(
+ const SBufferConfig &config,
+ const size_t num_buffs
+){
+ return BufferQueueSptr(new BufferQueueCirc(config, num_buffs));
+}
diff --git a/lib/buffer_queue_pool.cpp b/lib/buffer_queue_pool.cpp
new file mode 100644
index 0000000..db9b68a
--- /dev/null
+++ b/lib/buffer_queue_pool.cpp
@@ -0,0 +1,67 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/buffer_queue.hpp>
+#include <gras_impl/debug.hpp>
+#include <boost/circular_buffer.hpp>
+
+using namespace gras;
+
+struct BufferQueuePool : BufferQueue
+{
+ BufferQueuePool(const SBufferConfig &config, const size_t num):
+ _token(config.token), //save config, its holds token
+ _queue(boost::circular_buffer<SBuffer>(num))
+ {
+ //NOP
+ }
+
+ ~BufferQueuePool(void)
+ {
+ _token.reset();
+ _queue.clear();
+ }
+
+ SBuffer &front(void)
+ {
+ ASSERT(not _queue.empty());
+ ASSERT(_queue.front());
+ return _queue.front();
+ }
+
+ void pop(void)
+ {
+ ASSERT(not _queue.empty());
+ _queue.front().reset(); //dont hold ref
+ _queue.pop_front();
+ }
+
+ void push(const SBuffer &buff)
+ {
+ _queue.push_back(buff);
+ }
+
+ bool empty(void) const
+ {
+ return _queue.empty();
+ }
+
+ SBufferToken _token;
+ boost::circular_buffer<SBuffer> _queue;
+
+};
+
+BufferQueueSptr BufferQueue::make_pool(
+ const SBufferConfig &config,
+ const size_t num_buffs
+){
+ BufferQueueSptr queue(new BufferQueuePool(config, num_buffs));
+
+ for (size_t i = 0; i < num_buffs; i++)
+ {
+ SBuffer buff(config);
+ std::memset(buff.get_actual_memory(), 0, buff.get_actual_length());
+ //buffer derefs and returns to this queue thru token callback
+ }
+
+ return queue;
+}
diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp
new file mode 100644
index 0000000..5e56469
--- /dev/null
+++ b/lib/circular_buffer.cpp
@@ -0,0 +1,132 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/buffer_queue.hpp>
+#include <gras_impl/debug.hpp>
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
+#include <boost/interprocess/shared_memory_object.hpp>
+#include <boost/interprocess/anonymous_shared_memory.hpp>
+#include <boost/interprocess/mapped_region.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+
+using namespace gras;
+namespace ipc = boost::interprocess;
+
+static boost::mutex alloc_mutex;
+
+/*!
+* This routine generates an incredibly unique name for the allocation.
+*
+* Because we are using boost IPC, and it expects that we share the memory;
+* IPC allocation requires a unique name to share amongst the processes.
+* Since we are not actually using IPC, the sharing aspect isnt very useful,
+* but we still need a unique name for the shared memory allocation anyway.
+* (I would like if a empty string would suffice as an anonymous allocation)
+*/
+static std::string omg_so_unique(void)
+{
+ const std::string tid = boost::lexical_cast<std::string>(boost::this_thread::get_id());
+ static size_t count = 0;
+ return boost::str(boost::format("shmem-%s-%u") % tid % count++);
+}
+
+struct CircularBuffer
+{
+ CircularBuffer(const size_t);
+
+ ~CircularBuffer(void)
+ {
+ ipc::shared_memory_object::remove(shm_name.c_str());
+ }
+
+ char *buff_addr;
+ size_t actual_length;
+ std::string shm_name;
+ ipc::shared_memory_object shm_obj;
+ ipc::mapped_region region1;
+ ipc::mapped_region region2;
+};
+
+CircularBuffer::CircularBuffer(const size_t num_bytes)
+{
+ boost::mutex::scoped_lock lock(alloc_mutex);
+
+ const size_t chunk = ipc::mapped_region::get_page_size();
+ const size_t len = chunk*((num_bytes + chunk - 1)/chunk);
+ actual_length = len;
+
+ ////////////////////////////////////////////////////////////////
+ // Step 0) Find an address that can be mapped across 2x length:
+ ////////////////////////////////////////////////////////////////
+ {
+ ipc::mapped_region region0(ipc::anonymous_shared_memory(len*2));
+ buff_addr = (char *)region0.get_address();
+ }
+ std::cout << "reserve addr " << std::hex << size_t(buff_addr) << std::dec << std::endl;
+
+ ////////////////////////////////////////////////////////////////
+ // Step 1) Allocate a chunk of physical memory of length bytes
+ ////////////////////////////////////////////////////////////////
+ //std::cout << "make shmem\n" << std::endl;
+ shm_name = omg_so_unique();
+ shm_obj = ipc::shared_memory_object(
+ ipc::create_only, //only create
+ shm_name.c_str(), //name
+ ipc::read_write //read-write mode
+ );
+
+ //std::cout << "truncate\n" << std::endl;
+ shm_obj.truncate(len);
+
+ ////////////////////////////////////////////////////////////////
+ //Step 2) Remap region1 of the virtual memory space
+ ////////////////////////////////////////////////////////////////
+ //std::cout << "map region 1\n" << std::endl;
+ region1 = ipc::mapped_region(
+ shm_obj, //Memory-mappable object
+ ipc::read_write, //Access mode
+ 0, //Offset from the beginning of shm
+ len, //Length of the region
+ buff_addr
+ );
+ //std::cout << "region1.get_address() " << size_t(region1.get_address()) << std::endl;
+
+ ////////////////////////////////////////////////////////////////
+ //Step 3) Remap region2 of the virtual memory space
+ ////////////////////////////////////////////////////////////////
+ //std::cout << "map region 2\n" << std::endl;
+ region2 = ipc::mapped_region(
+ shm_obj, //Memory-mappable object
+ ipc::read_write, //Access mode
+ 0, //Offset from the beginning of shm
+ len, //Length of the region
+ buff_addr + len
+ );
+
+ //std::cout << "region2.get_address() " << size_t(region2.get_address()) << std::endl;
+ //std::cout << "diff " << (long(region2.get_address()) - long(region1.get_address())) << std::endl;
+
+ ////////////////////////////////////////////////////////////////
+ //4) Zero out the memory for good measure
+ ////////////////////////////////////////////////////////////////
+ std::memset(region1.get_address(), 0, region1.get_size());
+ std::memset(region2.get_address(), 0, region2.get_size());
+}
+
+static void circular_buffer_delete(SBuffer &buff, CircularBuffer *circ_buff)
+{
+ delete circ_buff;
+}
+
+SBuffer make_circular_buffer(const size_t num_bytes)
+{
+ CircularBuffer *circ_buff = new CircularBuffer(num_bytes);
+ SBufferDeleter deleter = boost::bind(&circular_buffer_delete, _1, circ_buff);
+ SBufferConfig config;
+ config.memory = circ_buff->buff_addr;
+ config.length = circ_buff->actual_length;
+ config.deleter = deleter;
+ return SBuffer(config);
+}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index d204ec4..afc248e 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -46,15 +46,16 @@ struct BlockActor : Apology::Worker
this->RegisterHandler(this, &BlockActor::handle_input_token);
this->RegisterHandler(this, &BlockActor::handle_input_check);
this->RegisterHandler(this, &BlockActor::handle_input_alloc);
+ this->RegisterHandler(this, &BlockActor::handle_input_update);
this->RegisterHandler(this, &BlockActor::handle_output_buffer);
this->RegisterHandler(this, &BlockActor::handle_output_token);
this->RegisterHandler(this, &BlockActor::handle_output_check);
this->RegisterHandler(this, &BlockActor::handle_output_hint);
this->RegisterHandler(this, &BlockActor::handle_output_alloc);
+ this->RegisterHandler(this, &BlockActor::handle_output_update);
this->RegisterHandler(this, &BlockActor::handle_self_kick);
- this->RegisterHandler(this, &BlockActor::handle_update_inputs);
}
//handlers
@@ -72,15 +73,16 @@ struct BlockActor : Apology::Worker
void handle_input_token(const InputTokenMessage &, const Theron::Address);
void handle_input_check(const InputCheckMessage &, const Theron::Address);
void handle_input_alloc(const InputAllocMessage &, const Theron::Address);
+ void handle_input_update(const InputUpdateMessage &, const Theron::Address);
void handle_output_buffer(const OutputBufferMessage &, const Theron::Address);
void handle_output_token(const OutputTokenMessage &, const Theron::Address);
void handle_output_check(const OutputCheckMessage &, const Theron::Address);
void handle_output_hint(const OutputHintMessage &, const Theron::Address);
void handle_output_alloc(const OutputAllocMessage &, const Theron::Address);
+ void handle_output_update(const OutputUpdateMessage &, const Theron::Address);
void handle_self_kick(const SelfKickMessage &, const Theron::Address);
- void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address);
//helpers
void buffer_returner(const size_t index, SBuffer &buffer);
@@ -93,7 +95,7 @@ struct BlockActor : Apology::Worker
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
- void flush_output(const size_t index, const bool force_pop = false);
+ void flush_output(const size_t index);
bool is_work_allowed(void);
GRAS_FORCE_INLINE bool is_input_done(const size_t i)
@@ -122,11 +124,10 @@ struct BlockActor : Apology::Worker
BitSet inputs_done;
BitSet outputs_done;
std::set<Token> token_pool;
- std::vector<SBufferToken> output_buffer_tokens;
//buffer queues and ready conditions
InputBufferQueues input_queues;
- OutputBufferQueues<SBuffer> output_queues;
+ OutputBufferQueues output_queues;
BitSet inputs_available;
//tag tracking
@@ -155,8 +156,6 @@ struct BlockActor : Apology::Worker
long buffer_affinity;
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
-
- bool topology_init;
};
} //namespace gras
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index a90eb6c..c8c7dfe 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -25,7 +25,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
//-- define to enable these debugs:
//----------------------------------------------------------------------
//#define WORK_DEBUG
-//#define ASSERTING
+#define ASSERTING
//#define MESSAGE_TRACING
//#define ITEM_CONSPROD
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 83ed44b..071f2ac 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -5,7 +5,7 @@
#include <gras_impl/debug.hpp>
#include <gras_impl/bitset.hpp>
-#include <gras_impl/buffer_queue.hpp>
+#include <gras_impl/simple_buffer_queue.hpp>
#include <gras/sbuffer.hpp>
#include <vector>
#include <queue>
@@ -52,6 +52,8 @@ struct InputBufferQueues
//special case when the null buffer is possible
if (_queues[i].empty()) return get_null_buff();
+ this->try_stitch(i);
+
//there are enough enqueued bytes, but not in the front buffer
const bool must_accumulate = _queues[i].front().length < _reserve_bytes[i];
@@ -60,7 +62,7 @@ struct InputBufferQueues
const bool light_front = _queues[i].front().length <= _maximum_bytes[i]/2;
const bool should_accumulate = heavy_load and light_front;
- if (must_accumulate or should_accumulate) this->accumulate(i);
+ if (must_accumulate/* or should_accumulate*/) this->accumulate(i);
ASSERT(_queues[i].front().length >= _reserve_bytes[i]);
@@ -69,6 +71,28 @@ struct InputBufferQueues
return _queues[i].front();
}
+ GRAS_FORCE_INLINE void try_stitch(const size_t i)
+ {
+ if (_queues[i].size() < 2) return;
+ SBuffer &b0 = _queues[i][0];
+ SBuffer &b1 = _queues[i][1];
+
+ if (b0.offset > b0.get_actual_length())
+ {
+ const size_t xfer_bytes = b0.length;
+ ASSERT(b1.offset >= xfer_bytes);
+ b1.offset -= xfer_bytes;
+ b1.length += xfer_bytes;
+ _queues[i].pop_front();
+ return;
+ }
+
+ const size_t xfer_bytes = b1.length;
+ b0.length += xfer_bytes;
+ b1.length = 0;
+ b1.offset += xfer_bytes;
+ }
+
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
@@ -104,20 +128,7 @@ struct InputBufferQueues
{
ASSERT(not _queues[i].full());
if (buffer.length == 0) return;
-
- //does this buffer starts where the last one ends?
- //perform buffer stitching into back of buffer
- if (
- not _queues[i].empty() and _queues[i].back() == buffer and
- (_queues[i].back().length + _queues[i].back().offset) == buffer.offset
- ){
- _queues[i].back().length += buffer.length;
- }
- else
- {
- _queues[i].push_back(buffer);
- }
-
+ _queues[i].push_back(buffer);
_enqueued_bytes[i] += buffer.length;
__update(i);
}
@@ -172,7 +183,7 @@ struct InputBufferQueues
std::vector<size_t> _maximum_bytes;
std::vector<boost::circular_buffer<SBuffer> > _queues;
std::vector<size_t> _preload_bytes;
- std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
+ std::vector<boost::shared_ptr<SimpleBufferQueue> > _aux_queues;
};
@@ -207,7 +218,7 @@ inline void InputBufferQueues::update_config(
_aux_queues[i]->empty() or
_aux_queues[i]->front().get_actual_length() != _maximum_bytes[i]
){
- _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
+ _aux_queues[i].reset(new SimpleBufferQueue());
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
@@ -287,7 +298,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
//update bounds on the current buffer
front.offset += bytes_consumed;
front.length -= bytes_consumed;
- ASSERT(front.offset <= front.get_actual_length());
+ //ASSERT(front.offset <= front.get_actual_length());
ASSERT((_queues[i].front().length % _items_sizes[i]) == 0);
if (front.length == 0) this->pop(i);
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index eae3258..b113e0d 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -3,6 +3,7 @@
#ifndef INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP
#define INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP
+#include <gras/buffer_queue.hpp>
#include <gras/sbuffer.hpp>
#include <gras/tags.hpp>
#include <gras/sbuffer.hpp>
@@ -72,6 +73,11 @@ struct InputCheckMessage
size_t index;
};
+struct InputUpdateMessage
+{
+ size_t index;
+};
+
//----------------------------------------------------------------------
//-- message to an output port
//-- do not ack
@@ -104,7 +110,12 @@ struct OutputHintMessage
struct OutputAllocMessage
{
size_t index;
- SBufferToken token;
+ BufferQueueSptr queue;
+};
+
+struct OutputUpdateMessage
+{
+ size_t index;
};
//----------------------------------------------------------------------
@@ -117,11 +128,6 @@ struct SelfKickMessage
//empty
};
-struct UpdateInputsMessage
-{
- //empty
-};
-
} //namespace gras
#include <Theron/Register.h>
@@ -141,14 +147,15 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::InputBufferMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTokenMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputCheckMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputAllocMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::InputUpdateMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputBufferMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputTokenMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputCheckMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputHintMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputAllocMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputUpdateMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::SelfKickMessage);
-THERON_DECLARE_REGISTERED_MESSAGE(gras::UpdateInputsMessage);
#endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index 722a4c2..3618b1b 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -3,83 +3,76 @@
#ifndef INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP
#define INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP
+#include <gras/buffer_queue.hpp>
#include <gras_impl/bitset.hpp>
#include <vector>
-#include <boost/circular_buffer.hpp>
namespace gras
{
-template <typename T>
struct OutputBufferQueues
{
- enum {MAX_QUEUE_SIZE = 128};
- BitSet _bitset;
- std::vector<boost::circular_buffer<T> > _queues;
-
- GRAS_FORCE_INLINE void resize(const size_t size)
+ void set_buffer_queue(const size_t i, BufferQueueSptr queue)
{
- _bitset.resize(size);
- _queues.resize(size, boost::circular_buffer<T>(MAX_QUEUE_SIZE));
+ _queues[i] = queue;
+ _update(i);
}
- GRAS_FORCE_INLINE void push(const size_t i, const T &value)
+ void set_reserve_bytes(const size_t i, const size_t num_bytes)
{
- _queues[i].push_back(value);
- _bitset.set(i);
+ _reserve_bytes[i] = num_bytes;
+ if (_queues[i]) _update(i);
}
- //! used for input buffer inlining
- GRAS_FORCE_INLINE void push_front(const size_t i, const T &value)
+ GRAS_FORCE_INLINE void resize(const size_t size)
{
- ASSERT(not _queues[i].full());
- _queues[i].push_front(value);
- _bitset.set(i);
+ _bitset.resize(size);
+ _queues.resize(size);
+ _reserve_bytes.resize(size, 1);
}
- GRAS_FORCE_INLINE const T &front(const size_t i) const
+ GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buff)
{
- return _queues[i].front();
+ if (not _queues[i]) return; //block is likely done, throw out buffer
+ _queues[i]->push(buff);
+ _update(i);
}
- GRAS_FORCE_INLINE T &front(const size_t i)
+ GRAS_FORCE_INLINE void flush_all(void)
{
- ASSERT(not _queues[i].empty());
- return _queues[i].front();
+ const size_t old_size = this->size();
+ this->resize(0);
+ this->resize(old_size);
}
- GRAS_FORCE_INLINE void pop(const size_t i)
+ GRAS_FORCE_INLINE SBuffer &front(const size_t i)
{
- _queues[i].front() = T();
- _queues[i].pop_front();
- _bitset.set(i, not _queues[i].empty());
+ ASSERT(not this->empty(i));
+ return _queues[i]->front();
}
- GRAS_FORCE_INLINE void fail(const size_t i)
+ GRAS_FORCE_INLINE void pop(const size_t i)
{
- _bitset.reset(i);
+ ASSERT(_queues[i]);
+ ASSERT(not _queues[i]->empty());
+ _queues[i]->pop();
+ _update(i);
}
- GRAS_FORCE_INLINE void flush(const size_t i)
+ GRAS_FORCE_INLINE void fail(const size_t i)
{
- _queues[i].clear();
_bitset.reset(i);
}
- GRAS_FORCE_INLINE void flush_all(void)
- {
- for (size_t i = 0; i < this->size(); i++) this->flush(i);
- }
-
GRAS_FORCE_INLINE bool ready(const size_t i) const
{
- return not _queues[i].empty();
+ return _bitset[i];
}
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
- return _queues[i].empty();
+ return (not _queues[i] or _queues[i]->empty());
}
GRAS_FORCE_INLINE bool all_ready(void) const
@@ -91,6 +84,23 @@ struct OutputBufferQueues
{
return _queues.size();
}
+
+ GRAS_FORCE_INLINE void _update(const size_t i)
+ {
+ if (not _queues[i] or _queues[i]->empty())
+ {
+ _bitset.reset(i);
+ return;
+ }
+ const SBuffer &front = _queues[i]->front();
+ const size_t avail = front.get_actual_length() - front.offset - front.length;
+ _bitset.set(i, avail >= _reserve_bytes[i]);
+ }
+
+ BitSet _bitset;
+ std::vector<BufferQueueSptr> _queues;
+ std::vector<size_t> _reserve_bytes;
+
};
} //namespace gras
diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/simple_buffer_queue.hpp
index 0e6be9e..db6ecc0 100644
--- a/lib/gras_impl/buffer_queue.hpp
+++ b/lib/gras_impl/simple_buffer_queue.hpp
@@ -1,7 +1,7 @@
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
-#ifndef INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP
-#define INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP
+#ifndef INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP
+#define INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP
#include <gras/sbuffer.hpp>
#include <boost/bind.hpp>
@@ -10,15 +10,15 @@
namespace gras
{
-struct BufferQueue : std::queue<SBuffer>
+struct SimpleBufferQueue : std::queue<SBuffer>
{
- BufferQueue(void)
+ SimpleBufferQueue(void)
{
- SBufferDeleter deleter = boost::bind(&BufferQueue::push_back, this, _1);
+ SBufferDeleter deleter = boost::bind(&SimpleBufferQueue::push_back, this, _1);
_token = SBufferToken(new SBufferDeleter(deleter));
}
- ~BufferQueue(void)
+ ~SimpleBufferQueue(void)
{
_token.reset();
while (not this->empty())
@@ -47,4 +47,4 @@ struct BufferQueue : std::queue<SBuffer>
} //namespace gras
-#endif /*INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP*/
+#endif /*INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP*/
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 02f62f3..f6a81bc 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -66,8 +66,21 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther
//handle the upstream block allocation request
OutputAllocMessage new_msg;
- new_msg.token = block_ptr->input_buffer_allocator(
+ new_msg.queue = block_ptr->input_buffer_allocator(
index, message.token, message.recommend_length
);
- if (new_msg.token) this->post_upstream(index, new_msg);
+ if (new_msg.queue) this->post_upstream(index, new_msg);
+}
+
+void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t i = message.index;
+
+ //update buffer queue configuration
+ if (i >= this->input_queues.size()) return;
+ const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items;
+ const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items;
+ const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items;
+ this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes);
}
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index 5876f89..aaf5544 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -70,9 +70,16 @@ void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Th
const size_t index = message.index;
//return of a positive downstream allocation
- //reset the token, and clear old output buffers
- //the new token from the downstream is installed
- this->output_buffer_tokens[index].reset();
- this->output_queues.flush(index);
- this->output_buffer_tokens[index] = message.token;
+ this->output_queues.set_buffer_queue(index, message.queue);
+}
+
+void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t i = message.index;
+
+ //update buffer queue configuration
+ if (i >= this->output_queues.size()) return;
+ const size_t reserve_bytes = this->output_items_sizes[i]*this->output_configs[i].reserve_items;
+ this->output_queues.set_reserve_bytes(i, reserve_bytes);
}
diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp
index d112157..caa8ed8 100644
--- a/lib/register_messages.cpp
+++ b/lib/register_messages.cpp
@@ -16,12 +16,13 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::InputBufferMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTokenMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputCheckMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputAllocMessage);
+THERON_DEFINE_REGISTERED_MESSAGE(gras::InputUpdateMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputBufferMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputTokenMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputCheckMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputHintMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputAllocMessage);
+THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputUpdateMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::SelfKickMessage);
-THERON_DEFINE_REGISTERED_MESSAGE(gras::UpdateInputsMessage);
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index cf757d5..f813b57 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -79,25 +79,18 @@ void BlockActor::handle_topology(
this->mark_done();
}
- this->topology_init = true;
- this->handle_update_inputs(UpdateInputsMessage(), Theron::Address());
-
- this->Send(0, from); //ACK
-}
-
-void BlockActor::handle_update_inputs(
- const UpdateInputsMessage &,
- const Theron::Address
-){
- MESSAGE_TRACER();
- const size_t num_inputs = this->get_num_inputs();
- this->input_queues.resize(num_inputs);
-
for (size_t i = 0; i < num_inputs; i++)
{
- const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items;
- const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items;
- const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items;
- this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes);
+ InputUpdateMessage message;
+ message.index = i;
+ this->handle_input_update(message, Theron::Address());
+ }
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ OutputUpdateMessage message;
+ message.index = i;
+ this->handle_output_update(message, Theron::Address());
}
+
+ this->Send(0, from); //ACK
}