diff options
author | Josh Blum | 2012-09-02 17:09:46 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-02 17:09:46 -0700 |
commit | 591905b3a33982a46e8c956d4ca226bb4a241b54 (patch) | |
tree | 897a168667fa8048e816a131428f81c830892942 /lib | |
parent | 814be255443ef4ada223375263ee3d2ad43540c1 (diff) | |
download | sandhi-591905b3a33982a46e8c956d4ca226bb4a241b54.tar.gz sandhi-591905b3a33982a46e8c956d4ca226bb4a241b54.tar.bz2 sandhi-591905b3a33982a46e8c956d4ca226bb4a241b54.zip |
created history implementation (untested)
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_handlers.cpp | 27 | ||||
-rw-r--r-- | lib/block_task.cpp | 24 | ||||
-rw-r--r-- | lib/buffer_ios.hpp | 201 | ||||
-rw-r--r-- | lib/common_impl.hpp | 5 | ||||
-rw-r--r-- | lib/debug_impl.hpp | 26 | ||||
-rw-r--r-- | lib/element_impl.hpp | 17 |
6 files changed, 241 insertions, 59 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 396416d..4e6b732 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -113,7 +113,6 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) this->output_items.resize(num_outputs); this->consume_items.resize(num_inputs, 0); this->produce_items.resize(num_outputs, 0); - this->input_buff_offsets.resize(num_inputs, 0); this->input_queues.resize(num_inputs); this->output_queues.resize(num_outputs); @@ -125,30 +124,8 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) this->input_tags.resize(num_inputs); this->output_tags.resize(num_outputs); - //determine max history length for allocation below - this->max_history_items = *std::max_element( - this->input_history_items.begin(), - this->input_history_items.end() - ); - - //resize and clear that initial history - this->history_buffs.resize(num_inputs); - this->input_history_bytes.resize(num_inputs); - for (size_t i = 0; i < num_inputs; i++) - { - tsbe::Buffer &buff = this->history_buffs[i]; - const size_t num_items = this->input_history_items[i] + this->max_history_items; - const size_t num_bytes = this->input_items_sizes[i]*num_items; - this->input_history_bytes[i] = num_bytes; - if (not buff or buff.get_length() != num_bytes) - { - tsbe::BufferConfig config; - config.memory = NULL; - config.length = num_bytes; - buff = tsbe::Buffer(config); - std::memset(buff.get_memory(), 0, buff.get_length()); - } - } + //init the history comprehension on input queues + this->input_queues.init(this->input_history_items, this->input_items_sizes); //TODO: think more about this: if (num_inputs == 0 and num_outputs == 0) diff --git a/lib/block_task.cpp b/lib/block_task.cpp index a84ef6a..88ca5cf 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -94,19 +94,14 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) { input_tokens_count += this->input_tokens[i].use_count(); - ASSERT(this->input_history_items[i] == 0); - ASSERT(this->input_queues.ready(i)); - const tsbe::Buffer &buff = this->input_queues.front(i); - ASSERT(this->input_buff_offsets[i] < buff.get_length()); - char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i]; - const size_t bytes = buff.get_length() - this->input_buff_offsets[i]; - const size_t items = bytes/this->input_items_sizes[i]; + const BuffInfo info = this->input_queues.front(i); + const size_t items = info.len/this->input_items_sizes[i]; - this->work_io_ptr_mask |= ptrdiff_t(mem); - this->input_items[i]._mem = mem; + this->work_io_ptr_mask |= ptrdiff_t(info.mem); + this->input_items[i]._mem = info.mem; this->input_items[i]._len = items; - this->work_input_items[i] = mem; + this->work_input_items[i] = info.mem; this->work_ninput_items[i] = items; } @@ -172,14 +167,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; - this->input_buff_offsets[i] += bytes; - tsbe::Buffer &buff = this->input_queues.front(i); - - if (buff.get_length() <= this->input_buff_offsets[i]) - { - this->input_queues.pop(i); - this->input_buff_offsets[i] = 0; - } + this->input_queues.pop(i, bytes); } //------------------------------------------------------------------ diff --git a/lib/buffer_ios.hpp b/lib/buffer_ios.hpp new file mode 100644 index 0000000..2a74ba7 --- /dev/null +++ b/lib/buffer_ios.hpp @@ -0,0 +1,201 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_LIBGRAS_BUFFER_IOS_HPP +#define INCLUDED_LIBGRAS_BUFFER_IOS_HPP + +#include <debug_impl.hpp> +#include <tsbe/buffer.hpp> +#include <boost/dynamic_bitset.hpp> +#include <vector> +#include <queue> +#include <cstring> //memcpy/memset + +namespace gnuradio +{ + +struct BuffInfo +{ + void *mem; + size_t len; +}; + +struct BufferIOs +{ + boost::dynamic_bitset<> _bitset; + std::vector<std::queue<tsbe::Buffer> > _queues; + std::vector<size_t> _offset_bytes; + std::vector<size_t> _history_bytes; + std::vector<size_t> _post_bytes; + std::vector<tsbe::Buffer> _history_buffs; + + template <typename V> + inline void init( + const V &input_history_items, + const V &input_item_sizes + ){ + if (this->size() == 0) return; + + const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end()); + + for (size_t i = 0; i < this->size(); i++) + { + //determine byte sizes for buffers and dealing with history + _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; + _post_bytes[i] = input_item_sizes[i]*max_history_items; + + //allocate mini buffer for history edge conditions + const size_t num_bytes = _history_bytes[i] + _post_bytes[i]; + tsbe::Buffer &buff = _history_buffs[i]; + if (not buff or buff.get_length() != num_bytes) + { + tsbe::BufferConfig config; + config.memory = NULL; + config.length = num_bytes; + buff = tsbe::Buffer(config); + std::memset(buff.get_memory(), 0, buff.get_length()); + } + } + } + + /*! + * Rules for front: + * + * If we are within the mini history buffer, + * memcpy post bytes from the head of the input buffer. + * The caller must chew through the mini history buffer + * until offset bytes passes the history requirement. + * + * Otherwise, resolve pointers to the input buffer, + * moving the memory and length by num history bytes. + */ + inline BuffInfo front(const size_t i) const + { + BuffInfo info; + const tsbe::Buffer &buff = _queues[i].front(); + + if (_history_bytes[i] != 0) + { + const tsbe::Buffer &hist_buff = _history_buffs[i]; + + if (_offset_bytes[i] == 0) + { + info.mem = (char *)hist_buff.get_memory(); + info.len = hist_buff.get_length(); + char *src_mem = ((char *)buff.get_memory()); + char *dst_mem = ((char *)info.mem) + _history_bytes[i]; + std::memcpy(dst_mem, src_mem, _post_bytes[i]); + } + + else if (_offset_bytes[i] < _history_bytes[i]) //caller left us a partial + { + info.mem = (char *)hist_buff.get_memory() + _offset_bytes[i]; + info.len = hist_buff.get_length() - _offset_bytes[i]; + } + } + + else + { + const size_t delta_bytes = _offset_bytes[i] - _history_bytes[i]; + info.mem = ((char *)buff.get_memory()) + delta_bytes; + info.len = buff.get_length() - delta_bytes; + } + + return info; + } + + /*! + * Rules for popping: + * + * If we were operating in a mini history buffer, do nothing. + * Otherwise, check if the input buffer was entirely consumed. + * If so, pop the input buffer, copy the tail end of the buffer + * into the mini history buffer, nd reset the offset condition. + */ + inline void pop(const size_t i, const size_t bytes_consumed) + { + const bool in_history = _offset_bytes[i] < _history_bytes[i]; + _offset_bytes[i] += bytes_consumed; + if (in_history) return; + + //if totally consumed, memcpy history and pop + const tsbe::Buffer &buff = _queues[i].front(); + if (_offset_bytes[i] >= buff.get_length()) + { + if (_history_bytes[i] != 0) + { + char *src_mem = ((char *)buff.get_memory()) + _offset_bytes[i] - _history_bytes[i]; + std::memcpy(_history_buffs[i].get_memory(), src_mem, _history_bytes[i]); + } + _queues[i].pop(); + _bitset.set(i, not _queues[i].empty()); + _offset_bytes[i] = 0; + } + } + + inline void resize(const size_t size) + { + _bitset.resize(size); + _queues.resize(size); + _post_bytes.resize(size, 0); + _offset_bytes.resize(size, 0); + _history_bytes.resize(size, 0); + _history_buffs.resize(size); + } + + inline void push(const size_t i, const tsbe::Buffer &value) + { + _queues[i].push(value); + _bitset.set(i); + } + + inline void flush(const size_t i) + { + _queues[i] = std::queue<tsbe::Buffer>(); + _bitset.reset(i); + } + + size_t size(void) const + { + return _queues.size(); + } + + inline void flush_all(void) + { + const size_t old_size = this->size(); + this->resize(0); + this->resize(old_size); + } + + inline bool ready(const size_t i) const + { + return not _queues[i].empty(); + } + + inline bool empty(const size_t i) const + { + return _queues[i].empty(); + } + + inline bool all_ready(void) const + { + return (~_bitset).none(); + } +}; + +} //namespace gnuradio + +#endif /*INCLUDED_LIBGRAS_BUFFER_IOS_HPP*/ diff --git a/lib/common_impl.hpp b/lib/common_impl.hpp index 61fe45f..40d9676 100644 --- a/lib/common_impl.hpp +++ b/lib/common_impl.hpp @@ -19,11 +19,6 @@ #include <tsbe/buffer.hpp> #include <boost/shared_ptr.hpp> -#include <iostream> - -#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; -#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; -#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;} static inline unsigned long myulround(const double x) { diff --git a/lib/debug_impl.hpp b/lib/debug_impl.hpp new file mode 100644 index 0000000..80c01c7 --- /dev/null +++ b/lib/debug_impl.hpp @@ -0,0 +1,26 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_LIBGRAS_DEBUG_IMPL_HPP +#define INCLUDED_LIBGRAS_DEBUG_IMPL_HPP + +#include <iostream> + +#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; +#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; +#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;} + +#endif /*INCLUDED_LIBGRAS_DEBUG_IMPL_HPP*/ diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 6d49033..9cd21f6 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -17,8 +17,11 @@ #ifndef INCLUDED_LIBGNURADIO_ELEMENT_IMPL_HPP #define INCLUDED_LIBGNURADIO_ELEMENT_IMPL_HPP -#include <common_impl.hpp>//#include "common_impl.hpp" -#include <vector_of_queues.hpp>//#include "vector_of_queues.hpp" +#include <debug_impl.hpp> +#include <common_impl.hpp> +#include <vector_of_queues.hpp> +#include <buffer_ios.hpp> + #include <tsbe/block.hpp> #include <tsbe/topology.hpp> #include <tsbe/executor.hpp> @@ -70,14 +73,6 @@ struct ElementImpl std::vector<size_t> produce_items; std::vector<size_t> consume_items; - //state for partial input buffer consumption - std::vector<size_t> input_buff_offsets; - - //special buffer for dealing with history - std::vector<size_t> input_history_bytes; - std::vector<tsbe::Buffer> history_buffs; - size_t max_history_items; - //track the subscriber counts std::vector<Token> input_tokens; std::vector<Token> output_tokens; @@ -86,7 +81,7 @@ struct ElementImpl std::vector<tsbe::BufferToken> output_buffer_tokens; //buffer queues and ready conditions - VectorOfQueues<tsbe::Buffer> input_queues; + BufferIOs input_queues; VectorOfQueues<tsbe::Buffer> output_queues; //tag tracking |