diff options
author | Josh Blum | 2012-08-29 23:15:21 -0700 |
---|---|---|
committer | Josh Blum | 2012-08-29 23:15:21 -0700 |
commit | 36f216977ff79a72b3c5498162659050bc7552ad (patch) | |
tree | 78d13d782f86d895138aefa690b0f056093ece72 /lib/block_handlers.cpp | |
parent | 483a44a31f6e02ae0cadfc41710f052d9e48fc6c (diff) | |
download | sandhi-36f216977ff79a72b3c5498162659050bc7552ad.tar.gz sandhi-36f216977ff79a72b3c5498162659050bc7552ad.tar.bz2 sandhi-36f216977ff79a72b3c5498162659050bc7552ad.zip |
using port messages and implement buffer queues in house
Diffstat (limited to 'lib/block_handlers.cpp')
-rw-r--r-- | lib/block_handlers.cpp | 75 |
1 files changed, 14 insertions, 61 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index de0a8bf..0eca9ea 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -15,70 +15,19 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" +#include "vec_utils_impl.hpp" using namespace gnuradio; -void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) -{ - if (msg.type() == typeid(Tag)) - { - this->input_tags[index].push_back(msg.cast<Tag>()); - this->input_tags_changed[index] = true; - } - if (msg.type() == typeid(Token)) - { - this->token_pool.push_back(msg.cast<Token>()); - } - if (msg.type() == typeid(CheckTokensMessage)) - { - if (this->input_tokens[index].unique()) - { - this->mark_done(handle); - } - } -} - -void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) -{ - if (msg.type() == typeid(Token)) - { - this->token_pool.push_back(msg.cast<Token>()); - } - if (msg.type() == typeid(CheckTokensMessage)) - { - if (this->output_tokens[index].unique()) - { - this->mark_done(handle); - } - } -} - -template <typename V, typename T> -void resize_fill(V &v, const size_t new_len, const T &fill) -{ - if (v.size() >= new_len) return; //dont ever shrink it - v.resize(new_len, fill); -} - -template <typename V> -void resize_fill_back(V &v, const size_t new_len) -{ - if (v.empty()) v.push_back(0); - resize_fill(v, new_len, v.back()); -} - -template <typename V, typename Sig> -void fill_item_sizes_from_sig(V &v, const Sig &s, const size_t size) +void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state) { - v.resize(size); - for (size_t i = 0; i < v.size(); i++) + if (state.type() == typeid(BufferReturnMessage)) { - v[i] = s->sizeof_stream_item(i); + const BufferReturnMessage &message = state.cast<BufferReturnMessage>(); + this->handle_output_msg(task_iface, message.index, message.buffer); + return; } -} -void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state) -{ const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -91,8 +40,8 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t resize_fill_back(this->output_multiple_items, num_outputs); //resize the bytes consumed/produced - resize_fill(this->items_consumed, num_inputs, 0); - resize_fill(this->items_produced, num_outputs, 0); + resize_fill_grow(this->items_consumed, num_inputs, 0); + resize_fill_grow(this->items_produced, num_outputs, 0); //resize all work buffers to match current connections this->work_input_items.resize(num_inputs); @@ -103,6 +52,10 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t this->consume_items.resize(num_inputs, 0); this->produce_items.resize(num_outputs, 0); this->input_buff_offsets.resize(num_inputs, 0); + this->input_queues.resize(num_inputs); + this->output_queues.resize(num_outputs); + this->inputs_ready.resize(num_inputs); + this->outputs_ready.resize(num_outputs); //resize tags vector to match sizes this->input_tags_changed.resize(num_inputs); @@ -130,13 +83,13 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t this->input_tokens.resize(num_inputs); for (size_t i = 0; i < num_inputs; i++) { - this->input_tokens[i] = make_token(); + this->input_tokens[i] = Token::make(); task_iface.post_upstream(i, this->input_tokens[i]); } this->output_tokens.resize(num_outputs); for (size_t i = 0; i < num_outputs; i++) { - this->output_tokens[i] = make_token(); + this->output_tokens[i] = Token::make(); task_iface.post_downstream(i, this->output_tokens[i]); } } |