summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp3
-rw-r--r--lib/block_handlers.cpp47
-rw-r--r--lib/block_task.cpp52
-rw-r--r--lib/element_impl.hpp53
-rw-r--r--lib/hier_block.cpp6
-rw-r--r--lib/top_block.cpp18
6 files changed, 169 insertions, 10 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 324f9af..c7ff967 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -35,7 +35,8 @@ Block::Block(const std::string &name):
this->set_tag_propagation_policy(TPP_ALL_TO_ALL);
tsbe::BlockConfig config;
- config.port_callback = boost::bind(&ElementImpl::handle_port_msg, this->get(), _1, _2);
+ config.input_callback = boost::bind(&ElementImpl::handle_input_msg, this->get(), _1, _2, _3);
+ config.output_callback = boost::bind(&ElementImpl::handle_output_msg, this->get(), _1, _2, _3);
config.update_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1, _2);
config.task_callback = boost::bind(&ElementImpl::handle_task, this->get(), _1);
(*this)->block = tsbe::Block(config);
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 7825868..de0a8bf 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -18,13 +18,39 @@
using namespace gnuradio;
-void ElementImpl::handle_port_msg(const size_t index, const tsbe::Wax &msg)
+void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg)
{
if (msg.type() == typeid(Tag))
{
this->input_tags[index].push_back(msg.cast<Tag>());
this->input_tags_changed[index] = true;
}
+ if (msg.type() == typeid(Token))
+ {
+ this->token_pool.push_back(msg.cast<Token>());
+ }
+ if (msg.type() == typeid(CheckTokensMessage))
+ {
+ if (this->input_tokens[index].unique())
+ {
+ this->mark_done(handle);
+ }
+ }
+}
+
+void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg)
+{
+ if (msg.type() == typeid(Token))
+ {
+ this->token_pool.push_back(msg.cast<Token>());
+ }
+ if (msg.type() == typeid(CheckTokensMessage))
+ {
+ if (this->output_tokens[index].unique())
+ {
+ this->mark_done(handle);
+ }
+ }
}
template <typename V, typename T>
@@ -98,9 +124,27 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t
}
}
+ //allocate output tokens and send them downstream
+ if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
+ {
+ this->input_tokens.resize(num_inputs);
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ this->input_tokens[i] = make_token();
+ task_iface.post_upstream(i, this->input_tokens[i]);
+ }
+ this->output_tokens.resize(num_outputs);
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ this->output_tokens[i] = make_token();
+ task_iface.post_downstream(i, this->output_tokens[i]);
+ }
+ }
+
if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
{
this->active = true;
+ this->token = state.cast<TopBlockMessage>().token;
//causes initial processing kick-off for source blocks
this->handle_allocation(task_iface);
@@ -108,5 +152,6 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t
if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT)
{
this->active = false;
+ this->token = state.cast<TopBlockMessage>().token;
}
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index f156ae0..577fdb7 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -20,8 +20,44 @@
using namespace gnuradio;
+void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface)
+{
+ for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ {
+ while (task_iface.get_input_buffer(i))
+ {
+ task_iface.pop_input_buffer(i);
+ }
+ }
+}
+
+void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
+{
+ if (not this->active) return;
+ this->active = false;
+ this->token_pool.clear();
+ this->token.reset();
+ this->free_inputs(task_iface);
+ for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ {
+ task_iface.post_upstream(i, CheckTokensMessage());
+ }
+ for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+ {
+ task_iface.post_downstream(i, CheckTokensMessage());
+ }
+ HERE();
+ VAR(name);
+}
+
void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
+ //FIXME in case we get called in the inactive state, assuming done?
+ if (not this->active)
+ {
+ this->free_inputs(task_iface);
+ }
+
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
//-- Handle task may get called for incoming buffers,
@@ -53,8 +89,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
std::cout << "calling work on " << name << std::endl;
//reset work trackers for production/consumption
+ size_t input_tokens_count = 0;
for (size_t i = 0; i < num_inputs; i++)
{
+ input_tokens_count += this->input_tokens[i].use_count();
//this->consume_items[i] = 0;
ASSERT(this->input_history_items[i] == 0);
@@ -69,9 +107,12 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
}
+
size_t num_output_items = ~0; //so big that it must std::min
+ size_t output_tokens_count = 0;
for (size_t i = 0; i < num_outputs; i++)
{
+ output_tokens_count += this->output_tokens[i].use_count();
//this->produce_items[i] = 0;
const tsbe::Buffer &buff = task_iface.get_output_buffer(i);
@@ -85,13 +126,22 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
num_output_items = std::min(num_output_items, items);
}
+ //someone upstream or downstream holds no tokens, we are done!
+ if (
+ (num_inputs != 0 and input_tokens_count == num_inputs) or
+ (num_outputs != 0 and output_tokens_count == num_outputs)
+ ){
+ this->mark_done(task_iface);
+ return;
+ }
+
//start with source, this should be EZ
int ret = 0;
ret = block_ptr->Work(this->input_items, this->output_items);
VAR(ret);
if (ret == Block::WORK_DONE)
{
- this->active = false;
+ this->mark_done(task_iface);
return;
}
const size_t noutput_items = size_t(ret);
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index d54e5bb..b92aee2 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -40,16 +40,42 @@ static inline unsigned long long myullround(const double x)
return (unsigned long long)(x + 0.5);
}
+//! return true if an instance was found and removed
+template <typename V, typename T>
+bool remove_one(V &v, const T &t)
+{
+ for (size_t i = 0; i < v.size(); i++)
+ {
+ if (v[i] == t)
+ {
+ v.erase(v.begin() + i);
+ return true;
+ }
+ }
+ return false;
+}
+
+typedef boost::shared_ptr<int> Token;
+static inline Token make_token(void)
+{
+ return Token(new int(0));
+}
+
struct TopBlockMessage
{
enum
{
- UPDATE,
ACTIVE,
INERT,
HINT,
} what;
size_t hint;
+ Token token;
+};
+
+struct CheckTokensMessage
+{
+ //empty
};
namespace gnuradio
@@ -57,6 +83,18 @@ namespace gnuradio
struct ElementImpl
{
+ ElementImpl(void)
+ {
+ //NOP
+ }
+
+ ~ElementImpl(void)
+ {
+ children.clear();
+ }
+
+ std::vector<boost::shared_ptr<Element> > children;
+
//stuff for when its a block
std::string name;
long unique_id;
@@ -92,6 +130,11 @@ struct ElementImpl
//special buffer for dealing with history
std::vector<tsbe::Buffer> history_buffs;
+ //track the subscriber counts
+ std::vector<Token> input_tokens;
+ std::vector<Token> output_tokens;
+ std::vector<Token> token_pool;
+
//tag tracking
std::vector<bool> input_tags_changed;
std::vector<std::vector<Tag> > input_tags;
@@ -109,16 +152,20 @@ struct ElementImpl
}
//gets the handlers access for forecast and work
Block *block_ptr;
- size_t hint; //some kind of allocation hint
//handlers
- void handle_port_msg(const size_t, const tsbe::Wax &);
+ void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
+ void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
void topology_update(const tsbe::TaskInterface &, const tsbe::Wax &);
void handle_allocation(const tsbe::TaskInterface &);
void handle_task(const tsbe::TaskInterface &);
+ void mark_done(const tsbe::TaskInterface &);
+ void free_inputs(const tsbe::TaskInterface &);
//is the fg running?
bool active;
+ Token token;
+ size_t hint; //some kind of allocation hint
//rate settings
bool enable_fixed_rate;
diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp
index 3184595..edd1f35 100644
--- a/lib/hier_block.cpp
+++ b/lib/hier_block.cpp
@@ -34,11 +34,13 @@ HierBlock::HierBlock(const std::string &name):
void HierBlock::connect(const Element &elem)
{
(*this)->topology.add_topology(elem->topology);
+ (*this)->children.push_back(elem.weak_self.lock());
}
void HierBlock::disconnect(const Element &elem)
{
(*this)->topology.remove_topology(elem->topology);
+ remove_one((*this)->children, elem.weak_self.lock());
}
void HierBlock::connect(
@@ -53,6 +55,8 @@ void HierBlock::connect(
tsbe::Port(sink->get_elem(), sink_index)
);
(*this)->topology.connect(conn);
+ (*this)->children.push_back(src.weak_self.lock());
+ (*this)->children.push_back(sink.weak_self.lock());
}
void HierBlock::disconnect(
@@ -66,4 +70,6 @@ void HierBlock::disconnect(
tsbe::Port(sink->get_elem(), sink_index)
);
(*this)->topology.disconnect(conn);
+ remove_one((*this)->children, src.weak_self.lock());
+ remove_one((*this)->children, sink.weak_self.lock());
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 0ae4ce9..f9dde36 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -30,13 +30,12 @@ TopBlock::TopBlock(const std::string &name):
tsbe::ExecutorConfig config;
config.topology = (*this)->topology;
(*this)->executor = tsbe::Executor(config);
+ (*this)->token = make_token();
}
void TopBlock::update(void)
{
- TopBlockMessage event;
- event.what = TopBlockMessage::UPDATE;
- (*this)->executor.update(event);
+ this->start(); //ok to re-start, means update
}
void TopBlock::set_buffer_hint(const size_t hint)
@@ -51,6 +50,7 @@ void TopBlock::start(void)
{
TopBlockMessage event;
event.what = TopBlockMessage::ACTIVE;
+ event.token = (*this)->token;
(*this)->executor.update(event);
}
@@ -61,7 +61,17 @@ void TopBlock::stop(void)
(*this)->executor.update(event);
}
+void TopBlock::run(void)
+{
+ this->start();
+ this->wait();
+}
+
void TopBlock::wait(void)
{
- //NOP/TODO?
+ while (not (*this)->token.unique())
+ {
+ sleep(1);
+ VAR((*this)->token.use_count());
+ }
}