diff options
-rw-r--r-- | include/gras/CMakeLists.txt | 1 | ||||
-rw-r--r-- | include/gras/block.hpp | 9 | ||||
-rw-r--r-- | include/gras/block.i | 1 | ||||
-rw-r--r-- | include/gras/buffer_queue.hpp | 73 | ||||
-rw-r--r-- | lib/CMakeLists.txt | 3 | ||||
-rw-r--r-- | lib/block.cpp | 15 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 28 | ||||
-rw-r--r-- | lib/block_task.cpp | 30 | ||||
-rw-r--r-- | lib/buffer_queue_circ.cpp | 137 | ||||
-rw-r--r-- | lib/buffer_queue_pool.cpp | 67 | ||||
-rw-r--r-- | lib/circular_buffer.cpp | 132 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 13 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 49 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 21 | ||||
-rw-r--r-- | lib/gras_impl/output_buffer_queues.hpp | 84 | ||||
-rw-r--r-- | lib/gras_impl/simple_buffer_queue.hpp (renamed from lib/gras_impl/buffer_queue.hpp) | 14 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 17 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 17 | ||||
-rw-r--r-- | lib/register_messages.cpp | 3 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 29 |
21 files changed, 596 insertions, 149 deletions
diff --git a/include/gras/CMakeLists.txt b/include/gras/CMakeLists.txt index e8fb85e..f0b0d3a 100644 --- a/include/gras/CMakeLists.txt +++ b/include/gras/CMakeLists.txt @@ -21,6 +21,7 @@ install(FILES thread_pool.hpp top_block.hpp work_buffer.hpp + buffer_queue.hpp DESTINATION include/gras COMPONENT ${GRAS_COMP_DEVEL} diff --git a/include/gras/block.hpp b/include/gras/block.hpp index 3899561..524ff00 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -8,6 +8,7 @@ #include <gras/tag_iter.hpp> #include <gras/tags.hpp> #include <gras/work_buffer.hpp> +#include <gras/buffer_queue.hpp> #include <vector> #include <string> @@ -337,9 +338,9 @@ struct GRAS_API Block : Element * \param which_output the output port index number * \param token the token for the buffer's returner * \param recommend_length the schedulers recommended length in bytes - * \return the token used for the buffer allocation (may be the same) + * \return a shared ptr to a new buffer queue object */ - virtual SBufferToken output_buffer_allocator( + virtual BufferQueueSptr output_buffer_allocator( const size_t which_output, const SBufferToken &token, const size_t recommend_length @@ -356,9 +357,9 @@ struct GRAS_API Block : Element * \param which_input the input port index number * \param token the token for the buffer's returner * \param recommend_length the schedulers recommended length in bytes - * \return the token used for the buffer allocation (may be the same) + * \return a shared ptr to a new buffer queue object */ - virtual SBufferToken input_buffer_allocator( + virtual BufferQueueSptr input_buffer_allocator( const size_t which_input, const SBufferToken &token, const size_t recommend_length diff --git a/include/gras/block.i b/include/gras/block.i index 80c3905..ef2738c 100644 --- a/include/gras/block.i +++ b/include/gras/block.i @@ -11,6 +11,7 @@ %import <gras/tags.i> %include <gras/tag_iter.i> %import <gras/sbuffer.i> +%include <gras/buffer_queue.hpp> %include <gras/block.hpp> #endif /*INCLUDED_GRAS_BLOCK_I*/ diff --git a/include/gras/buffer_queue.hpp b/include/gras/buffer_queue.hpp new file mode 100644 index 0000000..da8de41 --- /dev/null +++ b/include/gras/buffer_queue.hpp @@ -0,0 +1,73 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#ifndef INCLUDED_GRAS_BUFFER_QUEUE_HPP +#define INCLUDED_GRAS_BUFFER_QUEUE_HPP + +#include <gras/gras.hpp> +#include <gras/sbuffer.hpp> +#include <boost/shared_ptr.hpp> + +namespace gras +{ + +struct BufferQueue; + +typedef boost::shared_ptr<BufferQueue> BufferQueueSptr; + +//! Buffer Queue is an interface enabling us to create custom buffer allocators. +struct BufferQueue +{ + + /*! + * Create a buffer queue object using the pool allocator. + * A pool of buffers contains individual buffer allocations. + * + * The config param is used to pass information + * about per-buffer size, affinity, and token. + * + * \param config used to alloc one buffer + * \param num_buffs alloc this many buffs + * \return a new buffer queue sptr + */ + GRAS_API static BufferQueueSptr make_pool( + const SBufferConfig &config, + const size_t num_buffs + ); + + /*! + * Create a buffer queue object using the circular allocator. + * The circular allocator contains one large double-mapped buffer. + * This buffer is virtually mapped so that buff[n+len] = buff[n]. + * + * Pieces of this buffer can be passed around in smaller chunks. + * The number of chunks is dictated by num_buffs, + * the size of the chunks is dictated by config.length. + * The size of the circular buffer will be num_buffs*config.length + * + * \param config used to alloc one buffer + * \param num_buffs this many smaller chunks + * + * \return a new buffer queue sptr + */ + GRAS_API static BufferQueueSptr make_circ( + const SBufferConfig &config, + const size_t num_buffs + ); + + //! Get a reference to the buffer at the front of the queue + virtual SBuffer &front(void) = 0; + + //! Pop off the used portion of the queue + virtual void pop(void) = 0; + + //! Push a used buffer back into the queue + virtual void push(const SBuffer &buff) = 0; + + //! Is the queue empty? + virtual bool empty(void) const = 0; + +}; + +} //namespace gras + +#endif /*INCLUDED_GRAS_BUFFER_QUEUE_HPP*/ diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 8f6f46c..bc38d06 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -40,6 +40,9 @@ list(APPEND GRAS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/debug.cpp ${CMAKE_CURRENT_SOURCE_DIR}/element.cpp ${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/circular_buffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_circ.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp diff --git a/lib/block.cpp b/lib/block.cpp index 85f7654..c806063 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -34,7 +34,6 @@ Block::Block(const std::string &name): (*this)->block->name = name; //for debug purposes //setup some state variables - (*this)->block->topology_init = false; (*this)->block->block_ptr = this; (*this)->block->block_state = BlockActor::BLOCK_STATE_INIT; @@ -89,8 +88,11 @@ InputPortConfig Block::get_input_config(const size_t which_input) const void Block::set_input_config(const size_t which_input, const InputPortConfig &config) { vector_set((*this)->block->input_configs, config, which_input); - if ((*this)->block->topology_init) - (*this)->block->Push(UpdateInputsMessage(), Theron::Address()); + { + InputUpdateMessage message; + message.index = which_input; + (*this)->block->Push(message, Theron::Address()); + } } OutputPortConfig Block::get_output_config(const size_t which_output) const @@ -101,8 +103,11 @@ OutputPortConfig Block::get_output_config(const size_t which_output) const void Block::set_output_config(const size_t which_output, const OutputPortConfig &config) { vector_set((*this)->block->output_configs, config, which_output); - if ((*this)->block->topology_init) - (*this)->block->Push(UpdateInputsMessage(), Theron::Address()); + { + OutputUpdateMessage message; + message.index = which_output; + (*this)->block->Push(message, Theron::Address()); + } } void Block::consume(const size_t which_input, const size_t num_items) diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 64b3aed..4c486ab 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -58,7 +58,6 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address //allocate output buffers which will also wake up the task const size_t num_outputs = this->get_num_outputs(); - this->output_buffer_tokens.resize(num_outputs); for (size_t i = 0; i < num_outputs; i++) { const size_t bytes = recommend_length( @@ -71,7 +70,8 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1); SBufferToken token = SBufferToken(new SBufferDeleter(deleter)); - this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes); + BufferQueueSptr queue = block_ptr->output_buffer_allocator(i, token, bytes); + this->output_queues.set_buffer_queue(i, queue); InputAllocMessage message; message.token = SBufferToken(new SBufferDeleter(deleter)); @@ -82,29 +82,23 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address this->Send(0, from); //ACK } -SBufferToken Block::output_buffer_allocator( +BufferQueueSptr Block::output_buffer_allocator( const size_t, const SBufferToken &token, const size_t recommend_length ){ - for (size_t j = 0; j < THIS_MANY_BUFFERS; j++) - { - SBufferConfig config; - config.memory = NULL; - config.length = recommend_length; - config.affinity = (*this)->block->buffer_affinity; - config.token = token; - SBuffer buff(config); - std::memset(buff.get_actual_memory(), 0, buff.get_actual_length()); - //buffer derefs here and the token messages it back to the block - } - return token; + SBufferConfig config; + config.memory = NULL; + config.length = recommend_length; + config.affinity = (*this)->block->buffer_affinity; + config.token = token; + return BufferQueue::make_circ(config, THIS_MANY_BUFFERS); } -SBufferToken Block::input_buffer_allocator( +BufferQueueSptr Block::input_buffer_allocator( const size_t, const SBufferToken &, const size_t ){ - return SBufferToken(); //null + return BufferQueueSptr(); //null } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 9e5917d..4f44de3 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -31,9 +31,6 @@ void BlockActor::mark_done(void) //release upstream, downstream, and executor tokens this->token_pool.clear(); - //release allocator tokens, buffers can now call deleters - this->output_buffer_tokens.clear(); - //release all buffers in queues this->input_queues.flush_all(); this->output_queues.flush_all(); @@ -95,8 +92,8 @@ void BlockActor::output_fail(const size_t i) throw std::runtime_error("output_fail called on maximum_items buffer"); } - //force pop so next work() gets a new buffer - this->flush_output(i, true); + //mark fail: not ready until a new buffer appears + this->output_queues.fail(i); } GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) @@ -156,7 +153,8 @@ void BlockActor::handle_task(void) this->input_items.max() = std::max(this->input_items.max(), items); //inline dealings, how and when input buffers can be inlined into output buffers - //continue; + continue; //FIXME to implement needs change + /* if ( buff.unique() and input_configs[i].inline_buffer and @@ -170,6 +168,7 @@ void BlockActor::handle_task(void) this->output_queues.push_front(output_inline_index, new_obuff); //you got inlined! output_inline_index++; //done do this output port again } + */ } //------------------------------------------------------------------ @@ -234,7 +233,7 @@ void BlockActor::handle_task(void) void BlockActor::consume(const size_t i, const size_t items) { #ifdef ITEM_CONSPROD - std::cerr << "consume " << items << std::endl; + std::cerr << name << " consume " << items << std::endl; #endif this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; @@ -245,7 +244,7 @@ void BlockActor::consume(const size_t i, const size_t items) void BlockActor::produce(const size_t i, const size_t items) { #ifdef ITEM_CONSPROD - std::cerr << "produce " << items << std::endl; + std::cerr << name << " produce " << items << std::endl; #endif SBuffer &buff = this->output_queues.front(i); this->items_produced[i] += items; @@ -263,9 +262,9 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) this->post_downstream(i, buff_msg); } -GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force_pop) +GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i) { - if (not this->output_queues.ready(i) or this->output_queues.front(i).length == 0) return; + if (this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return; SBuffer &buff = this->output_queues.front(i); InputBufferMessage buff_msg; buff_msg.buffer = buff; @@ -275,13 +274,6 @@ GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force buff.offset += buff.length; buff.length = 0; - //when to pop the buffer and give next work a new one - const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i]; - if ( - force_pop or (buff.offset*2 > buff.get_actual_length()) or - (buff.get_actual_length() - buff.offset) < reserve_bytes - ) - { - this->output_queues.pop(i); - } + //release whatever has been used of the output buffer + this->output_queues.pop(i); } diff --git a/lib/buffer_queue_circ.cpp b/lib/buffer_queue_circ.cpp new file mode 100644 index 0000000..60a2b51 --- /dev/null +++ b/lib/buffer_queue_circ.cpp @@ -0,0 +1,137 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras/buffer_queue.hpp> +#include <gras_impl/debug.hpp> +#include <boost/circular_buffer.hpp> +#include <vector> + +using namespace gras; + +SBuffer make_circular_buffer(const size_t num_bytes); //circular_buffer.cpp + +static void buffer_dummy_delete(SBuffer &, const SBuffer /*ref holder*/) +{ + //NOP +} + +struct BufferQueueCirc : BufferQueue +{ + BufferQueueCirc(const SBufferConfig &config, const size_t num); + + ~BufferQueueCirc(void) + { + _token.reset(); + _available_buffers.clear(); + _returned_buffers.clear(); + } + + SBuffer &front(void); + + void pop(void); + + void push(const SBuffer &buff); + + bool empty(void) const + { + return _bytes_avail == 0 or _available_buffers.empty(); + } + + SBufferToken _token; + SBuffer _circ_buff; + char *_write_ptr; + size_t _bytes_avail; + size_t _ack_index; + boost::circular_buffer<SBuffer> _available_buffers; + std::vector<SBuffer> _returned_buffers; + std::vector<size_t> _outgone_bytes; + +}; + +BufferQueueCirc::BufferQueueCirc(const SBufferConfig &config, const size_t num_buffs): + _token(config.token), + _ack_index(0) +{ + //allocate a large buffer + const size_t num_bytes = config.length * num_buffs; + _circ_buff = make_circular_buffer(num_bytes); + _write_ptr = (char *)_circ_buff.get_actual_memory(); + _bytes_avail = _circ_buff.get_actual_length(); + + //create dummy deleter to hold ref to circ buff + SBufferDeleter deleter = boost::bind(&buffer_dummy_delete, _1, _circ_buff); + + //allocate pool of sbuffers + _available_buffers.set_capacity(num_buffs); + _returned_buffers.resize(num_buffs); + _outgone_bytes.resize(num_buffs, 0); + SBufferConfig sconfig = config; + sconfig.deleter = deleter; + sconfig.memory = _circ_buff.get_actual_memory(); + for (size_t i = 0; i < num_buffs; i++) + { + sconfig.user_index = i; + SBuffer buff(sconfig); + //buffer derefs and returns to this queue thru token callback + } +} + +SBuffer &BufferQueueCirc::front(void) +{ + ASSERT(not this->empty()); + SBuffer &front = _available_buffers.front(); + ASSERT(front); + front->config.memory = _write_ptr; + return front; +} + +void BufferQueueCirc::pop(void) +{ + ASSERT(not this->empty()); + SBuffer &front = _available_buffers.front(); + const size_t num_bytes = front.offset; + + //store number of bytes for buffer return + _outgone_bytes[front.get_user_index()] = num_bytes; + + //pop the buffer from internal reference + _available_buffers.pop_front(); + + //adjust the write pointer + _write_ptr += num_bytes; + + //handle circular wrap + if (_write_ptr > (char *)_circ_buff.get(_circ_buff.get_actual_length())) + { + _write_ptr -= _circ_buff.get_actual_length(); + } + + //subtract out of available bytes + ASSERT(_bytes_avail >= num_bytes); + _bytes_avail -= num_bytes; +} + +void BufferQueueCirc::push(const SBuffer &buff) +{ + _returned_buffers[buff.get_user_index()] = buff; + + //ack starting at the expected index and up + while (_returned_buffers[_ack_index]) + { + //return the held bytes to the available + _bytes_avail += _outgone_bytes[_ack_index]; + + //remove the buffer container into the queue + _available_buffers.push_back(_returned_buffers[_ack_index]); + _returned_buffers[_ack_index].reset(); + + //increment the ack index for the next run + if (++_ack_index == _returned_buffers.size()) _ack_index = 0; + } +} + +BufferQueueSptr BufferQueue::make_circ( + const SBufferConfig &config, + const size_t num_buffs +){ + return BufferQueueSptr(new BufferQueueCirc(config, num_buffs)); +} diff --git a/lib/buffer_queue_pool.cpp b/lib/buffer_queue_pool.cpp new file mode 100644 index 0000000..db9b68a --- /dev/null +++ b/lib/buffer_queue_pool.cpp @@ -0,0 +1,67 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras/buffer_queue.hpp> +#include <gras_impl/debug.hpp> +#include <boost/circular_buffer.hpp> + +using namespace gras; + +struct BufferQueuePool : BufferQueue +{ + BufferQueuePool(const SBufferConfig &config, const size_t num): + _token(config.token), //save config, its holds token + _queue(boost::circular_buffer<SBuffer>(num)) + { + //NOP + } + + ~BufferQueuePool(void) + { + _token.reset(); + _queue.clear(); + } + + SBuffer &front(void) + { + ASSERT(not _queue.empty()); + ASSERT(_queue.front()); + return _queue.front(); + } + + void pop(void) + { + ASSERT(not _queue.empty()); + _queue.front().reset(); //dont hold ref + _queue.pop_front(); + } + + void push(const SBuffer &buff) + { + _queue.push_back(buff); + } + + bool empty(void) const + { + return _queue.empty(); + } + + SBufferToken _token; + boost::circular_buffer<SBuffer> _queue; + +}; + +BufferQueueSptr BufferQueue::make_pool( + const SBufferConfig &config, + const size_t num_buffs +){ + BufferQueueSptr queue(new BufferQueuePool(config, num_buffs)); + + for (size_t i = 0; i < num_buffs; i++) + { + SBuffer buff(config); + std::memset(buff.get_actual_memory(), 0, buff.get_actual_length()); + //buffer derefs and returns to this queue thru token callback + } + + return queue; +} diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp new file mode 100644 index 0000000..5e56469 --- /dev/null +++ b/lib/circular_buffer.cpp @@ -0,0 +1,132 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras/buffer_queue.hpp> +#include <gras_impl/debug.hpp> +#include <boost/format.hpp> +#include <boost/bind.hpp> +#include <boost/interprocess/shared_memory_object.hpp> +#include <boost/interprocess/anonymous_shared_memory.hpp> +#include <boost/interprocess/mapped_region.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/mutex.hpp> + +using namespace gras; +namespace ipc = boost::interprocess; + +static boost::mutex alloc_mutex; + +/*! +* This routine generates an incredibly unique name for the allocation. +* +* Because we are using boost IPC, and it expects that we share the memory; +* IPC allocation requires a unique name to share amongst the processes. +* Since we are not actually using IPC, the sharing aspect isnt very useful, +* but we still need a unique name for the shared memory allocation anyway. +* (I would like if a empty string would suffice as an anonymous allocation) +*/ +static std::string omg_so_unique(void) +{ + const std::string tid = boost::lexical_cast<std::string>(boost::this_thread::get_id()); + static size_t count = 0; + return boost::str(boost::format("shmem-%s-%u") % tid % count++); +} + +struct CircularBuffer +{ + CircularBuffer(const size_t); + + ~CircularBuffer(void) + { + ipc::shared_memory_object::remove(shm_name.c_str()); + } + + char *buff_addr; + size_t actual_length; + std::string shm_name; + ipc::shared_memory_object shm_obj; + ipc::mapped_region region1; + ipc::mapped_region region2; +}; + +CircularBuffer::CircularBuffer(const size_t num_bytes) +{ + boost::mutex::scoped_lock lock(alloc_mutex); + + const size_t chunk = ipc::mapped_region::get_page_size(); + const size_t len = chunk*((num_bytes + chunk - 1)/chunk); + actual_length = len; + + //////////////////////////////////////////////////////////////// + // Step 0) Find an address that can be mapped across 2x length: + //////////////////////////////////////////////////////////////// + { + ipc::mapped_region region0(ipc::anonymous_shared_memory(len*2)); + buff_addr = (char *)region0.get_address(); + } + std::cout << "reserve addr " << std::hex << size_t(buff_addr) << std::dec << std::endl; + + //////////////////////////////////////////////////////////////// + // Step 1) Allocate a chunk of physical memory of length bytes + //////////////////////////////////////////////////////////////// + //std::cout << "make shmem\n" << std::endl; + shm_name = omg_so_unique(); + shm_obj = ipc::shared_memory_object( + ipc::create_only, //only create + shm_name.c_str(), //name + ipc::read_write //read-write mode + ); + + //std::cout << "truncate\n" << std::endl; + shm_obj.truncate(len); + + //////////////////////////////////////////////////////////////// + //Step 2) Remap region1 of the virtual memory space + //////////////////////////////////////////////////////////////// + //std::cout << "map region 1\n" << std::endl; + region1 = ipc::mapped_region( + shm_obj, //Memory-mappable object + ipc::read_write, //Access mode + 0, //Offset from the beginning of shm + len, //Length of the region + buff_addr + ); + //std::cout << "region1.get_address() " << size_t(region1.get_address()) << std::endl; + + //////////////////////////////////////////////////////////////// + //Step 3) Remap region2 of the virtual memory space + //////////////////////////////////////////////////////////////// + //std::cout << "map region 2\n" << std::endl; + region2 = ipc::mapped_region( + shm_obj, //Memory-mappable object + ipc::read_write, //Access mode + 0, //Offset from the beginning of shm + len, //Length of the region + buff_addr + len + ); + + //std::cout << "region2.get_address() " << size_t(region2.get_address()) << std::endl; + //std::cout << "diff " << (long(region2.get_address()) - long(region1.get_address())) << std::endl; + + //////////////////////////////////////////////////////////////// + //4) Zero out the memory for good measure + //////////////////////////////////////////////////////////////// + std::memset(region1.get_address(), 0, region1.get_size()); + std::memset(region2.get_address(), 0, region2.get_size()); +} + +static void circular_buffer_delete(SBuffer &buff, CircularBuffer *circ_buff) +{ + delete circ_buff; +} + +SBuffer make_circular_buffer(const size_t num_bytes) +{ + CircularBuffer *circ_buff = new CircularBuffer(num_bytes); + SBufferDeleter deleter = boost::bind(&circular_buffer_delete, _1, circ_buff); + SBufferConfig config; + config.memory = circ_buff->buff_addr; + config.length = circ_buff->actual_length; + config.deleter = deleter; + return SBuffer(config); +} diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index d204ec4..afc248e 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -46,15 +46,16 @@ struct BlockActor : Apology::Worker this->RegisterHandler(this, &BlockActor::handle_input_token); this->RegisterHandler(this, &BlockActor::handle_input_check); this->RegisterHandler(this, &BlockActor::handle_input_alloc); + this->RegisterHandler(this, &BlockActor::handle_input_update); this->RegisterHandler(this, &BlockActor::handle_output_buffer); this->RegisterHandler(this, &BlockActor::handle_output_token); this->RegisterHandler(this, &BlockActor::handle_output_check); this->RegisterHandler(this, &BlockActor::handle_output_hint); this->RegisterHandler(this, &BlockActor::handle_output_alloc); + this->RegisterHandler(this, &BlockActor::handle_output_update); this->RegisterHandler(this, &BlockActor::handle_self_kick); - this->RegisterHandler(this, &BlockActor::handle_update_inputs); } //handlers @@ -72,15 +73,16 @@ struct BlockActor : Apology::Worker void handle_input_token(const InputTokenMessage &, const Theron::Address); void handle_input_check(const InputCheckMessage &, const Theron::Address); void handle_input_alloc(const InputAllocMessage &, const Theron::Address); + void handle_input_update(const InputUpdateMessage &, const Theron::Address); void handle_output_buffer(const OutputBufferMessage &, const Theron::Address); void handle_output_token(const OutputTokenMessage &, const Theron::Address); void handle_output_check(const OutputCheckMessage &, const Theron::Address); void handle_output_hint(const OutputHintMessage &, const Theron::Address); void handle_output_alloc(const OutputAllocMessage &, const Theron::Address); + void handle_output_update(const OutputUpdateMessage &, const Theron::Address); void handle_self_kick(const SelfKickMessage &, const Theron::Address); - void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address); //helpers void buffer_returner(const size_t index, SBuffer &buffer); @@ -93,7 +95,7 @@ struct BlockActor : Apology::Worker void produce(const size_t index, const size_t items); void consume(const size_t index, const size_t items); void produce_buffer(const size_t index, const SBuffer &buffer); - void flush_output(const size_t index, const bool force_pop = false); + void flush_output(const size_t index); bool is_work_allowed(void); GRAS_FORCE_INLINE bool is_input_done(const size_t i) @@ -122,11 +124,10 @@ struct BlockActor : Apology::Worker BitSet inputs_done; BitSet outputs_done; std::set<Token> token_pool; - std::vector<SBufferToken> output_buffer_tokens; //buffer queues and ready conditions InputBufferQueues input_queues; - OutputBufferQueues<SBuffer> output_queues; + OutputBufferQueues output_queues; BitSet inputs_available; //tag tracking @@ -155,8 +156,6 @@ struct BlockActor : Apology::Worker long buffer_affinity; std::vector<std::vector<OutputHintMessage> > output_allocation_hints; - - bool topology_init; }; } //namespace gras diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index a90eb6c..c8c7dfe 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -25,7 +25,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); //-- define to enable these debugs: //---------------------------------------------------------------------- //#define WORK_DEBUG -//#define ASSERTING +#define ASSERTING //#define MESSAGE_TRACING //#define ITEM_CONSPROD diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 83ed44b..071f2ac 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -5,7 +5,7 @@ #include <gras_impl/debug.hpp> #include <gras_impl/bitset.hpp> -#include <gras_impl/buffer_queue.hpp> +#include <gras_impl/simple_buffer_queue.hpp> #include <gras/sbuffer.hpp> #include <vector> #include <queue> @@ -52,6 +52,8 @@ struct InputBufferQueues //special case when the null buffer is possible if (_queues[i].empty()) return get_null_buff(); + this->try_stitch(i); + //there are enough enqueued bytes, but not in the front buffer const bool must_accumulate = _queues[i].front().length < _reserve_bytes[i]; @@ -60,7 +62,7 @@ struct InputBufferQueues const bool light_front = _queues[i].front().length <= _maximum_bytes[i]/2; const bool should_accumulate = heavy_load and light_front; - if (must_accumulate or should_accumulate) this->accumulate(i); + if (must_accumulate/* or should_accumulate*/) this->accumulate(i); ASSERT(_queues[i].front().length >= _reserve_bytes[i]); @@ -69,6 +71,28 @@ struct InputBufferQueues return _queues[i].front(); } + GRAS_FORCE_INLINE void try_stitch(const size_t i) + { + if (_queues[i].size() < 2) return; + SBuffer &b0 = _queues[i][0]; + SBuffer &b1 = _queues[i][1]; + + if (b0.offset > b0.get_actual_length()) + { + const size_t xfer_bytes = b0.length; + ASSERT(b1.offset >= xfer_bytes); + b1.offset -= xfer_bytes; + b1.length += xfer_bytes; + _queues[i].pop_front(); + return; + } + + const size_t xfer_bytes = b1.length; + b0.length += xfer_bytes; + b1.length = 0; + b1.offset += xfer_bytes; + } + //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); @@ -104,20 +128,7 @@ struct InputBufferQueues { ASSERT(not _queues[i].full()); if (buffer.length == 0) return; - - //does this buffer starts where the last one ends? - //perform buffer stitching into back of buffer - if ( - not _queues[i].empty() and _queues[i].back() == buffer and - (_queues[i].back().length + _queues[i].back().offset) == buffer.offset - ){ - _queues[i].back().length += buffer.length; - } - else - { - _queues[i].push_back(buffer); - } - + _queues[i].push_back(buffer); _enqueued_bytes[i] += buffer.length; __update(i); } @@ -172,7 +183,7 @@ struct InputBufferQueues std::vector<size_t> _maximum_bytes; std::vector<boost::circular_buffer<SBuffer> > _queues; std::vector<size_t> _preload_bytes; - std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; + std::vector<boost::shared_ptr<SimpleBufferQueue> > _aux_queues; }; @@ -207,7 +218,7 @@ inline void InputBufferQueues::update_config( _aux_queues[i]->empty() or _aux_queues[i]->front().get_actual_length() != _maximum_bytes[i] ){ - _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); + _aux_queues[i].reset(new SimpleBufferQueue()); _aux_queues[i]->allocate_one(_maximum_bytes[i]); _aux_queues[i]->allocate_one(_maximum_bytes[i]); _aux_queues[i]->allocate_one(_maximum_bytes[i]); @@ -287,7 +298,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b //update bounds on the current buffer front.offset += bytes_consumed; front.length -= bytes_consumed; - ASSERT(front.offset <= front.get_actual_length()); + //ASSERT(front.offset <= front.get_actual_length()); ASSERT((_queues[i].front().length % _items_sizes[i]) == 0); if (front.length == 0) this->pop(i); diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index eae3258..b113e0d 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -3,6 +3,7 @@ #ifndef INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP #define INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP +#include <gras/buffer_queue.hpp> #include <gras/sbuffer.hpp> #include <gras/tags.hpp> #include <gras/sbuffer.hpp> @@ -72,6 +73,11 @@ struct InputCheckMessage size_t index; }; +struct InputUpdateMessage +{ + size_t index; +}; + //---------------------------------------------------------------------- //-- message to an output port //-- do not ack @@ -104,7 +110,12 @@ struct OutputHintMessage struct OutputAllocMessage { size_t index; - SBufferToken token; + BufferQueueSptr queue; +}; + +struct OutputUpdateMessage +{ + size_t index; }; //---------------------------------------------------------------------- @@ -117,11 +128,6 @@ struct SelfKickMessage //empty }; -struct UpdateInputsMessage -{ - //empty -}; - } //namespace gras #include <Theron/Register.h> @@ -141,14 +147,15 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::InputBufferMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTokenMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputCheckMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputAllocMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::InputUpdateMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputBufferMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputTokenMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputCheckMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputHintMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputAllocMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputUpdateMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::SelfKickMessage); -THERON_DECLARE_REGISTERED_MESSAGE(gras::UpdateInputsMessage); #endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/ diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp index 722a4c2..3618b1b 100644 --- a/lib/gras_impl/output_buffer_queues.hpp +++ b/lib/gras_impl/output_buffer_queues.hpp @@ -3,83 +3,76 @@ #ifndef INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP #define INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP +#include <gras/buffer_queue.hpp> #include <gras_impl/bitset.hpp> #include <vector> -#include <boost/circular_buffer.hpp> namespace gras { -template <typename T> struct OutputBufferQueues { - enum {MAX_QUEUE_SIZE = 128}; - BitSet _bitset; - std::vector<boost::circular_buffer<T> > _queues; - - GRAS_FORCE_INLINE void resize(const size_t size) + void set_buffer_queue(const size_t i, BufferQueueSptr queue) { - _bitset.resize(size); - _queues.resize(size, boost::circular_buffer<T>(MAX_QUEUE_SIZE)); + _queues[i] = queue; + _update(i); } - GRAS_FORCE_INLINE void push(const size_t i, const T &value) + void set_reserve_bytes(const size_t i, const size_t num_bytes) { - _queues[i].push_back(value); - _bitset.set(i); + _reserve_bytes[i] = num_bytes; + if (_queues[i]) _update(i); } - //! used for input buffer inlining - GRAS_FORCE_INLINE void push_front(const size_t i, const T &value) + GRAS_FORCE_INLINE void resize(const size_t size) { - ASSERT(not _queues[i].full()); - _queues[i].push_front(value); - _bitset.set(i); + _bitset.resize(size); + _queues.resize(size); + _reserve_bytes.resize(size, 1); } - GRAS_FORCE_INLINE const T &front(const size_t i) const + GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buff) { - return _queues[i].front(); + if (not _queues[i]) return; //block is likely done, throw out buffer + _queues[i]->push(buff); + _update(i); } - GRAS_FORCE_INLINE T &front(const size_t i) + GRAS_FORCE_INLINE void flush_all(void) { - ASSERT(not _queues[i].empty()); - return _queues[i].front(); + const size_t old_size = this->size(); + this->resize(0); + this->resize(old_size); } - GRAS_FORCE_INLINE void pop(const size_t i) + GRAS_FORCE_INLINE SBuffer &front(const size_t i) { - _queues[i].front() = T(); - _queues[i].pop_front(); - _bitset.set(i, not _queues[i].empty()); + ASSERT(not this->empty(i)); + return _queues[i]->front(); } - GRAS_FORCE_INLINE void fail(const size_t i) + GRAS_FORCE_INLINE void pop(const size_t i) { - _bitset.reset(i); + ASSERT(_queues[i]); + ASSERT(not _queues[i]->empty()); + _queues[i]->pop(); + _update(i); } - GRAS_FORCE_INLINE void flush(const size_t i) + GRAS_FORCE_INLINE void fail(const size_t i) { - _queues[i].clear(); _bitset.reset(i); } - GRAS_FORCE_INLINE void flush_all(void) - { - for (size_t i = 0; i < this->size(); i++) this->flush(i); - } - GRAS_FORCE_INLINE bool ready(const size_t i) const { - return not _queues[i].empty(); + return _bitset[i]; } GRAS_FORCE_INLINE bool empty(const size_t i) const { - return _queues[i].empty(); + return (not _queues[i] or _queues[i]->empty()); } GRAS_FORCE_INLINE bool all_ready(void) const @@ -91,6 +84,23 @@ struct OutputBufferQueues { return _queues.size(); } + + GRAS_FORCE_INLINE void _update(const size_t i) + { + if (not _queues[i] or _queues[i]->empty()) + { + _bitset.reset(i); + return; + } + const SBuffer &front = _queues[i]->front(); + const size_t avail = front.get_actual_length() - front.offset - front.length; + _bitset.set(i, avail >= _reserve_bytes[i]); + } + + BitSet _bitset; + std::vector<BufferQueueSptr> _queues; + std::vector<size_t> _reserve_bytes; + }; } //namespace gras diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/simple_buffer_queue.hpp index 0e6be9e..db6ecc0 100644 --- a/lib/gras_impl/buffer_queue.hpp +++ b/lib/gras_impl/simple_buffer_queue.hpp @@ -1,7 +1,7 @@ // Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. -#ifndef INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP -#define INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP +#ifndef INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP +#define INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP #include <gras/sbuffer.hpp> #include <boost/bind.hpp> @@ -10,15 +10,15 @@ namespace gras { -struct BufferQueue : std::queue<SBuffer> +struct SimpleBufferQueue : std::queue<SBuffer> { - BufferQueue(void) + SimpleBufferQueue(void) { - SBufferDeleter deleter = boost::bind(&BufferQueue::push_back, this, _1); + SBufferDeleter deleter = boost::bind(&SimpleBufferQueue::push_back, this, _1); _token = SBufferToken(new SBufferDeleter(deleter)); } - ~BufferQueue(void) + ~SimpleBufferQueue(void) { _token.reset(); while (not this->empty()) @@ -47,4 +47,4 @@ struct BufferQueue : std::queue<SBuffer> } //namespace gras -#endif /*INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP*/ +#endif /*INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP*/ diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 02f62f3..f6a81bc 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -66,8 +66,21 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther //handle the upstream block allocation request OutputAllocMessage new_msg; - new_msg.token = block_ptr->input_buffer_allocator( + new_msg.queue = block_ptr->input_buffer_allocator( index, message.token, message.recommend_length ); - if (new_msg.token) this->post_upstream(index, new_msg); + if (new_msg.queue) this->post_upstream(index, new_msg); +} + +void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t i = message.index; + + //update buffer queue configuration + if (i >= this->input_queues.size()) return; + const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items; + const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items; + const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items; + this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes); } diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index 5876f89..aaf5544 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -70,9 +70,16 @@ void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Th const size_t index = message.index; //return of a positive downstream allocation - //reset the token, and clear old output buffers - //the new token from the downstream is installed - this->output_buffer_tokens[index].reset(); - this->output_queues.flush(index); - this->output_buffer_tokens[index] = message.token; + this->output_queues.set_buffer_queue(index, message.queue); +} + +void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t i = message.index; + + //update buffer queue configuration + if (i >= this->output_queues.size()) return; + const size_t reserve_bytes = this->output_items_sizes[i]*this->output_configs[i].reserve_items; + this->output_queues.set_reserve_bytes(i, reserve_bytes); } diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp index d112157..caa8ed8 100644 --- a/lib/register_messages.cpp +++ b/lib/register_messages.cpp @@ -16,12 +16,13 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::InputBufferMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTokenMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputCheckMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputAllocMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gras::InputUpdateMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputBufferMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputTokenMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputCheckMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputHintMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputAllocMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputUpdateMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::SelfKickMessage); -THERON_DEFINE_REGISTERED_MESSAGE(gras::UpdateInputsMessage); diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index cf757d5..f813b57 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -79,25 +79,18 @@ void BlockActor::handle_topology( this->mark_done(); } - this->topology_init = true; - this->handle_update_inputs(UpdateInputsMessage(), Theron::Address()); - - this->Send(0, from); //ACK -} - -void BlockActor::handle_update_inputs( - const UpdateInputsMessage &, - const Theron::Address -){ - MESSAGE_TRACER(); - const size_t num_inputs = this->get_num_inputs(); - this->input_queues.resize(num_inputs); - for (size_t i = 0; i < num_inputs; i++) { - const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items; - const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items; - const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items; - this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes); + InputUpdateMessage message; + message.index = i; + this->handle_input_update(message, Theron::Address()); + } + for (size_t i = 0; i < num_outputs; i++) + { + OutputUpdateMessage message; + message.index = i; + this->handle_output_update(message, Theron::Address()); } + + this->Send(0, from); //ACK } |