diff options
author | Josh Blum | 2012-08-28 15:27:42 -0700 |
---|---|---|
committer | Josh Blum | 2012-08-28 15:27:42 -0700 |
commit | 4044977deba6d64124763836d875b4da2b70eeaf (patch) | |
tree | 45b785eca8769ab1e65fa4e84940c0f2ea60718c | |
parent | 43a371895a821efde490db1df20a0c445b534ba8 (diff) | |
download | sandhi-4044977deba6d64124763836d875b4da2b70eeaf.tar.gz sandhi-4044977deba6d64124763836d875b4da2b70eeaf.tar.bz2 sandhi-4044977deba6d64124763836d875b4da2b70eeaf.zip |
some work on work consume/produce buffers
-rw-r--r-- | lib/block.cpp | 2 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 14 | ||||
-rw-r--r-- | lib/block_task.cpp | 75 | ||||
-rw-r--r-- | lib/element_impl.hpp | 14 |
4 files changed, 94 insertions, 11 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 79ac3c6..324f9af 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -104,7 +104,7 @@ void Block::produce(const size_t which_output, const size_t how_many_items) void Block::set_fixed_rate(const bool fixed_rate) { - (*this)->enble_fixed_rate = fixed_rate; + (*this)->enable_fixed_rate = fixed_rate; } void Block::set_relative_rate(double relative_rate) diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index e5a117f..7825868 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -35,9 +35,10 @@ void resize_fill(V &v, const size_t new_len, const T &fill) } template <typename V> -void resize_fill_front(V &v, const size_t new_len) +void resize_fill_back(V &v, const size_t new_len) { - resize_fill(v, new_len, v.front()); + if (v.empty()) v.push_back(0); + resize_fill(v, new_len, v.back()); } template <typename V, typename Sig> @@ -60,8 +61,8 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t fill_item_sizes_from_sig(this->output_items_sizes, this->output_signature, num_outputs); //resize and fill port properties - resize_fill_front(this->input_history_items, num_inputs); - resize_fill_front(this->output_multiple_items, num_outputs); + resize_fill_back(this->input_history_items, num_inputs); + resize_fill_back(this->output_multiple_items, num_outputs); //resize the bytes consumed/produced resize_fill(this->items_consumed, num_inputs, 0); @@ -73,8 +74,9 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t this->work_ninput_items.resize(num_inputs); this->input_items.resize(num_inputs); this->output_items.resize(num_outputs); - this->consume_items.resize(num_inputs); - this->produce_items.resize(num_outputs); + this->consume_items.resize(num_inputs, 0); + this->produce_items.resize(num_outputs, 0); + this->input_buff_offsets.resize(num_inputs, 0); //resize tags vector to match sizes this->input_tags_changed.resize(num_inputs); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 0001b44..f156ae0 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -50,8 +50,81 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Processing time! //------------------------------------------------------------------ - HERE(); + std::cout << "calling work on " << name << std::endl; + //reset work trackers for production/consumption + for (size_t i = 0; i < num_inputs; i++) + { + //this->consume_items[i] = 0; + + ASSERT(this->input_history_items[i] == 0); + + const tsbe::Buffer &buff = task_iface.get_input_buffer(i); + char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i]; + const size_t bytes = buff.get_length() - this->input_buff_offsets[i]; + const size_t items = bytes/this->input_items_sizes[i]; + + this->input_items[i]._mem = mem; + this->input_items[i]._len = items; + this->work_input_items[i] = mem; + this->work_ninput_items[i] = items; + } + size_t num_output_items = ~0; //so big that it must std::min + for (size_t i = 0; i < num_outputs; i++) + { + //this->produce_items[i] = 0; + + const tsbe::Buffer &buff = task_iface.get_output_buffer(i); + char *mem = ((char *)buff.get_memory()); + const size_t bytes = buff.get_length(); + const size_t items = bytes/this->output_items_sizes[i]; + + this->output_items[i]._mem = mem; + this->output_items[i]._len = items; + this->work_output_items[i] = mem; + num_output_items = std::min(num_output_items, items); + } + + //start with source, this should be EZ + int ret = 0; + ret = block_ptr->Work(this->input_items, this->output_items); + VAR(ret); + if (ret == Block::WORK_DONE) + { + this->active = false; + return; + } + const size_t noutput_items = size_t(ret); + + //now to deal with consumption and production + for (size_t i = 0; i < num_inputs; i++) + { + ASSERT(enable_fixed_rate or ret != Block::WORK_CALLED_PRODUCE); + const size_t items = (enable_fixed_rate)? (myulround((noutput_items/this->relative_rate))) : this->consume_items[i]; + this->consume_items[i] = 0; + + this->items_consumed[i] += items; + const size_t bytes = items*this->input_items_sizes[i]; + this->input_buff_offsets[i] += bytes; + tsbe::Buffer &buff = task_iface.get_input_buffer(i); + if (buff.get_length() >= this->input_buff_offsets[i]) + { + task_iface.pop_input_buffer(i); + this->input_buff_offsets[i] = 0; + } + } + for (size_t i = 0; i < num_outputs; i++) + { + const size_t items = (ret == Block::WORK_CALLED_PRODUCE)? this->produce_items[i] : noutput_items; + this->produce_items[i] = 0; + + this->items_produced[i] += items; + const size_t bytes = items*this->output_items_sizes[i]; + tsbe::Buffer &buff = task_iface.get_output_buffer(i); + buff.get_length() = bytes; + task_iface.post_downstream(i, buff); + task_iface.pop_output_buffer(i); + } //0) figure out what we have for input data //1) calculate the possible num output items diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index e87da70..d54e5bb 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -27,7 +27,8 @@ #include <iostream> #define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; -#define VAR(x) std::cout << #x << " = " << (x) << std::endl << std::flush; +#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; +#define ASSERT(x) if(not (x)){HERE(); std::cerr << "assert failed: " << #x << std::endl << std::flush;} static inline unsigned long myulround(const double x) { @@ -72,15 +73,22 @@ struct ElementImpl std::vector<uint64_t> items_consumed; std::vector<uint64_t> items_produced; - //work buffers + //work buffers for the classic interface gr_vector_const_void_star work_input_items; gr_vector_void_star work_output_items; gr_vector_int work_ninput_items; + + //work buffers for the new work interface Block::InputItems input_items; Block::OutputItems output_items; + + //track work's calls to produce and consume std::vector<size_t> produce_items; std::vector<size_t> consume_items; + //state for partial input buffer consumption + std::vector<size_t> input_buff_offsets; + //special buffer for dealing with history std::vector<tsbe::Buffer> history_buffs; @@ -113,7 +121,7 @@ struct ElementImpl bool active; //rate settings - bool enble_fixed_rate; + bool enable_fixed_rate; double relative_rate; }; |