summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-08-30 01:05:53 -0700
committerJosh Blum2012-08-30 01:05:53 -0700
commite661028006d0f36ad10672f4d0fa034c157e882d (patch)
tree1a9449b20de7a7803f8ad38b57d9e912cb1b96a5
parent36f216977ff79a72b3c5498162659050bc7552ad (diff)
downloadsandhi-e661028006d0f36ad10672f4d0fa034c157e882d.tar.gz
sandhi-e661028006d0f36ad10672f4d0fa034c157e882d.tar.bz2
sandhi-e661028006d0f36ad10672f4d0fa034c157e882d.zip
cleanups from the previous commit
-rw-r--r--lib/block.cpp2
-rw-r--r--lib/block_handlers.cpp6
-rw-r--r--lib/block_ports.cpp6
-rw-r--r--lib/block_task.cpp41
-rw-r--r--lib/element_impl.hpp5
-rw-r--r--lib/top_block.cpp5
6 files changed, 35 insertions, 30 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 73c232f..c38e362 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -42,6 +42,8 @@ Block::Block(const std::string &name):
(*this)->block_ptr = this;
(*this)->hint = 0;
+ (*this)->active = false;
+ (*this)->done = false;
}
template <typename V, typename T>
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 0eca9ea..ab32300 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -97,14 +97,14 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t
if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
{
this->active = true;
- this->token = state.cast<TopBlockMessage>().token;
+ this->done = false;
+ this->token_pool.insert(state.cast<TopBlockMessage>().token);
//causes initial processing kick-off for source blocks
this->handle_allocation(task_iface);
}
if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT)
{
- this->active = false;
- this->token = state.cast<TopBlockMessage>().token;
+ this->mark_done(task_iface);
}
}
diff --git a/lib/block_ports.cpp b/lib/block_ports.cpp
index 702e0c4..3b3aec6 100644
--- a/lib/block_ports.cpp
+++ b/lib/block_ports.cpp
@@ -22,6 +22,7 @@ void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size
{
if (msg.type() == typeid(tsbe::Buffer))
{
+ if (this->done) return;
this->input_queues[index].push(msg.cast<tsbe::Buffer>());
this->inputs_ready.set(index, true);
this->handle_task(handle);
@@ -35,7 +36,7 @@ void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size
}
if (msg.type() == typeid(Token))
{
- this->token_pool.push_back(msg.cast<Token>());
+ this->token_pool.insert(msg.cast<Token>());
return;
}
if (msg.type() == typeid(CheckTokensMessage))
@@ -52,6 +53,7 @@ void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const siz
{
if (msg.type() == typeid(tsbe::Buffer))
{
+ if (this->done) return;
this->output_queues[index].push(msg.cast<tsbe::Buffer>());
this->outputs_ready.set(index, true);
this->handle_task(handle);
@@ -59,7 +61,7 @@ void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const siz
}
if (msg.type() == typeid(Token))
{
- this->token_pool.push_back(msg.cast<Token>());
+ this->token_pool.insert(msg.cast<Token>());
return;
}
if (msg.type() == typeid(CheckTokensMessage))
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index d871f12..d9b52f0 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -20,26 +20,27 @@
using namespace gnuradio;
-void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface)
-{
- for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
- {
- while (not this->input_queues[i].empty())
- {
- this->input_queues[i].pop();
- }
- this->inputs_ready.set(i, false);
- }
-}
-
void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
{
- if (not this->active) return;
+ if (this->done) return; //can re-enter checking done first
+
+ //mark down the new state
this->active = false;
+ this->done = true;
+
+ //release upstream, downstream, and executor tokens
this->token_pool.clear();
- this->token.reset();
- this->free_inputs(task_iface);
+
+ //release allocator tokens, buffers can now call deleters
this->output_buffer_tokens.clear();
+
+ //release all buffers in queues
+ this->input_queues.clear();
+ this->output_queues.clear();
+
+ //tell the upstream and downstram to re-check their tokens
+ //this is how the other blocks know who is interested,
+ //and can decide based on interest to set done or not
for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
{
task_iface.post_upstream(i, CheckTokensMessage());
@@ -48,18 +49,12 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
{
task_iface.post_downstream(i, CheckTokensMessage());
}
- HERE();
- VAR(name);
+
+ std::cout << "This one: " << name << " is done..." << std::endl;
}
void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
- //FIXME in case we get called in the inactive state, assuming done?
- if (not this->active)
- {
- this->free_inputs(task_iface);
- }
-
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
//-- Handle task may get called for incoming buffers,
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 18c2fd3..4dda5ff 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -24,6 +24,7 @@
#include <gnuradio/element.hpp>
#include <gnuradio/block.hpp>
#include <gr_types.h>
+#include <set>
#include <vector>
#include <queue>
@@ -82,7 +83,7 @@ struct ElementImpl
//track the subscriber counts
std::vector<Token> input_tokens;
std::vector<Token> output_tokens;
- std::vector<Token> token_pool;
+ std::set<Token> token_pool;
std::vector<tsbe::BufferToken> output_buffer_tokens;
@@ -117,11 +118,11 @@ struct ElementImpl
void handle_allocation(const tsbe::TaskInterface &);
void handle_task(const tsbe::TaskInterface &);
void mark_done(const tsbe::TaskInterface &);
- void free_inputs(const tsbe::TaskInterface &);
void buffer_returner(const size_t index, tsbe::Buffer &buffer);
//is the fg running?
bool active;
+ bool done;
Token token;
size_t hint; //some kind of allocation hint
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 6f98449..36ff723 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -32,6 +32,9 @@ TopBlock::TopBlock(const std::string &name):
config.topology = (*this)->topology;
(*this)->executor = tsbe::Executor(config);
(*this)->token = Token::make();
+ std::cout << "===================================================" << std::endl;
+ std::cout << "== Top Block Created: " << name << std::endl;
+ std::cout << "===================================================" << std::endl;
}
void TopBlock::update(void)
@@ -73,5 +76,7 @@ void TopBlock::wait(void)
while (not (*this)->token.unique())
{
boost::this_thread::yield();
+ sleep(1);
+ VAR((*this)->token.use_count());
}
}