diff options
author | Josh Blum | 2012-11-01 21:15:03 -0700 |
---|---|---|
committer | Josh Blum | 2012-11-01 21:15:03 -0700 |
commit | b22c63e756d50723188fdd25983868ea1f67bd05 (patch) | |
tree | 8c2f8672ca8e4284b9cd250d85b235011fc3d675 /lib | |
parent | 1581c3925a8d1e2057b9864f0bfe59e7f5fbedfb (diff) | |
download | sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.tar.gz sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.tar.bz2 sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.zip |
moved out a bunch of work, rate, fcast logic
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block.cpp | 51 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/block_task.cpp | 80 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 17 | ||||
-rw-r--r-- | lib/tag_handlers.hpp | 35 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 2 |
6 files changed, 9 insertions, 178 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 0d50ef7..3732fdb 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -51,12 +51,8 @@ Block::Block(const std::string &name): //call block methods to init stuff this->set_input_config(InputPortConfig()); this->set_output_config(OutputPortConfig()); - this->set_fixed_rate(true); - this->set_relative_rate(1.0); - this->set_tag_propagation_policy(TPP_ALL_TO_ALL); this->set_interruptible_work(false); this->set_buffer_affinity(-1); - (*this)->block->output_multiple_items = 1; } template <typename V, typename T> @@ -106,7 +102,6 @@ void Block::set_output_config(const OutputPortConfig &config, const size_t which void Block::consume(const size_t which_input, const size_t how_many_items) { (*this)->block->consume_items[which_input] += how_many_items; - (*this)->block->consume_called[which_input] = true; } void Block::consume_each(const size_t how_many_items) @@ -114,7 +109,6 @@ void Block::consume_each(const size_t how_many_items) for (size_t i = 0; i < (*this)->block->consume_items.size(); i++) { (*this)->block->consume_items[i] += how_many_items; - (*this)->block->consume_called[i] = true; } } @@ -123,39 +117,6 @@ void Block::produce(const size_t which_output, const size_t how_many_items) (*this)->block->produce_items[which_output] += how_many_items; } -void Block::set_fixed_rate(const bool fixed_rate) -{ - (*this)->block->enable_fixed_rate = fixed_rate; -} - -bool Block::fixed_rate(void) const -{ - return (*this)->block->enable_fixed_rate; -} - -void Block::set_output_multiple(const size_t multiple) -{ - (*this)->block->output_multiple_items = multiple; - gras::OutputPortConfig config = this->output_config(); - config.reserve_items = multiple; - this->set_output_config(config); -} - -size_t Block::output_multiple(void) const -{ - return (*this)->block->output_multiple_items; -} - -void Block::set_relative_rate(double relative_rate) -{ - (*this)->block->relative_rate = relative_rate; -} - -double Block::relative_rate(void) const -{ - return (*this)->block->relative_rate; -} - uint64_t Block::nitems_read(const size_t which_input) { return (*this)->block->items_consumed[which_input]; @@ -166,16 +127,6 @@ uint64_t Block::nitems_written(const size_t which_output) return (*this)->block->items_produced[which_output]; } -Block::tag_propagation_policy_t Block::tag_propagation_policy(void) -{ - return (*this)->block->tag_prop_policy; -} - -void Block::set_tag_propagation_policy(Block::tag_propagation_policy_t p) -{ - (*this)->block->tag_prop_policy = p; -} - void Block::post_output_tag( const size_t which_output, const Tag &tag @@ -190,7 +141,7 @@ Block::TagIter Block::get_input_tags( return boost::make_iterator_range(input_tags.begin(), input_tags.end()); } -void Block::forecast(int, std::vector<int> &) +void Block::propagate_tags(const size_t, const TagIter &) { //NOP } diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 6140d40..e35733b 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -68,7 +68,7 @@ void BlockActor::handle_top_token( //tell the upstream about the input requirements OutputHintMessage output_hints; output_hints.history_bytes = this->input_configs[i].lookahead_items*this->input_items_sizes[i]; - output_hints.reserve_bytes = size_t(std::ceil(this->output_multiple_items/this->relative_rate)); + output_hints.reserve_bytes = 1; //TODO what do we want here, and so we like these hints at all? output_hints.token = this->input_tokens[i]; this->post_upstream(i, output_hints); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index f3ddb04..01a9072 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -17,8 +17,6 @@ #include <gras_impl/block_actor.hpp> #include "tag_handlers.hpp" -#define REALLY_BIG size_t(1 << 30) - using namespace gras; void BlockActor::mark_done(void) @@ -110,7 +108,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- initialize input buffers before work //------------------------------------------------------------------ - size_t num_input_items = REALLY_BIG; //so big that it must std::min size_t output_inline_index = 0; for (size_t i = 0; i < num_inputs; i++) { @@ -122,24 +119,10 @@ void BlockActor::handle_task(void) void *mem = buff.get(); size_t items = buff.length/this->input_items_sizes[i]; - this->work_io_ptr_mask |= ptrdiff_t(mem); this->input_items[i].get() = mem; this->input_items[i].size() = items; - this->work_input_items[i] = mem; - this->work_ninput_items[i] = items; - if (this->enable_fixed_rate) - { - if (items <= this->input_configs[i].lookahead_items) - { - this->input_fail(i); return; - } - items -= this->input_configs[i].lookahead_items; - } - - num_input_items = std::min(num_input_items, items); this->consume_items[i] = 0; - this->consume_called[i] = false; //inline dealings, how and when input buffers can be inlined into output buffers //continue; @@ -160,7 +143,6 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ //-- initialize output buffers before work //------------------------------------------------------------------ - size_t num_output_items = REALLY_BIG; //so big that it must std::min for (size_t i = 0; i < num_outputs; i++) { ASSERT(this->output_queues.ready(i)); @@ -169,69 +151,15 @@ void BlockActor::handle_task(void) const size_t bytes = buff.get_actual_length() - buff.length - buff.offset; size_t items = bytes/this->output_items_sizes[i]; - this->work_io_ptr_mask |= ptrdiff_t(mem); this->output_items[i].get() = mem; this->output_items[i].size() = items; - this->work_output_items[i] = mem; - items /= this->output_multiple_items; - items *= this->output_multiple_items; - num_output_items = std::min(num_output_items, items); this->produce_items[i] = 0; } //------------------------------------------------------------------ - //-- calculate the work_noutput_items given: - //-- min of num_input_items - //-- min of num_output_items - //-- relative rate and output multiple items - //------------------------------------------------------------------ - work_noutput_items = num_output_items; - if (num_inputs and (this->enable_fixed_rate or not num_outputs)) - { - size_t calc_output_items = size_t(num_input_items*this->relative_rate); - calc_output_items += this->output_multiple_items-1; - calc_output_items /= this->output_multiple_items; - calc_output_items *= this->output_multiple_items; - if (calc_output_items and calc_output_items < work_noutput_items) - work_noutput_items = calc_output_items; - } - - //------------------------------------------------------------------ - //-- forecast - //------------------------------------------------------------------ - //VAR(work_noutput_items); - if (this->forecast_enable) - { - forecast_again_you_jerk: - fcast_ninput_items = work_ninput_items; //init for NOP case - block_ptr->forecast(work_noutput_items, fcast_ninput_items); - for (size_t i = 0; i < num_inputs; i++) - { - if (fcast_ninput_items[i] <= work_ninput_items[i]) continue; - - //handle the case of forecast failing - if (work_noutput_items <= this->output_multiple_items) - { - this->input_fail(i); return; - } - - work_noutput_items = work_noutput_items/2; //backoff regime - work_noutput_items += this->output_multiple_items-1; - work_noutput_items /= this->output_multiple_items; - work_noutput_items *= this->output_multiple_items; - goto forecast_again_you_jerk; - } - } - - //workaround: - if (num_outputs) output_items[0].size() = work_noutput_items; - else input_items[0].size() = work_noutput_items; - - //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - //VAR(work_noutput_items); this->work_ret = -1; if (this->interruptible_thread) { @@ -241,8 +169,6 @@ void BlockActor::handle_task(void) { this->task_work(); } - const size_t noutput_items = size_t(work_ret); - //VAR(work_ret); if (work_ret == Block::WORK_DONE) { @@ -255,9 +181,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ for (size_t i = 0; i < num_inputs; i++) { - ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE); - const bool use_consume = (not this->enable_fixed_rate) or (this->consume_called[i]); - const size_t items = (use_consume)? this->consume_items[i] : (myulround((noutput_items/this->relative_rate))); + const size_t items = this->consume_items[i]; this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; @@ -271,7 +195,7 @@ void BlockActor::handle_task(void) //------------------------------------------------------------------ for (size_t i = 0; i < num_outputs; i++) { - const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items; + const size_t items = this->produce_items[i]; if (items == 0) continue; SBuffer &buff = this->output_queues.front(i); diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 4ef71e8..18c13e2 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -35,16 +35,6 @@ namespace gras { -static GRAS_FORCE_INLINE unsigned long myulround(const double x) -{ - return (unsigned long)(x + 0.5); -} - -static GRAS_FORCE_INLINE unsigned long long myullround(const double x) -{ - return (unsigned long long)(x + 0.5); -} - struct BlockActor : Apology::Worker { BlockActor(void); @@ -131,7 +121,6 @@ struct BlockActor : Apology::Worker std::vector<size_t> output_items_sizes; std::vector<InputPortConfig> input_configs; std::vector<OutputPortConfig> output_configs; - size_t output_multiple_items; //keeps track of production std::vector<uint64_t> items_consumed; @@ -152,7 +141,6 @@ struct BlockActor : Apology::Worker //track work's calls to produce and consume std::vector<size_t> produce_items; std::vector<size_t> consume_items; - std::vector<bool> consume_called; //track the subscriber counts std::vector<Token> input_tokens; @@ -169,7 +157,6 @@ struct BlockActor : Apology::Worker //tag tracking std::vector<bool> input_tags_changed; std::vector<std::vector<Tag> > input_tags; - Block::tag_propagation_policy_t tag_prop_policy; //interruptible thread stuff bool interruptible_work; @@ -194,10 +181,6 @@ struct BlockActor : Apology::Worker std::vector<std::vector<OutputHintMessage> > output_allocation_hints; - //rate settings - bool enable_fixed_rate; - double relative_rate; - bool forecast_enable; bool topology_init; }; diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index 408e28b..7848c70 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -33,8 +33,6 @@ GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i) GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) { - const size_t num_outputs = this->get_num_outputs(); - //------------------------------------------------------------------ //-- trim the input tags that are past the consumption zone //-- and post trimmed tags to the downstream based on policy @@ -48,36 +46,13 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) last++; } - //follow the tag propagation policy before erasure - switch (this->tag_prop_policy) - { - case Block::TPP_DONT: break; //well that was ez - case Block::TPP_ALL_TO_ALL: - for (size_t out_i = 0; out_i < num_outputs; out_i++) - { - for (size_t tag_i = 0; tag_i < last; tag_i++) - { - Tag t = tags_i[tag_i]; - t.offset = myullround(t.offset * this->relative_rate); - this->post_downstream(out_i, InputTagMessage(t)); - } - } - break; - case Block::TPP_ONE_TO_ONE: - if (i < num_outputs) - { - for (size_t tag_i = 0; tag_i < last; tag_i++) - { - Tag t = tags_i[tag_i]; - t.offset = myullround(t.offset * this->relative_rate); - this->post_downstream(i, InputTagMessage(t)); - } - } - break; - }; + if (last == 0) return; + + //call the overloaded propagate_tags to do the dirty work + this->block_ptr->propagate_tags(i, boost::make_iterator_range(tags_i.begin(), tags_i.begin()+last)); //now its safe to perform the erasure - if (last != 0) tags_i.erase(tags_i.begin(), tags_i.begin()+last); + tags_i.erase(tags_i.begin(), tags_i.begin()+last); } } //namespace gras diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index 27c2346..d1512e9 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -74,11 +74,9 @@ void BlockActor::handle_topology( this->input_items.resize(num_inputs); this->output_items.resize(num_outputs); this->consume_items.resize(num_inputs, 0); - this->consume_called.resize(num_inputs, false); this->produce_items.resize(num_outputs, 0); this->input_queues.resize(num_inputs); this->output_queues.resize(num_outputs); - this->forecast_enable = num_outputs != 0 and num_inputs != 0; this->input_tokens.resize(num_inputs); this->output_tokens.resize(num_outputs); |