summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/block.cpp12
-rw-r--r--lib/gras_impl/block_actor.hpp6
-rw-r--r--lib/gras_impl/debug.hpp2
-rw-r--r--lib/gras_impl/messages.hpp18
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp3
-rw-r--r--lib/input_handlers.cpp12
-rw-r--r--lib/output_handlers.cpp10
-rw-r--r--lib/register_messages.cpp3
-rw-r--r--lib/topology_handler.cpp28
9 files changed, 64 insertions, 30 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 4f7c7c9..c305d4c 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -73,7 +73,11 @@ void Block::set_input_config(const size_t which_input, const InputPortConfig &co
{
vector_set((*this)->block->input_configs, config, which_input);
if ((*this)->block->topology_init)
- (*this)->block->Push(UpdateInputsMessage(), Theron::Address());
+ {
+ InputUpdateMessage message;
+ message.index = which_input;
+ (*this)->block->Push(message, Theron::Address());
+ }
}
OutputPortConfig Block::get_output_config(const size_t which_output) const
@@ -85,7 +89,11 @@ void Block::set_output_config(const size_t which_output, const OutputPortConfig
{
vector_set((*this)->block->output_configs, config, which_output);
if ((*this)->block->topology_init)
- (*this)->block->Push(UpdateInputsMessage(), Theron::Address());
+ {
+ OutputUpdateMessage message;
+ message.index = which_output;
+ (*this)->block->Push(message, Theron::Address());
+ }
}
void Block::consume(const size_t which_input, const size_t num_items)
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 8717fa2..ab75367 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -46,15 +46,16 @@ struct BlockActor : Apology::Worker
this->RegisterHandler(this, &BlockActor::handle_input_token);
this->RegisterHandler(this, &BlockActor::handle_input_check);
this->RegisterHandler(this, &BlockActor::handle_input_alloc);
+ this->RegisterHandler(this, &BlockActor::handle_input_update);
this->RegisterHandler(this, &BlockActor::handle_output_buffer);
this->RegisterHandler(this, &BlockActor::handle_output_token);
this->RegisterHandler(this, &BlockActor::handle_output_check);
this->RegisterHandler(this, &BlockActor::handle_output_hint);
this->RegisterHandler(this, &BlockActor::handle_output_alloc);
+ this->RegisterHandler(this, &BlockActor::handle_output_update);
this->RegisterHandler(this, &BlockActor::handle_self_kick);
- this->RegisterHandler(this, &BlockActor::handle_update_inputs);
}
//handlers
@@ -72,15 +73,16 @@ struct BlockActor : Apology::Worker
void handle_input_token(const InputTokenMessage &, const Theron::Address);
void handle_input_check(const InputCheckMessage &, const Theron::Address);
void handle_input_alloc(const InputAllocMessage &, const Theron::Address);
+ void handle_input_update(const InputUpdateMessage &, const Theron::Address);
void handle_output_buffer(const OutputBufferMessage &, const Theron::Address);
void handle_output_token(const OutputTokenMessage &, const Theron::Address);
void handle_output_check(const OutputCheckMessage &, const Theron::Address);
void handle_output_hint(const OutputHintMessage &, const Theron::Address);
void handle_output_alloc(const OutputAllocMessage &, const Theron::Address);
+ void handle_output_update(const OutputUpdateMessage &, const Theron::Address);
void handle_self_kick(const SelfKickMessage &, const Theron::Address);
- void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address);
//helpers
void buffer_returner(const size_t index, SBuffer &buffer);
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index a90eb6c..c8c7dfe 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -25,7 +25,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
//-- define to enable these debugs:
//----------------------------------------------------------------------
//#define WORK_DEBUG
-//#define ASSERTING
+#define ASSERTING
//#define MESSAGE_TRACING
//#define ITEM_CONSPROD
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 62d48e6..b113e0d 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -73,6 +73,11 @@ struct InputCheckMessage
size_t index;
};
+struct InputUpdateMessage
+{
+ size_t index;
+};
+
//----------------------------------------------------------------------
//-- message to an output port
//-- do not ack
@@ -108,6 +113,11 @@ struct OutputAllocMessage
BufferQueueSptr queue;
};
+struct OutputUpdateMessage
+{
+ size_t index;
+};
+
//----------------------------------------------------------------------
//-- message to just the block
//-- do not ack
@@ -118,11 +128,6 @@ struct SelfKickMessage
//empty
};
-struct UpdateInputsMessage
-{
- //empty
-};
-
} //namespace gras
#include <Theron/Register.h>
@@ -142,14 +147,15 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::InputBufferMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTokenMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputCheckMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputAllocMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::InputUpdateMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputBufferMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputTokenMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputCheckMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputHintMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputAllocMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputUpdateMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::SelfKickMessage);
-THERON_DECLARE_REGISTERED_MESSAGE(gras::UpdateInputsMessage);
#endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index 5674c3d..d9cd5e4 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -57,7 +57,7 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE SBuffer &front(const size_t i)
{
- ASSERT(not _queues[i]->empty());
+ ASSERT(not this->empty(i));
return _queues[i]->front();
}
@@ -93,6 +93,7 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
+ ASSERT(_queues[i]);
return _queues[i]->empty();
}
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index bef5234..66d2b68 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -71,3 +71,15 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther
);
if (new_msg.queue) this->post_upstream(index, new_msg);
}
+
+void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t i = message.index;
+
+ //update buffer queue configuration
+ const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items;
+ const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items;
+ const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items;
+ this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes);
+}
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index a0bad75..41dd791 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -72,3 +72,13 @@ void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Th
//return of a positive downstream allocation
this->output_queues.set_buffer_queue(index, message.queue);
}
+
+void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t i = message.index;
+
+ //update buffer queue configuration
+ const size_t reserve_bytes = this->output_items_sizes[i]*this->output_configs[i].reserve_items;
+ this->output_queues.set_reserve_bytes(i, reserve_bytes);
+}
diff --git a/lib/register_messages.cpp b/lib/register_messages.cpp
index d112157..caa8ed8 100644
--- a/lib/register_messages.cpp
+++ b/lib/register_messages.cpp
@@ -16,12 +16,13 @@ THERON_DEFINE_REGISTERED_MESSAGE(gras::InputBufferMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputTokenMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputCheckMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::InputAllocMessage);
+THERON_DEFINE_REGISTERED_MESSAGE(gras::InputUpdateMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputBufferMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputTokenMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputCheckMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputHintMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputAllocMessage);
+THERON_DEFINE_REGISTERED_MESSAGE(gras::OutputUpdateMessage);
THERON_DEFINE_REGISTERED_MESSAGE(gras::SelfKickMessage);
-THERON_DEFINE_REGISTERED_MESSAGE(gras::UpdateInputsMessage);
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index cf757d5..0dcdbfb 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -80,24 +80,18 @@ void BlockActor::handle_topology(
}
this->topology_init = true;
- this->handle_update_inputs(UpdateInputsMessage(), Theron::Address());
-
- this->Send(0, from); //ACK
-}
-
-void BlockActor::handle_update_inputs(
- const UpdateInputsMessage &,
- const Theron::Address
-){
- MESSAGE_TRACER();
- const size_t num_inputs = this->get_num_inputs();
- this->input_queues.resize(num_inputs);
-
for (size_t i = 0; i < num_inputs; i++)
{
- const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items;
- const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items;
- const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items;
- this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes);
+ InputUpdateMessage message;
+ message.index = i;
+ this->handle_input_update(message, Theron::Address());
+ }
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ OutputUpdateMessage message;
+ message.index = i;
+ this->handle_output_update(message, Theron::Address());
}
+
+ this->Send(0, from); //ACK
}