diff options
author | Josh Blum | 2013-03-18 20:38:03 -0700 |
---|---|---|
committer | Josh Blum | 2013-03-18 20:38:03 -0700 |
commit | 71a5eed84e050e111ff60006312936f93efdae91 (patch) | |
tree | 5e658042c7b7a4b5c5480f25b118e2c7ec5b6a99 /lib | |
parent | 84effc390649937ab2f79bbfdf56a00dba38569e (diff) | |
download | sandhi-71a5eed84e050e111ff60006312936f93efdae91.tar.gz sandhi-71a5eed84e050e111ff60006312936f93efdae91.tar.bz2 sandhi-71a5eed84e050e111ff60006312936f93efdae91.zip |
gras: msg port reserve tweak convenience
Diffstat (limited to 'lib')
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 6 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 12 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 1 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 3 |
4 files changed, 18 insertions, 4 deletions
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 89b8230..25c83f6 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -141,8 +141,8 @@ struct BlockActor : Apology::Worker return ( not this->hasHighPrioMsg() and this->block_state == BLOCK_STATE_LIVE and - this->input_queues.all_ready() and this->inputs_available.any() and + this->input_queues.all_ready() and this->output_queues.all_ready() ); } @@ -165,10 +165,10 @@ struct BlockActor : Apology::Worker //buffer queues and ready conditions InputBufferQueues input_queues; OutputBufferQueues output_queues; - BitSet inputs_available; std::vector<bool> produce_outputs; + BitSet inputs_available; - //tag tracking + //tag and msg tracking std::vector<bool> input_tags_changed; std::vector<std::vector<Tag> > input_tags; std::vector<size_t> num_input_msgs_read; diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index c15885d..e3b5203 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -22,7 +22,19 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron:: MESSAGE_TRACER(); const size_t index = message.index; + //got an input message? remove the item reserve + //This is for user convenience so msg ports + //dont need any special configuration to work. + if (this->input_configs[index].reserve_items) + { + this->input_configs[index].reserve_items = 0; + InputUpdateMessage message; + message.index = index; + this->handle_input_update(message, Theron::Address()); + } + //handle incoming async message, push into the msg storage + if (this->block_state == BLOCK_STATE_DONE) return; this->input_msgs[index].push_back(message.msg); this->inputs_available.set(index); diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index d8064f6..17244ee 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -16,6 +16,7 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const //(all interested consumers have finished with it) if (this->block_state == BLOCK_STATE_DONE) return; this->output_queues.push(index, message.buffer); + ta.done(); this->handle_task(); } diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index aec8f68..38ea138 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -47,14 +47,15 @@ void BlockActor::handle_topology( this->output_items.resize(num_outputs); this->input_queues.resize(num_inputs); this->output_queues.resize(num_outputs); - this->inputs_available.resize(num_inputs); this->produce_outputs.resize(num_outputs, false); + this->inputs_available.resize(num_inputs); if (num_inputs == 0) this->inputs_available.resize(1, true); //so its always "available" //copy the name into the queues for debug purposes this->input_queues.name = this->name; this->output_queues.name = this->name; + //resize the token trackers this->input_tokens.resize(num_inputs); this->output_tokens.resize(num_outputs); this->inputs_done.resize(num_inputs); |