diff options
-rw-r--r-- | TODO.txt | 6 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 51 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 13 | ||||
-rw-r--r-- | lib/element_impl.hpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/interruptible_thread.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/messages.hpp | 6 | ||||
-rw-r--r-- | lib/gras_impl/token.hpp | 3 | ||||
-rw-r--r-- | lib/port_handlers.cpp | 29 | ||||
-rw-r--r-- | lib/top_block.cpp | 1 |
9 files changed, 99 insertions, 15 deletions
@@ -10,16 +10,14 @@ * add hooks to specify input reserve ** automatically calculate from output multiple and rel rate * gr stream mux is on drugs -* inform upstream of multiple requirements inposed on input buffs * handle calculating noutputitems using ninputs as a constraint * allocation ** hooks for advanced allocation -** intelligent sizing of buffers -** communicate upstream requirements? -** communicate downstream requirements? * python wrapper for block will come from grextras * bring in numanuma ** thread prio ** thread affinity ** memory affinity * track memcpys for block usage stats +* runtime history, input/output multiple change? +** resend the hint to the allocator and possibly re-allocate diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 22f60af..5f45dd0 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -15,13 +15,18 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" -#include <gras_impl/messages.hpp> #include <boost/bind.hpp> +#include <boost/foreach.hpp> +#include <boost/math/common_factor.hpp> using namespace gnuradio; -//TODO will need more complicated later +const size_t AT_LEAST_DEFAULT_ITEMS = 1024*2; +const size_t AHH_TOO_MANY_BYTES = 1 << 16; //TODO +const size_t THIS_MANY_BUFFERS = 8; +const double EDGE_CASE_MITIGATION = 8.0; //edge case mitigation constant +//TODO will need more complicated later void ElementImpl::buffer_returner(const size_t index, SBuffer &buffer) { @@ -35,6 +40,35 @@ void ElementImpl::buffer_returner(const size_t index, SBuffer &buffer) this->block.post_msg(message); } +static size_t recommend_length( + const std::vector<BufferHintMessage> &hints, + const size_t output_multiple_bytes, + const size_t at_least_bytes +){ + //step 1) find the LCM of all reserves to create a super-reserve + size_t lcm_bytes = output_multiple_bytes; + BOOST_FOREACH(const BufferHintMessage &hint, hints) + { + lcm_bytes = boost::math::lcm(lcm_bytes, hint.reserve_bytes); + } + + //step 2) N x super reserve to minimize history edge case + size_t Nlcm_bytes = lcm_bytes; + BOOST_FOREACH(const BufferHintMessage &hint, hints) + { + while (hint.history_bytes*EDGE_CASE_MITIGATION > Nlcm_bytes) + { + Nlcm_bytes += lcm_bytes; + } + } + while (at_least_bytes > Nlcm_bytes) + { + Nlcm_bytes += lcm_bytes; + } + + return std::min(Nlcm_bytes, AHH_TOO_MANY_BYTES); +} + void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) { //allocate output buffers which will also wake up the task @@ -42,16 +76,19 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) this->output_buffer_tokens.resize(num_outputs); for (size_t i = 0; i < num_outputs; i++) { - size_t items = this->hint; - if (items == 0) items = 2048; - items = std::max(items, this->output_multiple_items[i]); + size_t at_least_items = this->hint; + if (at_least_items == 0) at_least_items = AT_LEAST_DEFAULT_ITEMS; - const size_t bytes = items * this->output_items_sizes[i]; + const size_t bytes = recommend_length( + this->output_allocation_hints[i], + this->output_multiple_items[i]*this->output_items_sizes[i], + at_least_items*this->output_items_sizes[i] + ); SBufferDeleter deleter = boost::bind(&ElementImpl::buffer_returner, this, i, _1); this->output_buffer_tokens[i] = SBufferToken(new SBufferDeleter(deleter)); - for (size_t j = 0; j < 8; j++) + for (size_t j = 0; j < THIS_MANY_BUFFERS; j++) { SBufferConfig config; config.memory = NULL; diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 4ecfa20..3715615 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -15,7 +15,6 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" -#include <gras_impl/messages.hpp> #include <gras_impl/vector_utils.hpp> #include <boost/make_shared.hpp> #include <boost/bind.hpp> @@ -89,6 +88,15 @@ void ElementImpl::handle_block_msg( { this->input_tokens[i] = Token::make(); task_iface.post_upstream(i, this->input_tokens[i]); + + //TODO, schedule this message as a pre-allocation message + //tell the upstream about the input requirements + BufferHintMessage message; + message.history_bytes = this->input_history_items[i]*this->input_items_sizes[i]; + message.reserve_bytes = input_multiple_items[i]; + message.token = this->input_tokens[i]; + task_iface.post_upstream(i, message); + } for (size_t i = 0; i < num_outputs; i++) { @@ -164,13 +172,14 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) this->input_tokens.resize(num_inputs); this->output_tokens.resize(num_outputs); + this->output_allocation_hints.resize(num_outputs); //resize tags vector to match sizes this->input_tags_changed.resize(num_inputs); this->input_tags.resize(num_inputs); //impose input reserve requirements based on relative rate and output multiple - std::vector<size_t> input_multiple_items(num_inputs, 1); + this->input_multiple_items.resize(num_inputs, 1); for (size_t i = 0; i < num_inputs; i++) { //TODO, this is a little cheap, we only look at output multiple [0] diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 8cbf2be..44125b1 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -19,6 +19,7 @@ #include <gras_impl/debug.hpp> #include <gras_impl/token.hpp> +#include <gras_impl/messages.hpp> #include <gras_impl/vector_of_queues.hpp> #include <gras_impl/input_buffer_queues.hpp> #include <gras_impl/interruptible_thread.hpp> @@ -65,6 +66,7 @@ struct ElementImpl IOSignature output_signature; std::vector<size_t> input_history_items; std::vector<size_t> output_multiple_items; + std::vector<size_t> input_multiple_items; //keeps track of production std::vector<uint64_t> items_consumed; @@ -148,6 +150,8 @@ struct ElementImpl Token token; size_t hint; //some kind of allocation hint + std::vector<std::vector<BufferHintMessage> > output_allocation_hints; + //rate settings bool enable_fixed_rate; double relative_rate; diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp index 1e99111..e7025bf 100644 --- a/lib/gras_impl/interruptible_thread.hpp +++ b/lib/gras_impl/interruptible_thread.hpp @@ -91,6 +91,7 @@ namespace gnuradio { while (_wait_msg) _cond.wait(lock); _wait_msg = true; + if (not _callable) break; _callable(); _wait_ack = false; _notify(lock); diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp index d5bfc8b..e1a3565 100644 --- a/lib/gras_impl/messages.hpp +++ b/lib/gras_impl/messages.hpp @@ -52,6 +52,12 @@ struct BufferReturnMessage SBuffer buffer; }; +struct BufferHintMessage +{ + size_t history_bytes; + size_t reserve_bytes; + WeakToken token; +}; } //namespace gnuradio diff --git a/lib/gras_impl/token.hpp b/lib/gras_impl/token.hpp index 4923f74..d5b438a 100644 --- a/lib/gras_impl/token.hpp +++ b/lib/gras_impl/token.hpp @@ -18,10 +18,13 @@ #define INCLUDED_LIBGRAS_IMPL_TOKEN_HPP #include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> namespace gnuradio { +typedef boost::weak_ptr<int> WeakToken; + struct Token : boost::shared_ptr<int> { static Token make(void) diff --git a/lib/port_handlers.cpp b/lib/port_handlers.cpp index 0b5cfd0..80f8239 100644 --- a/lib/port_handlers.cpp +++ b/lib/port_handlers.cpp @@ -15,7 +15,7 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" -#include <gras_impl/messages.hpp> +#include <boost/foreach.hpp> using namespace gnuradio; @@ -59,6 +59,8 @@ void ElementImpl::handle_input_msg( } return; } + + ASSERT(false); } void ElementImpl::handle_output_msg( @@ -85,4 +87,29 @@ void ElementImpl::handle_output_msg( } return; } + + //update the buffer allocation hint + if (msg.type() == typeid(BufferHintMessage)) + { + //this->output_allocation_hints.resize(std::max(output_allocation_hints.size(), index+1)); + const BufferHintMessage new_hint = msg.cast<BufferHintMessage>(); + + //remove any old hints with expired token + //remove any older hints with matching token + std::vector<BufferHintMessage> hints; + BOOST_FOREACH(const BufferHintMessage &hint, this->output_allocation_hints[index]) + { + if (hint.token.expired()) continue; + if (hint.token.lock() == new_hint.token.lock()) continue; + hints.push_back(hint); + } + + //store the new hint as well + hints.push_back(new_hint); + + this->output_allocation_hints[index] = hints; + return; + } + + ASSERT(false); } diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 0adcdb0..f3e1fc8 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -15,7 +15,6 @@ // along with io_sig program. If not, see <http://www.gnu.org/licenses/>. #include "element_impl.hpp" -#include <gras_impl/messages.hpp> #include <gnuradio/top_block.hpp> #include <boost/thread/thread.hpp> //sleep |