summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-09 18:54:49 -0700
committerJosh Blum2012-09-09 18:54:49 -0700
commit920d654bcf22024fe08c35474fdfcf8df7c72a4d (patch)
tree46db7e638d8e65b643c0e62ea3b62b73b2127b39 /lib
parent203950da07d91482a05bd364367d116e0916e198 (diff)
downloadsandhi-920d654bcf22024fe08c35474fdfcf8df7c72a4d.tar.gz
sandhi-920d654bcf22024fe08c35474fdfcf8df7c72a4d.tar.bz2
sandhi-920d654bcf22024fe08c35474fdfcf8df7c72a4d.zip
work on implementing forecast logic and failure cases
* still a bit to clean up * commented out case of consume zero and 0 reserve for input queues
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp2
-rw-r--r--lib/block_handlers.cpp7
-rw-r--r--lib/block_task.cpp39
-rw-r--r--lib/element.cpp3
-rw-r--r--lib/element_impl.hpp1
-rw-r--r--lib/gras_impl/debug.hpp1
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp12
7 files changed, 56 insertions, 9 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 4b1e07e..ea70f28 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -41,6 +41,7 @@ Block::Block(const std::string &name):
config.changed_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1);
(*this)->block = tsbe::Block(config);
+ (*this)->forecast_fail = false;
(*this)->block_ptr = this;
(*this)->hint = 0;
(*this)->block_state = ElementImpl::BLOCK_STATE_INIT;
@@ -205,6 +206,7 @@ void Block::forecast(
int noutput_items,
std::vector<int> &ninput_items_required
){
+ if (not (*this)->enable_fixed_rate) return;
for (size_t i = 0; i < ninput_items_required.size(); i++)
{
ninput_items_required[i] =
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 0741aa0..73dc20a 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -47,7 +47,7 @@ void ElementImpl::handle_block_msg(
//clearly, this block is near death, hang on sparky
if (msg.type() == typeid(CheckTokensMessage))
{
- if (this->input_queues.all_ready())
+ if (this->input_queues.all_ready() and not this->forecast_fail)
{
this->handle_task(task_iface);
}
@@ -154,10 +154,9 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
std::vector<size_t> input_multiple_items(num_inputs, 1);
for (size_t i = 0; i < num_inputs; i++)
{
- if (num_outputs == 0 or not this->enable_fixed_rate) continue;
//TODO, this is a little cheap, we only look at output multiple [0]
- const size_t multiple = this->output_multiple_items.front();
- input_multiple_items[i] = myulround(multiple/this->relative_rate);
+ const size_t multiple = (num_outputs)?this->output_multiple_items.front():1;
+ input_multiple_items[i] = size_t(std::ceil(multiple/this->relative_rate));
if (input_multiple_items[i] == 0) input_multiple_items[i] = 1;
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 872e8f8..d3653b5 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -82,7 +82,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->input_queues.all_ready() and
this->output_queues.all_ready()
)) return;
- //std::cout << "=== calling work on " << name << " ===" << std::endl;
+ if (WORK) std::cout << "=== calling work on " << name << " ===" << std::endl;
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
@@ -154,15 +154,50 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//------------------------------------------------------------------
//-- forecast
//------------------------------------------------------------------
+ forecast_again_you_jerk:
if (not this->enable_fixed_rate)
{
block_ptr->forecast(num_output_items, work_ninput_items);
+ for (size_t i = 0; i < num_inputs; i++)
+ {
+ if (size_t(work_ninput_items[i]) > this->input_items[i].size())
+ {
+ for (size_t j = 0; j < num_inputs; j++)
+ {
+ work_ninput_items[j] = this->input_items[j]._len;
+ }
+ num_output_items = num_output_items/2;
+ if (num_output_items == 0)
+ {
+ this->forecast_fail = true;
+ //TODO FIXME this logic is totally duplicated from the bottom!
+ //re-use some common code
+ {
+ if (num_inputs != 0 and input_tokens_count == num_inputs)
+ {
+ this->block.post_msg(CheckTokensMessage());
+ return;
+ }
+ if (this->input_queues.all_ready() and this->output_queues.all_ready())
+ {
+ this->block.post_msg(SelfKickMessage());
+ return;
+ }
+ }
+ return;
+ }
+ goto forecast_again_you_jerk;
+ }
+ }
}
+ this->forecast_fail = false;
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
- work_noutput_items = std::min(num_output_items, myulround((num_input_items)*this->relative_rate));
+ work_noutput_items = num_output_items;
+ if (this->enable_fixed_rate) work_noutput_items = std::min(
+ work_noutput_items, myulround((num_input_items)*this->relative_rate));
const int ret = block_ptr->Work(this->input_items, this->output_items);
const size_t noutput_items = size_t(ret);
diff --git a/lib/element.cpp b/lib/element.cpp
index 83b07b2..cdb856e 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -30,10 +30,11 @@ Element::Element(void)
Element::Element(const std::string &name)
{
this->reset(new ElementImpl());
- if (GENESIS) std::cout << "New element: " << name << std::endl;
(*this)->name = name;
(*this)->unique_id = ++unique_id_pool;
+ if (GENESIS) std::cout << "New element: " << name << std::endl;
+
//default io signature to something
IOSignature sig; sig.push_back(1);
this->set_input_signature(sig);
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 6ecbbbd..991ab4e 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -128,6 +128,7 @@ struct ElementImpl
//rate settings
bool enable_fixed_rate;
double relative_rate;
+ bool forecast_fail;
};
} //namespace gnuradio
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index 9f03efa..210ac7a 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -23,6 +23,7 @@
#define GENESIS 0
#define ARMAGEDDON 0
#define MESSAGE 0
+#define WORK 0
#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush;
#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index d1bb692..2066809 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -52,6 +52,7 @@ struct BufferWOffset
struct BuffInfo
{
+ BuffInfo(void): mem(NULL), len(0){}
void *mem;
size_t len;
};
@@ -148,6 +149,7 @@ struct InputBufferQueues
std::vector<std::deque<BufferWOffset> > _queues;
std::vector<size_t> _history_bytes;
std::vector<size_t> _reserve_bytes;
+ std::vector<size_t> _multiple_bytes;
std::vector<size_t> _post_bytes;
std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
};
@@ -160,6 +162,7 @@ inline void InputBufferQueues::resize(const size_t size)
_queues.resize(size);
_history_bytes.resize(size, 0);
_reserve_bytes.resize(size, 0);
+ _multiple_bytes.resize(size, 0);
_post_bytes.resize(size, 0);
_aux_queues.resize(size);
}
@@ -183,6 +186,7 @@ inline void InputBufferQueues::init(
//determine byte sizes for buffers and dealing with history
_history_bytes[i] = input_item_sizes[i]*input_history_items[i];
_reserve_bytes[i] = input_item_sizes[i]*input_multiple_items[i];
+ _multiple_bytes[i] = std::max(size_t(1), _reserve_bytes[i]);
_post_bytes[i] = input_item_sizes[i]*max_history_items;
_post_bytes[i] = std::max(_post_bytes[i], _reserve_bytes[i]);
@@ -209,6 +213,8 @@ inline void InputBufferQueues::init(
inline BuffInfo InputBufferQueues::front(const size_t i)
{
+ //if (_queues[i].empty()) return BuffInfo();
+
ASSERT(not _queues[i].empty());
ASSERT(this->ready(i));
__prepare(i);
@@ -217,8 +223,8 @@ inline BuffInfo InputBufferQueues::front(const size_t i)
BuffInfo info;
info.mem = front.mem_offset() - _history_bytes[i];
info.len = front.length;
- info.len /= _reserve_bytes[i];
- info.len *= _reserve_bytes[i];
+ info.len /= _multiple_bytes[i];
+ info.len *= _multiple_bytes[i];
return info;
}
@@ -273,6 +279,8 @@ inline void InputBufferQueues::__prepare(const size_t i)
inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
{
+ //if (bytes_consumed == 0) return true;
+
//assert that we dont consume past the bounds of the buffer
ASSERT(_queues[i].front().length >= bytes_consumed);