diff options
author | Josh Blum | 2012-09-09 18:54:49 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-09 18:54:49 -0700 |
commit | 920d654bcf22024fe08c35474fdfcf8df7c72a4d (patch) | |
tree | 46db7e638d8e65b643c0e62ea3b62b73b2127b39 /lib | |
parent | 203950da07d91482a05bd364367d116e0916e198 (diff) | |
download | sandhi-920d654bcf22024fe08c35474fdfcf8df7c72a4d.tar.gz sandhi-920d654bcf22024fe08c35474fdfcf8df7c72a4d.tar.bz2 sandhi-920d654bcf22024fe08c35474fdfcf8df7c72a4d.zip |
work on implementing forecast logic and failure cases
* still a bit to clean up
* commented out case of consume zero and 0 reserve for input queues
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block.cpp | 2 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 7 | ||||
-rw-r--r-- | lib/block_task.cpp | 39 | ||||
-rw-r--r-- | lib/element.cpp | 3 | ||||
-rw-r--r-- | lib/element_impl.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 12 |
7 files changed, 56 insertions, 9 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 4b1e07e..ea70f28 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -41,6 +41,7 @@ Block::Block(const std::string &name): config.changed_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1); (*this)->block = tsbe::Block(config); + (*this)->forecast_fail = false; (*this)->block_ptr = this; (*this)->hint = 0; (*this)->block_state = ElementImpl::BLOCK_STATE_INIT; @@ -205,6 +206,7 @@ void Block::forecast( int noutput_items, std::vector<int> &ninput_items_required ){ + if (not (*this)->enable_fixed_rate) return; for (size_t i = 0; i < ninput_items_required.size(); i++) { ninput_items_required[i] = diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 0741aa0..73dc20a 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -47,7 +47,7 @@ void ElementImpl::handle_block_msg( //clearly, this block is near death, hang on sparky if (msg.type() == typeid(CheckTokensMessage)) { - if (this->input_queues.all_ready()) + if (this->input_queues.all_ready() and not this->forecast_fail) { this->handle_task(task_iface); } @@ -154,10 +154,9 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) std::vector<size_t> input_multiple_items(num_inputs, 1); for (size_t i = 0; i < num_inputs; i++) { - if (num_outputs == 0 or not this->enable_fixed_rate) continue; //TODO, this is a little cheap, we only look at output multiple [0] - const size_t multiple = this->output_multiple_items.front(); - input_multiple_items[i] = myulround(multiple/this->relative_rate); + const size_t multiple = (num_outputs)?this->output_multiple_items.front():1; + input_multiple_items[i] = size_t(std::ceil(multiple/this->relative_rate)); if (input_multiple_items[i] == 0) input_multiple_items[i] = 1; } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 872e8f8..d3653b5 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -82,7 +82,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->input_queues.all_ready() and this->output_queues.all_ready() )) return; - //std::cout << "=== calling work on " << name << " ===" << std::endl; + if (WORK) std::cout << "=== calling work on " << name << " ===" << std::endl; const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -154,15 +154,50 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ + forecast_again_you_jerk: if (not this->enable_fixed_rate) { block_ptr->forecast(num_output_items, work_ninput_items); + for (size_t i = 0; i < num_inputs; i++) + { + if (size_t(work_ninput_items[i]) > this->input_items[i].size()) + { + for (size_t j = 0; j < num_inputs; j++) + { + work_ninput_items[j] = this->input_items[j]._len; + } + num_output_items = num_output_items/2; + if (num_output_items == 0) + { + this->forecast_fail = true; + //TODO FIXME this logic is totally duplicated from the bottom! + //re-use some common code + { + if (num_inputs != 0 and input_tokens_count == num_inputs) + { + this->block.post_msg(CheckTokensMessage()); + return; + } + if (this->input_queues.all_ready() and this->output_queues.all_ready()) + { + this->block.post_msg(SelfKickMessage()); + return; + } + } + return; + } + goto forecast_again_you_jerk; + } + } } + this->forecast_fail = false; //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - work_noutput_items = std::min(num_output_items, myulround((num_input_items)*this->relative_rate)); + work_noutput_items = num_output_items; + if (this->enable_fixed_rate) work_noutput_items = std::min( + work_noutput_items, myulround((num_input_items)*this->relative_rate)); const int ret = block_ptr->Work(this->input_items, this->output_items); const size_t noutput_items = size_t(ret); diff --git a/lib/element.cpp b/lib/element.cpp index 83b07b2..cdb856e 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -30,10 +30,11 @@ Element::Element(void) Element::Element(const std::string &name) { this->reset(new ElementImpl()); - if (GENESIS) std::cout << "New element: " << name << std::endl; (*this)->name = name; (*this)->unique_id = ++unique_id_pool; + if (GENESIS) std::cout << "New element: " << name << std::endl; + //default io signature to something IOSignature sig; sig.push_back(1); this->set_input_signature(sig); diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 6ecbbbd..991ab4e 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -128,6 +128,7 @@ struct ElementImpl //rate settings bool enable_fixed_rate; double relative_rate; + bool forecast_fail; }; } //namespace gnuradio diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index 9f03efa..210ac7a 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -23,6 +23,7 @@ #define GENESIS 0 #define ARMAGEDDON 0 #define MESSAGE 0 +#define WORK 0 #define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; #define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index d1bb692..2066809 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -52,6 +52,7 @@ struct BufferWOffset struct BuffInfo { + BuffInfo(void): mem(NULL), len(0){} void *mem; size_t len; }; @@ -148,6 +149,7 @@ struct InputBufferQueues std::vector<std::deque<BufferWOffset> > _queues; std::vector<size_t> _history_bytes; std::vector<size_t> _reserve_bytes; + std::vector<size_t> _multiple_bytes; std::vector<size_t> _post_bytes; std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; }; @@ -160,6 +162,7 @@ inline void InputBufferQueues::resize(const size_t size) _queues.resize(size); _history_bytes.resize(size, 0); _reserve_bytes.resize(size, 0); + _multiple_bytes.resize(size, 0); _post_bytes.resize(size, 0); _aux_queues.resize(size); } @@ -183,6 +186,7 @@ inline void InputBufferQueues::init( //determine byte sizes for buffers and dealing with history _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; _reserve_bytes[i] = input_item_sizes[i]*input_multiple_items[i]; + _multiple_bytes[i] = std::max(size_t(1), _reserve_bytes[i]); _post_bytes[i] = input_item_sizes[i]*max_history_items; _post_bytes[i] = std::max(_post_bytes[i], _reserve_bytes[i]); @@ -209,6 +213,8 @@ inline void InputBufferQueues::init( inline BuffInfo InputBufferQueues::front(const size_t i) { + //if (_queues[i].empty()) return BuffInfo(); + ASSERT(not _queues[i].empty()); ASSERT(this->ready(i)); __prepare(i); @@ -217,8 +223,8 @@ inline BuffInfo InputBufferQueues::front(const size_t i) BuffInfo info; info.mem = front.mem_offset() - _history_bytes[i]; info.len = front.length; - info.len /= _reserve_bytes[i]; - info.len *= _reserve_bytes[i]; + info.len /= _multiple_bytes[i]; + info.len *= _multiple_bytes[i]; return info; } @@ -273,6 +279,8 @@ inline void InputBufferQueues::__prepare(const size_t i) inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { + //if (bytes_consumed == 0) return true; + //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); |