summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2013-03-18 20:38:03 -0700
committerJosh Blum2013-03-18 20:38:03 -0700
commit71a5eed84e050e111ff60006312936f93efdae91 (patch)
tree5e658042c7b7a4b5c5480f25b118e2c7ec5b6a99 /lib
parent84effc390649937ab2f79bbfdf56a00dba38569e (diff)
downloadsandhi-71a5eed84e050e111ff60006312936f93efdae91.tar.gz
sandhi-71a5eed84e050e111ff60006312936f93efdae91.tar.bz2
sandhi-71a5eed84e050e111ff60006312936f93efdae91.zip
gras: msg port reserve tweak convenience
Diffstat (limited to 'lib')
-rw-r--r--lib/gras_impl/block_actor.hpp6
-rw-r--r--lib/input_handlers.cpp12
-rw-r--r--lib/output_handlers.cpp1
-rw-r--r--lib/topology_handler.cpp3
4 files changed, 18 insertions, 4 deletions
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 89b8230..25c83f6 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -141,8 +141,8 @@ struct BlockActor : Apology::Worker
return (
not this->hasHighPrioMsg() and
this->block_state == BLOCK_STATE_LIVE and
- this->input_queues.all_ready() and
this->inputs_available.any() and
+ this->input_queues.all_ready() and
this->output_queues.all_ready()
);
}
@@ -165,10 +165,10 @@ struct BlockActor : Apology::Worker
//buffer queues and ready conditions
InputBufferQueues input_queues;
OutputBufferQueues output_queues;
- BitSet inputs_available;
std::vector<bool> produce_outputs;
+ BitSet inputs_available;
- //tag tracking
+ //tag and msg tracking
std::vector<bool> input_tags_changed;
std::vector<std::vector<Tag> > input_tags;
std::vector<size_t> num_input_msgs_read;
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index c15885d..e3b5203 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -22,7 +22,19 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::
MESSAGE_TRACER();
const size_t index = message.index;
+ //got an input message? remove the item reserve
+ //This is for user convenience so msg ports
+ //dont need any special configuration to work.
+ if (this->input_configs[index].reserve_items)
+ {
+ this->input_configs[index].reserve_items = 0;
+ InputUpdateMessage message;
+ message.index = index;
+ this->handle_input_update(message, Theron::Address());
+ }
+
//handle incoming async message, push into the msg storage
+ if (this->block_state == BLOCK_STATE_DONE) return;
this->input_msgs[index].push_back(message.msg);
this->inputs_available.set(index);
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index d8064f6..17244ee 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -16,6 +16,7 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const
//(all interested consumers have finished with it)
if (this->block_state == BLOCK_STATE_DONE) return;
this->output_queues.push(index, message.buffer);
+
ta.done();
this->handle_task();
}
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index aec8f68..38ea138 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -47,14 +47,15 @@ void BlockActor::handle_topology(
this->output_items.resize(num_outputs);
this->input_queues.resize(num_inputs);
this->output_queues.resize(num_outputs);
- this->inputs_available.resize(num_inputs);
this->produce_outputs.resize(num_outputs, false);
+ this->inputs_available.resize(num_inputs);
if (num_inputs == 0) this->inputs_available.resize(1, true); //so its always "available"
//copy the name into the queues for debug purposes
this->input_queues.name = this->name;
this->output_queues.name = this->name;
+ //resize the token trackers
this->input_tokens.resize(num_inputs);
this->output_tokens.resize(num_outputs);
this->inputs_done.resize(num_inputs);