diff options
author | Josh Blum | 2012-11-07 00:40:41 -0800 |
---|---|---|
committer | Josh Blum | 2012-11-07 00:40:41 -0800 |
commit | 0f0c68c4a8c32c43eee8f1589878abe376e2b1da (patch) | |
tree | 165014e95b5fa5fda8bcaf95bb7af6b2b99d7f02 /lib/block_task.cpp | |
parent | 12c56a0845a7b8e5cbff6ed20eae8b3fa3e26b2d (diff) | |
download | sandhi-0f0c68c4a8c32c43eee8f1589878abe376e2b1da.tar.gz sandhi-0f0c68c4a8c32c43eee8f1589878abe376e2b1da.tar.bz2 sandhi-0f0c68c4a8c32c43eee8f1589878abe376e2b1da.zip |
io fails and io config work
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r-- | lib/block_task.cpp | 36 |
1 files changed, 32 insertions, 4 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 8546731..cfe5bb0 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -73,6 +73,15 @@ void BlockActor::mark_done(void) void BlockActor::input_fail(const size_t i) { + SBuffer &buff = this->input_queues.front(i); + + //check that the input is not already maxed + const size_t front_items = buff.length/this->input_items_sizes[i]; + if (front_items >= this->input_configs[i].maximum_items) + { + //throw std::runtime_error("input_fail called on maximum_items buffer"); + } + //input failed, accumulate and try again if (not this->input_queues.is_accumulated(i)) { @@ -80,15 +89,29 @@ void BlockActor::input_fail(const size_t i) this->Push(SelfKickMessage(), Theron::Address()); return; } + //otherwise check for done, else wait for more if (this->inputs_done[i]) this->mark_done(); - - //TODO check if input buffer is max size and throw } void BlockActor::output_fail(const size_t i) { - //TODO + SBuffer &buff = this->output_queues.front(i); + + //check that the input is not already maxed + const size_t front_items = buff.length/this->output_items_sizes[i]; + if (front_items >= this->output_configs[i].maximum_items) + { + throw std::runtime_error("output_fail called on maximum_items buffer"); + } + + if (buff.length != 0) + { + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); + this->output_queues.pop(i); + } } void BlockActor::handle_task(void) @@ -178,9 +201,14 @@ void BlockActor::handle_task(void) { if (not this->output_queues.ready(i)) continue; SBuffer &buff = this->output_queues.front(i); + const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i]; //dont always pass output buffers downstream for the sake of efficiency - if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length()) + if ( + not this->input_queues.all_ready() or + buff.length*2 > buff.get_actual_length() or + (buff.get_actual_length() - buff.length) < reserve_bytes + ) { InputBufferMessage buff_msg; buff_msg.buffer = buff; |