From f3194e33de2a6496ce9ea838681e62b64fa689f4 Mon Sep 17 00:00:00 2001
From: Josh Blum
Date: Sat, 29 Sep 2012 10:30:52 -0700
Subject: ported block task and tag handlers to apology

---
 TODO.txt                      |  3 +++
 include/gnuradio/element.hpp  |  3 +++
 lib/CMakeLists.txt            |  2 +-
 lib/block.cpp                 |  4 +---
 lib/block_task.cpp            | 49 ++++++++++++++++++++++---------------------
 lib/element.cpp               |  5 +++++
 lib/gras_impl/block_actor.hpp | 19 +++--------------
 lib/gras_impl/messages.hpp    |  1 +
 lib/tag_handlers.hpp          | 12 +++++------
 9 files changed, 48 insertions(+), 50 deletions(-)

diff --git a/TODO.txt b/TODO.txt
index 0a5fc06..1564b59 100644
--- a/TODO.txt
+++ b/TODO.txt
@@ -34,3 +34,6 @@ and not directly from block::<method>
 * create an input and output config struct (per port)
 ** idea is, we dont need so many API calls for a port
 ** set/get input/output config
+
+* want per-port token_pool for inputs and outputs
+** if ports get removed, this will release subscribers
diff --git a/include/gnuradio/element.hpp b/include/gnuradio/element.hpp
index 510a383..6c2790d 100644
--- a/include/gnuradio/element.hpp
+++ b/include/gnuradio/element.hpp
@@ -56,6 +56,9 @@ struct GRAS_API Element : boost::shared_ptr<ElementImpl>, boost::enable_shared_f
     //! Get the name of this element
     std::string name(void) const;
 
+    //! get a canonical name for this element
+    std::string to_string(void) const;
+
     void set_output_signature(const gnuradio::IOSignature &sig);
 
     void set_input_signature(const gnuradio::IOSignature &sig);
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 1a3e156..bc8749f 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -49,7 +49,7 @@ list(APPEND gnuradio_core_sources
     ${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp
-    #${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp
     ${CMAKE_CURRENT_SOURCE_DIR}/input_handlers.cpp
diff --git a/lib/block.cpp b/lib/block.cpp
index 2ca4e8b..21165f2 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -157,9 +157,7 @@ void Block::add_item_tag(
     const size_t which_output,
     const Tag &tag
 ){
-    InputTagMessage message;
-    message.tag = tag;
-    (*this)->block->post_downstream(which_output, message);
+    (*this)->block->post_downstream(which_output, InputTagMessage(tag));
 }
 
 void Block::add_item_tag(
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index dda7dcc..176b161 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -14,25 +14,26 @@
 // You should have received a copy of the GNU Lesser General Public License
 // along with io_sig program.  If not, see <http://www.gnu.org/licenses/>.
 
-#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
 #include "tag_handlers.hpp"
-#include <gras_impl/messages.hpp>
 
 #define REALLY_BIG size_t(1 << 30)
 
 using namespace gnuradio;
 
-void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
+void BlockActor::mark_done(void)
 {
     if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
 
     //flush partial output buffers to the downstream
-    for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+    for (size_t i = 0; i < this->get_num_outputs(); i++)
     {
         if (not this->output_queues.ready(i)) continue;
         SBuffer &buff = this->output_queues.front(i);
         if (buff.length == 0) continue;
-        task_iface.post_downstream(i, buff);
+        InputBufferMessage buff_msg;
+        buff_msg.buffer = buff;
+        this->post_downstream(i, buff_msg);
         this->output_queues.pop(i);
     }
 
@@ -54,23 +55,23 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
     //tell the upstream and downstram to re-check their tokens
     //this is how the other blocks know who is interested,
     //and can decide based on interest to set done or not
-    for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+    for (size_t i = 0; i < this->get_num_inputs(); i++)
     {
-        task_iface.post_upstream(i, CheckTokensMessage());
+        this->post_upstream(i, OutputCheckMessage());
     }
-    for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+    for (size_t i = 0; i < this->get_num_outputs(); i++)
     {
-        task_iface.post_downstream(i, CheckTokensMessage());
+        this->post_downstream(i, InputCheckMessage());
     }
 
     if (ARMAGEDDON) std::cerr
         << "==================================================\n"
-        << "== The " << name << " " << unique_id << " is done...\n"
+        << "== The " << block_ptr->to_string() << " is done...\n"
         << "==================================================\n"
         << std::flush;
 }
 
-void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
+void BlockActor::handle_task(void)
 {
     #ifdef WORK_DEBUG
     WorkDebugPrinter WDP(this->name);
@@ -87,8 +88,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
         this->output_queues.all_ready()
     )) return;
 
-    const size_t num_inputs = task_iface.get_num_inputs();
-    const size_t num_outputs = task_iface.get_num_outputs();
+    const size_t num_inputs = this->get_num_inputs();
+    const size_t num_outputs = this->get_num_outputs();
     //const bool is_source = (num_inputs == 0);
     //const bool is_sink = (num_outputs == 0);
     this->work_io_ptr_mask = 0; //reset
@@ -162,7 +163,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
     //if we have outputs and at least one port has no downstream subscibers, mark done
     if (outputs_done)
     {
-        this->mark_done(task_iface);
+        this->mark_done();
         return;
     }
 
@@ -186,7 +187,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
             if (num_output_items) goto forecast_again_you_jerk;
 
             this->forecast_fail = true;
-            this->conclusion(task_iface, inputs_done);
+            this->conclusion(inputs_done);
             return;
         }
     }
@@ -198,7 +199,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
     work_noutput_items = num_output_items;
     if (this->enable_fixed_rate) work_noutput_items = std::min(
         work_noutput_items, myulround((num_input_items)*this->relative_rate));
-    this->work_task_iface = task_iface;
     this->work_ret = -1;
     if (this->interruptible_thread)
     {
@@ -208,12 +208,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
     {
         this->task_work();
     }
-    this->work_task_iface.reset();
     const size_t noutput_items = size_t(work_ret);
 
     if (work_ret == Block::WORK_DONE)
     {
-        this->mark_done(task_iface);
+        this->mark_done();
         return;
     }
 
@@ -230,7 +229,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
         const size_t bytes = items*this->input_items_sizes[i];
         this->input_queues.consume(i, bytes);
 
-        this->trim_tags(task_iface, i);
+        this->trim_tags(i);
     }
 
     //------------------------------------------------------------------
@@ -249,7 +248,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
         //dont always pass output buffers downstream for the sake of efficiency
         if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length())
         {
-            task_iface.post_downstream(i, buff);
+            InputBufferMessage buff_msg;
+            buff_msg.buffer = buff;
+            this->post_downstream(i, buff_msg);
             this->output_queues.pop(i);
         }
     }
@@ -257,23 +258,23 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
     //------------------------------------------------------------------
     //-- Message self based on post-work conditions
     //------------------------------------------------------------------
-    this->conclusion(task_iface, inputs_done);
+    this->conclusion(inputs_done);
 }
 
-GRAS_FORCE_INLINE void ElementImpl::conclusion(const tsbe::TaskInterface &task_iface, const bool inputs_done)
+GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done)
 {
     //if there are inputs, and not all are provided for,
     //tell the block to check input queues and handle done
     if (inputs_done)
     {
-        this->block.post_msg(CheckTokensMessage());
+        this->Push(CheckTokensMessage(), Theron::Address());
         return;
     }
 
     //still have IO ready? kick off another task
     if (this->input_queues.all_ready() and this->output_queues.all_ready())
     {
-        this->block.post_msg(SelfKickMessage());
+        this->Push(SelfKickMessage(), Theron::Address());
         return;
     }
 }
diff --git a/lib/element.cpp b/lib/element.cpp
index b416261..a733d31 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -16,6 +16,7 @@
 
 #include "element_impl.hpp"
 #include <gnuradio/element.hpp>
+#include <boost/format.hpp>
 #include <boost/detail/atomic_count.hpp>
 
 static boost::detail::atomic_count unique_id_pool(0);
@@ -63,6 +64,10 @@ std::string Element::name(void) const
     return (*this)->name;
 }
 
+std::string Element::to_string(void) const
+{
+    return str(boost::format("%s(%d)") % this->name() % this->unique_id());
+}
 
 void Element::set_output_signature(const IOSignature &sig)
 {
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index abb3d6d..8932ded 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -108,6 +108,9 @@ struct BlockActor : Apology::Worker
     void buffer_returner(const size_t index, SBuffer &buffer);
     void mark_done(void);
     void handle_task(void);
+    void sort_tags(const size_t index);
+    void trim_tags(const size_t index);
+    void conclusion(const bool);
 
     //per port properties
     std::vector<size_t> input_items_sizes;
@@ -158,22 +161,6 @@ struct BlockActor : Apology::Worker
     SharedThreadGroup thread_group;
     boost::shared_ptr<InterruptibleThread> interruptible_thread;
 
-    //handlers
-    /*
-    void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
-    void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
-    void topology_update(const tsbe::TaskInterface &);
-    void handle_block_msg(const tsbe::TaskInterface &, const tsbe::Wax &);
-    void handle_allocation(const tsbe::TaskInterface &);
-    void handle_task(const tsbe::TaskInterface &);
-    void mark_done(const tsbe::TaskInterface &);
-    void conclusion(const tsbe::TaskInterface &task_iface, const bool);
-    void buffer_returner(const size_t index, SBuffer &buffer);
-    void input_update(const tsbe::TaskInterface &task_iface);
-    void sort_tags(const size_t index);
-    void trim_tags(const tsbe::TaskInterface &, const size_t index);
-    * */
-
     //work helpers
     int work_ret;
     inline void task_work(void)
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index f3e3a37..183befd 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -62,6 +62,7 @@ struct TopHintMessage
 
 struct InputTagMessage
 {
+    InputTagMessage(const Tag &tag):tag(tag){}
     size_t index;
     Tag tag;
 };
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index b49be96..0a89ef1 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -17,13 +17,13 @@
 #ifndef INCLUDED_LIBGRAS_TAG_HANDLERS_HPP
 #define INCLUDED_LIBGRAS_TAG_HANDLERS_HPP
 
-#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
 #include <algorithm>
 
 namespace gnuradio
 {
 
-GRAS_FORCE_INLINE void ElementImpl::sort_tags(const size_t i)
+GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i)
 {
     if (not this->input_tags_changed[i]) return;
     std::vector<Tag> &tags_i = this->input_tags[i];
@@ -31,9 +31,9 @@ GRAS_FORCE_INLINE void ElementImpl::sort_tags(const size_t i)
     this->input_tags_changed[i] = false;
 }
 
-GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_iface, const size_t i)
+GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
 {
-    const size_t num_outputs = task_iface.get_num_outputs();
+    const size_t num_outputs = this->get_num_outputs();
 
     //------------------------------------------------------------------
     //-- trim the input tags that are past the consumption zone
@@ -59,7 +59,7 @@ GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_if
             {
                 Tag t = tags_i[tag_i];
                 t.offset = myullround(t.offset * this->relative_rate);
-                task_iface.post_downstream(out_i, t);
+                this->post_downstream(out_i, InputTagMessage(t));
             }
         }
         break;
@@ -70,7 +70,7 @@ GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_if
             {
                 Tag t = tags_i[tag_i];
                 t.offset = myullround(t.offset * this->relative_rate);
-                task_iface.post_downstream(i, t);
+                this->post_downstream(i, InputTagMessage(t));
             }
         }
         break;
-- 
cgit