summaryrefslogtreecommitdiff
path: root/lib/block_handlers.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-08-29 23:15:21 -0700
committerJosh Blum2012-08-29 23:15:21 -0700
commit36f216977ff79a72b3c5498162659050bc7552ad (patch)
tree78d13d782f86d895138aefa690b0f056093ece72 /lib/block_handlers.cpp
parent483a44a31f6e02ae0cadfc41710f052d9e48fc6c (diff)
downloadsandhi-36f216977ff79a72b3c5498162659050bc7552ad.tar.gz
sandhi-36f216977ff79a72b3c5498162659050bc7552ad.tar.bz2
sandhi-36f216977ff79a72b3c5498162659050bc7552ad.zip
using port messages and implement buffer queues in house
Diffstat (limited to 'lib/block_handlers.cpp')
-rw-r--r--lib/block_handlers.cpp75
1 files changed, 14 insertions, 61 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index de0a8bf..0eca9ea 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -15,70 +15,19 @@
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
#include "element_impl.hpp"
+#include "vec_utils_impl.hpp"
using namespace gnuradio;
-void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg)
-{
- if (msg.type() == typeid(Tag))
- {
- this->input_tags[index].push_back(msg.cast<Tag>());
- this->input_tags_changed[index] = true;
- }
- if (msg.type() == typeid(Token))
- {
- this->token_pool.push_back(msg.cast<Token>());
- }
- if (msg.type() == typeid(CheckTokensMessage))
- {
- if (this->input_tokens[index].unique())
- {
- this->mark_done(handle);
- }
- }
-}
-
-void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg)
-{
- if (msg.type() == typeid(Token))
- {
- this->token_pool.push_back(msg.cast<Token>());
- }
- if (msg.type() == typeid(CheckTokensMessage))
- {
- if (this->output_tokens[index].unique())
- {
- this->mark_done(handle);
- }
- }
-}
-
-template <typename V, typename T>
-void resize_fill(V &v, const size_t new_len, const T &fill)
-{
- if (v.size() >= new_len) return; //dont ever shrink it
- v.resize(new_len, fill);
-}
-
-template <typename V>
-void resize_fill_back(V &v, const size_t new_len)
-{
- if (v.empty()) v.push_back(0);
- resize_fill(v, new_len, v.back());
-}
-
-template <typename V, typename Sig>
-void fill_item_sizes_from_sig(V &v, const Sig &s, const size_t size)
+void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state)
{
- v.resize(size);
- for (size_t i = 0; i < v.size(); i++)
+ if (state.type() == typeid(BufferReturnMessage))
{
- v[i] = s->sizeof_stream_item(i);
+ const BufferReturnMessage &message = state.cast<BufferReturnMessage>();
+ this->handle_output_msg(task_iface, message.index, message.buffer);
+ return;
}
-}
-void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state)
-{
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
@@ -91,8 +40,8 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t
resize_fill_back(this->output_multiple_items, num_outputs);
//resize the bytes consumed/produced
- resize_fill(this->items_consumed, num_inputs, 0);
- resize_fill(this->items_produced, num_outputs, 0);
+ resize_fill_grow(this->items_consumed, num_inputs, 0);
+ resize_fill_grow(this->items_produced, num_outputs, 0);
//resize all work buffers to match current connections
this->work_input_items.resize(num_inputs);
@@ -103,6 +52,10 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t
this->consume_items.resize(num_inputs, 0);
this->produce_items.resize(num_outputs, 0);
this->input_buff_offsets.resize(num_inputs, 0);
+ this->input_queues.resize(num_inputs);
+ this->output_queues.resize(num_outputs);
+ this->inputs_ready.resize(num_inputs);
+ this->outputs_ready.resize(num_outputs);
//resize tags vector to match sizes
this->input_tags_changed.resize(num_inputs);
@@ -130,13 +83,13 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t
this->input_tokens.resize(num_inputs);
for (size_t i = 0; i < num_inputs; i++)
{
- this->input_tokens[i] = make_token();
+ this->input_tokens[i] = Token::make();
task_iface.post_upstream(i, this->input_tokens[i]);
}
this->output_tokens.resize(num_outputs);
for (size_t i = 0; i < num_outputs; i++)
{
- this->output_tokens[i] = make_token();
+ this->output_tokens[i] = Token::make();
task_iface.post_downstream(i, this->output_tokens[i]);
}
}