diff options
-rw-r--r-- | TODO.txt | 14 | ||||
-rw-r--r-- | lib/block.cpp | 2 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 7 | ||||
-rw-r--r-- | lib/block_task.cpp | 39 | ||||
-rw-r--r-- | lib/element.cpp | 3 | ||||
-rw-r--r-- | lib/element_impl.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 12 |
8 files changed, 64 insertions, 15 deletions
@@ -3,19 +3,21 @@ ######################################################################## * handle forecast -* handle items multiple -** inform upstream of multiple requirements inposed on input buffs +** not 100% sure here +* add hooks to specify input reserve +** automatically calculate from output multiple and rel rate +* sources need their own interruptible thread +** udp source +** uhd source +* gr stream mux is on drugs +* inform upstream of multiple requirements inposed on input buffs * handle calculating noutputitems using ninputs as a constraint -* handle history -** test implementation * allocation ** hooks for advanced allocation ** intelligent sizing of buffers ** communicate upstream requirements? ** communicate downstream requirements? -* python wrappers for hier_block and top_block * python wrapper for block will come from grextras -* python wrapper should set weak_self somehow * bring in numanuma ** thread prio ** thread affinity diff --git a/lib/block.cpp b/lib/block.cpp index 4b1e07e..ea70f28 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -41,6 +41,7 @@ Block::Block(const std::string &name): config.changed_callback = boost::bind(&ElementImpl::topology_update, this->get(), _1); (*this)->block = tsbe::Block(config); + (*this)->forecast_fail = false; (*this)->block_ptr = this; (*this)->hint = 0; (*this)->block_state = ElementImpl::BLOCK_STATE_INIT; @@ -205,6 +206,7 @@ void Block::forecast( int noutput_items, std::vector<int> &ninput_items_required ){ + if (not (*this)->enable_fixed_rate) return; for (size_t i = 0; i < ninput_items_required.size(); i++) { ninput_items_required[i] = diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 0741aa0..73dc20a 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -47,7 +47,7 @@ void ElementImpl::handle_block_msg( //clearly, this block is near death, hang on sparky if (msg.type() == typeid(CheckTokensMessage)) { - if (this->input_queues.all_ready()) + if (this->input_queues.all_ready() and not this->forecast_fail) { this->handle_task(task_iface); } @@ -154,10 +154,9 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) std::vector<size_t> input_multiple_items(num_inputs, 1); for (size_t i = 0; i < num_inputs; i++) { - if (num_outputs == 0 or not this->enable_fixed_rate) continue; //TODO, this is a little cheap, we only look at output multiple [0] - const size_t multiple = this->output_multiple_items.front(); - input_multiple_items[i] = myulround(multiple/this->relative_rate); + const size_t multiple = (num_outputs)?this->output_multiple_items.front():1; + input_multiple_items[i] = size_t(std::ceil(multiple/this->relative_rate)); if (input_multiple_items[i] == 0) input_multiple_items[i] = 1; } diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 872e8f8..d3653b5 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -82,7 +82,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->input_queues.all_ready() and this->output_queues.all_ready() )) return; - //std::cout << "=== calling work on " << name << " ===" << std::endl; + if (WORK) std::cout << "=== calling work on " << name << " ===" << std::endl; const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -154,15 +154,50 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ + forecast_again_you_jerk: if (not this->enable_fixed_rate) { block_ptr->forecast(num_output_items, work_ninput_items); + for (size_t i = 0; i < num_inputs; i++) + { + if (size_t(work_ninput_items[i]) > this->input_items[i].size()) + { + for (size_t j = 0; j < num_inputs; j++) + { + work_ninput_items[j] = this->input_items[j]._len; + } + num_output_items = num_output_items/2; + if (num_output_items == 0) + { + this->forecast_fail = true; + //TODO FIXME this logic is totally duplicated from the bottom! + //re-use some common code + { + if (num_inputs != 0 and input_tokens_count == num_inputs) + { + this->block.post_msg(CheckTokensMessage()); + return; + } + if (this->input_queues.all_ready() and this->output_queues.all_ready()) + { + this->block.post_msg(SelfKickMessage()); + return; + } + } + return; + } + goto forecast_again_you_jerk; + } + } } + this->forecast_fail = false; //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ - work_noutput_items = std::min(num_output_items, myulround((num_input_items)*this->relative_rate)); + work_noutput_items = num_output_items; + if (this->enable_fixed_rate) work_noutput_items = std::min( + work_noutput_items, myulround((num_input_items)*this->relative_rate)); const int ret = block_ptr->Work(this->input_items, this->output_items); const size_t noutput_items = size_t(ret); diff --git a/lib/element.cpp b/lib/element.cpp index 83b07b2..cdb856e 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -30,10 +30,11 @@ Element::Element(void) Element::Element(const std::string &name) { this->reset(new ElementImpl()); - if (GENESIS) std::cout << "New element: " << name << std::endl; (*this)->name = name; (*this)->unique_id = ++unique_id_pool; + if (GENESIS) std::cout << "New element: " << name << std::endl; + //default io signature to something IOSignature sig; sig.push_back(1); this->set_input_signature(sig); diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 6ecbbbd..991ab4e 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -128,6 +128,7 @@ struct ElementImpl //rate settings bool enable_fixed_rate; double relative_rate; + bool forecast_fail; }; } //namespace gnuradio diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index 9f03efa..210ac7a 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -23,6 +23,7 @@ #define GENESIS 0 #define ARMAGEDDON 0 #define MESSAGE 0 +#define WORK 0 #define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; #define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index d1bb692..2066809 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -52,6 +52,7 @@ struct BufferWOffset struct BuffInfo { + BuffInfo(void): mem(NULL), len(0){} void *mem; size_t len; }; @@ -148,6 +149,7 @@ struct InputBufferQueues std::vector<std::deque<BufferWOffset> > _queues; std::vector<size_t> _history_bytes; std::vector<size_t> _reserve_bytes; + std::vector<size_t> _multiple_bytes; std::vector<size_t> _post_bytes; std::vector<boost::shared_ptr<BufferQueue> > _aux_queues; }; @@ -160,6 +162,7 @@ inline void InputBufferQueues::resize(const size_t size) _queues.resize(size); _history_bytes.resize(size, 0); _reserve_bytes.resize(size, 0); + _multiple_bytes.resize(size, 0); _post_bytes.resize(size, 0); _aux_queues.resize(size); } @@ -183,6 +186,7 @@ inline void InputBufferQueues::init( //determine byte sizes for buffers and dealing with history _history_bytes[i] = input_item_sizes[i]*input_history_items[i]; _reserve_bytes[i] = input_item_sizes[i]*input_multiple_items[i]; + _multiple_bytes[i] = std::max(size_t(1), _reserve_bytes[i]); _post_bytes[i] = input_item_sizes[i]*max_history_items; _post_bytes[i] = std::max(_post_bytes[i], _reserve_bytes[i]); @@ -209,6 +213,8 @@ inline void InputBufferQueues::init( inline BuffInfo InputBufferQueues::front(const size_t i) { + //if (_queues[i].empty()) return BuffInfo(); + ASSERT(not _queues[i].empty()); ASSERT(this->ready(i)); __prepare(i); @@ -217,8 +223,8 @@ inline BuffInfo InputBufferQueues::front(const size_t i) BuffInfo info; info.mem = front.mem_offset() - _history_bytes[i]; info.len = front.length; - info.len /= _reserve_bytes[i]; - info.len *= _reserve_bytes[i]; + info.len /= _multiple_bytes[i]; + info.len *= _multiple_bytes[i]; return info; } @@ -273,6 +279,8 @@ inline void InputBufferQueues::__prepare(const size_t i) inline bool InputBufferQueues::consume(const size_t i, const size_t bytes_consumed) { + //if (bytes_consumed == 0) return true; + //assert that we dont consume past the bounds of the buffer ASSERT(_queues[i].front().length >= bytes_consumed); |