summaryrefslogtreecommitdiff
path: root/lib/block_task.cpp
diff options
context:
space:
mode:
authorJosh Blum2012-09-29 10:30:52 -0700
committerJosh Blum2012-09-29 10:30:52 -0700
commitf3194e33de2a6496ce9ea838681e62b64fa689f4 (patch)
tree37b80081a4978826b6bf56603ed0bd35520e4af4 /lib/block_task.cpp
parent17e39ddbb0940d9d5e687713531e9a18d18e29f1 (diff)
downloadsandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.tar.gz
sandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.tar.bz2
sandhi-f3194e33de2a6496ce9ea838681e62b64fa689f4.zip
ported block task and tag handlers to apology
Diffstat (limited to 'lib/block_task.cpp')
-rw-r--r--lib/block_task.cpp49
1 files changed, 25 insertions, 24 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index dda7dcc..176b161 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -14,25 +14,26 @@
// You should have received a copy of the GNU Lesser General Public License
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
-#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
#include "tag_handlers.hpp"
-#include <gras_impl/messages.hpp>
#define REALLY_BIG size_t(1 << 30)
using namespace gnuradio;
-void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
+void BlockActor::mark_done(void)
{
if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
//flush partial output buffers to the downstream
- for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+ for (size_t i = 0; i < this->get_num_outputs(); i++)
{
if (not this->output_queues.ready(i)) continue;
SBuffer &buff = this->output_queues.front(i);
if (buff.length == 0) continue;
- task_iface.post_downstream(i, buff);
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buff;
+ this->post_downstream(i, buff_msg);
this->output_queues.pop(i);
}
@@ -54,23 +55,23 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
//tell the upstream and downstram to re-check their tokens
//this is how the other blocks know who is interested,
//and can decide based on interest to set done or not
- for (size_t i = 0; i < task_iface.get_num_inputs(); i++)
+ for (size_t i = 0; i < this->get_num_inputs(); i++)
{
- task_iface.post_upstream(i, CheckTokensMessage());
+ this->post_upstream(i, OutputCheckMessage());
}
- for (size_t i = 0; i < task_iface.get_num_outputs(); i++)
+ for (size_t i = 0; i < this->get_num_outputs(); i++)
{
- task_iface.post_downstream(i, CheckTokensMessage());
+ this->post_downstream(i, InputCheckMessage());
}
if (ARMAGEDDON) std::cerr
<< "==================================================\n"
- << "== The " << name << " " << unique_id << " is done...\n"
+ << "== The " << block_ptr->to_string() << " is done...\n"
<< "==================================================\n"
<< std::flush;
}
-void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
+void BlockActor::handle_task(void)
{
#ifdef WORK_DEBUG
WorkDebugPrinter WDP(this->name);
@@ -87,8 +88,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->output_queues.all_ready()
)) return;
- const size_t num_inputs = task_iface.get_num_inputs();
- const size_t num_outputs = task_iface.get_num_outputs();
+ const size_t num_inputs = this->get_num_inputs();
+ const size_t num_outputs = this->get_num_outputs();
//const bool is_source = (num_inputs == 0);
//const bool is_sink = (num_outputs == 0);
this->work_io_ptr_mask = 0; //reset
@@ -162,7 +163,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//if we have outputs and at least one port has no downstream subscibers, mark done
if (outputs_done)
{
- this->mark_done(task_iface);
+ this->mark_done();
return;
}
@@ -186,7 +187,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
if (num_output_items) goto forecast_again_you_jerk;
this->forecast_fail = true;
- this->conclusion(task_iface, inputs_done);
+ this->conclusion(inputs_done);
return;
}
}
@@ -198,7 +199,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
work_noutput_items = num_output_items;
if (this->enable_fixed_rate) work_noutput_items = std::min(
work_noutput_items, myulround((num_input_items)*this->relative_rate));
- this->work_task_iface = task_iface;
this->work_ret = -1;
if (this->interruptible_thread)
{
@@ -208,12 +208,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
this->task_work();
}
- this->work_task_iface.reset();
const size_t noutput_items = size_t(work_ret);
if (work_ret == Block::WORK_DONE)
{
- this->mark_done(task_iface);
+ this->mark_done();
return;
}
@@ -230,7 +229,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
const size_t bytes = items*this->input_items_sizes[i];
this->input_queues.consume(i, bytes);
- this->trim_tags(task_iface, i);
+ this->trim_tags(i);
}
//------------------------------------------------------------------
@@ -249,7 +248,9 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//dont always pass output buffers downstream for the sake of efficiency
if (not this->input_queues.all_ready() or buff.length*2 > buff.get_actual_length())
{
- task_iface.post_downstream(i, buff);
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buff;
+ this->post_downstream(i, buff_msg);
this->output_queues.pop(i);
}
}
@@ -257,23 +258,23 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//------------------------------------------------------------------
//-- Message self based on post-work conditions
//------------------------------------------------------------------
- this->conclusion(task_iface, inputs_done);
+ this->conclusion(inputs_done);
}
-GRAS_FORCE_INLINE void ElementImpl::conclusion(const tsbe::TaskInterface &task_iface, const bool inputs_done)
+GRAS_FORCE_INLINE void BlockActor::conclusion(const bool inputs_done)
{
//if there are inputs, and not all are provided for,
//tell the block to check input queues and handle done
if (inputs_done)
{
- this->block.post_msg(CheckTokensMessage());
+ this->Push(CheckTokensMessage(), Theron::Address());
return;
}
//still have IO ready? kick off another task
if (this->input_queues.all_ready() and this->output_queues.all_ready())
{
- this->block.post_msg(SelfKickMessage());
+ this->Push(SelfKickMessage(), Theron::Address());
return;
}
}