summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/CMakeLists.txt10
-rw-r--r--lib/block.cpp74
-rw-r--r--lib/block_actor.cpp28
-rw-r--r--lib/element_impl.hpp135
-rw-r--r--lib/gr_block.cpp14
-rw-r--r--lib/gras_impl/block_actor.hpp146
-rw-r--r--lib/gras_impl/messages.hpp7
-rw-r--r--lib/hier_block.cpp27
-rw-r--r--lib/top_block.cpp20
9 files changed, 265 insertions, 196 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 73a919c..bc5115e 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -1,7 +1,6 @@
########################################################################
# This file included, use CMake directory variables
########################################################################
-
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
set(GRAS_SOURCE_DIR ${CMAKE_SOURCE_DIR}/../)
@@ -49,10 +48,11 @@ list(APPEND gnuradio_core_sources
${CMAKE_CURRENT_SOURCE_DIR}/element.cpp
${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/port_handlers.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp
+ #${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
+ #${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp
+ #${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp
+ #${CMAKE_CURRENT_SOURCE_DIR}/port_handlers.cpp
${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cpp
diff --git a/lib/block.cpp b/lib/block.cpp
index c11fb18..6bfaa9b 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -16,7 +16,6 @@
#include "element_impl.hpp"
#include <gnuradio/block.hpp>
-#include <boost/bind.hpp>
using namespace gnuradio;
@@ -28,20 +27,14 @@ Block::Block(void)
Block::Block(const std::string &name):
Element(name)
{
- //create internal block object
- tsbe::BlockConfig config;
- config.input_callback = boost::bind(&ElementImpl::handle_input_msg, this->get(), _1, _2, _3);
- config.output_callback = boost::bind(&ElementImpl::handle_output_msg, this->get(), _1, _2, _3);
- config.block_callback = boost::bind(&ElementImpl::handle_block_msg, this->get(), _1, _2);
- config.changed_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1);
- (*this)->block = tsbe::Block(config);
+ (*this)->block = boost::shared_ptr<BlockActor>(new BlockActor());
//setup some state variables
- (*this)->topology_init = false;
- (*this)->forecast_fail = false;
- (*this)->block_ptr = this;
- (*this)->hint = 0;
- (*this)->block_state = ElementImpl::BLOCK_STATE_INIT;
+ (*this)->block->topology_init = false;
+ (*this)->block->forecast_fail = false;
+ (*this)->block->block_ptr = this;
+ (*this)->block->hint = 0;
+ (*this)->block->block_state = BlockActor::BLOCK_STATE_INIT;
//call block methods to init stuff
this->set_input_history(0);
@@ -73,97 +66,100 @@ typename V::value_type vector_get(const V &v, const size_t index)
size_t Block::input_history(const size_t which_input) const
{
- return vector_get((*this)->input_history_items, which_input);
+ return vector_get((*this)->block->input_history_items, which_input);
}
void Block::set_input_history(const size_t history, const size_t which_input)
{
- vector_set((*this)->input_history_items, history, which_input);
- if ((*this)->topology_init) (*this)->block.post_msg(UpdateInputsMessage());
+ vector_set((*this)->block->input_history_items, history, which_input);
+ if ((*this)->block->topology_init)
+ (*this)->block->Push(UpdateInputsMessage(), Theron::Address());
}
size_t Block::output_multiple(const size_t which_output) const
{
- return vector_get((*this)->output_multiple_items, which_output);
+ return vector_get((*this)->block->output_multiple_items, which_output);
}
void Block::set_output_multiple(const size_t multiple, const size_t which_output)
{
- vector_set((*this)->output_multiple_items, multiple, which_output);
- if ((*this)->topology_init) (*this)->block.post_msg(UpdateInputsMessage());
+ vector_set((*this)->block->output_multiple_items, multiple, which_output);
+ if ((*this)->block->topology_init)
+ (*this)->block->Push(UpdateInputsMessage(), Theron::Address());
}
void Block::consume(const size_t which_input, const size_t how_many_items)
{
- (*this)->consume_items[which_input] += how_many_items;
- (*this)->consume_called[which_input] = true;
+ (*this)->block->consume_items[which_input] += how_many_items;
+ (*this)->block->consume_called[which_input] = true;
}
void Block::consume_each(const size_t how_many_items)
{
- for (size_t i = 0; i < (*this)->consume_items.size(); i++)
+ for (size_t i = 0; i < (*this)->block->consume_items.size(); i++)
{
- (*this)->consume_items[i] += how_many_items;
- (*this)->consume_called[i] = true;
+ (*this)->block->consume_items[i] += how_many_items;
+ (*this)->block->consume_called[i] = true;
}
}
void Block::produce(const size_t which_output, const size_t how_many_items)
{
- (*this)->produce_items[which_output] += how_many_items;
+ (*this)->block->produce_items[which_output] += how_many_items;
}
void Block::set_input_inline(const size_t which_input, const bool enb)
{
- vector_set((*this)->input_inline_enables, enb, which_input);
+ vector_set((*this)->block->input_inline_enables, enb, which_input);
}
bool Block::input_inline(const size_t which_input) const
{
- return vector_get((*this)->input_inline_enables, which_input);
+ return vector_get((*this)->block->input_inline_enables, which_input);
}
void Block::set_fixed_rate(const bool fixed_rate)
{
- (*this)->enable_fixed_rate = fixed_rate;
+ (*this)->block->enable_fixed_rate = fixed_rate;
}
void Block::set_relative_rate(double relative_rate)
{
- (*this)->relative_rate = relative_rate;
+ (*this)->block->relative_rate = relative_rate;
}
double Block::relative_rate(void) const
{
- return (*this)->relative_rate;
+ return (*this)->block->relative_rate;
}
uint64_t Block::nitems_read(const size_t which_input)
{
- return (*this)->items_consumed[which_input];
+ return (*this)->block->items_consumed[which_input];
}
uint64_t Block::nitems_written(const size_t which_output)
{
- return (*this)->items_produced[which_output];
+ return (*this)->block->items_produced[which_output];
}
Block::tag_propagation_policy_t Block::tag_propagation_policy(void)
{
- return (*this)->tag_prop_policy;
+ return (*this)->block->tag_prop_policy;
}
void Block::set_tag_propagation_policy(Block::tag_propagation_policy_t p)
{
- (*this)->tag_prop_policy = p;
+ (*this)->block->tag_prop_policy = p;
}
void Block::add_item_tag(
const size_t which_output,
const Tag &tag
){
- ASSERT((*this)->work_task_iface);
- (*this)->work_task_iface.post_downstream(which_output, tag);
+ BlockTagMessage message;
+ message.tag = tag;
+ (*this)->block->post_downstream(which_output, message);
}
void Block::add_item_tag(
@@ -187,7 +183,7 @@ void Block::get_tags_in_range(
uint64_t abs_start,
uint64_t abs_end
){
- const std::vector<Tag> &input_tags = (*this)->input_tags[which_input];
+ const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input];
tags.clear();
for (size_t i = 0; i < input_tags.size(); i++)
{
@@ -205,7 +201,7 @@ void Block::get_tags_in_range(
uint64_t abs_end,
const pmt::pmt_t &key
){
- const std::vector<Tag> &input_tags = (*this)->input_tags[which_input];
+ const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input];
tags.clear();
for (size_t i = 0; i < input_tags.size(); i++)
{
@@ -241,5 +237,5 @@ bool Block::check_topology(int, int)
void Block::set_buffer_affinity(const Affinity &affinity)
{
- (*this)->buffer_affinity = affinity;
+ (*this)->block->buffer_affinity = affinity;
}
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
new file mode 100644
index 0000000..3ba6c79
--- /dev/null
+++ b/lib/block_actor.cpp
@@ -0,0 +1,28 @@
+//
+// Copyright 2012 Josh Blum
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
+
+#include <gras_impl/block_actor.hpp>
+#include <Theron/Framework.h>
+
+using namespace gnuradio;
+
+static Theron::Framework global_framework(8); //TODO needs API config
+
+BlockActor::BlockActor(void):
+ Apology::Worker(global_framework)
+{
+
+}
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index d0f7f5d..988bbbf 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -17,31 +17,13 @@
#ifndef INCLUDED_LIBGRAS_ELEMENT_IMPL_HPP
#define INCLUDED_LIBGRAS_ELEMENT_IMPL_HPP
-#include <gras_impl/debug.hpp>
-#include <gras_impl/token.hpp>
-#include <gras_impl/messages.hpp>
-#include <gras_impl/output_buffer_queues.hpp>
-#include <gras_impl/input_buffer_queues.hpp>
-#include <gras_impl/interruptible_thread.hpp>
-
-#include <tsbe/block.hpp>
-#include <tsbe/topology.hpp>
-#include <tsbe/executor.hpp>
+#include <gras_impl/block_actor.hpp>
+#include <Apology/Topology.hpp>
+#include <Apology/Executor.hpp>
#include <gnuradio/element.hpp>
#include <gnuradio/block.hpp>
-#include <set>
-#include <vector>
-#include <queue>
-
-static GRAS_FORCE_INLINE unsigned long myulround(const double x)
-{
- return (unsigned long)(x + 0.5);
-}
-
-static GRAS_FORCE_INLINE unsigned long long myullround(const double x)
-{
- return (unsigned long long)(x + 0.5);
-}
+#include <gras_impl/token.hpp>
+#include <gras_impl/interruptible_thread.hpp>
namespace gnuradio
{
@@ -54,114 +36,27 @@ struct ElementImpl
void top_block_cleanup(void);
void hier_block_cleanup(void);
- //stuff for when its a block
+ //common element properties
std::string name;
long unique_id;
-
- //per port properties
- std::vector<size_t> input_items_sizes;
- std::vector<size_t> output_items_sizes;
IOSignature input_signature;
IOSignature output_signature;
- std::vector<size_t> input_history_items;
- std::vector<size_t> output_multiple_items;
- std::vector<size_t> input_multiple_items;
- std::vector<bool> input_inline_enables;
-
- //keeps track of production
- std::vector<uint64_t> items_consumed;
- std::vector<uint64_t> items_produced;
-
- //work buffers for the classic interface
- size_t work_noutput_items;
- std::vector<const void *> work_input_items;
- std::vector<void *> work_output_items;
- std::vector<int> work_ninput_items;
- std::vector<int> fcast_ninput_items;
-
- //work buffers for the new work interface
- Block::InputItems input_items;
- Block::OutputItems output_items;
- ptrdiff_t work_io_ptr_mask;
- //track work's calls to produce and consume
- std::vector<size_t> produce_items;
- std::vector<size_t> consume_items;
- std::vector<bool> consume_called;
-
- //track the subscriber counts
- std::vector<Token> input_tokens;
- std::vector<Token> output_tokens;
- std::set<Token> token_pool;
-
- std::vector<SBufferToken> output_buffer_tokens;
-
- //buffer queues and ready conditions
- InputBufferQueues input_queues;
- OutputBufferQueues<SBuffer> output_queues;
-
- //tag tracking
- std::vector<bool> input_tags_changed;
- std::vector<std::vector<Tag> > input_tags;
- Block::tag_propagation_policy_t tag_prop_policy;
-
- //topological things
- tsbe::Block block;
- tsbe::Topology topology;
- tsbe::Executor executor;
- const tsbe::Element &get_elem(void) const
- {
- if (block) return block;
- return topology;
- }
- //gets the handlers access for forecast and work
- Block *block_ptr;
- tsbe::TaskInterface work_task_iface; //only valid during work
-
- //interruptible thread stuff
+ //top block stuff
SharedThreadGroup thread_group;
- boost::shared_ptr<InterruptibleThread> interruptible_thread;
-
- //handlers
- void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
- void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
- void topology_update(const tsbe::TaskInterface &);
- void handle_block_msg(const tsbe::TaskInterface &, const tsbe::Wax &);
- void handle_allocation(const tsbe::TaskInterface &);
- void handle_task(const tsbe::TaskInterface &);
- void mark_done(const tsbe::TaskInterface &);
- void conclusion(const tsbe::TaskInterface &task_iface, const bool);
- void buffer_returner(const size_t index, SBuffer &buffer);
- void input_update(const tsbe::TaskInterface &task_iface);
- void sort_tags(const size_t index);
- void trim_tags(const tsbe::TaskInterface &, const size_t index);
+ Token token;
- //work helpers
- int work_ret;
- inline void task_work(void)
+ //things may be in this element
+ boost::shared_ptr<Apology::Topology> topology;
+ boost::shared_ptr<Apology::Executor> executor;
+ boost::shared_ptr<BlockActor> block;
+ Apology::Base *get_elem(void) const
{
- this->work_ret = block_ptr->Work(this->input_items, this->output_items);
+ if (block) return block.get();
+ return topology.get();
}
- //is the fg running?
- enum
- {
- BLOCK_STATE_INIT,
- BLOCK_STATE_LIVE,
- BLOCK_STATE_DONE,
- } block_state;
- Token token;
- size_t hint; //some kind of allocation hint
- Affinity buffer_affinity;
-
- std::vector<std::vector<BufferHintMessage> > output_allocation_hints;
- //rate settings
- bool enable_fixed_rate;
- double relative_rate;
- bool forecast_fail;
- bool forecast_enable;
- bool topology_init;
};
} //namespace gnuradio
diff --git a/lib/gr_block.cpp b/lib/gr_block.cpp
index 1f59bb1..821722b 100644
--- a/lib/gr_block.cpp
+++ b/lib/gr_block.cpp
@@ -39,10 +39,10 @@ int gr_block::Work(
const OutputItems &output_items
){
return this->general_work(
- (*this)->work_noutput_items,
- (*this)->work_ninput_items,
- (*this)->work_input_items,
- (*this)->work_output_items
+ (*this)->block->work_noutput_items,
+ (*this)->block->work_ninput_items,
+ (*this)->block->work_input_items,
+ (*this)->block->work_output_items
);
}
@@ -67,13 +67,13 @@ bool gr_block::is_unaligned(void)
//TODO
//probably dont need this since volk dispatcher checks alignment
//32 byte aligned is good enough for you
- return ((*this)->work_io_ptr_mask & ptrdiff_t(GRAS_MAX_ALIGNMENT-1)) != 0;
+ return ((*this)->block->work_io_ptr_mask & ptrdiff_t(GRAS_MAX_ALIGNMENT-1)) != 0;
}
size_t gr_block::fixed_rate_noutput_to_ninput(const size_t noutput_items)
{
- return (*this)->input_history_items[0] +
- myulround((noutput_items/(*this)->relative_rate));
+ return (*this)->block->input_history_items[0] +
+ size_t((noutput_items/this->relative_rate()));
}
size_t gr_block::interpolation(void) const
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
new file mode 100644
index 0000000..98634b7
--- /dev/null
+++ b/lib/gras_impl/block_actor.hpp
@@ -0,0 +1,146 @@
+//
+// Copyright 2012 Josh Blum
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
+
+#ifndef INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP
+#define INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP
+
+
+#include <gnuradio/gras.hpp>
+#include <gnuradio/block.hpp>
+#include <Apology/Worker.hpp>
+#include <gras_impl/debug.hpp>
+#include <gras_impl/token.hpp>
+#include <gras_impl/messages.hpp>
+#include <gras_impl/output_buffer_queues.hpp>
+#include <gras_impl/input_buffer_queues.hpp>
+#include <gras_impl/interruptible_thread.hpp>
+#include <vector>
+#include <set>
+
+namespace gnuradio
+{
+
+static GRAS_FORCE_INLINE unsigned long myulround(const double x)
+{
+ return (unsigned long)(x + 0.5);
+}
+
+static GRAS_FORCE_INLINE unsigned long long myullround(const double x)
+{
+ return (unsigned long long)(x + 0.5);
+}
+
+struct BlockActor : Apology::Worker
+{
+ BlockActor(void);
+ ~BlockActor(void);
+
+ Block *block_ptr;
+
+ //per port properties
+ std::vector<size_t> input_items_sizes;
+ std::vector<size_t> output_items_sizes;
+ std::vector<size_t> input_history_items;
+ std::vector<size_t> output_multiple_items;
+ std::vector<size_t> input_multiple_items;
+ std::vector<bool> input_inline_enables;
+
+ //keeps track of production
+ std::vector<uint64_t> items_consumed;
+ std::vector<uint64_t> items_produced;
+
+ //work buffers for the classic interface
+ size_t work_noutput_items;
+ std::vector<const void *> work_input_items;
+ std::vector<void *> work_output_items;
+ std::vector<int> work_ninput_items;
+ std::vector<int> fcast_ninput_items;
+
+ //work buffers for the new work interface
+ Block::InputItems input_items;
+ Block::OutputItems output_items;
+ ptrdiff_t work_io_ptr_mask;
+
+ //track work's calls to produce and consume
+ std::vector<size_t> produce_items;
+ std::vector<size_t> consume_items;
+ std::vector<bool> consume_called;
+
+ //track the subscriber counts
+ std::vector<Token> input_tokens;
+ std::vector<Token> output_tokens;
+ std::set<Token> token_pool;
+
+ std::vector<SBufferToken> output_buffer_tokens;
+
+ //buffer queues and ready conditions
+ InputBufferQueues input_queues;
+ OutputBufferQueues<SBuffer> output_queues;
+
+ //tag tracking
+ std::vector<bool> input_tags_changed;
+ std::vector<std::vector<Tag> > input_tags;
+ Block::tag_propagation_policy_t tag_prop_policy;
+
+ //interruptible thread stuff
+ boost::shared_ptr<InterruptibleThread> interruptible_thread;
+
+ //handlers
+ /*
+ void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
+ void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
+ void topology_update(const tsbe::TaskInterface &);
+ void handle_block_msg(const tsbe::TaskInterface &, const tsbe::Wax &);
+ void handle_allocation(const tsbe::TaskInterface &);
+ void handle_task(const tsbe::TaskInterface &);
+ void mark_done(const tsbe::TaskInterface &);
+ void conclusion(const tsbe::TaskInterface &task_iface, const bool);
+ void buffer_returner(const size_t index, SBuffer &buffer);
+ void input_update(const tsbe::TaskInterface &task_iface);
+ void sort_tags(const size_t index);
+ void trim_tags(const tsbe::TaskInterface &, const size_t index);
+ * */
+
+ //work helpers
+ int work_ret;
+ inline void task_work(void)
+ {
+ this->work_ret = block_ptr->Work(this->input_items, this->output_items);
+ }
+
+ //is the fg running?
+ enum
+ {
+ BLOCK_STATE_INIT,
+ BLOCK_STATE_LIVE,
+ BLOCK_STATE_DONE,
+ } block_state;
+ size_t hint; //some kind of allocation hint
+ Affinity buffer_affinity;
+
+ std::vector<std::vector<BufferHintMessage> > output_allocation_hints;
+
+ //rate settings
+ bool enable_fixed_rate;
+ double relative_rate;
+ bool forecast_fail;
+ bool forecast_enable;
+ bool topology_init;
+};
+
+} //namespace gnuradio
+
+#endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP*/
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 912e3d1..0b1bed0 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -18,10 +18,17 @@
#define INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP
#include <gnuradio/sbuffer.hpp>
+#include <gnuradio/tags.hpp>
namespace gnuradio
{
+struct BlockTagMessage
+{
+ size_t index;
+ Tag tag;
+};
+
struct TopBlockMessage
{
enum
diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp
index d4c9ee6..70b5df3 100644
--- a/lib/hier_block.cpp
+++ b/lib/hier_block.cpp
@@ -28,23 +28,22 @@ HierBlock::HierBlock(void)
HierBlock::HierBlock(const std::string &name):
Element(name)
{
- tsbe::TopologyConfig config;
- (*this)->topology = tsbe::Topology(config);
+ (*this)->topology = boost::shared_ptr<Apology::Topology>(new Apology::Topology());
}
void ElementImpl::hier_block_cleanup(void)
{
- this->topology.clear_all();
+ this->topology->clear_all();
}
void HierBlock::connect(const Element &elem)
{
- (*this)->topology.add_topology(elem->topology);
+ (*this)->topology->add_topology(elem->topology.get());
}
void HierBlock::disconnect(const Element &elem)
{
- (*this)->topology.remove_topology(elem->topology);
+ (*this)->topology->remove_topology(elem->topology.get());
}
void HierBlock::connect(
@@ -54,11 +53,11 @@ void HierBlock::connect(
const size_t sink_index
){
//TODO, this is the perfect place to validate IO sigs
- const tsbe::Connection conn(
- tsbe::Port(src->get_elem(), src_index, src.weak_self.lock()),
- tsbe::Port(sink->get_elem(), sink_index, sink.weak_self.lock())
+ const Apology::Flow flow(
+ Apology::Port(src->get_elem(), src_index, src.weak_self.lock()),
+ Apology::Port(sink->get_elem(), sink_index, sink.weak_self.lock())
);
- (*this)->topology.connect(conn);
+ (*this)->topology->add_flow(flow);
}
void HierBlock::disconnect(
@@ -67,14 +66,14 @@ void HierBlock::disconnect(
const Element &sink,
const size_t sink_index
){
- const tsbe::Connection conn(
- tsbe::Port(src->get_elem(), src_index),
- tsbe::Port(sink->get_elem(), sink_index)
+ const Apology::Flow flow(
+ Apology::Port(src->get_elem(), src_index),
+ Apology::Port(sink->get_elem(), sink_index)
);
- (*this)->topology.disconnect(conn);
+ (*this)->topology->remove_flow(flow);
}
void HierBlock::disconnect_all(void)
{
- (*this)->topology.clear_all();
+ (*this)->topology->clear_all();
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index fdabad1..a8f01de 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -28,9 +28,7 @@ TopBlock::TopBlock(void)
TopBlock::TopBlock(const std::string &name):
HierBlock(name)
{
- tsbe::ExecutorConfig config;
- config.topology = (*this)->topology;
- (*this)->executor = tsbe::Executor(config);
+ (*this)->executor = boost::shared_ptr<Apology::Executor>(new Apology::Executor((*this)->topology.get()));
(*this)->token = Token::make();
(*this)->thread_group = SharedThreadGroup(new boost::thread_group());
if (GENESIS) std::cerr
@@ -44,7 +42,7 @@ void ElementImpl::top_block_cleanup(void)
{
TopBlockMessage event;
event.what = TopBlockMessage::INERT;
- this->executor.post_msg(event);
+ this->executor->post_all(event);
if (ARMAGEDDON) std::cerr
<< "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"
<< "xx Top Block Destroyed: " << name << "\n"
@@ -62,30 +60,30 @@ void TopBlock::set_buffer_hint(const size_t hint)
TopBlockMessage event;
event.what = TopBlockMessage::HINT;
event.hint = hint;
- (*this)->executor.post_msg(event);
+ (*this)->executor->post_all(event);
}
void TopBlock::start(void)
{
- (*this)->executor.commit();
+ (*this)->executor->commit();
{
- (*this)->executor.post_msg((*this)->thread_group);
+ (*this)->executor->post_all((*this)->thread_group);
}
{
TopBlockMessage event;
event.what = TopBlockMessage::TOKEN_TIME;
event.token = (*this)->token;
- (*this)->executor.post_msg(event);
+ (*this)->executor->post_all(event);
}
{
TopBlockMessage event;
event.what = TopBlockMessage::ALLOCATE;
- (*this)->executor.post_msg(event);
+ (*this)->executor->post_all(event);
}
{
TopBlockMessage event;
event.what = TopBlockMessage::ACTIVE;
- (*this)->executor.post_msg(event);
+ (*this)->executor->post_all(event);
}
}
@@ -97,7 +95,7 @@ void TopBlock::stop(void)
//message all blocks to mark done
TopBlockMessage event;
event.what = TopBlockMessage::INERT;
- (*this)->executor.post_msg(event);
+ (*this)->executor->post_all(event);
}
void TopBlock::run(void)