diff options
-rw-r--r-- | include/gras/block.hpp | 37 | ||||
-rw-r--r-- | lib/block.cpp | 2 | ||||
-rw-r--r-- | lib/block_task.cpp | 36 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 38 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 5 |
5 files changed, 89 insertions, 29 deletions
diff --git a/include/gras/block.hpp b/include/gras/block.hpp index 76ccd61..25d2993 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -33,6 +33,25 @@ struct GRAS_API InputPortConfig InputPortConfig(void); /*! + * Set an input reserve requirement such that work is called + * with an input buffer at least reserve items in size. + * + * Default = 1. + */ + size_t reserve_items; + + /*! + * Constrain the input buffer allocation size: + * The scheduler may accumulate multiple buffers + * into a single larger buffer under failure conditions. + * The maximum size of this accumulated buffer + * is constrained by this maximum_items setting. + * + * Default = 0 aka disabled. + */ + size_t maximum_items; + + /*! * Set buffer inlining for this port config. * Inlining means that the input buffer can be used as an output buffer. * The goal is to make better use of cache and memory bandwidth. @@ -78,8 +97,10 @@ struct GRAS_API OutputPortConfig size_t reserve_items; /*! - * Constrain the maximum number of items that - * work can be called with for this port. + * Constrain the output buffer allocation size: + * The user might set a small maximum items + * to reduce the amount of buffered items + * waiting for processing in downstream queues. * * Default = 0 aka disabled. */ @@ -204,7 +225,8 @@ struct GRAS_API Block : Element * The next call to work will be with a full size output buffer. * * - If the output buffer was not partially filled, this call will throw. - * In this case, the user should set larger reserve_items on this port. + * In this case, the user should set larger maximum_items on this port. + * * \param which_output the output port index */ void mark_output_fail(const size_t which_output); @@ -220,14 +242,9 @@ struct GRAS_API Block : Element * is no longer producing, then the scheduler will mark this block done. * * - If the input buffer at the maximum size, this call will throw. - * In this case, the user should set larger reserve_items on this port. + * In this case, the user should set larger maximum_items on this port. * - * If the output buffer was partially filled (ie, not flushed downstream), - * this will cause the output buffer to flush to the downstream. - * The next call to work will be with a full size output buffer. - * If the output buffer was not partially filled, this call will throw. - * In this case, the user should set larger reserve_items on this port. - * \param which_output the output port index + * \param which_input the input port index */ void mark_input_fail(const size_t which_input); diff --git a/lib/block.cpp b/lib/block.cpp index 2235f72..578b03e 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -21,6 +21,8 @@ using namespace gras; InputPortConfig::InputPortConfig(void) { + reserve_items = 1; + maximum_items = 0; inline_buffer = false; lookahead_items = 0; } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 8546731..cfe5bb0 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -73,6 +73,15 @@ void BlockActor::mark_done(void) void BlockActor::input_fail(const size_t i) { + SBuffer &buff = this->input_queues.front(i); + + //check that the input is not already maxed + const size_t front_items = buff.length/this->input_items_sizes[i]; + if (front_items >= this->input_configs[i].maximum_items) + { + //throw std::runtime_error("input_fail called on maximum_items buffer"); + } + //input failed, accumulate and try again if (not this->input_queues.is_accumulated(i)) { @@ -80,15 +89,29 @@ void BlockActor::input_fail(const size_t i) this->Push(SelfKickMessage(), Theron::Address()); return; } + //otherwise check for done, else wait for more if (this->inputs_done[i]) this->mark_done(); - - //TODO check if input buffer is max size and throw } void BlockActor::output_fail(const size_t i) { - //TODO + SBuffer &buff = this->output_queues.front(i); + + //check that the input is not already maxed + const size_t front_items = buff.length/this->output_items_sizes[i]; + if (front_items >= this->output_configs[i].maximum_items) + { + throw std::runtime_error("output_fail called on maximum_items buffer"); + } + + if (buff.length != 0) + { + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); + this->output_queues.pop(i); + } } void BlockActor::handle_task(void) @@ -178,9 +201,14 @@ void BlockActor::handle_task(void) { if (not this->output_queues.ready(i)) continue; SBuffer &buff = this->output_queues.front(i); + const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i]; //dont always pass output buffers downstream for the sake of efficiency - if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length()) + if ( + not this->input_queues.all_ready() or + buff.length*2 > buff.get_actual_length() or + (buff.get_actual_length() - buff.length) < reserve_bytes + ) { InputBufferMessage buff_msg; buff_msg.buffer = buff; diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 92bfb02..b851003 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -40,7 +40,7 @@ struct InputBufferQueues this->resize(0); } - void update_history_bytes(const size_t i, const size_t hist_bytes); + void update_config(const size_t i, const size_t, const size_t, const size_t); //! Call to get an input buffer for work GRAS_FORCE_INLINE SBuffer &front(const size_t i) @@ -114,11 +114,12 @@ struct InputBufferQueues GRAS_FORCE_INLINE void __update(const size_t i) { - _bitset.set(i, _enqueued_bytes[i] != 0); + _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]); } BitSet _bitset; std::vector<size_t> _enqueued_bytes; + std::vector<size_t> _reserve_bytes; std::vector<boost::circular_buffer<SBuffer> > _queues; std::vector<size_t> _history_bytes; std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; @@ -129,24 +130,33 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size) { _bitset.resize(size); _enqueued_bytes.resize(size, 0); + _reserve_bytes.resize(size, 1); _queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE)); _history_bytes.resize(size, 0); _aux_queues.resize(size); - for (size_t i = 0; i < this->size(); i++) - { - if (_aux_queues[i]) continue; - _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); - _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); - _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); - _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); - _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES); - } - } -inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes) +inline void InputBufferQueues::update_config( + const size_t i, + const size_t hist_bytes, + const size_t reserve_bytes, + size_t maximum_bytes +) { + //first allocate the aux buffer + if (maximum_bytes == 0) maximum_bytes = MAX_AUX_BUFF_BYTES; + if ( + not _aux_queues[i] or + _aux_queues[i]->empty() or + _aux_queues[i]->front().get_actual_length() != maximum_bytes + ){ + _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); + _aux_queues[i]->allocate_one(maximum_bytes); + _aux_queues[i]->allocate_one(maximum_bytes); + _aux_queues[i]->allocate_one(maximum_bytes); + } + //there is history, so enqueue some initial history if (hist_bytes > _history_bytes[i]) { @@ -169,6 +179,8 @@ inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t } _history_bytes[i] = hist_bytes; + _reserve_bytes[i] = reserve_bytes; + this->__update(i); } GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size) diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index 9261af1..9a1f243 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -102,10 +102,11 @@ void BlockActor::handle_update_inputs( const size_t num_inputs = this->get_num_inputs(); this->input_queues.resize(num_inputs); - //impose input reserve requirements based on relative rate and output multiple for (size_t i = 0; i < num_inputs; i++) { const size_t hist_bytes = this->input_items_sizes[i]*this->input_configs[i].lookahead_items; - this->input_queues.update_history_bytes(i, hist_bytes); + const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items; + const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items; + this->input_queues.update_config(i, hist_bytes, reserve_bytes, maximum_bytes); } } |