diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/CMakeLists.txt | 10 | ||||
-rw-r--r-- | lib/block.cpp | 74 | ||||
-rw-r--r-- | lib/block_actor.cpp | 28 | ||||
-rw-r--r-- | lib/element_impl.hpp | 135 | ||||
-rw-r--r-- | lib/gr_block.cpp | 14 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 146 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 7 | ||||
-rw-r--r-- | lib/hier_block.cpp | 27 | ||||
-rw-r--r-- | lib/top_block.cpp | 20 |
9 files changed, 265 insertions, 196 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 73a919c..bc5115e 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -1,7 +1,6 @@ ######################################################################## # This file included, use CMake directory variables ######################################################################## - include_directories(${CMAKE_CURRENT_SOURCE_DIR}) set(GRAS_SOURCE_DIR ${CMAKE_SOURCE_DIR}/../) @@ -49,10 +48,11 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/element.cpp ${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/port_handlers.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp + #${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp + #${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp + #${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp + #${CMAKE_CURRENT_SOURCE_DIR}/port_handlers.cpp ${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cpp diff --git a/lib/block.cpp b/lib/block.cpp index c11fb18..6bfaa9b 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -16,7 +16,6 @@ #include "element_impl.hpp" #include <gnuradio/block.hpp> -#include <boost/bind.hpp> using namespace gnuradio; @@ -28,20 +27,14 @@ Block::Block(void) Block::Block(const std::string &name): Element(name) { - //create internal block object - tsbe::BlockConfig config; - config.input_callback = boost::bind(&ElementImpl::handle_input_msg, this->get(), _1, _2, _3); - config.output_callback = boost::bind(&ElementImpl::handle_output_msg, this->get(), _1, _2, _3); - config.block_callback = boost::bind(&ElementImpl::handle_block_msg, this->get(), _1, _2); - config.changed_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1); - (*this)->block = tsbe::Block(config); + (*this)->block = boost::shared_ptr<BlockActor>(new BlockActor()); //setup some state variables - (*this)->topology_init = false; - (*this)->forecast_fail = false; - (*this)->block_ptr = this; - (*this)->hint = 0; - (*this)->block_state = ElementImpl::BLOCK_STATE_INIT; + (*this)->block->topology_init = false; + (*this)->block->forecast_fail = false; + (*this)->block->block_ptr = this; + (*this)->block->hint = 0; + (*this)->block->block_state = BlockActor::BLOCK_STATE_INIT; //call block methods to init stuff this->set_input_history(0); @@ -73,97 +66,100 @@ typename V::value_type vector_get(const V &v, const size_t index) size_t Block::input_history(const size_t which_input) const { - return vector_get((*this)->input_history_items, which_input); + return vector_get((*this)->block->input_history_items, which_input); } void Block::set_input_history(const size_t history, const size_t which_input) { - vector_set((*this)->input_history_items, history, which_input); - if ((*this)->topology_init) (*this)->block.post_msg(UpdateInputsMessage()); + vector_set((*this)->block->input_history_items, history, which_input); + if ((*this)->block->topology_init) + (*this)->block->Push(UpdateInputsMessage(), Theron::Address()); } size_t Block::output_multiple(const size_t which_output) const { - return vector_get((*this)->output_multiple_items, which_output); + return vector_get((*this)->block->output_multiple_items, which_output); } void Block::set_output_multiple(const size_t multiple, const size_t which_output) { - vector_set((*this)->output_multiple_items, multiple, which_output); - if ((*this)->topology_init) (*this)->block.post_msg(UpdateInputsMessage()); + vector_set((*this)->block->output_multiple_items, multiple, which_output); + if ((*this)->block->topology_init) + (*this)->block->Push(UpdateInputsMessage(), Theron::Address()); } void Block::consume(const size_t which_input, const size_t how_many_items) { - (*this)->consume_items[which_input] += how_many_items; - (*this)->consume_called[which_input] = true; + (*this)->block->consume_items[which_input] += how_many_items; + (*this)->block->consume_called[which_input] = true; } void Block::consume_each(const size_t how_many_items) { - for (size_t i = 0; i < (*this)->consume_items.size(); i++) + for (size_t i = 0; i < (*this)->block->consume_items.size(); i++) { - (*this)->consume_items[i] += how_many_items; - (*this)->consume_called[i] = true; + (*this)->block->consume_items[i] += how_many_items; + (*this)->block->consume_called[i] = true; } } void Block::produce(const size_t which_output, const size_t how_many_items) { - (*this)->produce_items[which_output] += how_many_items; + (*this)->block->produce_items[which_output] += how_many_items; } void Block::set_input_inline(const size_t which_input, const bool enb) { - vector_set((*this)->input_inline_enables, enb, which_input); + vector_set((*this)->block->input_inline_enables, enb, which_input); } bool Block::input_inline(const size_t which_input) const { - return vector_get((*this)->input_inline_enables, which_input); + return vector_get((*this)->block->input_inline_enables, which_input); } void Block::set_fixed_rate(const bool fixed_rate) { - (*this)->enable_fixed_rate = fixed_rate; + (*this)->block->enable_fixed_rate = fixed_rate; } void Block::set_relative_rate(double relative_rate) { - (*this)->relative_rate = relative_rate; + (*this)->block->relative_rate = relative_rate; } double Block::relative_rate(void) const { - return (*this)->relative_rate; + return (*this)->block->relative_rate; } uint64_t Block::nitems_read(const size_t which_input) { - return (*this)->items_consumed[which_input]; + return (*this)->block->items_consumed[which_input]; } uint64_t Block::nitems_written(const size_t which_output) { - return (*this)->items_produced[which_output]; + return (*this)->block->items_produced[which_output]; } Block::tag_propagation_policy_t Block::tag_propagation_policy(void) { - return (*this)->tag_prop_policy; + return (*this)->block->tag_prop_policy; } void Block::set_tag_propagation_policy(Block::tag_propagation_policy_t p) { - (*this)->tag_prop_policy = p; + (*this)->block->tag_prop_policy = p; } void Block::add_item_tag( const size_t which_output, const Tag &tag ){ - ASSERT((*this)->work_task_iface); - (*this)->work_task_iface.post_downstream(which_output, tag); + BlockTagMessage message; + message.tag = tag; + (*this)->block->post_downstream(which_output, message); } void Block::add_item_tag( @@ -187,7 +183,7 @@ void Block::get_tags_in_range( uint64_t abs_start, uint64_t abs_end ){ - const std::vector<Tag> &input_tags = (*this)->input_tags[which_input]; + const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input]; tags.clear(); for (size_t i = 0; i < input_tags.size(); i++) { @@ -205,7 +201,7 @@ void Block::get_tags_in_range( uint64_t abs_end, const pmt::pmt_t &key ){ - const std::vector<Tag> &input_tags = (*this)->input_tags[which_input]; + const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input]; tags.clear(); for (size_t i = 0; i < input_tags.size(); i++) { @@ -241,5 +237,5 @@ bool Block::check_topology(int, int) void Block::set_buffer_affinity(const Affinity &affinity) { - (*this)->buffer_affinity = affinity; + (*this)->block->buffer_affinity = affinity; } diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp new file mode 100644 index 0000000..3ba6c79 --- /dev/null +++ b/lib/block_actor.cpp @@ -0,0 +1,28 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#include <gras_impl/block_actor.hpp> +#include <Theron/Framework.h> + +using namespace gnuradio; + +static Theron::Framework global_framework(8); //TODO needs API config + +BlockActor::BlockActor(void): + Apology::Worker(global_framework) +{ + +} diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index d0f7f5d..988bbbf 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -17,31 +17,13 @@ #ifndef INCLUDED_LIBGRAS_ELEMENT_IMPL_HPP #define INCLUDED_LIBGRAS_ELEMENT_IMPL_HPP -#include <gras_impl/debug.hpp> -#include <gras_impl/token.hpp> -#include <gras_impl/messages.hpp> -#include <gras_impl/output_buffer_queues.hpp> -#include <gras_impl/input_buffer_queues.hpp> -#include <gras_impl/interruptible_thread.hpp> - -#include <tsbe/block.hpp> -#include <tsbe/topology.hpp> -#include <tsbe/executor.hpp> +#include <gras_impl/block_actor.hpp> +#include <Apology/Topology.hpp> +#include <Apology/Executor.hpp> #include <gnuradio/element.hpp> #include <gnuradio/block.hpp> -#include <set> -#include <vector> -#include <queue> - -static GRAS_FORCE_INLINE unsigned long myulround(const double x) -{ - return (unsigned long)(x + 0.5); -} - -static GRAS_FORCE_INLINE unsigned long long myullround(const double x) -{ - return (unsigned long long)(x + 0.5); -} +#include <gras_impl/token.hpp> +#include <gras_impl/interruptible_thread.hpp> namespace gnuradio { @@ -54,114 +36,27 @@ struct ElementImpl void top_block_cleanup(void); void hier_block_cleanup(void); - //stuff for when its a block + //common element properties std::string name; long unique_id; - - //per port properties - std::vector<size_t> input_items_sizes; - std::vector<size_t> output_items_sizes; IOSignature input_signature; IOSignature output_signature; - std::vector<size_t> input_history_items; - std::vector<size_t> output_multiple_items; - std::vector<size_t> input_multiple_items; - std::vector<bool> input_inline_enables; - - //keeps track of production - std::vector<uint64_t> items_consumed; - std::vector<uint64_t> items_produced; - - //work buffers for the classic interface - size_t work_noutput_items; - std::vector<const void *> work_input_items; - std::vector<void *> work_output_items; - std::vector<int> work_ninput_items; - std::vector<int> fcast_ninput_items; - - //work buffers for the new work interface - Block::InputItems input_items; - Block::OutputItems output_items; - ptrdiff_t work_io_ptr_mask; - //track work's calls to produce and consume - std::vector<size_t> produce_items; - std::vector<size_t> consume_items; - std::vector<bool> consume_called; - - //track the subscriber counts - std::vector<Token> input_tokens; - std::vector<Token> output_tokens; - std::set<Token> token_pool; - - std::vector<SBufferToken> output_buffer_tokens; - - //buffer queues and ready conditions - InputBufferQueues input_queues; - OutputBufferQueues<SBuffer> output_queues; - - //tag tracking - std::vector<bool> input_tags_changed; - std::vector<std::vector<Tag> > input_tags; - Block::tag_propagation_policy_t tag_prop_policy; - - //topological things - tsbe::Block block; - tsbe::Topology topology; - tsbe::Executor executor; - const tsbe::Element &get_elem(void) const - { - if (block) return block; - return topology; - } - //gets the handlers access for forecast and work - Block *block_ptr; - tsbe::TaskInterface work_task_iface; //only valid during work - - //interruptible thread stuff + //top block stuff SharedThreadGroup thread_group; - boost::shared_ptr<InterruptibleThread> interruptible_thread; - - //handlers - void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); - void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); - void topology_update(const tsbe::TaskInterface &); - void handle_block_msg(const tsbe::TaskInterface &, const tsbe::Wax &); - void handle_allocation(const tsbe::TaskInterface &); - void handle_task(const tsbe::TaskInterface &); - void mark_done(const tsbe::TaskInterface &); - void conclusion(const tsbe::TaskInterface &task_iface, const bool); - void buffer_returner(const size_t index, SBuffer &buffer); - void input_update(const tsbe::TaskInterface &task_iface); - void sort_tags(const size_t index); - void trim_tags(const tsbe::TaskInterface &, const size_t index); + Token token; - //work helpers - int work_ret; - inline void task_work(void) + //things may be in this element + boost::shared_ptr<Apology::Topology> topology; + boost::shared_ptr<Apology::Executor> executor; + boost::shared_ptr<BlockActor> block; + Apology::Base *get_elem(void) const { - this->work_ret = block_ptr->Work(this->input_items, this->output_items); + if (block) return block.get(); + return topology.get(); } - //is the fg running? - enum - { - BLOCK_STATE_INIT, - BLOCK_STATE_LIVE, - BLOCK_STATE_DONE, - } block_state; - Token token; - size_t hint; //some kind of allocation hint - Affinity buffer_affinity; - - std::vector<std::vector<BufferHintMessage> > output_allocation_hints; - //rate settings - bool enable_fixed_rate; - double relative_rate; - bool forecast_fail; - bool forecast_enable; - bool topology_init; }; } //namespace gnuradio diff --git a/lib/gr_block.cpp b/lib/gr_block.cpp index 1f59bb1..821722b 100644 --- a/lib/gr_block.cpp +++ b/lib/gr_block.cpp @@ -39,10 +39,10 @@ int gr_block::Work( const OutputItems &output_items ){ return this->general_work( - (*this)->work_noutput_items, - (*this)->work_ninput_items, - (*this)->work_input_items, - (*this)->work_output_items + (*this)->block->work_noutput_items, + (*this)->block->work_ninput_items, + (*this)->block->work_input_items, + (*this)->block->work_output_items ); } @@ -67,13 +67,13 @@ bool gr_block::is_unaligned(void) //TODO //probably dont need this since volk dispatcher checks alignment //32 byte aligned is good enough for you - return ((*this)->work_io_ptr_mask & ptrdiff_t(GRAS_MAX_ALIGNMENT-1)) != 0; + return ((*this)->block->work_io_ptr_mask & ptrdiff_t(GRAS_MAX_ALIGNMENT-1)) != 0; } size_t gr_block::fixed_rate_noutput_to_ninput(const size_t noutput_items) { - return (*this)->input_history_items[0] + - myulround((noutput_items/(*this)->relative_rate)); + return (*this)->block->input_history_items[0] + + size_t((noutput_items/this->relative_rate())); } size_t gr_block::interpolation(void) const diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp new file mode 100644 index 0000000..98634b7 --- /dev/null +++ b/lib/gras_impl/block_actor.hpp @@ -0,0 +1,146 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP +#define INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP + + +#include <gnuradio/gras.hpp> +#include <gnuradio/block.hpp> +#include <Apology/Worker.hpp> +#include <gras_impl/debug.hpp> +#include <gras_impl/token.hpp> +#include <gras_impl/messages.hpp> +#include <gras_impl/output_buffer_queues.hpp> +#include <gras_impl/input_buffer_queues.hpp> +#include <gras_impl/interruptible_thread.hpp> +#include <vector> +#include <set> + +namespace gnuradio +{ + +static GRAS_FORCE_INLINE unsigned long myulround(const double x) +{ + return (unsigned long)(x + 0.5); +} + +static GRAS_FORCE_INLINE unsigned long long myullround(const double x) +{ + return (unsigned long long)(x + 0.5); +} + +struct BlockActor : Apology::Worker +{ + BlockActor(void); + ~BlockActor(void); + + Block *block_ptr; + + //per port properties + std::vector<size_t> input_items_sizes; + std::vector<size_t> output_items_sizes; + std::vector<size_t> input_history_items; + std::vector<size_t> output_multiple_items; + std::vector<size_t> input_multiple_items; + std::vector<bool> input_inline_enables; + + //keeps track of production + std::vector<uint64_t> items_consumed; + std::vector<uint64_t> items_produced; + + //work buffers for the classic interface + size_t work_noutput_items; + std::vector<const void *> work_input_items; + std::vector<void *> work_output_items; + std::vector<int> work_ninput_items; + std::vector<int> fcast_ninput_items; + + //work buffers for the new work interface + Block::InputItems input_items; + Block::OutputItems output_items; + ptrdiff_t work_io_ptr_mask; + + //track work's calls to produce and consume + std::vector<size_t> produce_items; + std::vector<size_t> consume_items; + std::vector<bool> consume_called; + + //track the subscriber counts + std::vector<Token> input_tokens; + std::vector<Token> output_tokens; + std::set<Token> token_pool; + + std::vector<SBufferToken> output_buffer_tokens; + + //buffer queues and ready conditions + InputBufferQueues input_queues; + OutputBufferQueues<SBuffer> output_queues; + + //tag tracking + std::vector<bool> input_tags_changed; + std::vector<std::vector<Tag> > input_tags; + Block::tag_propagation_policy_t tag_prop_policy; + + //interruptible thread stuff + boost::shared_ptr<InterruptibleThread> interruptible_thread; + + //handlers + /* + void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); + void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); + void topology_update(const tsbe::TaskInterface &); + void handle_block_msg(const tsbe::TaskInterface &, const tsbe::Wax &); + void handle_allocation(const tsbe::TaskInterface &); + void handle_task(const tsbe::TaskInterface &); + void mark_done(const tsbe::TaskInterface &); + void conclusion(const tsbe::TaskInterface &task_iface, const bool); + void buffer_returner(const size_t index, SBuffer &buffer); + void input_update(const tsbe::TaskInterface &task_iface); + void sort_tags(const size_t index); + void trim_tags(const tsbe::TaskInterface &, const size_t index); + * */ + + //work helpers + int work_ret; + inline void task_work(void) + { + this->work_ret = block_ptr->Work(this->input_items, this->output_items); + } + + //is the fg running? + enum + { + BLOCK_STATE_INIT, + BLOCK_STATE_LIVE, + BLOCK_STATE_DONE, + } block_state; + size_t hint; //some kind of allocation hint + Affinity buffer_affinity; + + std::vector<std::vector<BufferHintMessage> > output_allocation_hints; + + //rate settings + bool enable_fixed_rate; + double relative_rate; + bool forecast_fail; + bool forecast_enable; + bool topology_init; +}; + +} //namespace gnuradio + +#endif /*INCLUDED_LIBGRAS_IMPL_BLOCK_ACTOR_HPP*/ diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 912e3d1..0b1bed0 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -18,10 +18,17 @@ #define INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP #include <gnuradio/sbuffer.hpp> +#include <gnuradio/tags.hpp> namespace gnuradio { +struct BlockTagMessage +{ + size_t index; + Tag tag; +}; + struct TopBlockMessage { enum diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp index d4c9ee6..70b5df3 100644 --- a/lib/hier_block.cpp +++ b/lib/hier_block.cpp @@ -28,23 +28,22 @@ HierBlock::HierBlock(void) HierBlock::HierBlock(const std::string &name): Element(name) { - tsbe::TopologyConfig config; - (*this)->topology = tsbe::Topology(config); + (*this)->topology = boost::shared_ptr<Apology::Topology>(new Apology::Topology()); } void ElementImpl::hier_block_cleanup(void) { - this->topology.clear_all(); + this->topology->clear_all(); } void HierBlock::connect(const Element &elem) { - (*this)->topology.add_topology(elem->topology); + (*this)->topology->add_topology(elem->topology.get()); } void HierBlock::disconnect(const Element &elem) { - (*this)->topology.remove_topology(elem->topology); + (*this)->topology->remove_topology(elem->topology.get()); } void HierBlock::connect( @@ -54,11 +53,11 @@ void HierBlock::connect( const size_t sink_index ){ //TODO, this is the perfect place to validate IO sigs - const tsbe::Connection conn( - tsbe::Port(src->get_elem(), src_index, src.weak_self.lock()), - tsbe::Port(sink->get_elem(), sink_index, sink.weak_self.lock()) + const Apology::Flow flow( + Apology::Port(src->get_elem(), src_index, src.weak_self.lock()), + Apology::Port(sink->get_elem(), sink_index, sink.weak_self.lock()) ); - (*this)->topology.connect(conn); + (*this)->topology->add_flow(flow); } void HierBlock::disconnect( @@ -67,14 +66,14 @@ void HierBlock::disconnect( const Element &sink, const size_t sink_index ){ - const tsbe::Connection conn( - tsbe::Port(src->get_elem(), src_index), - tsbe::Port(sink->get_elem(), sink_index) + const Apology::Flow flow( + Apology::Port(src->get_elem(), src_index), + Apology::Port(sink->get_elem(), sink_index) ); - (*this)->topology.disconnect(conn); + (*this)->topology->remove_flow(flow); } void HierBlock::disconnect_all(void) { - (*this)->topology.clear_all(); + (*this)->topology->clear_all(); } diff --git a/lib/top_block.cpp b/lib/top_block.cpp index fdabad1..a8f01de 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -28,9 +28,7 @@ TopBlock::TopBlock(void) TopBlock::TopBlock(const std::string &name): HierBlock(name) { - tsbe::ExecutorConfig config; - config.topology = (*this)->topology; - (*this)->executor = tsbe::Executor(config); + (*this)->executor = boost::shared_ptr<Apology::Executor>(new Apology::Executor((*this)->topology.get())); (*this)->token = Token::make(); (*this)->thread_group = SharedThreadGroup(new boost::thread_group()); if (GENESIS) std::cerr @@ -44,7 +42,7 @@ void ElementImpl::top_block_cleanup(void) { TopBlockMessage event; event.what = TopBlockMessage::INERT; - this->executor.post_msg(event); + this->executor->post_all(event); if (ARMAGEDDON) std::cerr << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" << "xx Top Block Destroyed: " << name << "\n" @@ -62,30 +60,30 @@ void TopBlock::set_buffer_hint(const size_t hint) TopBlockMessage event; event.what = TopBlockMessage::HINT; event.hint = hint; - (*this)->executor.post_msg(event); + (*this)->executor->post_all(event); } void TopBlock::start(void) { - (*this)->executor.commit(); + (*this)->executor->commit(); { - (*this)->executor.post_msg((*this)->thread_group); + (*this)->executor->post_all((*this)->thread_group); } { TopBlockMessage event; event.what = TopBlockMessage::TOKEN_TIME; event.token = (*this)->token; - (*this)->executor.post_msg(event); + (*this)->executor->post_all(event); } { TopBlockMessage event; event.what = TopBlockMessage::ALLOCATE; - (*this)->executor.post_msg(event); + (*this)->executor->post_all(event); } { TopBlockMessage event; event.what = TopBlockMessage::ACTIVE; - (*this)->executor.post_msg(event); + (*this)->executor->post_all(event); } } @@ -97,7 +95,7 @@ void TopBlock::stop(void) //message all blocks to mark done TopBlockMessage event; event.what = TopBlockMessage::INERT; - (*this)->executor.post_msg(event); + (*this)->executor->post_all(event); } void TopBlock::run(void) |