diff options
author | Josh Blum | 2012-08-28 23:05:41 -0700 |
---|---|---|
committer | Josh Blum | 2012-08-28 23:05:41 -0700 |
commit | ac3857575c4c762f9a18ee18889740d4360a9aa8 (patch) | |
tree | 4526f5647f2e2d93c21d12ae3c524fb7991745b3 | |
parent | 4044977deba6d64124763836d875b4da2b70eeaf (diff) | |
download | sandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.tar.gz sandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.tar.bz2 sandhi-ac3857575c4c762f9a18ee18889740d4360a9aa8.zip |
token work w/ messages to implement finite runs
-rw-r--r-- | include/gnuradio/element.hpp | 7 | ||||
-rw-r--r-- | include/gnuradio/runtime_api.h | 1 | ||||
-rw-r--r-- | include/gnuradio/top_block.hpp | 7 | ||||
-rw-r--r-- | lib/block.cpp | 3 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 47 | ||||
-rw-r--r-- | lib/block_task.cpp | 52 | ||||
-rw-r--r-- | lib/element_impl.hpp | 53 | ||||
-rw-r--r-- | lib/hier_block.cpp | 6 | ||||
-rw-r--r-- | lib/top_block.cpp | 18 |
9 files changed, 178 insertions, 16 deletions
diff --git a/include/gnuradio/element.hpp b/include/gnuradio/element.hpp index 71dc359..ad2e1e8 100644 --- a/include/gnuradio/element.hpp +++ b/include/gnuradio/element.hpp @@ -30,6 +30,7 @@ struct GR_RUNTIME_API Element : boost::shared_ptr<ElementImpl> //! Create an empty element Element(void); + //! Creates a new element given the name Element(const std::string &name); /*! @@ -40,10 +41,16 @@ struct GR_RUNTIME_API Element : boost::shared_ptr<ElementImpl> Element(const boost::shared_ptr<T> &elem) { *this = *elem; + weak_self = elem; } + //! for internal use only + boost::weak_ptr<Element> weak_self; + + //! An integer ID that is unique across the process long unique_id(void) const; + //! Get the name of this element std::string name(void) const; void set_output_signature(const gnuradio::IOSignature &sig); diff --git a/include/gnuradio/runtime_api.h b/include/gnuradio/runtime_api.h index 1f0cc44..bc359cc 100644 --- a/include/gnuradio/runtime_api.h +++ b/include/gnuradio/runtime_api.h @@ -27,6 +27,7 @@ #define GR_RUNTIME_API GR_CORE_API #include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> namespace gnuradio { diff --git a/include/gnuradio/top_block.hpp b/include/gnuradio/top_block.hpp index 2981b04..09da0a5 100644 --- a/include/gnuradio/top_block.hpp +++ b/include/gnuradio/top_block.hpp @@ -60,12 +60,7 @@ struct GR_RUNTIME_API TopBlock : HierBlock * Run is for finite flow graph executions. * Mostly for testing purposes only. */ - void run(void) - { - this->start(); - this->stop(); - this->wait(); - } + void run(void); //! Start a flow graph execution (does not block) void start(void); diff --git a/lib/block.cpp b/lib/block.cpp index 324f9af..c7ff967 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -35,7 +35,8 @@ Block::Block(const std::string &name): this->set_tag_propagation_policy(TPP_ALL_TO_ALL); tsbe::BlockConfig config; - config.port_callback = boost::bind(&ElementImpl::handle_port_msg, this->get(), _1, _2); + config.input_callback = boost::bind(&ElementImpl::handle_input_msg, this->get(), _1, _2, _3); + config.output_callback = boost::bind(&ElementImpl::handle_output_msg, this->get(), _1, _2, _3); config.update_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1, _2); config.task_callback = boost::bind(&ElementImpl::handle_task, this->get(), _1); (*this)->block = tsbe::Block(config); diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 7825868..de0a8bf 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -18,13 +18,39 @@ using namespace gnuradio; -void ElementImpl::handle_port_msg(const size_t index, const tsbe::Wax &msg) +void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) { if (msg.type() == typeid(Tag)) { this->input_tags[index].push_back(msg.cast<Tag>()); this->input_tags_changed[index] = true; } + if (msg.type() == typeid(Token)) + { + this->token_pool.push_back(msg.cast<Token>()); + } + if (msg.type() == typeid(CheckTokensMessage)) + { + if (this->input_tokens[index].unique()) + { + this->mark_done(handle); + } + } +} + +void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) +{ + if (msg.type() == typeid(Token)) + { + this->token_pool.push_back(msg.cast<Token>()); + } + if (msg.type() == typeid(CheckTokensMessage)) + { + if (this->output_tokens[index].unique()) + { + this->mark_done(handle); + } + } } template <typename V, typename T> @@ -98,9 +124,27 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t } } + //allocate output tokens and send them downstream + if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) + { + this->input_tokens.resize(num_inputs); + for (size_t i = 0; i < num_inputs; i++) + { + this->input_tokens[i] = make_token(); + task_iface.post_upstream(i, this->input_tokens[i]); + } + this->output_tokens.resize(num_outputs); + for (size_t i = 0; i < num_outputs; i++) + { + this->output_tokens[i] = make_token(); + task_iface.post_downstream(i, this->output_tokens[i]); + } + } + if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) { this->active = true; + this->token = state.cast<TopBlockMessage>().token; //causes initial processing kick-off for source blocks this->handle_allocation(task_iface); @@ -108,5 +152,6 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT) { this->active = false; + this->token = state.cast<TopBlockMessage>().token; } } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index f156ae0..577fdb7 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -20,8 +20,44 @@ using namespace gnuradio; +void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface) +{ + for (size_t i = 0; i < task_iface.get_num_inputs(); i++) + { + while (task_iface.get_input_buffer(i)) + { + task_iface.pop_input_buffer(i); + } + } +} + +void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) +{ + if (not this->active) return; + this->active = false; + this->token_pool.clear(); + this->token.reset(); + this->free_inputs(task_iface); + for (size_t i = 0; i < task_iface.get_num_inputs(); i++) + { + task_iface.post_upstream(i, CheckTokensMessage()); + } + for (size_t i = 0; i < task_iface.get_num_outputs(); i++) + { + task_iface.post_downstream(i, CheckTokensMessage()); + } + HERE(); + VAR(name); +} + void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) { + //FIXME in case we get called in the inactive state, assuming done? + if (not this->active) + { + this->free_inputs(task_iface); + } + //------------------------------------------------------------------ //-- Decide if its possible to continue any processing: //-- Handle task may get called for incoming buffers, @@ -53,8 +89,10 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) std::cout << "calling work on " << name << std::endl; //reset work trackers for production/consumption + size_t input_tokens_count = 0; for (size_t i = 0; i < num_inputs; i++) { + input_tokens_count += this->input_tokens[i].use_count(); //this->consume_items[i] = 0; ASSERT(this->input_history_items[i] == 0); @@ -69,9 +107,12 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->work_input_items[i] = mem; this->work_ninput_items[i] = items; } + size_t num_output_items = ~0; //so big that it must std::min + size_t output_tokens_count = 0; for (size_t i = 0; i < num_outputs; i++) { + output_tokens_count += this->output_tokens[i].use_count(); //this->produce_items[i] = 0; const tsbe::Buffer &buff = task_iface.get_output_buffer(i); @@ -85,13 +126,22 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) num_output_items = std::min(num_output_items, items); } + //someone upstream or downstream holds no tokens, we are done! + if ( + (num_inputs != 0 and input_tokens_count == num_inputs) or + (num_outputs != 0 and output_tokens_count == num_outputs) + ){ + this->mark_done(task_iface); + return; + } + //start with source, this should be EZ int ret = 0; ret = block_ptr->Work(this->input_items, this->output_items); VAR(ret); if (ret == Block::WORK_DONE) { - this->active = false; + this->mark_done(task_iface); return; } const size_t noutput_items = size_t(ret); diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index d54e5bb..b92aee2 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -40,16 +40,42 @@ static inline unsigned long long myullround(const double x) return (unsigned long long)(x + 0.5); } +//! return true if an instance was found and removed +template <typename V, typename T> +bool remove_one(V &v, const T &t) +{ + for (size_t i = 0; i < v.size(); i++) + { + if (v[i] == t) + { + v.erase(v.begin() + i); + return true; + } + } + return false; +} + +typedef boost::shared_ptr<int> Token; +static inline Token make_token(void) +{ + return Token(new int(0)); +} + struct TopBlockMessage { enum { - UPDATE, ACTIVE, INERT, HINT, } what; size_t hint; + Token token; +}; + +struct CheckTokensMessage +{ + //empty }; namespace gnuradio @@ -57,6 +83,18 @@ namespace gnuradio struct ElementImpl { + ElementImpl(void) + { + //NOP + } + + ~ElementImpl(void) + { + children.clear(); + } + + std::vector<boost::shared_ptr<Element> > children; + //stuff for when its a block std::string name; long unique_id; @@ -92,6 +130,11 @@ struct ElementImpl //special buffer for dealing with history std::vector<tsbe::Buffer> history_buffs; + //track the subscriber counts + std::vector<Token> input_tokens; + std::vector<Token> output_tokens; + std::vector<Token> token_pool; + //tag tracking std::vector<bool> input_tags_changed; std::vector<std::vector<Tag> > input_tags; @@ -109,16 +152,20 @@ struct ElementImpl } //gets the handlers access for forecast and work Block *block_ptr; - size_t hint; //some kind of allocation hint //handlers - void handle_port_msg(const size_t, const tsbe::Wax &); + void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); + void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); void topology_update(const tsbe::TaskInterface &, const tsbe::Wax &); void handle_allocation(const tsbe::TaskInterface &); void handle_task(const tsbe::TaskInterface &); + void mark_done(const tsbe::TaskInterface &); + void free_inputs(const tsbe::TaskInterface &); //is the fg running? bool active; + Token token; + size_t hint; //some kind of allocation hint //rate settings bool enable_fixed_rate; diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp index 3184595..edd1f35 100644 --- a/lib/hier_block.cpp +++ b/lib/hier_block.cpp @@ -34,11 +34,13 @@ HierBlock::HierBlock(const std::string &name): void HierBlock::connect(const Element &elem) { (*this)->topology.add_topology(elem->topology); + (*this)->children.push_back(elem.weak_self.lock()); } void HierBlock::disconnect(const Element &elem) { (*this)->topology.remove_topology(elem->topology); + remove_one((*this)->children, elem.weak_self.lock()); } void HierBlock::connect( @@ -53,6 +55,8 @@ void HierBlock::connect( tsbe::Port(sink->get_elem(), sink_index) ); (*this)->topology.connect(conn); + (*this)->children.push_back(src.weak_self.lock()); + (*this)->children.push_back(sink.weak_self.lock()); } void HierBlock::disconnect( @@ -66,4 +70,6 @@ void HierBlock::disconnect( tsbe::Port(sink->get_elem(), sink_index) ); (*this)->topology.disconnect(conn); + remove_one((*this)->children, src.weak_self.lock()); + remove_one((*this)->children, sink.weak_self.lock()); } diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 0ae4ce9..f9dde36 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -30,13 +30,12 @@ TopBlock::TopBlock(const std::string &name): tsbe::ExecutorConfig config; config.topology = (*this)->topology; (*this)->executor = tsbe::Executor(config); + (*this)->token = make_token(); } void TopBlock::update(void) { - TopBlockMessage event; - event.what = TopBlockMessage::UPDATE; - (*this)->executor.update(event); + this->start(); //ok to re-start, means update } void TopBlock::set_buffer_hint(const size_t hint) @@ -51,6 +50,7 @@ void TopBlock::start(void) { TopBlockMessage event; event.what = TopBlockMessage::ACTIVE; + event.token = (*this)->token; (*this)->executor.update(event); } @@ -61,7 +61,17 @@ void TopBlock::stop(void) (*this)->executor.update(event); } +void TopBlock::run(void) +{ + this->start(); + this->wait(); +} + void TopBlock::wait(void) { - //NOP/TODO? + while (not (*this)->token.unique()) + { + sleep(1); + VAR((*this)->token.use_count()); + } } |