diff options
-rw-r--r-- | lib/block.cpp | 3 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 82 | ||||
-rw-r--r-- | lib/block_task.cpp | 4 | ||||
-rw-r--r-- | lib/element_impl.hpp | 3 | ||||
-rw-r--r-- | lib/top_block.cpp | 11 |
5 files changed, 56 insertions, 47 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index c38e362..4c7f113 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -37,7 +37,8 @@ Block::Block(const std::string &name): tsbe::BlockConfig config; config.input_callback = boost::bind(&ElementImpl::handle_input_msg, this->get(), _1, _2, _3); config.output_callback = boost::bind(&ElementImpl::handle_output_msg, this->get(), _1, _2, _3); - config.update_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1, _2); + config.block_callback = boost::bind(&ElementImpl::handle_block_msg, this->get(), _1, _2); + config.changed_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1); (*this)->block = tsbe::Block(config); (*this)->block_ptr = this; diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 95d4b8b..c3a6aff 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -19,7 +19,7 @@ using namespace gnuradio; -void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state) +void ElementImpl::handle_block_msg(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state) { if (state.type() == typeid(BufferReturnMessage)) { @@ -31,6 +31,49 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); + //allocate output tokens and send them downstream + if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) + { + this->input_tokens.resize(num_inputs); + for (size_t i = 0; i < num_inputs; i++) + { + this->input_tokens[i] = Token::make(); + task_iface.post_upstream(i, this->input_tokens[i]); + } + this->output_tokens.resize(num_outputs); + for (size_t i = 0; i < num_outputs; i++) + { + this->output_tokens[i] = Token::make(); + task_iface.post_downstream(i, this->output_tokens[i]); + } + } + + if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) + { + this->active = true; + this->done = false; + this->token_pool.insert(state.cast<TopBlockMessage>().token); + + //causes initial processing kick-off for source blocks + this->handle_allocation(task_iface); + } + if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT) + { + this->mark_done(task_iface); + } + + //TODO: generate a message to handle task in a loop for a while + //we may need to call it into exhaustion to be correct + //but dont call it from update, let the settings above sink in + this->handle_task(task_iface); + this->handle_task(task_iface); +} + +void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) +{ + const size_t num_inputs = task_iface.get_num_inputs(); + const size_t num_outputs = task_iface.get_num_outputs(); + //fill the item sizes from the IO signatures fill_item_sizes_from_sig(this->input_items_sizes, this->input_signature, num_inputs); fill_item_sizes_from_sig(this->output_items_sizes, this->output_signature, num_outputs); @@ -77,46 +120,9 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t } } - //allocate output tokens and send them downstream - if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) - { - this->input_tokens.resize(num_inputs); - for (size_t i = 0; i < num_inputs; i++) - { - this->input_tokens[i] = Token::make(); - task_iface.post_upstream(i, this->input_tokens[i]); - } - this->output_tokens.resize(num_outputs); - for (size_t i = 0; i < num_outputs; i++) - { - this->output_tokens[i] = Token::make(); - task_iface.post_downstream(i, this->output_tokens[i]); - } - } - - if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) - { - this->active = true; - this->done = false; - this->token_pool.insert(state.cast<TopBlockMessage>().token); - - //causes initial processing kick-off for source blocks - this->handle_allocation(task_iface); - } - if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT) - { - this->mark_done(task_iface); - } - //TODO: think more about this: if (num_inputs == 0 and num_outputs == 0) { this->mark_done(task_iface); } - - //TODO: generate a message to handle task in a loop for a while - //we may need to call it into exhaustion to be correct - //but dont call it from update, let the settings above sink in - this->handle_task(task_iface); - this->handle_task(task_iface); } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index d9b52f0..faf8098 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -83,7 +83,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Processing time! //------------------------------------------------------------------ - std::cout << "calling work on " << name << std::endl; + //std::cout << "calling work on " << name << std::endl; //reset work trackers for production/consumption size_t input_tokens_count = 0; @@ -135,7 +135,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //start with source, this should be EZ int ret = 0; ret = block_ptr->Work(this->input_items, this->output_items); - VAR(ret); + //VAR(ret); if (ret == Block::WORK_DONE) { this->mark_done(task_iface); diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 4dda5ff..e24e1d5 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -114,7 +114,8 @@ struct ElementImpl //handlers void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); - void topology_update(const tsbe::TaskInterface &, const tsbe::Wax &); + void topology_update(const tsbe::TaskInterface &); + void handle_block_msg(const tsbe::TaskInterface &, const tsbe::Wax &); void handle_allocation(const tsbe::TaskInterface &); void handle_task(const tsbe::TaskInterface &); void mark_done(const tsbe::TaskInterface &); diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 36ff723..9d7a2f0 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -47,22 +47,23 @@ void TopBlock::set_buffer_hint(const size_t hint) TopBlockMessage event; event.what = TopBlockMessage::HINT; event.hint = hint; - (*this)->executor.update(event); + (*this)->executor.post_msg(event); } void TopBlock::start(void) { + (*this)->executor.commit(); TopBlockMessage event; event.what = TopBlockMessage::ACTIVE; event.token = (*this)->token; - (*this)->executor.update(event); + (*this)->executor.post_msg(event); } void TopBlock::stop(void) { TopBlockMessage event; event.what = TopBlockMessage::INERT; - (*this)->executor.update(event); + (*this)->executor.post_msg(event); } void TopBlock::run(void) @@ -76,7 +77,7 @@ void TopBlock::wait(void) while (not (*this)->token.unique()) { boost::this_thread::yield(); - sleep(1); - VAR((*this)->token.use_count()); + //sleep(1); + //VAR((*this)->token.use_count()); } } |