summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-08-28 15:27:42 -0700
committerJosh Blum2012-08-28 15:27:42 -0700
commit4044977deba6d64124763836d875b4da2b70eeaf (patch)
tree45b785eca8769ab1e65fa4e84940c0f2ea60718c /lib/block_task.cpp
parent43a371895a821efde490db1df20a0c445b534ba8 (diff)
downloadsandhi-4044977deba6d64124763836d875b4da2b70eeaf.tar.gz
sandhi-4044977deba6d64124763836d875b4da2b70eeaf.tar.bz2
sandhi-4044977deba6d64124763836d875b4da2b70eeaf.zip
some work on work consume/produce buffers
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp75
1 files changed, 74 insertions, 1 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 0001b44..f156ae0 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -50,8 +50,81 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- Processing time!
//------------------------------------------------------------------
- HERE();
+ std::cout << "calling work on " << name << std::endl;
+ //reset work trackers for production/consumption
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ //this->consume_items[i] = 0;
+
+ ASSERT(this->input_history_items[i] == 0);
+
+ const tsbe::Buffer &buff = task_iface.get_input_buffer(i);
+ char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i];
+ const size_t bytes = buff.get_length() - this->input_buff_offsets[i];
+ const size_t items = bytes/this->input_items_sizes[i];
+
+ this->input_items[i]._mem = mem;
+ this->input_items[i]._len = items;
+ this->work_input_items[i] = mem;
+ this->work_ninput_items[i] = items;
+ }
+ size_t num_output_items = ~0; //so big that it must std::min
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ //this->produce_items[i] = 0;
+
+ const tsbe::Buffer &buff = task_iface.get_output_buffer(i);
+ char *mem = ((char *)buff.get_memory());
+ const size_t bytes = buff.get_length();
+ const size_t items = bytes/this->output_items_sizes[i];
+
+ this->output_items[i]._mem = mem;
+ this->output_items[i]._len = items;
+ this->work_output_items[i] = mem;
+ num_output_items = std::min(num_output_items, items);
+ }
+
+ //start with source, this should be EZ
+ int ret = 0;
+ ret = block_ptr->Work(this->input_items, this->output_items);
+ VAR(ret);
+ if (ret == Block::WORK_DONE)
+ {
+ this->active = false;
+ return;
+ }
+ const size_t noutput_items = size_t(ret);
+
+ //now to deal with consumption and production
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE);
+ const size_t items = (enable_fixed_rate)? (myulround((noutput_items/this->relative_rate))) : this->consume_items[i];
+ this->consume_items[i] = 0;
+
+ this->items_consumed[i] += items;
+ const size_t bytes = items*this->input_items_sizes[i];
+ this->input_buff_offsets[i] += bytes;
+ tsbe::Buffer &buff = task_iface.get_input_buffer(i);
+ if (buff.get_length() >= this->input_buff_offsets[i])
+ {
+ task_iface.pop_input_buffer(i);
+ this->input_buff_offsets[i] = 0;
+ }
+ }
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ const size_t items = (ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items;
+ this->produce_items[i] = 0;
+
+ this->items_produced[i] += items;
+ const size_t bytes = items*this->output_items_sizes[i];
+ tsbe::Buffer &buff = task_iface.get_output_buffer(i);
+ buff.get_length() = bytes;
+ task_iface.post_downstream(i, buff);
+ task_iface.pop_output_buffer(i);
+ }
//0) figure out what we have for input data
//1) calculate the possible num output items