summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-08-28 23:05:41 -0700
committerJosh Blum2012-08-28 23:05:41 -0700
commitac3857575c4c762f9a18ee18889740d4360a9aa8 (patch)
tree4526f5647f2e2d93c21d12ae3c524fb7991745b3
parent4044977deba6d64124763836d875b4da2b70eeaf (diff)
downloadsandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.tar.gz
sandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.tar.bz2
sandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.zip
token work w/ messages to implement finite runs
-rw-r--r--include/gnuradio/element.hpp7
-rw-r--r--include/gnuradio/runtime_api.h1
-rw-r--r--include/gnuradio/top_block.hpp7
-rw-r--r--lib/block.cpp3
-rw-r--r--lib/block_handlers.cpp47
-rw-r--r--lib/block_task.cpp52
-rw-r--r--lib/element_impl.hpp53
-rw-r--r--lib/hier_block.cpp6
-rw-r--r--lib/top_block.cpp18
9 files changed, 178 insertions, 16 deletions
diff --git a/include/gnuradio/element.hpp b/include/gnuradio/element.hpp
index 71dc359..ad2e1e8 100644
--- a/include/gnuradio/element.hpp
+++ b/include/gnuradio/element.hpp
@@ -30,6 +30,7 @@ struct GR_RUNTIME_API Element : boost::shared_ptr<ElementImpl>
//! Create an empty element
Element(void);
+ //! Creates a new element given the name
Element(const std::string &name);
/*!
@@ -40,10 +41,16 @@ struct GR_RUNTIME_API Element : boost::shared_ptr<ElementImpl>
Element(const boost::shared_ptr<T> &elem)
{
*this = *elem;
+ weak_self = elem;
}
+ //! for internal use only
+ boost::weak_ptr<Element> weak_self;
+
+ //! An integer ID that is unique across the process
long unique_id(void) const;
+ //! Get the name of this element
std::string name(void) const;
void set_output_signature(const gnuradio::IOSignature &sig);
diff --git a/include/gnuradio/runtime_api.h b/include/gnuradio/runtime_api.h
index 1f0cc44..bc359cc 100644
--- a/include/gnuradio/runtime_api.h
+++ b/include/gnuradio/runtime_api.h
@@ -27,6 +27,7 @@
#define GR_RUNTIME_API GR_CORE_API
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
namespace gnuradio
{
diff --git a/include/gnuradio/top_block.hpp b/include/gnuradio/top_block.hpp
index 2981b04..09da0a5 100644
--- a/include/gnuradio/top_block.hpp
+++ b/include/gnuradio/top_block.hpp
@@ -60,12 +60,7 @@ struct GR_RUNTIME_API TopBlock : HierBlock
* Run is for finite flow graph executions.
* Mostly for testing purposes only.
*/
- void run(void)
- {
- this->start();
- this->stop();
- this->wait();
- }
+ void run(void);
//! Start a flow graph execution (does not block)
void start(void);
diff --git a/lib/block.cpp b/lib/block.cpp
index 324f9af..c7ff967 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -35,7 +35,8 @@ Block::Block(const std::string &name):
this->set_tag_propagation_policy(TPP_ALL_TO_ALL);
tsbe::BlockConfig config;
- config.port_callback = boost::bind(&ElementImpl::handle_port_msg, this->get(), _1, _2);
+ config.input_callback = boost::bind(&ElementImpl::handle_input_msg, this->get(), _1, _2, _3);
+ config.output_callback = boost::bind(&ElementImpl::handle_output_msg, this->get(), _1, _2, _3);
config.update_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1, _2);
config.task_callback = boost::bind(&ElementImpl::handle_task, this->get(), _1);
(*this)->block = tsbe::Block(config);
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 7825868..de0a8bf 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -18,13 +18,39 @@
using namespace gnuradio;
-void ElementImpl::handle_port_msg(const size_t index, const tsbe::Wax &msg)
+void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg)
{
if (msg.type() == typeid(Tag))
{
this->input_tags[index].push_back(msg.cast<Tag>());
this->input_tags_changed[index] = true;
}
+ if (msg.type() == typeid(Token))
+ {
+ this->token_pool.push_back(msg.cast<Token>());
+ }
+ if (msg.type() == typeid(CheckTokensMessage))
+ {
+ if (this->input_tokens[index].unique())
+ {
+ this->mark_done(handle);
+ }
+ }
+}
+
+void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg)
+{
+ if (msg.type() == typeid(Token))
+ {
+ this->token_pool.push_back(msg.cast<Token>());
+ }
+ if (msg.type() == typeid(CheckTokensMessage))
+ {
+ if (this->output_tokens[index].unique())
+ {
+ this->mark_done(handle);
+ }
+ }
}
template <typename V, typename T>
@@ -98,9 +124,27 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t
}
}
+ //allocate output tokens and send them downstream
+ if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
+ {
+ this->input_tokens.resize(num_inputs);
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ this->input_tokens[i] = make_token();
+ task_iface.post_upstream(i, this->input_tokens[i]);
+ }
+ this->output_tokens.resize(num_outputs);
+ for (size_t i = 0; i < num_outputs; i++)
+ {
+ this->output_tokens[i] = make_token();
+ task_iface.post_downstream(i, this->output_tokens[i]);
+ }
+ }
+
if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
{
this->active = true;
+ this->token = state.cast<TopBlockMessage>().token;
//causes initial processing kick-off for source blocks
this->handle_allocation(task_iface);
@@ -108,5 +152,6 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t
if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT)
{
this->active = false;
+ this->token = state.cast<TopBlockMessage>().token;
}
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index f156ae0..577fdb7 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -20,8 +20,44 @@
using namespace gnuradio;
+void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface)
+{
+ for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ {
+ while (task_iface.get_input_buffer(i))
+ {
+ task_iface.pop_input_buffer(i);
+ }
+ }
+}
+
+void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
+{
+ if (not this->active) return;
+ this->active = false;
+ this->token_pool.clear();
+ this->token.reset();
+ this->free_inputs(task_iface);
+ for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ {
+ task_iface.post_upstream(i, CheckTokensMessage());
+ }
+ for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+ {
+ task_iface.post_downstream(i, CheckTokensMessage());
+ }
+ HERE();
+ VAR(name);
+}
+
void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
+ //FIXME in case we get called in the inactive state, assuming done?
+ if (not this->active)
+ {
+ this->free_inputs(task_iface);
+ }
+
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
//-- Handle task may get called for incoming buffers,
@@ -53,8 +89,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
std::cout << "calling work on " << name << std::endl;
//reset work trackers for production/consumption
+ size_t input_tokens_count = 0;
for (size_t i = 0; i < num_inputs; i++)
{
+ input_tokens_count += this->input_tokens[i].use_count();
//this->consume_items[i] = 0;
ASSERT(this->input_history_items[i] == 0);
@@ -69,9 +107,12 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
}
+
size_t num_output_items = ~0; //so big that it must std::min
+ size_t output_tokens_count = 0;
for (size_t i = 0; i < num_outputs; i++)
{
+ output_tokens_count += this->output_tokens[i].use_count();
//this->produce_items[i] = 0;
const tsbe::Buffer &buff = task_iface.get_output_buffer(i);
@@ -85,13 +126,22 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
num_output_items = std::min(num_output_items, items);
}
+ //someone upstream or downstream holds no tokens, we are done!
+ if (
+ (num_inputs != 0 and input_tokens_count == num_inputs) or
+ (num_outputs != 0 and output_tokens_count == num_outputs)
+ ){
+ this->mark_done(task_iface);
+ return;
+ }
+
//start with source, this should be EZ
int ret = 0;
ret = block_ptr->Work(this->input_items, this->output_items);
VAR(ret);
if (ret == Block::WORK_DONE)
{
- this->active = false;
+ this->mark_done(task_iface);
return;
}
const size_t noutput_items = size_t(ret);
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index d54e5bb..b92aee2 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -40,16 +40,42 @@ static inline unsigned long long myullround(const double x)
return (unsigned long long)(x + 0.5);
}
+//! return true if an instance was found and removed
+template <typename V, typename T>
+bool remove_one(V &v, const T &t)
+{
+ for (size_t i = 0; i < v.size(); i++)
+ {
+ if (v[i] == t)
+ {
+ v.erase(v.begin() + i);
+ return true;
+ }
+ }
+ return false;
+}
+
+typedef boost::shared_ptr<int> Token;
+static inline Token make_token(void)
+{
+ return Token(new int(0));
+}
+
struct TopBlockMessage
{
enum
{
- UPDATE,
ACTIVE,
INERT,
HINT,
} what;
size_t hint;
+ Token token;
+};
+
+struct CheckTokensMessage
+{
+ //empty
};
namespace gnuradio
@@ -57,6 +83,18 @@ namespace gnuradio
struct ElementImpl
{
+ ElementImpl(void)
+ {
+ //NOP
+ }
+
+ ~ElementImpl(void)
+ {
+ children.clear();
+ }
+
+ std::vector<boost::shared_ptr<Element> > children;
+
//stuff for when its a block
std::string name;
long unique_id;
@@ -92,6 +130,11 @@ struct ElementImpl
//special buffer for dealing with history
std::vector<tsbe::Buffer> history_buffs;
+ //track the subscriber counts
+ std::vector<Token> input_tokens;
+ std::vector<Token> output_tokens;
+ std::vector<Token> token_pool;
+
//tag tracking
std::vector<bool> input_tags_changed;
std::vector<std::vector<Tag> > input_tags;
@@ -109,16 +152,20 @@ struct ElementImpl
}
//gets the handlers access for forecast and work
Block *block_ptr;
- size_t hint; //some kind of allocation hint
//handlers
- void handle_port_msg(const size_t, const tsbe::Wax &);
+ void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
+ void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
void topology_update(const tsbe::TaskInterface &, const tsbe::Wax &);
void handle_allocation(const tsbe::TaskInterface &);
void handle_task(const tsbe::TaskInterface &);
+ void mark_done(const tsbe::TaskInterface &);
+ void free_inputs(const tsbe::TaskInterface &);
//is the fg running?
bool active;
+ Token token;
+ size_t hint; //some kind of allocation hint
//rate settings
bool enable_fixed_rate;
diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp
index 3184595..edd1f35 100644
--- a/lib/hier_block.cpp
+++ b/lib/hier_block.cpp
@@ -34,11 +34,13 @@ HierBlock::HierBlock(const std::string &name):
void HierBlock::connect(const Element &elem)
{
(*this)->topology.add_topology(elem->topology);
+ (*this)->children.push_back(elem.weak_self.lock());
}
void HierBlock::disconnect(const Element &elem)
{
(*this)->topology.remove_topology(elem->topology);
+ remove_one((*this)->children, elem.weak_self.lock());
}
void HierBlock::connect(
@@ -53,6 +55,8 @@ void HierBlock::connect(
tsbe::Port(sink->get_elem(), sink_index)
);
(*this)->topology.connect(conn);
+ (*this)->children.push_back(src.weak_self.lock());
+ (*this)->children.push_back(sink.weak_self.lock());
}
void HierBlock::disconnect(
@@ -66,4 +70,6 @@ void HierBlock::disconnect(
tsbe::Port(sink->get_elem(), sink_index)
);
(*this)->topology.disconnect(conn);
+ remove_one((*this)->children, src.weak_self.lock());
+ remove_one((*this)->children, sink.weak_self.lock());
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 0ae4ce9..f9dde36 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -30,13 +30,12 @@ TopBlock::TopBlock(const std::string &name):
tsbe::ExecutorConfig config;
config.topology = (*this)->topology;
(*this)->executor = tsbe::Executor(config);
+ (*this)->token = make_token();
}
void TopBlock::update(void)
{
- TopBlockMessage event;
- event.what = TopBlockMessage::UPDATE;
- (*this)->executor.update(event);
+ this->start(); //ok to re-start, means update
}
void TopBlock::set_buffer_hint(const size_t hint)
@@ -51,6 +50,7 @@ void TopBlock::start(void)
{
TopBlockMessage event;
event.what = TopBlockMessage::ACTIVE;
+ event.token = (*this)->token;
(*this)->executor.update(event);
}
@@ -61,7 +61,17 @@ void TopBlock::stop(void)
(*this)->executor.update(event);
}
+void TopBlock::run(void)
+{
+ this->start();
+ this->wait();
+}
+
void TopBlock::wait(void)
{
- //NOP/TODO?
+ while (not (*this)->token.unique())
+ {
+ sleep(1);
+ VAR((*this)->token.use_count());
+ }
}