diff options
-rw-r--r-- | lib/block.cpp | 12 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 6 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 18 | ||||
-rw-r--r-- | lib/gras_impl/output_buffer_queues.hpp | 3 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 12 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 10 | ||||
-rw-r--r-- | lib/register_messages.cpp | 3 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 28 |
9 files changed, 64 insertions, 30 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 4f7c7c9..c305d4c 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -73,7 +73,11 @@ void Block::set_input_config(const size_t which_input, const InputPortConfig &co { vector_set((*this)->block->input_configs, config, which_input); if ((*this)->block->topology_init) - (*this)->block->Push(UpdateInputsMessage(), Theron::Address()); + { + InputUpdateMessage message; + message.index = which_input; + (*this)->block->Push(message, Theron::Address()); + } } OutputPortConfig Block::get_output_config(const size_t which_output) const @@ -85,7 +89,11 @@ void Block::set_output_config(const size_t which_output, const OutputPortConfig { vector_set((*this)->block->output_configs, config, which_output); if ((*this)->block->topology_init) - (*this)->block->Push(UpdateInputsMessage(), Theron::Address()); + { + OutputUpdateMessage message; + message.index = which_output; + (*this)->block->Push(message, Theron::Address()); + } } void Block::consume(const size_t which_input, const size_t num_items) diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 8717fa2..ab75367 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -46,15 +46,16 @@ struct BlockActor : Apology::Worker this->RegisterHandler(this, &BlockActor::handle_input_token); this->RegisterHandler(this, &BlockActor::handle_input_check); this->RegisterHandler(this, &BlockActor::handle_input_alloc); + this->RegisterHandler(this, &BlockActor::handle_input_update); this->RegisterHandler(this, &BlockActor::handle_output_buffer); this->RegisterHandler(this, &BlockActor::handle_output_token); this->RegisterHandler(this, &BlockActor::handle_output_check); this->RegisterHandler(this, &BlockActor::handle_output_hint); this->RegisterHandler(this, &BlockActor::handle_output_alloc); + this->RegisterHandler(this, &BlockActor::handle_output_update); this->RegisterHandler(this, &BlockActor::handle_self_kick); - this->RegisterHandler(this, &BlockActor::handle_update_inputs); } //handlers @@ -72,15 +73,16 @@ struct BlockActor : Apology::Worker void handle_input_token(const InputTokenMessage &, const Theron::Address); void handle_input_check(const InputCheckMessage &, const Theron::Address); void handle_input_alloc(const InputAllocMessage &, const Theron::Address); + void handle_input_update(const InputUpdateMessage &, const Theron::Address); void handle_output_buffer(const OutputBufferMessage &, const Theron::Address); void handle_output_token(const OutputTokenMessage &, const Theron::Address); void handle_output_check(const OutputCheckMessage &, const Theron::Address); void handle_output_hint(const OutputHintMessage &, const Theron::Address); void handle_output_alloc(const OutputAllocMessage &, const Theron::Address); + void handle_output_update(const OutputUpdateMessage &, const Theron::Address); void handle_self_kick(const SelfKickMessage &, const Theron::Address); - void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address); //helpers void buffer_returner(const size_t index, SBuffer &buffer); diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index a90eb6c..c8c7dfe 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -25,7 +25,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); //-- define to enable these debugs: //---------------------------------------------------------------------- //#define WORK_DEBUG -//#define ASSERTING +#define ASSERTING //#define MESSAGE_TRACING //#define ITEM_CONSPROD diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 62d48e6..b113e0d 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -73,6 +73,11 @@ struct InputCheckMessage size_t index; }; +struct InputUpdateMessage +{ + size_t index; +}; + //---------------------------------------------------------------------- //-- message to an output port //-- do not ack @@ -108,6 +113,11 @@ struct OutputAllocMessage BufferQueueSptr queue; }; +struct OutputUpdateMessage +{ + size_t index; +}; + //---------------------------------------------------------------------- //-- message to just the block //-- do not ack @@ -118,11 +128,6 @@ struct SelfKickMessage //empty }; -struct UpdateInputsMessage -{ - //empty -}; - } //namespace gras #include <Theron/Register.h> @@ -142,14 +147,15 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::InputBufferMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTokenMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputCheckMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::InputAllocMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::InputUpdateMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputBufferMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputTokenMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputCheckMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputHintMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputAllocMessage); +THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputUpdateMessage); THERON_DECLARE_REGISTERED_MESSAGE(gras::SelfKickMessage); -THERON_DECLARE_REGISTERED_MESSAGE(gras::UpdateInputsMessage); #endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/ diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp index 5674c3d..d9cd5e4 100644 --- a/lib/gras_impl/output_buffer_queues.hpp +++ b/lib/gras_impl/output_buffer_queues.hpp @@ -57,7 +57,7 @@ struct OutputBufferQueues GRAS_FORCE_INLINE SBuffer &front(const size_t i) { - ASSERT(not _queues[i]->empty()); + ASSERT(not this->empty(i)); return _queues[i]->front(); } @@ -93,6 +93,7 @@ struct OutputBufferQueues GRAS_FORCE_INLINE bool empty(const size_t i) const { + ASSERT(_queues[i]); return _queues[i]->empty(); } diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index bef5234..66d2b68 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -71,3 +71,15 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther ); if (new_msg.queue) this->post_upstream(index, new_msg); } + +void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t i = message.index; + + //update buffer queue configuration + const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items; + const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items; + const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items; + this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes); +} diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index a0bad75..41dd791 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -72,3 +72,13 @@ void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Th //return of a positive downstream allocation this->output_queues.set_buffer_queue(index, message.queue); } + +void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t i = message.index; + + //update buffer queue configuration + const size_t reserve_bytes = this->output_items_sizes[i]*this->output_configs[i].reserve_items; + this->output_queues.set_reserve_bytes(i, reserve_bytes); +} diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp index d112157..caa8ed8 100644 --- a/lib/register_messages.cpp +++ b/lib/register_messages.cpp @@ -16,12 +16,13 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::InputBufferMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTokenMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputCheckMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::InputAllocMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gras::InputUpdateMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputBufferMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputTokenMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputCheckMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputHintMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputAllocMessage); +THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputUpdateMessage); THERON_DEFINE_REGISTERED_MESSAGE(gras::SelfKickMessage); -THERON_DEFINE_REGISTERED_MESSAGE(gras::UpdateInputsMessage); diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index cf757d5..0dcdbfb 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -80,24 +80,18 @@ void BlockActor::handle_topology( } this->topology_init = true; - this->handle_update_inputs(UpdateInputsMessage(), Theron::Address()); - - this->Send(0, from); //ACK -} - -void BlockActor::handle_update_inputs( - const UpdateInputsMessage &, - const Theron::Address -){ - MESSAGE_TRACER(); - const size_t num_inputs = this->get_num_inputs(); - this->input_queues.resize(num_inputs); - for (size_t i = 0; i < num_inputs; i++) { - const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items; - const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items; - const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items; - this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes); + InputUpdateMessage message; + message.index = i; + this->handle_input_update(message, Theron::Address()); + } + for (size_t i = 0; i < num_outputs; i++) + { + OutputUpdateMessage message; + message.index = i; + this->handle_output_update(message, Theron::Address()); } + + this->Send(0, from); //ACK } |