summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-13 22:25:59 -0700
committerJosh Blum2012-09-13 22:25:59 -0700
commitc1f4b98d88eebf1fdf408f87e96868fdf71222b9 (patch)
tree60f047318fe57301e12d94904cbc59f557ed5a76 /lib
parent59e5dd66dfa80709eefc31fbd938095cbc0ded3a (diff)
parentcae524fdf1da052cd70f3a872a8db4e80f202504 (diff)
downloadsandhi-c1f4b98d88eebf1fdf408f87e96868fdf71222b9.tar.gz
sandhi-c1f4b98d88eebf1fdf408f87e96868fdf71222b9.tar.bz2
sandhi-c1f4b98d88eebf1fdf408f87e96868fdf71222b9.zip
Merge branch 'filter_work'
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp12
-rw-r--r--lib/block_handlers.cpp1
-rw-r--r--lib/block_task.cpp8
-rw-r--r--lib/element_impl.hpp1
-rw-r--r--lib/gras_impl/debug.hpp46
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp28
6 files changed, 82 insertions, 14 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 39939cb..406e16b 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -28,7 +28,7 @@ Block::Block(void)
Block::Block(const std::string &name):
Element(name)
{
- this->set_history(0);
+ this->set_input_history(0);
this->set_output_multiple(1);
this->set_fixed_rate(true);
this->set_relative_rate(1.0);
@@ -67,15 +67,13 @@ typename V::value_type vector_get(const V &v, const size_t index)
return v[index];
}
-size_t Block::history(const size_t which_input) const
+size_t Block::input_history(const size_t which_input) const
{
return vector_get((*this)->input_history_items, which_input);
}
-void Block::set_history(const size_t history_, const size_t which_input)
+void Block::set_input_history(const size_t history, const size_t which_input)
{
- //FIXME why is history - 1 (gnuradio loves this)
- const size_t history = (history_ == 0)? 0 : history_-1;
vector_set((*this)->input_history_items, history, which_input);
}
@@ -92,6 +90,7 @@ void Block::set_output_multiple(const size_t multiple, const size_t which_output
void Block::consume(const size_t which_input, const size_t how_many_items)
{
(*this)->consume_items[which_input] = how_many_items;
+ (*this)->consume_called[which_input] = true;
}
void Block::consume_each(const size_t how_many_items)
@@ -99,6 +98,7 @@ void Block::consume_each(const size_t how_many_items)
for (size_t i = 0; i < (*this)->consume_items.size(); i++)
{
(*this)->consume_items[i] = how_many_items;
+ (*this)->consume_called[i] = true;
}
}
@@ -114,7 +114,7 @@ void Block::set_input_inline(const size_t which_input, const bool enb)
bool Block::input_inline(const size_t which_input) const
{
- return (*this)->input_inline_enables[which_input];
+ return vector_get((*this)->input_inline_enables, which_input);
}
void Block::set_fixed_rate(const bool fixed_rate)
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 2cb167f..243d158 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -166,6 +166,7 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
this->input_items.resize(num_inputs);
this->output_items.resize(num_outputs);
this->consume_items.resize(num_inputs, 0);
+ this->consume_called.resize(num_inputs, false);
this->produce_items.resize(num_outputs, 0);
this->input_queues.resize(num_inputs);
this->output_queues.resize(num_outputs);
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 114c073..a9b88bd 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -73,6 +73,10 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
{
+ #ifdef WORK_DEBUG
+ WorkDebugPrinter(this->name);
+ #endif
+
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
//-- Handle task may get called for incoming buffers,
@@ -83,7 +87,6 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->input_queues.all_ready() and
this->output_queues.all_ready()
)) return;
- if (WORK) std::cout << "=== calling work on " << name << " ===" << std::endl;
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
@@ -124,6 +127,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
num_input_items = std::min(num_input_items, items);
+ this->consume_called[i] = false;
//inline dealings, how and when input buffers can be inlined into output buffers
//TODO, check that the buff.get_affinity() matches this block or we dont inline
@@ -223,7 +227,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
for (size_t i = 0; i < num_inputs; i++)
{
ASSERT(enable_fixed_rate or work_ret != Block::WORK_CALLED_PRODUCE);
- const size_t items = (enable_fixed_rate)? (myulround((noutput_items/this->relative_rate))) : this->consume_items[i];
+ const size_t items = (this->consume_called[i])? this->consume_items[i] : (myulround((noutput_items/this->relative_rate)));
this->consume_items[i] = 0;
this->items_consumed[i] += items;
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 556fbd1..0ec4650 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -87,6 +87,7 @@ struct ElementImpl
//track work's calls to produce and consume
std::vector<size_t> produce_items;
std::vector<size_t> consume_items;
+ std::vector<bool> consume_called;
//track the subscriber counts
std::vector<Token> input_tokens;
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index c235c1f..69f7b4a 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -20,14 +20,28 @@
#include <iostream>
#include <stdexcept>
+//----------------------------------------------------------------------
+//-- set to 1 to enable these debugs:
+//----------------------------------------------------------------------
#define GENESIS 0
#define ARMAGEDDON 0
#define MESSAGE 0
-#define WORK 0
-#define ASSERTING 0
+//----------------------------------------------------------------------
+//-- define to enable these debugs:
+//----------------------------------------------------------------------
+//#define WORK_DEBUG
+#define ASSERTING
+
+//----------------------------------------------------------------------
+//-- various debug prints
+//----------------------------------------------------------------------
#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush;
#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
+
+//----------------------------------------------------------------------
+//-- implementation for assert debug
+//----------------------------------------------------------------------
#ifdef ASSERTING
#define ASSERT(x) {if(not (x)) \
{ \
@@ -38,4 +52,32 @@
#define ASSERT(x)
#endif
+//----------------------------------------------------------------------
+//-- implementation for work debug
+//----------------------------------------------------------------------
+#ifdef WORK_DEBUG
+
+#include <boost/thread/mutex.hpp>
+
+static boost::mutex work_debug_mutex;
+
+struct WorkDebugPrinter
+{
+ WorkDebugPrinter(const std::string &name):
+ lock(work_debug_mutex), name(name)
+ {
+ std::cout << "-----> begin work on " << name << std::endl;
+ }
+
+ ~WorkDebugPrinter(void)
+ {
+ std::cout << "<----- end work on " << name << std::endl;
+ }
+
+ boost::mutex::scoped_lock lock;
+ std::string name;
+};
+
+#endif
+
#endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 2188509..9e7bf63 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -124,6 +124,7 @@ struct InputBufferQueues
std::vector<size_t> _multiple_bytes;
std::vector<size_t> _post_bytes;
std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
+ std::vector<bool> _in_hist_buff;
};
@@ -137,6 +138,7 @@ inline void InputBufferQueues::resize(const size_t size)
_multiple_bytes.resize(size, 0);
_post_bytes.resize(size, 0);
_aux_queues.resize(size);
+ _in_hist_buff.resize(size, false);
}
@@ -157,10 +159,15 @@ inline void InputBufferQueues::init(
//determine byte sizes for buffers and dealing with history
_history_bytes[i] = input_item_sizes[i]*input_history_items[i];
+
+ //calculate the input multiple aka reserve size
_reserve_bytes[i] = input_item_sizes[i]*input_multiple_items[i];
_multiple_bytes[i] = std::max(size_t(1), _reserve_bytes[i]);
+
+ //post bytes are the desired buffer size to escape the edge case
_post_bytes[i] = input_item_sizes[i]*max_history_items;
_post_bytes[i] = std::max(_post_bytes[i], _reserve_bytes[i]);
+ _post_bytes[i] += input_item_sizes[i]; //pad for round down issues
//allocate mini buffers for history edge conditions
size_t num_bytes = _history_bytes[i] + _post_bytes[i];
@@ -179,6 +186,7 @@ inline void InputBufferQueues::init(
buff.length = 0;
_queues[i].push_front(buff);
+ _in_hist_buff[i] = true;
}
}
}
@@ -191,6 +199,7 @@ inline SBuffer InputBufferQueues::front(const size_t i, bool &potential_inline)
ASSERT(not _queues[i].empty());
ASSERT(this->ready(i));
__prepare(i);
+ ASSERT(_queues[i].front().offset >= _history_bytes[i]);
SBuffer &front = _queues[i].front();
const bool unique = front.unique();
@@ -233,6 +242,7 @@ inline void InputBufferQueues::__prepare(const size_t i)
hist_bytes = _history_bytes[i];
dst.offset = hist_bytes;
dst.length = 0;
+ _in_hist_buff[i] = true;
}
SBuffer src = _queues[i].front();
@@ -263,9 +273,6 @@ inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consum
//assert that we dont consume past the bounds of the buffer
ASSERT(_queues[i].front().length >= bytes_consumed);
- //this is an optimization
- const bool minibuff = (_history_bytes[i] != 0) and (_queues[i].front().offset == _history_bytes[i]) and (bytes_consumed == _post_bytes[i]);
-
//update bounds on the current buffer
_queues[i].front().offset += bytes_consumed;
_queues[i].front().length -= bytes_consumed;
@@ -276,12 +283,25 @@ inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consum
_queues[i].pop_front();
}
+ //otherwise, see if this is a mini history buff we can pop
+ else if (_in_hist_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i])
+ {
+ const size_t residual = _queues[i].front().length;
+ _queues[i].pop_front();
+ _in_hist_buff[i] = false;
+ ASSERT(not _queues[i].empty());
+ ASSERT(_queues[i].front().offset > residual);
+ _queues[i].front().offset -= residual;
+ _queues[i].front().length += residual;
+ ASSERT(_queues[i].front().offset >= _history_bytes[i]);
+ }
+
//update the number of bytes in this queue
ASSERT(_enqueued_bytes[i] >= bytes_consumed);
_enqueued_bytes[i] -= bytes_consumed;
__update(i);
- return not minibuff; //not true on minibuff
+ return not _in_hist_buff[i];
}
} //namespace gnuradio