summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-09-29 10:09:13 -0700
committerJosh Blum2012-09-29 10:09:13 -0700
commit17e39ddbb0940d9d5e687713531e9a18d18e29f1 (patch)
treef4872dc865e42a2584aa31d96055dfb0bac7cf86
parentba9ed63e59c1fc92bc823d11d779fe162df0aca1 (diff)
downloadsandhi-17e39ddbb0940d9d5e687713531e9a18d18e29f1.tar.gz
sandhi-17e39ddbb0940d9d5e687713531e9a18d18e29f1.tar.bz2
sandhi-17e39ddbb0940d9d5e687713531e9a18d18e29f1.zip
ported input and output port handlers to apology
-rw-r--r--lib/CMakeLists.txt3
-rw-r--r--lib/block_allocator.cpp3
-rw-r--r--lib/block_handlers.cpp1
-rw-r--r--lib/gras_impl/block_actor.hpp4
-rw-r--r--lib/gras_impl/messages.hpp11
-rw-r--r--lib/input_handlers.cpp75
-rw-r--r--lib/output_handlers.cpp91
-rw-r--r--lib/port_handlers.cpp149
8 files changed, 185 insertions, 152 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index ffc43f1..1a3e156 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -52,7 +52,8 @@ list(APPEND gnuradio_core_sources
#${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp
- #${CMAKE_CURRENT_SOURCE_DIR}/port_handlers.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/input_handlers.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/output_handlers.cpp
${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cpp
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 826aed9..b2da9ea 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -15,6 +15,7 @@
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <boost/math/common_factor.hpp>
@@ -92,7 +93,7 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes);
- InputAllocatorMessage message;
+ InputAllocMessage message;
message.token = SBufferToken(new SBufferDeleter(deleter));
message.recommend_length = bytes;
this->post_downstream(i, message);
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index c12820d..bf081a1 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -15,6 +15,7 @@
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
#include "element_impl.hpp"
+#include <gras_impl/block_actor.hpp>
#include <gras_impl/vector_utils.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 63d5ff5..abb3d6d 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -65,11 +65,13 @@ struct BlockActor : Apology::Worker
this->RegisterHandler(this, &BlockActor::handle_input_buffer);
this->RegisterHandler(this, &BlockActor::handle_input_token);
this->RegisterHandler(this, &BlockActor::handle_input_check);
+ this->RegisterHandler(this, &BlockActor::handle_input_alloc);
this->RegisterHandler(this, &BlockActor::handle_output_buffer);
this->RegisterHandler(this, &BlockActor::handle_output_token);
this->RegisterHandler(this, &BlockActor::handle_output_check);
this->RegisterHandler(this, &BlockActor::handle_output_hint);
+ this->RegisterHandler(this, &BlockActor::handle_output_alloc);
this->RegisterHandler(this, &BlockActor::handle_self_kick);
this->RegisterHandler(this, &BlockActor::handle_check_tokens);
@@ -90,11 +92,13 @@ struct BlockActor : Apology::Worker
void handle_input_buffer(const InputBufferMessage &, const Theron::Address);
void handle_input_token(const InputTokenMessage &, const Theron::Address);
void handle_input_check(const InputCheckMessage &, const Theron::Address);
+ void handle_input_alloc(const InputAllocMessage &, const Theron::Address);
void handle_output_buffer(const OutputBufferMessage &, const Theron::Address);
void handle_output_token(const OutputTokenMessage &, const Theron::Address);
void handle_output_check(const OutputCheckMessage &, const Theron::Address);
void handle_output_hint(const OutputHintMessage &, const Theron::Address);
+ void handle_output_alloc(const OutputAllocMessage &, const Theron::Address);
void handle_self_kick(const SelfKickMessage &, const Theron::Address);
void handle_check_tokens(const CheckTokensMessage &, const Theron::Address);
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index 3b57fef..f3e3a37 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -57,6 +57,7 @@ struct TopHintMessage
//----------------------------------------------------------------------
//-- message to an input port
+//-- do not ack
//----------------------------------------------------------------------
struct InputTagMessage
@@ -77,7 +78,7 @@ struct InputTokenMessage
Token token;
};
-struct InputAllocatorMessage
+struct InputAllocMessage
{
size_t index;
SBufferToken token;
@@ -91,6 +92,7 @@ struct InputCheckMessage
//----------------------------------------------------------------------
//-- message to an output port
+//-- do not ack
//----------------------------------------------------------------------
struct OutputBufferMessage
@@ -118,8 +120,15 @@ struct OutputHintMessage
WeakToken token;
};
+struct OutputAllocMessage
+{
+ size_t index;
+ SBufferToken token;
+};
+
//----------------------------------------------------------------------
//-- message to just the block
+//-- do not ack
//----------------------------------------------------------------------
struct SelfKickMessage
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
new file mode 100644
index 0000000..d2d12d5
--- /dev/null
+++ b/lib/input_handlers.cpp
@@ -0,0 +1,75 @@
+//
+// Copyright 2012 Josh Blum
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
+
+#include <gras_impl/block_actor.hpp>
+#include <boost/foreach.hpp>
+
+using namespace gnuradio;
+
+void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //handle incoming stream tag, push into the tag storage
+ this->input_tags[index].push_back(message.tag);
+ this->input_tags_changed[index] = true;
+}
+
+void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //handle incoming stream buffer, push into the queue
+ if (this->block_state == BLOCK_STATE_DONE) return;
+ this->input_queues.push(index, message.buffer);
+ this->handle_task();
+}
+
+void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ //const size_t index = message.index;
+
+ //store the token of the upstream producer
+ this->token_pool.insert(message.token);
+}
+
+void BlockActor::handle_input_check(const InputCheckMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //an upstream block declared itself done, recheck the token
+ if (this->input_queues.empty(index) and this->input_tokens[index].unique())
+ {
+ this->mark_done();
+ }
+}
+
+void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //handle the upstream block allocation request
+ OutputAllocMessage new_msg;
+ new_msg.token = block_ptr->input_buffer_allocator(
+ index, message.token, message.recommend_length
+ );
+ if (new_msg.token) this->post_upstream(index, new_msg);
+}
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
new file mode 100644
index 0000000..e8099bb
--- /dev/null
+++ b/lib/output_handlers.cpp
@@ -0,0 +1,91 @@
+//
+// Copyright 2012 Josh Blum
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
+
+#include <gras_impl/block_actor.hpp>
+#include <boost/foreach.hpp>
+
+using namespace gnuradio;
+
+
+void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //a buffer has returned from the downstream
+ //(all interested consumers have finished with it)
+ if (this->block_state == BLOCK_STATE_DONE) return;
+ this->output_queues.push(index, message.buffer);
+ this->handle_task();
+}
+
+void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ //const size_t index = message.index;
+
+ //store the token of the downstream consumer
+ this->token_pool.insert(message.token);
+}
+
+void BlockActor::handle_output_check(const OutputCheckMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //a downstream block has declared itself done, recheck the token
+ if (this->output_tokens[index].unique())
+ {
+ this->mark_done();
+ }
+}
+
+void BlockActor::handle_output_hint(const OutputHintMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //update the buffer allocation hint
+ //this->output_allocation_hints.resize(std::max(output_allocation_hints.size(), index+1));
+
+ //remove any old hints with expired token
+ //remove any older hints with matching token
+ std::vector<OutputHintMessage> hints;
+ BOOST_FOREACH(const OutputHintMessage &hint, this->output_allocation_hints[index])
+ {
+ if (hint.token.expired()) continue;
+ if (hint.token.lock() == message.token.lock()) continue;
+ hints.push_back(hint);
+ }
+
+ //store the new hint as well
+ hints.push_back(message);
+
+ this->output_allocation_hints[index] = hints;
+}
+
+void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Theron::Address)
+{
+ MESSAGE_TRACER();
+ const size_t index = message.index;
+
+ //return of a positive downstream allocation
+ //reset the token, and clear old output buffers
+ //the new token from the downstream is installed
+ this->output_buffer_tokens[index].reset();
+ this->output_queues.flush(index);
+ this->output_buffer_tokens[index] = message.token;
+}
diff --git a/lib/port_handlers.cpp b/lib/port_handlers.cpp
deleted file mode 100644
index a134ff4..0000000
--- a/lib/port_handlers.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-//
-// Copyright 2012 Josh Blum
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
-
-#include "element_impl.hpp"
-#include <boost/foreach.hpp>
-
-using namespace gnuradio;
-
-//a buffer has returned from the downstream
- //(all interested consumers have finished with it)
- if (msg.type() == typeid(BufferReturnMessage))
- {
- const BufferReturnMessage &message = msg.cast<BufferReturnMessage>();
- const size_t index = message.index;
- if (this->block_state == BLOCK_STATE_DONE) return;
- this->output_queues.push(index, message.buffer);
- this->handle_task(task_iface);
- return;
- }
-
-void ElementImpl::handle_input_msg(
- const tsbe::TaskInterface &handle,
- const size_t index,
- const tsbe::Wax &msg
-){
- if (MESSAGE) std::cerr << "handle_input_msg (" << msg.type().name() << ") " << name << std::endl;
-
- //handle incoming stream buffer, push into the queue
- if (msg.type() == typeid(SBuffer))
- {
- if (this->block_state == BLOCK_STATE_DONE) return;
- this->input_queues.push(index, msg.cast<SBuffer>());
- this->handle_task(handle);
- return;
- }
-
- //handle incoming stream tag, push into the tag storage
- if (msg.type() == typeid(Tag))
- {
- this->input_tags[index].push_back(msg.cast<Tag>());
- this->input_tags_changed[index] = true;
- return;
- }
-
- //store the token of the upstream producer
- if (msg.type() == typeid(Token))
- {
- this->token_pool.insert(msg.cast<Token>());
- return;
- }
-
- //an upstream block declared itself done, recheck the token
- if (msg.type() == typeid(CheckTokensMessage))
- {
- if (this->input_queues.empty(index) and this->input_tokens[index].unique())
- {
- this->mark_done(handle);
- }
- return;
- }
-
- //handle the upstream block allocation request
- if (msg.type() == typeid(InputAllocatorMessage))
- {
- InputAllocatorMessage message;
- message.token = block_ptr->input_buffer_allocator(
- index,
- msg.cast<InputAllocatorMessage>().token,
- msg.cast<InputAllocatorMessage>().recommend_length
- );
- if (message.token) handle.post_upstream(index, message);
- return;
- }
-
- ASSERT(false);
-}
-
-void ElementImpl::handle_output_msg(
- const tsbe::TaskInterface &handle,
- const size_t index,
- const tsbe::Wax &msg
-){
- if (MESSAGE) std::cerr << "handle_output_msg (" << msg.type().name() << ") " << name << std::endl;
-
- //store the token of the downstream consumer
- if (msg.type() == typeid(Token))
- {
- this->token_pool.insert(msg.cast<Token>());
- return;
- }
-
- //a downstream block has declared itself done, recheck the token
- if (msg.type() == typeid(CheckTokensMessage))
- {
- if (this->output_tokens[index].unique())
- {
- this->mark_done(handle);
- }
- return;
- }
-
- //update the buffer allocation hint
- if (msg.type() == typeid(BufferHintMessage))
- {
- //this->output_allocation_hints.resize(std::max(output_allocation_hints.size(), index+1));
- const BufferHintMessage new_hint = msg.cast<BufferHintMessage>();
-
- //remove any old hints with expired token
- //remove any older hints with matching token
- std::vector<BufferHintMessage> hints;
- BOOST_FOREACH(const BufferHintMessage &hint, this->output_allocation_hints[index])
- {
- if (hint.token.expired()) continue;
- if (hint.token.lock() == new_hint.token.lock()) continue;
- hints.push_back(hint);
- }
-
- //store the new hint as well
- hints.push_back(new_hint);
-
- this->output_allocation_hints[index] = hints;
- return;
- }
-
- //return of a positive downstream allocation
- //reset the token, and clear old output buffers
- //the new token from the downstream is installed
- if (msg.type() == typeid(InputAllocatorMessage))
- {
- this->output_buffer_tokens[index].reset();
- this->output_queues.flush(index);
- this->output_buffer_tokens[index] = msg.cast<InputAllocatorMessage>().token;
- }
-
- ASSERT(false);
-}