summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO.txt3
-rw-r--r--include/gnuradio/element.hpp3
-rw-r--r--lib/CMakeLists.txt2
-rw-r--r--lib/block.cpp4
-rw-r--r--lib/block_task.cpp49
-rw-r--r--lib/element.cpp5
-rw-r--r--lib/gras_impl/block_actor.hpp19
-rw-r--r--lib/gras_impl/messages.hpp1
-rw-r--r--lib/tag_handlers.hpp12
9 files changed, 48 insertions, 50 deletions
diff --git a/TODO.txt b/TODO.txt
index 0a5fc06..1564b59 100644
--- a/TODO.txt
+++ b/TODO.txt
@@ -34,3 +34,6 @@ and not directly from block::<method>
* create an input and output config struct (per port)
** idea is, we dont need so many API calls for a port
** set/get input/output config
+
+* want per-port token_pool for inputs and outputs
+** if ports get removed, this will release subscribers
diff --git a/include/gnuradio/element.hpp b/include/gnuradio/element.hpp
index 510a383..6c2790d 100644
--- a/include/gnuradio/element.hpp
+++ b/include/gnuradio/element.hpp
@@ -56,6 +56,9 @@ struct GRAS_API Element : boost::shared_ptr<ElementImpl>, boost::enable_shared_f
//! Get the name of this element
std::string name(void) const;
+ //! get a canonical name for this element
+ std::string to_string(void) const;
+
void set_output_signature(const gnuradio::IOSignature &sig);
void set_input_signature(const gnuradio::IOSignature &sig);
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 1a3e156..bc8749f 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -49,7 +49,7 @@ list(APPEND gnuradio_core_sources
${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp
- #${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp
${CMAKE_CURRENT_SOURCE_DIR}/input_handlers.cpp
diff --git a/lib/block.cpp b/lib/block.cpp
index 2ca4e8b..21165f2 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -157,9 +157,7 @@ void Block::add_item_tag(
const size_t which_output,
const Tag &tag
){
- InputTagMessage message;
- message.tag = tag;
- (*this)->block->post_downstream(which_output, message);
+ (*this)->block->post_downstream(which_output, InputTagMessage(tag));
}
void Block::add_item_tag(
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index dda7dcc..176b161 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -14,25 +14,26 @@
// You should have received a copy of the GNU Lesser General Public License
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
-#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
#include "tag_handlers.hpp"
-#include <gras_impl/messages.hpp>
#define REALLY_BIG size_t(1 << 30)
using namespace gnuradio;
-void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
+void BlockActor::mark_done(void)
{
if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
//flush partial output buffers to the downstream
- for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+ for (size_t i = 0; i < this->get_num_outputs(); i++)
{
if (not this->output_queues.ready(i)) continue;
SBuffer &buff = this->output_queues.front(i);
if (buff.length == 0) continue;
- task_iface.post_downstream(i, buff);
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buff;
+ this->post_downstream(i, buff_msg);
this->output_queues.pop(i);
}
@@ -54,23 +55,23 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
//tell the upstream and downstram to re-check their tokens
//this is how the other blocks know who is interested,
//and can decide based on interest to set done or not
- for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ for (size_t i = 0; i < this->get_num_inputs(); i++)
{
- task_iface.post_upstream(i, CheckTokensMessage());
+ this->post_upstream(i, OutputCheckMessage());
}
- for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+ for (size_t i = 0; i < this->get_num_outputs(); i++)
{
- task_iface.post_downstream(i, CheckTokensMessage());
+ this->post_downstream(i, InputCheckMessage());
}
if (ARMAGEDDON) std::cerr
<< "==================================================\n"
- << "== The " << name << " " << unique_id << " is done...\n"
+ << "== The " << block_ptr->to_string() << " is done...\n"
<< "==================================================\n"
<< std::flush;
}
-void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
+void BlockActor::handle_task(void)
{
#ifdef WORK_DEBUG
WorkDebugPrinter WDP(this->name);
@@ -87,8 +88,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->output_queues.all_ready()
)) return;
- const size_t num_inputs = task_iface.get_num_inputs();
- const size_t num_outputs = task_iface.get_num_outputs();
+ const size_t num_inputs = this->get_num_inputs();
+ const size_t num_outputs = this->get_num_outputs();
//const bool is_source = (num_inputs == 0);
//const bool is_sink = (num_outputs == 0);
this->work_io_ptr_mask = 0; //reset
@@ -162,7 +163,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//if we have outputs and at least one port has no downstream subscibers, mark done
if (outputs_done)
{
- this->mark_done(task_iface);
+ this->mark_done();
return;
}
@@ -186,7 +187,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
if (num_output_items) goto forecast_again_you_jerk;
this->forecast_fail = true;
- this->conclusion(task_iface, inputs_done);
+ this->conclusion(inputs_done);
return;
}
}
@@ -198,7 +199,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
work_noutput_items = num_output_items;
if (this->enable_fixed_rate) work_noutput_items = std::min(
work_noutput_items, myulround((num_input_items)*this->relative_rate));
- this->work_task_iface = task_iface;
this->work_ret = -1;
if (this->interruptible_thread)
{
@@ -208,12 +208,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
this->task_work();
}
- this->work_task_iface.reset();
const size_t noutput_items = size_t(work_ret);
if (work_ret == Block::WORK_DONE)
{
- this->mark_done(task_iface);
+ this->mark_done();
return;
}
@@ -230,7 +229,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
const size_t bytes = items*this->input_items_sizes[i];
this->input_queues.consume(i, bytes);
- this->trim_tags(task_iface, i);
+ this->trim_tags(i);
}
//------------------------------------------------------------------
@@ -249,7 +248,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//dont always pass output buffers downstream for the sake of efficiency
if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length())
{
- task_iface.post_downstream(i, buff);
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buff;
+ this->post_downstream(i, buff_msg);
this->output_queues.pop(i);
}
}
@@ -257,23 +258,23 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//------------------------------------------------------------------
//-- Message self based on post-work conditions
//------------------------------------------------------------------
- this->conclusion(task_iface, inputs_done);
+ this->conclusion(inputs_done);
}
-GRAS_FORCE_INLINE void ElementImpl::conclusion(const tsbe::TaskInterface &task_iface, const bool inputs_done)
+GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done)
{
//if there are inputs, and not all are provided for,
//tell the block to check input queues and handle done
if (inputs_done)
{
- this->block.post_msg(CheckTokensMessage());
+ this->Push(CheckTokensMessage(), Theron::Address());
return;
}
//still have IO ready? kick off another task
if (this->input_queues.all_ready() and this->output_queues.all_ready())
{
- this->block.post_msg(SelfKickMessage());
+ this->Push(SelfKickMessage(), Theron::Address());
return;
}
}
diff --git a/lib/element.cpp b/lib/element.cpp
index b416261..a733d31 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -16,6 +16,7 @@
#include "element_impl.hpp"
#include <gnuradio/element.hpp>
+#include <boost/format.hpp>
#include <boost/detail/atomic_count.hpp>
static boost::detail::atomic_count unique_id_pool(0);
@@ -63,6 +64,10 @@ std::string Element::name(void) const
return (*this)->name;
}
+std::string Element::to_string(void) const
+{
+ return str(boost::format("%s(%d)") % this->name() % this->unique_id());
+}
void Element::set_output_signature(const IOSignature &sig)
{
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index abb3d6d..8932ded 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -108,6 +108,9 @@ struct BlockActor : Apology::Worker
void buffer_returner(const size_t index, SBuffer &buffer);
void mark_done(void);
void handle_task(void);
+ void sort_tags(const size_t index);
+ void trim_tags(const size_t index);
+ void conclusion(const bool);
//per port properties
std::vector<size_t> input_items_sizes;
@@ -158,22 +161,6 @@ struct BlockActor : Apology::Worker
SharedThreadGroup thread_group;
boost::shared_ptr<InterruptibleThread> interruptible_thread;
- //handlers
- /*
- void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
- void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
- void topology_update(const tsbe::TaskInterface &);
- void handle_block_msg(const tsbe::TaskInterface &, const tsbe::Wax &);
- void handle_allocation(const tsbe::TaskInterface &);
- void handle_task(const tsbe::TaskInterface &);
- void mark_done(const tsbe::TaskInterface &);
- void conclusion(const tsbe::TaskInterface &task_iface, const bool);
- void buffer_returner(const size_t index, SBuffer &buffer);
- void input_update(const tsbe::TaskInterface &task_iface);
- void sort_tags(const size_t index);
- void trim_tags(const tsbe::TaskInterface &, const size_t index);
- * */
-
//work helpers
int work_ret;
inline void task_work(void)
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index f3e3a37..183befd 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -62,6 +62,7 @@ struct TopHintMessage
struct InputTagMessage
{
+ InputTagMessage(const Tag &tag):tag(tag){}
size_t index;
Tag tag;
};
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index b49be96..0a89ef1 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -17,13 +17,13 @@
#ifndef INCLUDED_LIBGRAS_TAG_HANDLERS_HPP
#define INCLUDED_LIBGRAS_TAG_HANDLERS_HPP
-#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
#include <algorithm>
namespace gnuradio
{
-GRAS_FORCE_INLINE void ElementImpl::sort_tags(const size_t i)
+GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i)
{
if (not this->input_tags_changed[i]) return;
std::vector<Tag> &tags_i = this->input_tags[i];
@@ -31,9 +31,9 @@ GRAS_FORCE_INLINE void ElementImpl::sort_tags(const size_t i)
this->input_tags_changed[i] = false;
}
-GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_iface, const size_t i)
+GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
{
- const size_t num_outputs = task_iface.get_num_outputs();
+ const size_t num_outputs = this->get_num_outputs();
//------------------------------------------------------------------
//-- trim the input tags that are past the consumption zone
@@ -59,7 +59,7 @@ GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_if
{
Tag t = tags_i[tag_i];
t.offset = myullround(t.offset * this->relative_rate);
- task_iface.post_downstream(out_i, t);
+ this->post_downstream(out_i, InputTagMessage(t));
}
}
break;
@@ -70,7 +70,7 @@ GRAS_FORCE_INLINE void ElementImpl::trim_tags(const tsbe::TaskInterface &task_if
{
Tag t = tags_i[tag_i];
t.offset = myullround(t.offset * this->relative_rate);
- task_iface.post_downstream(i, t);
+ this->post_downstream(i, InputTagMessage(t));
}
}
break;