summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-02 17:09:46 -0700
committerJosh Blum2012-09-02 17:09:46 -0700
commit591905b3a33982a46e8c956d4ca226bb4a241b54 (patch)
tree897a168667fa8048e816a131428f81c830892942 /lib
parent814be255443ef4ada223375263ee3d2ad43540c1 (diff)
downloadsandhi-591905b3a33982a46e8c956d4ca226bb4a241b54.tar.gz
sandhi-591905b3a33982a46e8c956d4ca226bb4a241b54.tar.bz2
sandhi-591905b3a33982a46e8c956d4ca226bb4a241b54.zip
created history implementation (untested)
Diffstat (limited to 'lib')
-rw-r--r--lib/block_handlers.cpp27
-rw-r--r--lib/block_task.cpp24
-rw-r--r--lib/buffer_ios.hpp201
-rw-r--r--lib/common_impl.hpp5
-rw-r--r--lib/debug_impl.hpp26
-rw-r--r--lib/element_impl.hpp17
6 files changed, 241 insertions, 59 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 396416d..4e6b732 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -113,7 +113,6 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
this->output_items.resize(num_outputs);
this->consume_items.resize(num_inputs, 0);
this->produce_items.resize(num_outputs, 0);
- this->input_buff_offsets.resize(num_inputs, 0);
this->input_queues.resize(num_inputs);
this->output_queues.resize(num_outputs);
@@ -125,30 +124,8 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
this->input_tags.resize(num_inputs);
this->output_tags.resize(num_outputs);
- //determine max history length for allocation below
- this->max_history_items = *std::max_element(
- this->input_history_items.begin(),
- this->input_history_items.end()
- );
-
- //resize and clear that initial history
- this->history_buffs.resize(num_inputs);
- this->input_history_bytes.resize(num_inputs);
- for (size_t i = 0; i < num_inputs; i++)
- {
- tsbe::Buffer &buff = this->history_buffs[i];
- const size_t num_items = this->input_history_items[i] + this->max_history_items;
- const size_t num_bytes = this->input_items_sizes[i]*num_items;
- this->input_history_bytes[i] = num_bytes;
- if (not buff or buff.get_length() != num_bytes)
- {
- tsbe::BufferConfig config;
- config.memory = NULL;
- config.length = num_bytes;
- buff = tsbe::Buffer(config);
- std::memset(buff.get_memory(), 0, buff.get_length());
- }
- }
+ //init the history comprehension on input queues
+ this->input_queues.init(this->input_history_items, this->input_items_sizes);
//TODO: think more about this:
if (num_inputs == 0 and num_outputs == 0)
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index a84ef6a..88ca5cf 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -94,19 +94,14 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
input_tokens_count += this->input_tokens[i].use_count();
- ASSERT(this->input_history_items[i] == 0);
-
ASSERT(this->input_queues.ready(i));
- const tsbe::Buffer &buff = this->input_queues.front(i);
- ASSERT(this->input_buff_offsets[i] < buff.get_length());
- char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i];
- const size_t bytes = buff.get_length() - this->input_buff_offsets[i];
- const size_t items = bytes/this->input_items_sizes[i];
+ const BuffInfo info = this->input_queues.front(i);
+ const size_t items = info.len/this->input_items_sizes[i];
- this->work_io_ptr_mask |= ptrdiff_t(mem);
- this->input_items[i]._mem = mem;
+ this->work_io_ptr_mask |= ptrdiff_t(info.mem);
+ this->input_items[i]._mem = info.mem;
this->input_items[i]._len = items;
- this->work_input_items[i] = mem;
+ this->work_input_items[i] = info.mem;
this->work_ninput_items[i] = items;
}
@@ -172,14 +167,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
- this->input_buff_offsets[i] += bytes;
- tsbe::Buffer &buff = this->input_queues.front(i);
-
- if (buff.get_length() <= this->input_buff_offsets[i])
- {
- this->input_queues.pop(i);
- this->input_buff_offsets[i] = 0;
- }
+ this->input_queues.pop(i, bytes);
}
//------------------------------------------------------------------
diff --git a/lib/buffer_ios.hpp b/lib/buffer_ios.hpp
new file mode 100644
index 0000000..2a74ba7
--- /dev/null
+++ b/lib/buffer_ios.hpp
@@ -0,0 +1,201 @@
+//
+// Copyright 2012 Josh Blum
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
+
+#ifndef INCLUDED_LIBGRAS_BUFFER_IOS_HPP
+#define INCLUDED_LIBGRAS_BUFFER_IOS_HPP
+
+#include <debug_impl.hpp>
+#include <tsbe/buffer.hpp>
+#include <boost/dynamic_bitset.hpp>
+#include <vector>
+#include <queue>
+#include <cstring> //memcpy/memset
+
+namespace gnuradio
+{
+
+struct BuffInfo
+{
+ void *mem;
+ size_t len;
+};
+
+struct BufferIOs
+{
+ boost::dynamic_bitset<> _bitset;
+ std::vector<std::queue<tsbe::Buffer> > _queues;
+ std::vector<size_t> _offset_bytes;
+ std::vector<size_t> _history_bytes;
+ std::vector<size_t> _post_bytes;
+ std::vector<tsbe::Buffer> _history_buffs;
+
+ template <typename V>
+ inline void init(
+ const V &input_history_items,
+ const V &input_item_sizes
+ ){
+ if (this->size() == 0) return;
+
+ const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
+
+ for (size_t i = 0; i < this->size(); i++)
+ {
+ //determine byte sizes for buffers and dealing with history
+ _history_bytes[i] = input_item_sizes[i]*input_history_items[i];
+ _post_bytes[i] = input_item_sizes[i]*max_history_items;
+
+ //allocate mini buffer for history edge conditions
+ const size_t num_bytes = _history_bytes[i] + _post_bytes[i];
+ tsbe::Buffer &buff = _history_buffs[i];
+ if (not buff or buff.get_length() != num_bytes)
+ {
+ tsbe::BufferConfig config;
+ config.memory = NULL;
+ config.length = num_bytes;
+ buff = tsbe::Buffer(config);
+ std::memset(buff.get_memory(), 0, buff.get_length());
+ }
+ }
+ }
+
+ /*!
+ * Rules for front:
+ *
+ * If we are within the mini history buffer,
+ * memcpy post bytes from the head of the input buffer.
+ * The caller must chew through the mini history buffer
+ * until offset bytes passes the history requirement.
+ *
+ * Otherwise, resolve pointers to the input buffer,
+ * moving the memory and length by num history bytes.
+ */
+ inline BuffInfo front(const size_t i) const
+ {
+ BuffInfo info;
+ const tsbe::Buffer &buff = _queues[i].front();
+
+ if (_history_bytes[i] != 0)
+ {
+ const tsbe::Buffer &hist_buff = _history_buffs[i];
+
+ if (_offset_bytes[i] == 0)
+ {
+ info.mem = (char *)hist_buff.get_memory();
+ info.len = hist_buff.get_length();
+ char *src_mem = ((char *)buff.get_memory());
+ char *dst_mem = ((char *)info.mem) + _history_bytes[i];
+ std::memcpy(dst_mem, src_mem, _post_bytes[i]);
+ }
+
+ else if (_offset_bytes[i] < _history_bytes[i]) //caller left us a partial
+ {
+ info.mem = (char *)hist_buff.get_memory() + _offset_bytes[i];
+ info.len = hist_buff.get_length() - _offset_bytes[i];
+ }
+ }
+
+ else
+ {
+ const size_t delta_bytes = _offset_bytes[i] - _history_bytes[i];
+ info.mem = ((char *)buff.get_memory()) + delta_bytes;
+ info.len = buff.get_length() - delta_bytes;
+ }
+
+ return info;
+ }
+
+ /*!
+ * Rules for popping:
+ *
+ * If we were operating in a mini history buffer, do nothing.
+ * Otherwise, check if the input buffer was entirely consumed.
+ * If so, pop the input buffer, copy the tail end of the buffer
+ * into the mini history buffer, nd reset the offset condition.
+ */
+ inline void pop(const size_t i, const size_t bytes_consumed)
+ {
+ const bool in_history = _offset_bytes[i] < _history_bytes[i];
+ _offset_bytes[i] += bytes_consumed;
+ if (in_history) return;
+
+ //if totally consumed, memcpy history and pop
+ const tsbe::Buffer &buff = _queues[i].front();
+ if (_offset_bytes[i] >= buff.get_length())
+ {
+ if (_history_bytes[i] != 0)
+ {
+ char *src_mem = ((char *)buff.get_memory()) + _offset_bytes[i] - _history_bytes[i];
+ std::memcpy(_history_buffs[i].get_memory(), src_mem, _history_bytes[i]);
+ }
+ _queues[i].pop();
+ _bitset.set(i, not _queues[i].empty());
+ _offset_bytes[i] = 0;
+ }
+ }
+
+ inline void resize(const size_t size)
+ {
+ _bitset.resize(size);
+ _queues.resize(size);
+ _post_bytes.resize(size, 0);
+ _offset_bytes.resize(size, 0);
+ _history_bytes.resize(size, 0);
+ _history_buffs.resize(size);
+ }
+
+ inline void push(const size_t i, const tsbe::Buffer &value)
+ {
+ _queues[i].push(value);
+ _bitset.set(i);
+ }
+
+ inline void flush(const size_t i)
+ {
+ _queues[i] = std::queue<tsbe::Buffer>();
+ _bitset.reset(i);
+ }
+
+ size_t size(void) const
+ {
+ return _queues.size();
+ }
+
+ inline void flush_all(void)
+ {
+ const size_t old_size = this->size();
+ this->resize(0);
+ this->resize(old_size);
+ }
+
+ inline bool ready(const size_t i) const
+ {
+ return not _queues[i].empty();
+ }
+
+ inline bool empty(const size_t i) const
+ {
+ return _queues[i].empty();
+ }
+
+ inline bool all_ready(void) const
+ {
+ return (~_bitset).none();
+ }
+};
+
+} //namespace gnuradio
+
+#endif /*INCLUDED_LIBGRAS_BUFFER_IOS_HPP*/
diff --git a/lib/common_impl.hpp b/lib/common_impl.hpp
index 61fe45f..40d9676 100644
--- a/lib/common_impl.hpp
+++ b/lib/common_impl.hpp
@@ -19,11 +19,6 @@
#include <tsbe/buffer.hpp>
#include <boost/shared_ptr.hpp>
-#include <iostream>
-
-#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush;
-#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
-#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;}
static inline unsigned long myulround(const double x)
{
diff --git a/lib/debug_impl.hpp b/lib/debug_impl.hpp
new file mode 100644
index 0000000..80c01c7
--- /dev/null
+++ b/lib/debug_impl.hpp
@@ -0,0 +1,26 @@
+//
+// Copyright 2012 Josh Blum
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
+
+#ifndef INCLUDED_LIBGRAS_DEBUG_IMPL_HPP
+#define INCLUDED_LIBGRAS_DEBUG_IMPL_HPP
+
+#include <iostream>
+
+#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush;
+#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
+#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;}
+
+#endif /*INCLUDED_LIBGRAS_DEBUG_IMPL_HPP*/
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 6d49033..9cd21f6 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -17,8 +17,11 @@
#ifndef INCLUDED_LIBGNURADIO_ELEMENT_IMPL_HPP
#define INCLUDED_LIBGNURADIO_ELEMENT_IMPL_HPP
-#include <common_impl.hpp>//#include "common_impl.hpp"
-#include <vector_of_queues.hpp>//#include "vector_of_queues.hpp"
+#include <debug_impl.hpp>
+#include <common_impl.hpp>
+#include <vector_of_queues.hpp>
+#include <buffer_ios.hpp>
+
#include <tsbe/block.hpp>
#include <tsbe/topology.hpp>
#include <tsbe/executor.hpp>
@@ -70,14 +73,6 @@ struct ElementImpl
std::vector<size_t> produce_items;
std::vector<size_t> consume_items;
- //state for partial input buffer consumption
- std::vector<size_t> input_buff_offsets;
-
- //special buffer for dealing with history
- std::vector<size_t> input_history_bytes;
- std::vector<tsbe::Buffer> history_buffs;
- size_t max_history_items;
-
//track the subscriber counts
std::vector<Token> input_tokens;
std::vector<Token> output_tokens;
@@ -86,7 +81,7 @@ struct ElementImpl
std::vector<tsbe::BufferToken> output_buffer_tokens;
//buffer queues and ready conditions
- VectorOfQueues<tsbe::Buffer> input_queues;
+ BufferIOs input_queues;
VectorOfQueues<tsbe::Buffer> output_queues;
//tag tracking