summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-11-01 21:15:03 -0700
committerJosh Blum2012-11-01 21:15:03 -0700
commitb22c63e756d50723188fdd25983868ea1f67bd05 (patch)
tree8c2f8672ca8e4284b9cd250d85b235011fc3d675 /lib
parent1581c3925a8d1e2057b9864f0bfe59e7f5fbedfb (diff)
downloadsandhi-b22c63e756d50723188fdd25983868ea1f67bd05.tar.gz
sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.tar.bz2
sandhi-b22c63e756d50723188fdd25983868ea1f67bd05.zip
moved out a bunch of work, rate, fcast logic
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp51
-rw-r--r--lib/block_handlers.cpp2
-rw-r--r--lib/block_task.cpp80
-rw-r--r--lib/gras_impl/block_actor.hpp17
-rw-r--r--lib/tag_handlers.hpp35
-rw-r--r--lib/topology_handler.cpp2
6 files changed, 9 insertions, 178 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 0d50ef7..3732fdb 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -51,12 +51,8 @@ Block::Block(const std::string &name):
//call block methods to init stuff
this->set_input_config(InputPortConfig());
this->set_output_config(OutputPortConfig());
- this->set_fixed_rate(true);
- this->set_relative_rate(1.0);
- this->set_tag_propagation_policy(TPP_ALL_TO_ALL);
this->set_interruptible_work(false);
this->set_buffer_affinity(-1);
- (*this)->block->output_multiple_items = 1;
}
template <typename V, typename T>
@@ -106,7 +102,6 @@ void Block::set_output_config(const OutputPortConfig &config, const size_t which
void Block::consume(const size_t which_input, const size_t how_many_items)
{
(*this)->block->consume_items[which_input] += how_many_items;
- (*this)->block->consume_called[which_input] = true;
}
void Block::consume_each(const size_t how_many_items)
@@ -114,7 +109,6 @@ void Block::consume_each(const size_t how_many_items)
for (size_t i = 0; i < (*this)->block->consume_items.size(); i++)
{
(*this)->block->consume_items[i] += how_many_items;
- (*this)->block->consume_called[i] = true;
}
}
@@ -123,39 +117,6 @@ void Block::produce(const size_t which_output, const size_t how_many_items)
(*this)->block->produce_items[which_output] += how_many_items;
}
-void Block::set_fixed_rate(const bool fixed_rate)
-{
- (*this)->block->enable_fixed_rate = fixed_rate;
-}
-
-bool Block::fixed_rate(void) const
-{
- return (*this)->block->enable_fixed_rate;
-}
-
-void Block::set_output_multiple(const size_t multiple)
-{
- (*this)->block->output_multiple_items = multiple;
- gras::OutputPortConfig config = this->output_config();
- config.reserve_items = multiple;
- this->set_output_config(config);
-}
-
-size_t Block::output_multiple(void) const
-{
- return (*this)->block->output_multiple_items;
-}
-
-void Block::set_relative_rate(double relative_rate)
-{
- (*this)->block->relative_rate = relative_rate;
-}
-
-double Block::relative_rate(void) const
-{
- return (*this)->block->relative_rate;
-}
-
uint64_t Block::nitems_read(const size_t which_input)
{
return (*this)->block->items_consumed[which_input];
@@ -166,16 +127,6 @@ uint64_t Block::nitems_written(const size_t which_output)
return (*this)->block->items_produced[which_output];
}
-Block::tag_propagation_policy_t Block::tag_propagation_policy(void)
-{
- return (*this)->block->tag_prop_policy;
-}
-
-void Block::set_tag_propagation_policy(Block::tag_propagation_policy_t p)
-{
- (*this)->block->tag_prop_policy = p;
-}
-
void Block::post_output_tag(
const size_t which_output,
const Tag &tag
@@ -190,7 +141,7 @@ Block::TagIter Block::get_input_tags(
return boost::make_iterator_range(input_tags.begin(), input_tags.end());
}
-void Block::forecast(int, std::vector<int> &)
+void Block::propagate_tags(const size_t, const TagIter &)
{
//NOP
}
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 6140d40..e35733b 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -68,7 +68,7 @@ void BlockActor::handle_top_token(
//tell the upstream about the input requirements
OutputHintMessage output_hints;
output_hints.history_bytes = this->input_configs[i].lookahead_items*this->input_items_sizes[i];
- output_hints.reserve_bytes = size_t(std::ceil(this->output_multiple_items/this->relative_rate));
+ output_hints.reserve_bytes = 1; //TODO what do we want here, and so we like these hints at all?
output_hints.token = this->input_tokens[i];
this->post_upstream(i, output_hints);
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index f3ddb04..01a9072 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -17,8 +17,6 @@
#include <gras_impl/block_actor.hpp>
#include "tag_handlers.hpp"
-#define REALLY_BIG size_t(1 << 30)
-
using namespace gras;
void BlockActor::mark_done(void)
@@ -110,7 +108,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- initialize input buffers before work
//------------------------------------------------------------------
- size_t num_input_items = REALLY_BIG; //so big that it must std::min
size_t output_inline_index = 0;
for (size_t i = 0; i < num_inputs; i++)
{
@@ -122,24 +119,10 @@ void BlockActor::handle_task(void)
void *mem = buff.get();
size_t items = buff.length/this->input_items_sizes[i];
- this->work_io_ptr_mask |= ptrdiff_t(mem);
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
- this->work_input_items[i] = mem;
- this->work_ninput_items[i] = items;
- if (this->enable_fixed_rate)
- {
- if (items <= this->input_configs[i].lookahead_items)
- {
- this->input_fail(i); return;
- }
- items -= this->input_configs[i].lookahead_items;
- }
-
- num_input_items = std::min(num_input_items, items);
this->consume_items[i] = 0;
- this->consume_called[i] = false;
//inline dealings, how and when input buffers can be inlined into output buffers
//continue;
@@ -160,7 +143,6 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- initialize output buffers before work
//------------------------------------------------------------------
- size_t num_output_items = REALLY_BIG; //so big that it must std::min
for (size_t i = 0; i < num_outputs; i++)
{
ASSERT(this->output_queues.ready(i));
@@ -169,69 +151,15 @@ void BlockActor::handle_task(void)
const size_t bytes = buff.get_actual_length() - buff.length - buff.offset;
size_t items = bytes/this->output_items_sizes[i];
- this->work_io_ptr_mask |= ptrdiff_t(mem);
this->output_items[i].get() = mem;
this->output_items[i].size() = items;
- this->work_output_items[i] = mem;
- items /= this->output_multiple_items;
- items *= this->output_multiple_items;
- num_output_items = std::min(num_output_items, items);
this->produce_items[i] = 0;
}
//------------------------------------------------------------------
- //-- calculate the work_noutput_items given:
- //-- min of num_input_items
- //-- min of num_output_items
- //-- relative rate and output multiple items
- //------------------------------------------------------------------
- work_noutput_items = num_output_items;
- if (num_inputs and (this->enable_fixed_rate or not num_outputs))
- {
- size_t calc_output_items = size_t(num_input_items*this->relative_rate);
- calc_output_items += this->output_multiple_items-1;
- calc_output_items /= this->output_multiple_items;
- calc_output_items *= this->output_multiple_items;
- if (calc_output_items and calc_output_items < work_noutput_items)
- work_noutput_items = calc_output_items;
- }
-
- //------------------------------------------------------------------
- //-- forecast
- //------------------------------------------------------------------
- //VAR(work_noutput_items);
- if (this->forecast_enable)
- {
- forecast_again_you_jerk:
- fcast_ninput_items = work_ninput_items; //init for NOP case
- block_ptr->forecast(work_noutput_items, fcast_ninput_items);
- for (size_t i = 0; i < num_inputs; i++)
- {
- if (fcast_ninput_items[i] <= work_ninput_items[i]) continue;
-
- //handle the case of forecast failing
- if (work_noutput_items <= this->output_multiple_items)
- {
- this->input_fail(i); return;
- }
-
- work_noutput_items = work_noutput_items/2; //backoff regime
- work_noutput_items += this->output_multiple_items-1;
- work_noutput_items /= this->output_multiple_items;
- work_noutput_items *= this->output_multiple_items;
- goto forecast_again_you_jerk;
- }
- }
-
- //workaround:
- if (num_outputs) output_items[0].size() = work_noutput_items;
- else input_items[0].size() = work_noutput_items;
-
- //------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
- //VAR(work_noutput_items);
this->work_ret = -1;
if (this->interruptible_thread)
{
@@ -241,8 +169,6 @@ void BlockActor::handle_task(void)
{
this->task_work();
}
- const size_t noutput_items = size_t(work_ret);
- //VAR(work_ret);
if (work_ret == Block::WORK_DONE)
{
@@ -255,9 +181,7 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
for (size_t i = 0; i < num_inputs; i++)
{
- ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE);
- const bool use_consume = (not this->enable_fixed_rate) or (this->consume_called[i]);
- const size_t items = (use_consume)? this->consume_items[i] : (myulround((noutput_items/this->relative_rate)));
+ const size_t items = this->consume_items[i];
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
@@ -271,7 +195,7 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
for (size_t i = 0; i < num_outputs; i++)
{
- const size_t items = (work_ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items;
+ const size_t items = this->produce_items[i];
if (items == 0) continue;
SBuffer &buff = this->output_queues.front(i);
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 4ef71e8..18c13e2 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -35,16 +35,6 @@
namespace gras
{
-static GRAS_FORCE_INLINE unsigned long myulround(const double x)
-{
- return (unsigned long)(x + 0.5);
-}
-
-static GRAS_FORCE_INLINE unsigned long long myullround(const double x)
-{
- return (unsigned long long)(x + 0.5);
-}
-
struct BlockActor : Apology::Worker
{
BlockActor(void);
@@ -131,7 +121,6 @@ struct BlockActor : Apology::Worker
std::vector<size_t> output_items_sizes;
std::vector<InputPortConfig> input_configs;
std::vector<OutputPortConfig> output_configs;
- size_t output_multiple_items;
//keeps track of production
std::vector<uint64_t> items_consumed;
@@ -152,7 +141,6 @@ struct BlockActor : Apology::Worker
//track work's calls to produce and consume
std::vector<size_t> produce_items;
std::vector<size_t> consume_items;
- std::vector<bool> consume_called;
//track the subscriber counts
std::vector<Token> input_tokens;
@@ -169,7 +157,6 @@ struct BlockActor : Apology::Worker
//tag tracking
std::vector<bool> input_tags_changed;
std::vector<std::vector<Tag> > input_tags;
- Block::tag_propagation_policy_t tag_prop_policy;
//interruptible thread stuff
bool interruptible_work;
@@ -194,10 +181,6 @@ struct BlockActor : Apology::Worker
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
- //rate settings
- bool enable_fixed_rate;
- double relative_rate;
- bool forecast_enable;
bool topology_init;
};
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index 408e28b..7848c70 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -33,8 +33,6 @@ GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i)
GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
{
- const size_t num_outputs = this->get_num_outputs();
-
//------------------------------------------------------------------
//-- trim the input tags that are past the consumption zone
//-- and post trimmed tags to the downstream based on policy
@@ -48,36 +46,13 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
last++;
}
- //follow the tag propagation policy before erasure
- switch (this->tag_prop_policy)
- {
- case Block::TPP_DONT: break; //well that was ez
- case Block::TPP_ALL_TO_ALL:
- for (size_t out_i = 0; out_i < num_outputs; out_i++)
- {
- for (size_t tag_i = 0; tag_i < last; tag_i++)
- {
- Tag t = tags_i[tag_i];
- t.offset = myullround(t.offset * this->relative_rate);
- this->post_downstream(out_i, InputTagMessage(t));
- }
- }
- break;
- case Block::TPP_ONE_TO_ONE:
- if (i < num_outputs)
- {
- for (size_t tag_i = 0; tag_i < last; tag_i++)
- {
- Tag t = tags_i[tag_i];
- t.offset = myullround(t.offset * this->relative_rate);
- this->post_downstream(i, InputTagMessage(t));
- }
- }
- break;
- };
+ if (last == 0) return;
+
+ //call the overloaded propagate_tags to do the dirty work
+ this->block_ptr->propagate_tags(i, boost::make_iterator_range(tags_i.begin(), tags_i.begin()+last));
//now its safe to perform the erasure
- if (last != 0) tags_i.erase(tags_i.begin(), tags_i.begin()+last);
+ tags_i.erase(tags_i.begin(), tags_i.begin()+last);
}
} //namespace gras
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 27c2346..d1512e9 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -74,11 +74,9 @@ void BlockActor::handle_topology(
this->input_items.resize(num_inputs);
this->output_items.resize(num_outputs);
this->consume_items.resize(num_inputs, 0);
- this->consume_called.resize(num_inputs, false);
this->produce_items.resize(num_outputs, 0);
this->input_queues.resize(num_inputs);
this->output_queues.resize(num_outputs);
- this->forecast_enable = num_outputs != 0 and num_inputs != 0;
this->input_tokens.resize(num_inputs);
this->output_tokens.resize(num_outputs);