diff options
author | Josh Blum | 2012-09-29 10:30:52 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-29 10:30:52 -0700 |
commit | f3194e33de2a6496ce9ea838681e62b64fa689f4 (patch) | |
tree | 37b80081a4978826b6bf56603ed0bd35520e4af4 | |
parent | 17e39ddbb0940d9d5e687713531e9a18d18e29f1 (diff) | |
download | sandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.tar.gz sandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.tar.bz2 sandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.zip |
ported block task and tag handlers to apology
-rw-r--r-- | TODO.txt | 3 | ||||
-rw-r--r-- | include/gnuradio/element.hpp | 3 | ||||
-rw-r--r-- | lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | lib/block.cpp | 4 | ||||
-rw-r--r-- | lib/block_task.cpp | 49 | ||||
-rw-r--r-- | lib/element.cpp | 5 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 19 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 1 | ||||
-rw-r--r-- | lib/tag_handlers.hpp | 12 |
9 files changed, 48 insertions, 50 deletions
@@ -34,3 +34,6 @@ and not directly from block::<method> * create an input and output config struct (per port) ** idea is, we dont need so many API calls for a port ** set/get input/output config + +* want per-port token_pool for inputs and outputs +** if ports get removed, this will release subscribers diff --git a/include/gnuradio/element.hpp b/include/gnuradio/element.hpp index 510a383..6c2790d 100644 --- a/include/gnuradio/element.hpp +++ b/include/gnuradio/element.hpp @@ -56,6 +56,9 @@ struct GRAS_API Element : boost::shared_ptr<ElementImpl>, boost::enable_shared_f //! Get the name of this element std::string name(void) const; + //! get a canonical name for this element + std::string to_string(void) const; + void set_output_signature(const gnuradio::IOSignature &sig); void set_input_signature(const gnuradio::IOSignature &sig); diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 1a3e156..bc8749f 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -49,7 +49,7 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp - #${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp ${CMAKE_CURRENT_SOURCE_DIR}/input_handlers.cpp diff --git a/lib/block.cpp b/lib/block.cpp index 2ca4e8b..21165f2 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -157,9 +157,7 @@ void Block::add_item_tag( const size_t which_output, const Tag &tag ){ - InputTagMessage message; - message.tag = tag; - (*this)->block->post_downstream(which_output, message); + (*this)->block->post_downstream(which_output, InputTagMessage(tag)); } void Block::add_item_tag( diff --git a/lib/block_task.cpp b/lib/block_task.cpp index dda7dcc..176b161 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -14,25 +14,26 @@ // You should have received a copy of the GNU Lesser General Public License // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. -#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> #include "tag_handlers.hpp" -#include <gras_impl/messages.hpp> #define REALLY_BIG size_t(1 << 30) using namespace gnuradio; -void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) +void BlockActor::mark_done(void) { if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first //flush partial output buffers to the downstream - for (size_t i = 0; i < task_iface.get_num_outputs(); i++) + for (size_t i = 0; i < this->get_num_outputs(); i++) { if (not this->output_queues.ready(i)) continue; SBuffer &buff = this->output_queues.front(i); if (buff.length == 0) continue; - task_iface.post_downstream(i, buff); + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); this->output_queues.pop(i); } @@ -54,23 +55,23 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) //tell the upstream and downstram to re-check their tokens //this is how the other blocks know who is interested, //and can decide based on interest to set done or not - for (size_t i = 0; i < task_iface.get_num_inputs(); i++) + for (size_t i = 0; i < this->get_num_inputs(); i++) { - task_iface.post_upstream(i, CheckTokensMessage()); + this->post_upstream(i, OutputCheckMessage()); } - for (size_t i = 0; i < task_iface.get_num_outputs(); i++) + for (size_t i = 0; i < this->get_num_outputs(); i++) { - task_iface.post_downstream(i, CheckTokensMessage()); + this->post_downstream(i, InputCheckMessage()); } if (ARMAGEDDON) std::cerr << "==================================================\n" - << "== The " << name << " " << unique_id << " is done...\n" + << "== The " << block_ptr->to_string() << " is done...\n" << "==================================================\n" << std::flush; } -void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) +void BlockActor::handle_task(void) { #ifdef WORK_DEBUG WorkDebugPrinter WDP(this->name); @@ -87,8 +88,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->output_queues.all_ready() )) return; - const size_t num_inputs = task_iface.get_num_inputs(); - const size_t num_outputs = task_iface.get_num_outputs(); + const size_t num_inputs = this->get_num_inputs(); + const size_t num_outputs = this->get_num_outputs(); //const bool is_source = (num_inputs == 0); //const bool is_sink = (num_outputs == 0); this->work_io_ptr_mask = 0; //reset @@ -162,7 +163,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //if we have outputs and at least one port has no downstream subscibers, mark done if (outputs_done) { - this->mark_done(task_iface); + this->mark_done(); return; } @@ -186,7 +187,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) if (num_output_items) goto forecast_again_you_jerk; this->forecast_fail = true; - this->conclusion(task_iface, inputs_done); + this->conclusion(inputs_done); return; } } @@ -198,7 +199,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) work_noutput_items = num_output_items; if (this->enable_fixed_rate) work_noutput_items = std::min( work_noutput_items, myulround((num_input_items)*this->relative_rate)); - this->work_task_iface = task_iface; this->work_ret = -1; if (this->interruptible_thread) { @@ -208,12 +208,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) { this->task_work(); } - this->work_task_iface.reset(); const size_t noutput_items = size_t(work_ret); if (work_ret == Block::WORK_DONE) { - this->mark_done(task_iface); + this->mark_done(); return; } @@ -230,7 +229,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) const size_t bytes = items*this->input_items_sizes[i]; this->input_queues.consume(i, bytes); - this->trim_tags(task_iface, i); + this->trim_tags(i); } //------------------------------------------------------------------ @@ -249,7 +248,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //dont always pass output buffers downstream for the sake of efficiency if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length()) { - task_iface.post_downstream(i, buff); + InputBufferMessage buff_msg; + buff_msg.buffer = buff; + this->post_downstream(i, buff_msg); this->output_queues.pop(i); } } @@ -257,23 +258,23 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ //-- Message self based on post-work conditions //------------------------------------------------------------------ - this->conclusion(task_iface, inputs_done); + this->conclusion(inputs_done); } -GRAS_FORCE_INLINE void ElementImpl::conclusion(const tsbe::TaskInterface &task_iface, const bool inputs_done) +GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done) { //if there are inputs, and not all are provided for, //tell the block to check input queues and handle done if (inputs_done) { - this->block.post_msg(CheckTokensMessage()); + this->Push(CheckTokensMessage(), Theron::Address()); return; } //still have IO ready? kick off another task if (this->input_queues.all_ready() and this->output_queues.all_ready()) { - this->block.post_msg(SelfKickMessage()); + this->Push(SelfKickMessage(), Theron::Address()); return; } } diff --git a/lib/element.cpp b/lib/element.cpp index b416261..a733d31 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -16,6 +16,7 @@ #include "element_impl.hpp" #include <gnuradio/element.hpp> +#include <boost/format.hpp> #include <boost/detail/atomic_count.hpp> static boost::detail::atomic_count unique_id_pool(0); @@ -63,6 +64,10 @@ std::string Element::name(void) const return (*this)->name; } +std::string Element::to_string(void) const +{ + return str(boost::format("%s(%d)") % this->name() % this->unique_id()); +} void Element::set_output_signature(const IOSignature &sig) { diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index abb3d6d..8932ded 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -108,6 +108,9 @@ struct BlockActor : Apology::Worker void buffer_returner(const size_t index, SBuffer &buffer); void mark_done(void); void handle_task(void); + void sort_tags(const size_t index); + void trim_tags(const size_t index); + void conclusion(const bool); //per port properties std::vector<size_t> input_items_sizes; @@ -158,22 +161,6 @@ struct BlockActor : Apology::Worker SharedThreadGroup thread_group; boost::shared_ptr<InterruptibleThread> interruptible_thread; - //handlers - /* - void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); - void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); - void topology_update(const tsbe::TaskInterface &); - void handle_block_msg(const tsbe::TaskInterface &, const tsbe::Wax &); - void handle_allocation(const tsbe::TaskInterface &); - void handle_task(const tsbe::TaskInterface &); - void mark_done(const tsbe::TaskInterface &); - void conclusion(const tsbe::TaskInterface &task_iface, const bool); - void buffer_returner(const size_t index, SBuffer &buffer); - void input_update(const tsbe::TaskInterface &task_iface); - void sort_tags(const size_t index); - void trim_tags(const tsbe::TaskInterface &, const size_t index); - * */ - //work helpers int work_ret; inline void task_work(void) diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index f3e3a37..183befd 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -62,6 +62,7 @@ struct TopHintMessage struct InputTagMessage { + InputTagMessage(const Tag &tag):tag(tag){} size_t index; Tag tag; }; diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index b49be96..0a89ef1 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -17,13 +17,13 @@ #ifndef INCLUDED_LIBGRAS_TAG_HANDLERS_HPP #define INCLUDED_LIBGRAS_TAG_HANDLERS_HPP -#include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> #include <algorithm> namespace gnuradio { -GRAS_FORCE_INLINE void ElementImpl::sort_tags(const size_t i) +GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i) { if (not this->input_tags_changed[i]) return; std::vector<Tag> &tags_i = this->input_tags[i]; @@ -31,9 +31,9 @@ GRAS_FORCE_INLINE void ElementImpl::sort_tags(const size_t i) this->input_tags_changed[i] = false; } -GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_iface, const size_t i) +GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) { - const size_t num_outputs = task_iface.get_num_outputs(); + const size_t num_outputs = this->get_num_outputs(); //------------------------------------------------------------------ //-- trim the input tags that are past the consumption zone @@ -59,7 +59,7 @@ GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_if { Tag t = tags_i[tag_i]; t.offset = myullround(t.offset * this->relative_rate); - task_iface.post_downstream(out_i, t); + this->post_downstream(out_i, InputTagMessage(t)); } } break; @@ -70,7 +70,7 @@ GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_if { Tag t = tags_i[tag_i]; t.offset = myullround(t.offset * this->relative_rate); - task_iface.post_downstream(i, t); + this->post_downstream(i, InputTagMessage(t)); } } break; |