summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-12 01:57:46 -0700
committerJosh Blum2012-09-12 01:57:46 -0700
commit79276543e665a53eb6f976c47f4c92e8faf844ba (patch)
treee1abdbeca3232567e231eb9897da72af65a111a5 /lib
parent8695f6127a447dbe6f5cf6eeba9d5cd110e33420 (diff)
downloadsandhi-79276543e665a53eb6f976c47f4c92e8faf844ba.tar.gz
sandhi-79276543e665a53eb6f976c47f4c92e8faf844ba.tar.bz2
sandhi-79276543e665a53eb6f976c47f4c92e8faf844ba.zip
communicate downstream buffer reqs to upstream output allocator
Diffstat (limited to 'lib')
-rw-r--r--lib/block_allocator.cpp51
-rw-r--r--lib/block_handlers.cpp13
-rw-r--r--lib/element_impl.hpp4
-rw-r--r--lib/gras_impl/interruptible_thread.hpp1
-rw-r--r--lib/gras_impl/messages.hpp6
-rw-r--r--lib/gras_impl/token.hpp3
-rw-r--r--lib/port_handlers.cpp29
-rw-r--r--lib/top_block.cpp1
8 files changed, 97 insertions, 11 deletions
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 22f60af..5f45dd0 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -15,13 +15,18 @@
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
#include "element_impl.hpp"
-#include <gras_impl/messages.hpp>
#include <boost/bind.hpp>
+#include <boost/foreach.hpp>
+#include <boost/math/common_factor.hpp>
using namespace gnuradio;
-//TODO will need more complicated later
+const size_t AT_LEAST_DEFAULT_ITEMS = 1024*2;
+const size_t AHH_TOO_MANY_BYTES = 1 << 16; //TODO
+const size_t THIS_MANY_BUFFERS = 8;
+const double EDGE_CASE_MITIGATION = 8.0; //edge case mitigation constant
+//TODO will need more complicated later
void ElementImpl::buffer_returner(const size_t index, SBuffer &buffer)
{
@@ -35,6 +40,35 @@ void ElementImpl::buffer_returner(const size_t index, SBuffer &buffer)
this->block.post_msg(message);
}
+static size_t recommend_length(
+ const std::vector<BufferHintMessage> &hints,
+ const size_t output_multiple_bytes,
+ const size_t at_least_bytes
+){
+ //step 1) find the LCM of all reserves to create a super-reserve
+ size_t lcm_bytes = output_multiple_bytes;
+ BOOST_FOREACH(const BufferHintMessage &hint, hints)
+ {
+ lcm_bytes = boost::math::lcm(lcm_bytes, hint.reserve_bytes);
+ }
+
+ //step 2) N x super reserve to minimize history edge case
+ size_t Nlcm_bytes = lcm_bytes;
+ BOOST_FOREACH(const BufferHintMessage &hint, hints)
+ {
+ while (hint.history_bytes*EDGE_CASE_MITIGATION > Nlcm_bytes)
+ {
+ Nlcm_bytes += lcm_bytes;
+ }
+ }
+ while (at_least_bytes > Nlcm_bytes)
+ {
+ Nlcm_bytes += lcm_bytes;
+ }
+
+ return std::min(Nlcm_bytes, AHH_TOO_MANY_BYTES);
+}
+
void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface)
{
//allocate output buffers which will also wake up the task
@@ -42,16 +76,19 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface)
this->output_buffer_tokens.resize(num_outputs);
for (size_t i = 0; i < num_outputs; i++)
{
- size_t items = this->hint;
- if (items == 0) items = 2048;
- items = std::max(items, this->output_multiple_items[i]);
+ size_t at_least_items = this->hint;
+ if (at_least_items == 0) at_least_items = AT_LEAST_DEFAULT_ITEMS;
- const size_t bytes = items * this->output_items_sizes[i];
+ const size_t bytes = recommend_length(
+ this->output_allocation_hints[i],
+ this->output_multiple_items[i]*this->output_items_sizes[i],
+ at_least_items*this->output_items_sizes[i]
+ );
SBufferDeleter deleter = boost::bind(&ElementImpl::buffer_returner, this, i, _1);
this->output_buffer_tokens[i] = SBufferToken(new SBufferDeleter(deleter));
- for (size_t j = 0; j < 8; j++)
+ for (size_t j = 0; j < THIS_MANY_BUFFERS; j++)
{
SBufferConfig config;
config.memory = NULL;
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 4ecfa20..3715615 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -15,7 +15,6 @@
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
#include "element_impl.hpp"
-#include <gras_impl/messages.hpp>
#include <gras_impl/vector_utils.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
@@ -89,6 +88,15 @@ void ElementImpl::handle_block_msg(
{
this->input_tokens[i] = Token::make();
task_iface.post_upstream(i, this->input_tokens[i]);
+
+ //TODO, schedule this message as a pre-allocation message
+ //tell the upstream about the input requirements
+ BufferHintMessage message;
+ message.history_bytes = this->input_history_items[i]*this->input_items_sizes[i];
+ message.reserve_bytes = input_multiple_items[i];
+ message.token = this->input_tokens[i];
+ task_iface.post_upstream(i, message);
+
}
for (size_t i = 0; i < num_outputs; i++)
{
@@ -164,13 +172,14 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
this->input_tokens.resize(num_inputs);
this->output_tokens.resize(num_outputs);
+ this->output_allocation_hints.resize(num_outputs);
//resize tags vector to match sizes
this->input_tags_changed.resize(num_inputs);
this->input_tags.resize(num_inputs);
//impose input reserve requirements based on relative rate and output multiple
- std::vector<size_t> input_multiple_items(num_inputs, 1);
+ this->input_multiple_items.resize(num_inputs, 1);
for (size_t i = 0; i < num_inputs; i++)
{
//TODO, this is a little cheap, we only look at output multiple [0]
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 8cbf2be..44125b1 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -19,6 +19,7 @@
#include <gras_impl/debug.hpp>
#include <gras_impl/token.hpp>
+#include <gras_impl/messages.hpp>
#include <gras_impl/vector_of_queues.hpp>
#include <gras_impl/input_buffer_queues.hpp>
#include <gras_impl/interruptible_thread.hpp>
@@ -65,6 +66,7 @@ struct ElementImpl
IOSignature output_signature;
std::vector<size_t> input_history_items;
std::vector<size_t> output_multiple_items;
+ std::vector<size_t> input_multiple_items;
//keeps track of production
std::vector<uint64_t> items_consumed;
@@ -148,6 +150,8 @@ struct ElementImpl
Token token;
size_t hint; //some kind of allocation hint
+ std::vector<std::vector<BufferHintMessage> > output_allocation_hints;
+
//rate settings
bool enable_fixed_rate;
double relative_rate;
diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp
index 1e99111..e7025bf 100644
--- a/lib/gras_impl/interruptible_thread.hpp
+++ b/lib/gras_impl/interruptible_thread.hpp
@@ -91,6 +91,7 @@ namespace gnuradio
{
while (_wait_msg) _cond.wait(lock);
_wait_msg = true;
+ if (not _callable) break;
_callable();
_wait_ack = false;
_notify(lock);
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index d5bfc8b..e1a3565 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -52,6 +52,12 @@ struct BufferReturnMessage
SBuffer buffer;
};
+struct BufferHintMessage
+{
+ size_t history_bytes;
+ size_t reserve_bytes;
+ WeakToken token;
+};
} //namespace gnuradio
diff --git a/lib/gras_impl/token.hpp b/lib/gras_impl/token.hpp
index 4923f74..d5b438a 100644
--- a/lib/gras_impl/token.hpp
+++ b/lib/gras_impl/token.hpp
@@ -18,10 +18,13 @@
#define INCLUDED_LIBGRAS_IMPL_TOKEN_HPP
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
namespace gnuradio
{
+typedef boost::weak_ptr<int> WeakToken;
+
struct Token : boost::shared_ptr<int>
{
static Token make(void)
diff --git a/lib/port_handlers.cpp b/lib/port_handlers.cpp
index 0b5cfd0..80f8239 100644
--- a/lib/port_handlers.cpp
+++ b/lib/port_handlers.cpp
@@ -15,7 +15,7 @@
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
#include "element_impl.hpp"
-#include <gras_impl/messages.hpp>
+#include <boost/foreach.hpp>
using namespace gnuradio;
@@ -59,6 +59,8 @@ void ElementImpl::handle_input_msg(
}
return;
}
+
+ ASSERT(false);
}
void ElementImpl::handle_output_msg(
@@ -85,4 +87,29 @@ void ElementImpl::handle_output_msg(
}
return;
}
+
+ //update the buffer allocation hint
+ if (msg.type() == typeid(BufferHintMessage))
+ {
+ //this->output_allocation_hints.resize(std::max(output_allocation_hints.size(), index+1));
+ const BufferHintMessage new_hint = msg.cast<BufferHintMessage>();
+
+ //remove any old hints with expired token
+ //remove any older hints with matching token
+ std::vector<BufferHintMessage> hints;
+ BOOST_FOREACH(const BufferHintMessage &hint, this->output_allocation_hints[index])
+ {
+ if (hint.token.expired()) continue;
+ if (hint.token.lock() == new_hint.token.lock()) continue;
+ hints.push_back(hint);
+ }
+
+ //store the new hint as well
+ hints.push_back(new_hint);
+
+ this->output_allocation_hints[index] = hints;
+ return;
+ }
+
+ ASSERT(false);
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 0adcdb0..f3e1fc8 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -15,7 +15,6 @@
// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
#include "element_impl.hpp"
-#include <gras_impl/messages.hpp>
#include <gnuradio/top_block.hpp>
#include <boost/thread/thread.hpp> //sleep