diff options
Diffstat (limited to 'lib/gras_impl')
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 13 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 49 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 21 | ||||
-rw-r--r-- | lib/gras_impl/output_buffer_queues.hpp | 84 | ||||
-rw-r--r-- | lib/gras_impl/simple_buffer_queue.hpp (renamed from lib/gras_impl/buffer_queue.hpp) | 14 |
6 files changed, 105 insertions, 78 deletions
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index d204ec4..afc248e 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -46,15 +46,16 @@ struct BlockActor : Apology::Worker this->RegisterHandler(this, &BlockActor::handle_input_token); this->RegisterHandler(this, &BlockActor::handle_input_check); this->RegisterHandler(this, &BlockActor::handle_input_alloc); + this->RegisterHandler(this, &BlockActor::handle_input_update); this->RegisterHandler(this, &BlockActor::handle_output_buffer); this->RegisterHandler(this, &BlockActor::handle_output_token); this->RegisterHandler(this, &BlockActor::handle_output_check); this->RegisterHandler(this, &BlockActor::handle_output_hint); this->RegisterHandler(this, &BlockActor::handle_output_alloc); + this->RegisterHandler(this, &BlockActor::handle_output_update); this->RegisterHandler(this, &BlockActor::handle_self_kick); - this->RegisterHandler(this, &BlockActor::handle_update_inputs); } //handlers @@ -72,15 +73,16 @@ struct BlockActor : Apology::Worker void handle_input_token(const InputTokenMessage &, const Theron::Address); void handle_input_check(const InputCheckMessage &, const Theron::Address); void handle_input_alloc(const InputAllocMessage &, const Theron::Address); + void handle_input_update(const InputUpdateMessage &, const Theron::Address); void handle_output_buffer(const OutputBufferMessage &, const Theron::Address); void handle_output_token(const OutputTokenMessage &, const Theron::Address); void handle_output_check(const OutputCheckMessage &, const Theron::Address); void handle_output_hint(const OutputHintMessage &, const Theron::Address); void handle_output_alloc(const OutputAllocMessage &, const Theron::Address); + void handle_output_update(const OutputUpdateMessage &, const Theron::Address); void handle_self_kick(const SelfKickMessage &, const Theron::Address); - void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address); //helpers void buffer_returner(const size_t index, SBuffer &buffer); @@ -93,7 +95,7 @@ struct BlockActor : Apology::Worker void produce(const size_t index, const size_t items); void consume(const size_t index, const size_t items); void produce_buffer(const size_t index, const SBuffer &buffer); - void flush_output(const size_t index, const bool force_pop = false); + void flush_output(const size_t index); bool is_work_allowed(void); GRAS_FORCE_INLINE bool is_input_done(const size_t i) @@ -122,11 +124,10 @@ struct BlockActor : Apology::Worker BitSet inputs_done; BitSet outputs_done; std::set<Token> token_pool; - std::vector<SBufferToken> output_buffer_tokens; //buffer queues and ready conditions InputBufferQueues input_queues; - OutputBufferQueues<SBuffer> output_queues; + OutputBufferQueues output_queues; BitSet inputs_available; //tag tracking @@ -155,8 +156,6 @@ struct BlockActor : Apology::Worker long buffer_affinity; std::vector<std::vector<OutputHintMessage> > output_allocation_hints; - - bool topology_init; }; } //namespace gras diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index a90eb6c..c8c7dfe 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -25,7 +25,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); //-- define to enable these debugs: //---------------------------------------------------------------------- //#define WORK_DEBUG -//#define ASSERTING +#define ASSERTING //#define MESSAGE_TRACING //#define ITEM_CONSPROD diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 83ed44b..071f2ac 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -5,7 +5,7 @@ #include <gras_impl/debug.hpp> #include <gras_impl/bitset.hpp> -#include <gras_impl/buffer_queue.hpp> +#include <gras_impl/simple_buffer_queue.hpp> #include <gras/sbuffer.hpp> #include <vector> #include <queue> @@ -52,6 +52,8 @@ struct InputBufferQueues //special case when the null buffer is possible if (_queues[i].empty()) return get_null_buff(); + this->try_stitch(i); + //there are enough enqueued bytes, but not in the front buffer const bool must_accumulate = _queues[i].front().length < _reserve_bytes[i]; @@ -60,7 +62,7 @@ struct InputBufferQueues const bool light_front = _queues[i].front().length <= _maximum_bytes[i]/2; const bool should_accumulate = heavy_load and light_front; - if (must_accumulate or should_accumulate) this->accumulate(i); + if (must_accumulate/* or should_accumulate*/) this->accumulate(i); ASSERT(_queues[i].front().length >= _reserve_bytes[i]); @@ -69,6 +71,28 @@ struct InputBufferQueues return _queues[i].front(); } + GRAS_FORCE_INLINE void try_stitch(const size_t i) + { + if (_queues[i].size() < 2) return; + SBuffer &b0 = _queues[i][0]; + SBuffer &b1 = _queues[i][1]; + + if (b0.offset > b0.get_actual_length()) + { + const size_t xfer_bytes = b0.length; + ASSERT(b1.offset >= xfer_bytes); + b1.offset -= xfer_bytes; + b1.length += xfer_bytes; + _queues[i].pop_front(); + return; + } + + const size_t xfer_bytes = b1.length; + b0.length += xfer_bytes; + b1.length = 0; + b1.offset += xfer_bytes; + } + //! Call when input bytes consumed by work void consume(const size_t i, const size_t bytes_consumed); @@ -104,20 +128,7 @@ struct InputBufferQueues { ASSERT(not _queues[i].full()); if (buffer.length == 0) return; - - //does this buffer starts where the last one ends? - //perform buffer stitching into back of buffer - if ( - not _queues[i].empty() and _queues[i].back() == buffer and - (_queues[i].back().length + _queues[i].back().offset) == buffer.offset - ){ - _queues[i].back().length += buffer.length; - } - else - { - _queues[i].push_back(buffer); - } - + _queues[i].push_back(buffer); _enqueued_bytes[i] += buffer.length; __update(i); } @@ -172,7 +183,7 @@ struct InputBufferQueues std::vector<size_t> _maximum_bytes; std::vector<boost::circular_buffer<SBuffer> > _queues; std::vector<size_t> _preload_bytes; - std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; + std::vector<boost::shared_ptr<SimpleBufferQueue> > _aux_queues; }; @@ -207,7 +218,7 @@ inline void InputBufferQueues::update_config( _aux_queues[i]->empty() or _aux_queues[i]->front().get_actual_length() != _maximum_bytes[i] ){ - _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue()); + _aux_queues[i].reset(new SimpleBufferQueue()); _aux_queues[i]->allocate_one(_maximum_bytes[i]); _aux_queues[i]->allocate_one(_maximum_bytes[i]); _aux_queues[i]->allocate_one(_maximum_bytes[i]); @@ -287,7 +298,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b //update bounds on the current buffer front.offset += bytes_consumed; front.length -= bytes_consumed; - ASSERT(front.offset <= front.get_actual_length()); + //ASSERT(front.offset <= front.get_actual_length()); ASSERT((_queues[i].front().length % _items_sizes[i]) == 0); if (front.length == 0) this->pop(i); diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index eae3258..b113e0d 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -3,6 +3,7 @@ #ifndef INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP #define INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP +#include <gras/buffer_queue.hpp> #include <gras/sbuffer.hpp> #include <gras/tags.hpp> #include <gras/sbuffer.hpp> @@ -72,6 +73,11 @@ struct InputCheckMessage size_t index; }; +struct InputUpdateMessage +{ + size_t index; +}; + //---------------------------------------------------------------------- //-- message to an output port //-- do not ack @@ -104,7 +110,12 @@ struct OutputHintMessage struct OutputAllocMessage { size_t index; - SBufferToken token; + BufferQueueSptr queue; +}; + +struct OutputUpdateMessage +{ + size_t index; }; //---------------------------------------------------------------------- @@ -117,11 +128,6 @@ struct SelfKickMessage //empty }; -struct UpdateInputsMessage -{ - //empty -}; - } //namespace gras #include <Theron/Register.h> @@ -141,14 +147,15 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::InputBufferMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTokenMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputCheckMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputAllocMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::InputUpdateMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputBufferMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputTokenMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputCheckMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputHintMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputAllocMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputUpdateMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::SelfKickMessage); -THERON_DECLARE_REGISTERED_MESSAGE(gras::UpdateInputsMessage); #endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/ diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp index 722a4c2..3618b1b 100644 --- a/lib/gras_impl/output_buffer_queues.hpp +++ b/lib/gras_impl/output_buffer_queues.hpp @@ -3,83 +3,76 @@ #ifndef INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP #define INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP +#include <gras/buffer_queue.hpp> #include <gras_impl/bitset.hpp> #include <vector> -#include <boost/circular_buffer.hpp> namespace gras { -template <typename T> struct OutputBufferQueues { - enum {MAX_QUEUE_SIZE = 128}; - BitSet _bitset; - std::vector<boost::circular_buffer<T> > _queues; - - GRAS_FORCE_INLINE void resize(const size_t size) + void set_buffer_queue(const size_t i, BufferQueueSptr queue) { - _bitset.resize(size); - _queues.resize(size, boost::circular_buffer<T>(MAX_QUEUE_SIZE)); + _queues[i] = queue; + _update(i); } - GRAS_FORCE_INLINE void push(const size_t i, const T &value) + void set_reserve_bytes(const size_t i, const size_t num_bytes) { - _queues[i].push_back(value); - _bitset.set(i); + _reserve_bytes[i] = num_bytes; + if (_queues[i]) _update(i); } - //! used for input buffer inlining - GRAS_FORCE_INLINE void push_front(const size_t i, const T &value) + GRAS_FORCE_INLINE void resize(const size_t size) { - ASSERT(not _queues[i].full()); - _queues[i].push_front(value); - _bitset.set(i); + _bitset.resize(size); + _queues.resize(size); + _reserve_bytes.resize(size, 1); } - GRAS_FORCE_INLINE const T &front(const size_t i) const + GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buff) { - return _queues[i].front(); + if (not _queues[i]) return; //block is likely done, throw out buffer + _queues[i]->push(buff); + _update(i); } - GRAS_FORCE_INLINE T &front(const size_t i) + GRAS_FORCE_INLINE void flush_all(void) { - ASSERT(not _queues[i].empty()); - return _queues[i].front(); + const size_t old_size = this->size(); + this->resize(0); + this->resize(old_size); } - GRAS_FORCE_INLINE void pop(const size_t i) + GRAS_FORCE_INLINE SBuffer &front(const size_t i) { - _queues[i].front() = T(); - _queues[i].pop_front(); - _bitset.set(i, not _queues[i].empty()); + ASSERT(not this->empty(i)); + return _queues[i]->front(); } - GRAS_FORCE_INLINE void fail(const size_t i) + GRAS_FORCE_INLINE void pop(const size_t i) { - _bitset.reset(i); + ASSERT(_queues[i]); + ASSERT(not _queues[i]->empty()); + _queues[i]->pop(); + _update(i); } - GRAS_FORCE_INLINE void flush(const size_t i) + GRAS_FORCE_INLINE void fail(const size_t i) { - _queues[i].clear(); _bitset.reset(i); } - GRAS_FORCE_INLINE void flush_all(void) - { - for (size_t i = 0; i < this->size(); i++) this->flush(i); - } - GRAS_FORCE_INLINE bool ready(const size_t i) const { - return not _queues[i].empty(); + return _bitset[i]; } GRAS_FORCE_INLINE bool empty(const size_t i) const { - return _queues[i].empty(); + return (not _queues[i] or _queues[i]->empty()); } GRAS_FORCE_INLINE bool all_ready(void) const @@ -91,6 +84,23 @@ struct OutputBufferQueues { return _queues.size(); } + + GRAS_FORCE_INLINE void _update(const size_t i) + { + if (not _queues[i] or _queues[i]->empty()) + { + _bitset.reset(i); + return; + } + const SBuffer &front = _queues[i]->front(); + const size_t avail = front.get_actual_length() - front.offset - front.length; + _bitset.set(i, avail >= _reserve_bytes[i]); + } + + BitSet _bitset; + std::vector<BufferQueueSptr> _queues; + std::vector<size_t> _reserve_bytes; + }; } //namespace gras diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/simple_buffer_queue.hpp index 0e6be9e..db6ecc0 100644 --- a/lib/gras_impl/buffer_queue.hpp +++ b/lib/gras_impl/simple_buffer_queue.hpp @@ -1,7 +1,7 @@ // Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. -#ifndef INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP -#define INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP +#ifndef INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP +#define INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP #include <gras/sbuffer.hpp> #include <boost/bind.hpp> @@ -10,15 +10,15 @@ namespace gras { -struct BufferQueue : std::queue<SBuffer> +struct SimpleBufferQueue : std::queue<SBuffer> { - BufferQueue(void) + SimpleBufferQueue(void) { - SBufferDeleter deleter = boost::bind(&BufferQueue::push_back, this, _1); + SBufferDeleter deleter = boost::bind(&SimpleBufferQueue::push_back, this, _1); _token = SBufferToken(new SBufferDeleter(deleter)); } - ~BufferQueue(void) + ~SimpleBufferQueue(void) { _token.reset(); while (not this->empty()) @@ -47,4 +47,4 @@ struct BufferQueue : std::queue<SBuffer> } //namespace gras -#endif /*INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP*/ +#endif /*INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP*/ |