diff options
-rw-r--r-- | lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | lib/block.cpp | 1 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 37 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 75 | ||||
-rw-r--r-- | lib/block_ports.cpp | 73 | ||||
-rw-r--r-- | lib/block_task.cpp | 24 | ||||
-rw-r--r-- | lib/common_impl.hpp | 80 | ||||
-rw-r--r-- | lib/element_impl.hpp | 70 | ||||
-rw-r--r-- | lib/hier_block.cpp | 1 | ||||
-rw-r--r-- | lib/top_block.cpp | 6 | ||||
-rw-r--r-- | lib/vec_utils_impl.hpp | 59 |
11 files changed, 281 insertions, 146 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 43ea21a..6ec28c1 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -14,6 +14,7 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/block_task.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_allocator.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_handlers.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/block_ports.cpp ${CMAKE_CURRENT_SOURCE_DIR}/hier_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cpp diff --git a/lib/block.cpp b/lib/block.cpp index c7ff967..73c232f 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -38,7 +38,6 @@ Block::Block(const std::string &name): config.input_callback = boost::bind(&ElementImpl::handle_input_msg, this->get(), _1, _2, _3); config.output_callback = boost::bind(&ElementImpl::handle_output_msg, this->get(), _1, _2, _3); config.update_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1, _2); - config.task_callback = boost::bind(&ElementImpl::handle_task, this->get(), _1); (*this)->block = tsbe::Block(config); (*this)->block_ptr = this; diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index b3f8c4d..516729e 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -20,26 +20,21 @@ using namespace gnuradio; //TODO will need more complicated later -static void simple_allocator( - const tsbe::BufferToken &tok, - const size_t num_bytes, - const size_t num_buffs -){ - for (size_t i = 0; i < num_buffs; i++) - { - tsbe::BufferConfig config; - config.memory = NULL; - config.length = num_bytes; - config.token = tok; - tsbe::Buffer buff(config); - //buffer derefs here and the token messages it back to the block - } + + +void ElementImpl::buffer_returner(const size_t index, tsbe::Buffer &buffer) +{ + BufferReturnMessage message; + message.index = index; + message.buffer = buffer; + this->block.post_msg(message); } void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) { //allocate output buffers which will also wake up the task const size_t num_outputs = task_iface.get_num_outputs(); + this->output_buffer_tokens.resize(num_outputs); for (size_t i = 0; i < num_outputs; i++) { size_t items = this->hint; @@ -47,6 +42,18 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) items = std::max(items, this->output_multiple_items[i]); const size_t bytes = items * this->output_items_sizes[i]; - this->block.set_output_port_allocator(i, boost::bind(&simple_allocator, _1, bytes, 2)); + + tsbe::BufferDeleter deleter = boost::bind(&ElementImpl::buffer_returner, this, i, _1); + this->output_buffer_tokens[i] = tsbe::BufferToken(new tsbe::BufferDeleter(deleter)); + + for (size_t j = 0; j < 2; j++) + { + tsbe::BufferConfig config; + config.memory = NULL; + config.length = bytes; + config.token = this->output_buffer_tokens[i]; + tsbe::Buffer buff(config); + //buffer derefs here and the token messages it back to the block + } } } diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index de0a8bf..0eca9ea 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -15,70 +15,19 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" +#include "vec_utils_impl.hpp" using namespace gnuradio; -void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) -{ - if (msg.type() == typeid(Tag)) - { - this->input_tags[index].push_back(msg.cast<Tag>()); - this->input_tags_changed[index] = true; - } - if (msg.type() == typeid(Token)) - { - this->token_pool.push_back(msg.cast<Token>()); - } - if (msg.type() == typeid(CheckTokensMessage)) - { - if (this->input_tokens[index].unique()) - { - this->mark_done(handle); - } - } -} - -void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) -{ - if (msg.type() == typeid(Token)) - { - this->token_pool.push_back(msg.cast<Token>()); - } - if (msg.type() == typeid(CheckTokensMessage)) - { - if (this->output_tokens[index].unique()) - { - this->mark_done(handle); - } - } -} - -template <typename V, typename T> -void resize_fill(V &v, const size_t new_len, const T &fill) -{ - if (v.size() >= new_len) return; //dont ever shrink it - v.resize(new_len, fill); -} - -template <typename V> -void resize_fill_back(V &v, const size_t new_len) -{ - if (v.empty()) v.push_back(0); - resize_fill(v, new_len, v.back()); -} - -template <typename V, typename Sig> -void fill_item_sizes_from_sig(V &v, const Sig &s, const size_t size) +void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state) { - v.resize(size); - for (size_t i = 0; i < v.size(); i++) + if (state.type() == typeid(BufferReturnMessage)) { - v[i] = s->sizeof_stream_item(i); + const BufferReturnMessage &message = state.cast<BufferReturnMessage>(); + this->handle_output_msg(task_iface, message.index, message.buffer); + return; } -} -void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state) -{ const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -91,8 +40,8 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t resize_fill_back(this->output_multiple_items, num_outputs); //resize the bytes consumed/produced - resize_fill(this->items_consumed, num_inputs, 0); - resize_fill(this->items_produced, num_outputs, 0); + resize_fill_grow(this->items_consumed, num_inputs, 0); + resize_fill_grow(this->items_produced, num_outputs, 0); //resize all work buffers to match current connections this->work_input_items.resize(num_inputs); @@ -103,6 +52,10 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t this->consume_items.resize(num_inputs, 0); this->produce_items.resize(num_outputs, 0); this->input_buff_offsets.resize(num_inputs, 0); + this->input_queues.resize(num_inputs); + this->output_queues.resize(num_outputs); + this->inputs_ready.resize(num_inputs); + this->outputs_ready.resize(num_outputs); //resize tags vector to match sizes this->input_tags_changed.resize(num_inputs); @@ -130,13 +83,13 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface, const t this->input_tokens.resize(num_inputs); for (size_t i = 0; i < num_inputs; i++) { - this->input_tokens[i] = make_token(); + this->input_tokens[i] = Token::make(); task_iface.post_upstream(i, this->input_tokens[i]); } this->output_tokens.resize(num_outputs); for (size_t i = 0; i < num_outputs; i++) { - this->output_tokens[i] = make_token(); + this->output_tokens[i] = Token::make(); task_iface.post_downstream(i, this->output_tokens[i]); } } diff --git a/lib/block_ports.cpp b/lib/block_ports.cpp new file mode 100644 index 0000000..702e0c4 --- /dev/null +++ b/lib/block_ports.cpp @@ -0,0 +1,73 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#include "element_impl.hpp" + +using namespace gnuradio; + +void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) +{ + if (msg.type() == typeid(tsbe::Buffer)) + { + this->input_queues[index].push(msg.cast<tsbe::Buffer>()); + this->inputs_ready.set(index, true); + this->handle_task(handle); + return; + } + if (msg.type() == typeid(Tag)) + { + this->input_tags[index].push_back(msg.cast<Tag>()); + this->input_tags_changed[index] = true; + return; + } + if (msg.type() == typeid(Token)) + { + this->token_pool.push_back(msg.cast<Token>()); + return; + } + if (msg.type() == typeid(CheckTokensMessage)) + { + if (this->input_tokens[index].unique()) + { + this->mark_done(handle); + } + return; + } +} + +void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) +{ + if (msg.type() == typeid(tsbe::Buffer)) + { + this->output_queues[index].push(msg.cast<tsbe::Buffer>()); + this->outputs_ready.set(index, true); + this->handle_task(handle); + return; + } + if (msg.type() == typeid(Token)) + { + this->token_pool.push_back(msg.cast<Token>()); + return; + } + if (msg.type() == typeid(CheckTokensMessage)) + { + if (this->output_tokens[index].unique()) + { + this->mark_done(handle); + } + return; + } +} diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 577fdb7..d871f12 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -24,10 +24,11 @@ void ElementImpl::free_inputs(const tsbe::TaskInterface &task_iface) { for (size_t i = 0; i < task_iface.get_num_inputs(); i++) { - while (task_iface.get_input_buffer(i)) + while (not this->input_queues[i].empty()) { - task_iface.pop_input_buffer(i); + this->input_queues[i].pop(); } + this->inputs_ready.set(i, false); } } @@ -38,6 +39,7 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) this->token_pool.clear(); this->token.reset(); this->free_inputs(task_iface); + this->output_buffer_tokens.clear(); for (size_t i = 0; i < task_iface.get_num_inputs(); i++) { task_iface.post_upstream(i, CheckTokensMessage()); @@ -63,8 +65,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Handle task may get called for incoming buffers, //-- however, not all ports may have available buffers. //------------------------------------------------------------------ - const bool all_inputs_ready = (~task_iface.get_inputs_ready()).none(); - const bool all_outputs_ready = (~task_iface.get_outputs_ready()).none(); + const bool all_inputs_ready = (~this->inputs_ready).none(); + const bool all_outputs_ready = (~this->outputs_ready).none(); if (not (this->active and all_inputs_ready and all_outputs_ready)) return; const size_t num_inputs = task_iface.get_num_inputs(); @@ -97,7 +99,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) ASSERT(this->input_history_items[i] == 0); - const tsbe::Buffer &buff = task_iface.get_input_buffer(i); + const tsbe::Buffer &buff = this->input_queues[i].front(); char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i]; const size_t bytes = buff.get_length() - this->input_buff_offsets[i]; const size_t items = bytes/this->input_items_sizes[i]; @@ -115,7 +117,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) output_tokens_count += this->output_tokens[i].use_count(); //this->produce_items[i] = 0; - const tsbe::Buffer &buff = task_iface.get_output_buffer(i); + const tsbe::Buffer &buff = this->output_queues[i].front(); char *mem = ((char *)buff.get_memory()); const size_t bytes = buff.get_length(); const size_t items = bytes/this->output_items_sizes[i]; @@ -156,10 +158,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; this->input_buff_offsets[i] += bytes; - tsbe::Buffer &buff = task_iface.get_input_buffer(i); + tsbe::Buffer &buff = this->input_queues[i].front(); if (buff.get_length() >= this->input_buff_offsets[i]) { - task_iface.pop_input_buffer(i); + this->input_queues[i].pop(); + this->inputs_ready.set(i, not this->input_queues[i].empty()); this->input_buff_offsets[i] = 0; } } @@ -170,10 +173,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_produced[i] += items; const size_t bytes = items*this->output_items_sizes[i]; - tsbe::Buffer &buff = task_iface.get_output_buffer(i); + tsbe::Buffer &buff = this->output_queues[i].front(); buff.get_length() = bytes; task_iface.post_downstream(i, buff); - task_iface.pop_output_buffer(i); + this->output_queues[i].pop(); + this->outputs_ready.set(i, not this->output_queues[i].empty()); } //0) figure out what we have for input data diff --git a/lib/common_impl.hpp b/lib/common_impl.hpp new file mode 100644 index 0000000..c6c3e93 --- /dev/null +++ b/lib/common_impl.hpp @@ -0,0 +1,80 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_LIBGNURADIO_COMMON_IMPL_HPP +#define INCLUDED_LIBGNURADIO_COMMON_IMPL_HPP + +#include <tsbe/buffer.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/dynamic_bitset.hpp> +#include <vector> +#include <iostream> + +#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; +#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; +#define ASSERT(x) if(not (x)){HERE(); std::cerr << "assert failed: " << #x << std::endl << std::flush;} + +static inline unsigned long myulround(const double x) +{ + return (unsigned long)(x + 0.5); +} + +static inline unsigned long long myullround(const double x) +{ + return (unsigned long long)(x + 0.5); +} + +namespace gnuradio +{ + +typedef boost::dynamic_bitset<> BitSet; + +struct Token : boost::shared_ptr<int> +{ + static Token make(void) + { + Token tok; + tok.reset(new int(0)); + return tok; + } +}; + +struct TopBlockMessage +{ + enum + { + ACTIVE, + INERT, + HINT, + } what; + size_t hint; + Token token; +}; + +struct CheckTokensMessage +{ + //empty +}; + +struct BufferReturnMessage +{ + size_t index; + tsbe::Buffer buffer; +}; + +} //namespace gnuradio + +#endif /*INCLUDED_LIBGNURADIO_COMMON_IMPL_HPP*/ diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index b92aee2..18c2fd3 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -17,6 +17,7 @@ #ifndef INCLUDED_LIBGNURADIO_ELEMENT_IMPL_HPP #define INCLUDED_LIBGNURADIO_ELEMENT_IMPL_HPP +#include <common_impl.hpp>//#include "common_impl.hpp" #include <tsbe/block.hpp> #include <tsbe/topology.hpp> #include <tsbe/executor.hpp> @@ -24,59 +25,7 @@ #include <gnuradio/block.hpp> #include <gr_types.h> #include <vector> -#include <iostream> - -#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; -#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; -#define ASSERT(x) if(not (x)){HERE(); std::cerr << "assert failed: " << #x << std::endl << std::flush;} - -static inline unsigned long myulround(const double x) -{ - return (unsigned long)(x + 0.5); -} - -static inline unsigned long long myullround(const double x) -{ - return (unsigned long long)(x + 0.5); -} - -//! return true if an instance was found and removed -template <typename V, typename T> -bool remove_one(V &v, const T &t) -{ - for (size_t i = 0; i < v.size(); i++) - { - if (v[i] == t) - { - v.erase(v.begin() + i); - return true; - } - } - return false; -} - -typedef boost::shared_ptr<int> Token; -static inline Token make_token(void) -{ - return Token(new int(0)); -} - -struct TopBlockMessage -{ - enum - { - ACTIVE, - INERT, - HINT, - } what; - size_t hint; - Token token; -}; - -struct CheckTokensMessage -{ - //empty -}; +#include <queue> namespace gnuradio { @@ -112,9 +61,9 @@ struct ElementImpl std::vector<uint64_t> items_produced; //work buffers for the classic interface - gr_vector_const_void_star work_input_items; - gr_vector_void_star work_output_items; - gr_vector_int work_ninput_items; + std::vector<const void *> work_input_items; + std::vector<void *> work_output_items; + std::vector<int> work_ninput_items; //work buffers for the new work interface Block::InputItems input_items; @@ -135,6 +84,14 @@ struct ElementImpl std::vector<Token> output_tokens; std::vector<Token> token_pool; + std::vector<tsbe::BufferToken> output_buffer_tokens; + + //buffer queues and ready conditions + std::vector<std::queue<tsbe::Buffer> > input_queues; + std::vector<std::queue<tsbe::Buffer> > output_queues; + BitSet inputs_ready; + BitSet outputs_ready; + //tag tracking std::vector<bool> input_tags_changed; std::vector<std::vector<Tag> > input_tags; @@ -161,6 +118,7 @@ struct ElementImpl void handle_task(const tsbe::TaskInterface &); void mark_done(const tsbe::TaskInterface &); void free_inputs(const tsbe::TaskInterface &); + void buffer_returner(const size_t index, tsbe::Buffer &buffer); //is the fg running? bool active; diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp index edd1f35..47be7d0 100644 --- a/lib/hier_block.cpp +++ b/lib/hier_block.cpp @@ -15,6 +15,7 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" +#include "vec_utils_impl.hpp" #include <gnuradio/hier_block.hpp> using namespace gnuradio; diff --git a/lib/top_block.cpp b/lib/top_block.cpp index f9dde36..6f98449 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -16,6 +16,7 @@ #include "element_impl.hpp" #include <gnuradio/top_block.hpp> +#include <boost/thread/thread.hpp> //sleep using namespace gnuradio; @@ -30,7 +31,7 @@ TopBlock::TopBlock(const std::string &name): tsbe::ExecutorConfig config; config.topology = (*this)->topology; (*this)->executor = tsbe::Executor(config); - (*this)->token = make_token(); + (*this)->token = Token::make(); } void TopBlock::update(void) @@ -71,7 +72,6 @@ void TopBlock::wait(void) { while (not (*this)->token.unique()) { - sleep(1); - VAR((*this)->token.use_count()); + boost::this_thread::yield(); } } diff --git a/lib/vec_utils_impl.hpp b/lib/vec_utils_impl.hpp new file mode 100644 index 0000000..779d368 --- /dev/null +++ b/lib/vec_utils_impl.hpp @@ -0,0 +1,59 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_LIBGNURADIO_VEC_UTILS_HPP_HPP +#define INCLUDED_LIBGNURADIO_VEC_UTILS_HPP_HPP + +//! return true if an instance was found and removed +template <typename V, typename T> +bool remove_one(V &v, const T &t) +{ + for (size_t i = 0; i < v.size(); i++) + { + if (v[i] == t) + { + v.erase(v.begin() + i); + return true; + } + } + return false; +} + +template <typename V, typename T> +void resize_fill_grow(V &v, const size_t new_len, const T &fill) +{ + if (v.size() >= new_len) return; //dont ever shrink it + v.resize(new_len, fill); +} + +template <typename V> +void resize_fill_back(V &v, const size_t new_len) +{ + if (v.empty()) v.push_back(0); + resize_fill_grow(v, new_len, v.back()); +} + +template <typename V, typename Sig> +void fill_item_sizes_from_sig(V &v, const Sig &s, const size_t size) +{ + v.resize(size); + for (size_t i = 0; i < v.size(); i++) + { + v[i] = s->sizeof_stream_item(i); + } +} + +#endif /*INCLUDED_LIBGNURADIO_VEC_UTILS_HPP_HPP*/ |