diff options
author | Josh Blum | 2012-08-30 01:05:53 -0700 |
---|---|---|
committer | Josh Blum | 2012-08-30 01:05:53 -0700 |
commit | e661028006d0f36ad10672f4d0fa034c157e882d (patch) | |
tree | 1a9449b20de7a7803f8ad38b57d9e912cb1b96a5 | |
parent | 36f216977ff79a72b3c5498162659050bc7552ad (diff) | |
download | sandhi-e661028006d0f36ad10672f4d0fa034c157e882d.tar.gz sandhi-e661028006d0f36ad10672f4d0fa034c157e882d.tar.bz2 sandhi-e661028006d0f36ad10672f4d0fa034c157e882d.zip |
cleanups from the previous commit
-rw-r--r-- | lib/block.cpp | 2 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 6 | ||||
-rw-r--r-- | lib/block_ports.cpp | 6 | ||||
-rw-r--r-- | lib/block_task.cpp | 41 | ||||
-rw-r--r-- | lib/element_impl.hpp | 5 | ||||
-rw-r--r-- | lib/top_block.cpp | 5 |
6 files changed, 35 insertions, 30 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 73c232f..c38e362 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -42,6 +42,8 @@ Block::Block(const std::string &name): (*this)->block_ptr = this; (*this)->hint = 0; + (*this)->active = false; + (*this)->done = false; } template <typename V, typename T> diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 0eca9ea..ab32300 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -97,14 +97,14 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) { this->active = true; - this->token = state.cast<TopBlockMessage>().token; + this->done = false; + this->token_pool.insert(state.cast<TopBlockMessage>().token); //causes initial processing kick-off for source blocks this->handle_allocation(task_iface); } if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT) { - this->active = false; - this->token = state.cast<TopBlockMessage>().token; + this->mark_done(task_iface); } } diff --git a/lib/block_ports.cpp b/lib/block_ports.cpp index 702e0c4..3b3aec6 100644 --- a/lib/block_ports.cpp +++ b/lib/block_ports.cpp @@ -22,6 +22,7 @@ void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size { if (msg.type() == typeid(tsbe::Buffer)) { + if (this->done) return; this->input_queues[index].push(msg.cast<tsbe::Buffer>()); this->inputs_ready.set(index, true); this->handle_task(handle); @@ -35,7 +36,7 @@ void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size } if (msg.type() == typeid(Token)) { - this->token_pool.push_back(msg.cast<Token>()); + this->token_pool.insert(msg.cast<Token>()); return; } if (msg.type() == typeid(CheckTokensMessage)) @@ -52,6 +53,7 @@ void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const siz { if (msg.type() == typeid(tsbe::Buffer)) { + if (this->done) return; this->output_queues[index].push(msg.cast<tsbe::Buffer>()); this->outputs_ready.set(index, true); this->handle_task(handle); @@ -59,7 +61,7 @@ void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const siz } if (msg.type() == typeid(Token)) { - this->token_pool.push_back(msg.cast<Token>()); + this->token_pool.insert(msg.cast<Token>()); return; } if (msg.type() == typeid(CheckTokensMessage)) diff --git a/lib/block_task.cpp b/lib/block_task.cpp index d871f12..d9b52f0 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -20,26 +20,27 @@ using namespace gnuradio; -void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface) -{ - for (size_t i = 0; i < task_iface.get_num_inputs(); i++) - { - while (not this->input_queues[i].empty()) - { - this->input_queues[i].pop(); - } - this->inputs_ready.set(i, false); - } -} - void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) { - if (not this->active) return; + if (this->done) return; //can re-enter checking done first + + //mark down the new state this->active = false; + this->done = true; + + //release upstream, downstream, and executor tokens this->token_pool.clear(); - this->token.reset(); - this->free_inputs(task_iface); + + //release allocator tokens, buffers can now call deleters this->output_buffer_tokens.clear(); + + //release all buffers in queues + this->input_queues.clear(); + this->output_queues.clear(); + + //tell the upstream and downstram to re-check their tokens + //this is how the other blocks know who is interested, + //and can decide based on interest to set done or not for (size_t i = 0; i < task_iface.get_num_inputs(); i++) { task_iface.post_upstream(i, CheckTokensMessage()); @@ -48,18 +49,12 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) { task_iface.post_downstream(i, CheckTokensMessage()); } - HERE(); - VAR(name); + + std::cout << "This one: " << name << " is done..." << std::endl; } void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) { - //FIXME in case we get called in the inactive state, assuming done? - if (not this->active) - { - this->free_inputs(task_iface); - } - //------------------------------------------------------------------ //-- Decide if its possible to continue any processing: //-- Handle task may get called for incoming buffers, diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 18c2fd3..4dda5ff 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -24,6 +24,7 @@ #include <gnuradio/element.hpp> #include <gnuradio/block.hpp> #include <gr_types.h> +#include <set> #include <vector> #include <queue> @@ -82,7 +83,7 @@ struct ElementImpl //track the subscriber counts std::vector<Token> input_tokens; std::vector<Token> output_tokens; - std::vector<Token> token_pool; + std::set<Token> token_pool; std::vector<tsbe::BufferToken> output_buffer_tokens; @@ -117,11 +118,11 @@ struct ElementImpl void handle_allocation(const tsbe::TaskInterface &); void handle_task(const tsbe::TaskInterface &); void mark_done(const tsbe::TaskInterface &); - void free_inputs(const tsbe::TaskInterface &); void buffer_returner(const size_t index, tsbe::Buffer &buffer); //is the fg running? bool active; + bool done; Token token; size_t hint; //some kind of allocation hint diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 6f98449..36ff723 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -32,6 +32,9 @@ TopBlock::TopBlock(const std::string &name): config.topology = (*this)->topology; (*this)->executor = tsbe::Executor(config); (*this)->token = Token::make(); + std::cout << "===================================================" << std::endl; + std::cout << "== Top Block Created: " << name << std::endl; + std::cout << "===================================================" << std::endl; } void TopBlock::update(void) @@ -73,5 +76,7 @@ void TopBlock::wait(void) while (not (*this)->token.unique()) { boost::this_thread::yield(); + sleep(1); + VAR((*this)->token.use_count()); } } |