diff options
author | Josh Blum | 2012-09-29 10:09:13 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-29 10:09:13 -0700 |
commit | 17e39ddbb0940d9d5e687713531e9a18d18e29f1 (patch) | |
tree | f4872dc865e42a2584aa31d96055dfb0bac7cf86 | |
parent | ba9ed63e59c1fc92bc823d11d779fe162df0aca1 (diff) | |
download | sandhi-17e39ddbb0940d9d5e687713531e9a18d18e29f1.tar.gz sandhi-17e39ddbb0940d9d5e687713531e9a18d18e29f1.tar.bz2 sandhi-17e39ddbb0940d9d5e687713531e9a18d18e29f1.zip |
ported input and output port handlers to apology
-rw-r--r-- | lib/CMakeLists.txt | 3 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 3 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 11 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 75 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 91 | ||||
-rw-r--r-- | lib/port_handlers.cpp | 149 |
8 files changed, 185 insertions, 152 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index ffc43f1..1a3e156 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -52,7 +52,8 @@ list(APPEND gnuradio_core_sources #${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp - #${CMAKE_CURRENT_SOURCE_DIR}/port_handlers.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/input_handlers.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/output_handlers.cpp ${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cpp diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 826aed9..b2da9ea 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -15,6 +15,7 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> #include <boost/bind.hpp> #include <boost/foreach.hpp> #include <boost/math/common_factor.hpp> @@ -92,7 +93,7 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes); - InputAllocatorMessage message; + InputAllocMessage message; message.token = SBufferToken(new SBufferDeleter(deleter)); message.recommend_length = bytes; this->post_downstream(i, message); diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index c12820d..bf081a1 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -15,6 +15,7 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" +#include <gras_impl/block_actor.hpp> #include <gras_impl/vector_utils.hpp> #include <boost/make_shared.hpp> #include <boost/bind.hpp> diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 63d5ff5..abb3d6d 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -65,11 +65,13 @@ struct BlockActor : Apology::Worker this->RegisterHandler(this, &BlockActor::handle_input_buffer); this->RegisterHandler(this, &BlockActor::handle_input_token); this->RegisterHandler(this, &BlockActor::handle_input_check); + this->RegisterHandler(this, &BlockActor::handle_input_alloc); this->RegisterHandler(this, &BlockActor::handle_output_buffer); this->RegisterHandler(this, &BlockActor::handle_output_token); this->RegisterHandler(this, &BlockActor::handle_output_check); this->RegisterHandler(this, &BlockActor::handle_output_hint); + this->RegisterHandler(this, &BlockActor::handle_output_alloc); this->RegisterHandler(this, &BlockActor::handle_self_kick); this->RegisterHandler(this, &BlockActor::handle_check_tokens); @@ -90,11 +92,13 @@ struct BlockActor : Apology::Worker void handle_input_buffer(const InputBufferMessage &, const Theron::Address); void handle_input_token(const InputTokenMessage &, const Theron::Address); void handle_input_check(const InputCheckMessage &, const Theron::Address); + void handle_input_alloc(const InputAllocMessage &, const Theron::Address); void handle_output_buffer(const OutputBufferMessage &, const Theron::Address); void handle_output_token(const OutputTokenMessage &, const Theron::Address); void handle_output_check(const OutputCheckMessage &, const Theron::Address); void handle_output_hint(const OutputHintMessage &, const Theron::Address); + void handle_output_alloc(const OutputAllocMessage &, const Theron::Address); void handle_self_kick(const SelfKickMessage &, const Theron::Address); void handle_check_tokens(const CheckTokensMessage &, const Theron::Address); diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index 3b57fef..f3e3a37 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -57,6 +57,7 @@ struct TopHintMessage //---------------------------------------------------------------------- //-- message to an input port +//-- do not ack //---------------------------------------------------------------------- struct InputTagMessage @@ -77,7 +78,7 @@ struct InputTokenMessage Token token; }; -struct InputAllocatorMessage +struct InputAllocMessage { size_t index; SBufferToken token; @@ -91,6 +92,7 @@ struct InputCheckMessage //---------------------------------------------------------------------- //-- message to an output port +//-- do not ack //---------------------------------------------------------------------- struct OutputBufferMessage @@ -118,8 +120,15 @@ struct OutputHintMessage WeakToken token; }; +struct OutputAllocMessage +{ + size_t index; + SBufferToken token; +}; + //---------------------------------------------------------------------- //-- message to just the block +//-- do not ack //---------------------------------------------------------------------- struct SelfKickMessage diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp new file mode 100644 index 0000000..d2d12d5 --- /dev/null +++ b/lib/input_handlers.cpp @@ -0,0 +1,75 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#include <gras_impl/block_actor.hpp> +#include <boost/foreach.hpp> + +using namespace gnuradio; + +void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t index = message.index; + + //handle incoming stream tag, push into the tag storage + this->input_tags[index].push_back(message.tag); + this->input_tags_changed[index] = true; +} + +void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t index = message.index; + + //handle incoming stream buffer, push into the queue + if (this->block_state == BLOCK_STATE_DONE) return; + this->input_queues.push(index, message.buffer); + this->handle_task(); +} + +void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + //const size_t index = message.index; + + //store the token of the upstream producer + this->token_pool.insert(message.token); +} + +void BlockActor::handle_input_check(const InputCheckMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t index = message.index; + + //an upstream block declared itself done, recheck the token + if (this->input_queues.empty(index) and this->input_tokens[index].unique()) + { + this->mark_done(); + } +} + +void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t index = message.index; + + //handle the upstream block allocation request + OutputAllocMessage new_msg; + new_msg.token = block_ptr->input_buffer_allocator( + index, message.token, message.recommend_length + ); + if (new_msg.token) this->post_upstream(index, new_msg); +} diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp new file mode 100644 index 0000000..e8099bb --- /dev/null +++ b/lib/output_handlers.cpp @@ -0,0 +1,91 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#include <gras_impl/block_actor.hpp> +#include <boost/foreach.hpp> + +using namespace gnuradio; + + +void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t index = message.index; + + //a buffer has returned from the downstream + //(all interested consumers have finished with it) + if (this->block_state == BLOCK_STATE_DONE) return; + this->output_queues.push(index, message.buffer); + this->handle_task(); +} + +void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + //const size_t index = message.index; + + //store the token of the downstream consumer + this->token_pool.insert(message.token); +} + +void BlockActor::handle_output_check(const OutputCheckMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t index = message.index; + + //a downstream block has declared itself done, recheck the token + if (this->output_tokens[index].unique()) + { + this->mark_done(); + } +} + +void BlockActor::handle_output_hint(const OutputHintMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t index = message.index; + + //update the buffer allocation hint + //this->output_allocation_hints.resize(std::max(output_allocation_hints.size(), index+1)); + + //remove any old hints with expired token + //remove any older hints with matching token + std::vector<OutputHintMessage> hints; + BOOST_FOREACH(const OutputHintMessage &hint, this->output_allocation_hints[index]) + { + if (hint.token.expired()) continue; + if (hint.token.lock() == message.token.lock()) continue; + hints.push_back(hint); + } + + //store the new hint as well + hints.push_back(message); + + this->output_allocation_hints[index] = hints; +} + +void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Theron::Address) +{ + MESSAGE_TRACER(); + const size_t index = message.index; + + //return of a positive downstream allocation + //reset the token, and clear old output buffers + //the new token from the downstream is installed + this->output_buffer_tokens[index].reset(); + this->output_queues.flush(index); + this->output_buffer_tokens[index] = message.token; +} diff --git a/lib/port_handlers.cpp b/lib/port_handlers.cpp deleted file mode 100644 index a134ff4..0000000 --- a/lib/port_handlers.cpp +++ /dev/null @@ -1,149 +0,0 @@ -// -// Copyright 2012 Josh Blum -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. - -#include "element_impl.hpp" -#include <boost/foreach.hpp> - -using namespace gnuradio; - -//a buffer has returned from the downstream - //(all interested consumers have finished with it) - if (msg.type() == typeid(BufferReturnMessage)) - { - const BufferReturnMessage &message = msg.cast<BufferReturnMessage>(); - const size_t index = message.index; - if (this->block_state == BLOCK_STATE_DONE) return; - this->output_queues.push(index, message.buffer); - this->handle_task(task_iface); - return; - } - -void ElementImpl::handle_input_msg( - const tsbe::TaskInterface &handle, - const size_t index, - const tsbe::Wax &msg -){ - if (MESSAGE) std::cerr << "handle_input_msg (" << msg.type().name() << ") " << name << std::endl; - - //handle incoming stream buffer, push into the queue - if (msg.type() == typeid(SBuffer)) - { - if (this->block_state == BLOCK_STATE_DONE) return; - this->input_queues.push(index, msg.cast<SBuffer>()); - this->handle_task(handle); - return; - } - - //handle incoming stream tag, push into the tag storage - if (msg.type() == typeid(Tag)) - { - this->input_tags[index].push_back(msg.cast<Tag>()); - this->input_tags_changed[index] = true; - return; - } - - //store the token of the upstream producer - if (msg.type() == typeid(Token)) - { - this->token_pool.insert(msg.cast<Token>()); - return; - } - - //an upstream block declared itself done, recheck the token - if (msg.type() == typeid(CheckTokensMessage)) - { - if (this->input_queues.empty(index) and this->input_tokens[index].unique()) - { - this->mark_done(handle); - } - return; - } - - //handle the upstream block allocation request - if (msg.type() == typeid(InputAllocatorMessage)) - { - InputAllocatorMessage message; - message.token = block_ptr->input_buffer_allocator( - index, - msg.cast<InputAllocatorMessage>().token, - msg.cast<InputAllocatorMessage>().recommend_length - ); - if (message.token) handle.post_upstream(index, message); - return; - } - - ASSERT(false); -} - -void ElementImpl::handle_output_msg( - const tsbe::TaskInterface &handle, - const size_t index, - const tsbe::Wax &msg -){ - if (MESSAGE) std::cerr << "handle_output_msg (" << msg.type().name() << ") " << name << std::endl; - - //store the token of the downstream consumer - if (msg.type() == typeid(Token)) - { - this->token_pool.insert(msg.cast<Token>()); - return; - } - - //a downstream block has declared itself done, recheck the token - if (msg.type() == typeid(CheckTokensMessage)) - { - if (this->output_tokens[index].unique()) - { - this->mark_done(handle); - } - return; - } - - //update the buffer allocation hint - if (msg.type() == typeid(BufferHintMessage)) - { - //this->output_allocation_hints.resize(std::max(output_allocation_hints.size(), index+1)); - const BufferHintMessage new_hint = msg.cast<BufferHintMessage>(); - - //remove any old hints with expired token - //remove any older hints with matching token - std::vector<BufferHintMessage> hints; - BOOST_FOREACH(const BufferHintMessage &hint, this->output_allocation_hints[index]) - { - if (hint.token.expired()) continue; - if (hint.token.lock() == new_hint.token.lock()) continue; - hints.push_back(hint); - } - - //store the new hint as well - hints.push_back(new_hint); - - this->output_allocation_hints[index] = hints; - return; - } - - //return of a positive downstream allocation - //reset the token, and clear old output buffers - //the new token from the downstream is installed - if (msg.type() == typeid(InputAllocatorMessage)) - { - this->output_buffer_tokens[index].reset(); - this->output_queues.flush(index); - this->output_buffer_tokens[index] = msg.cast<InputAllocatorMessage>().token; - } - - ASSERT(false); -} |