summaryrefslogtreecommitdiff
path: root/lib/gras_impl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gras_impl')
-rw-r--r--lib/gras_impl/block_actor.hpp13
-rw-r--r--lib/gras_impl/debug.hpp2
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp49
-rw-r--r--lib/gras_impl/messages.hpp21
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp84
-rw-r--r--lib/gras_impl/simple_buffer_queue.hpp (renamed from lib/gras_impl/buffer_queue.hpp)14
6 files changed, 105 insertions, 78 deletions
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index d204ec4..afc248e 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -46,15 +46,16 @@ struct BlockActor : Apology::Worker
this->RegisterHandler(this, &BlockActor::handle_input_token);
this->RegisterHandler(this, &BlockActor::handle_input_check);
this->RegisterHandler(this, &BlockActor::handle_input_alloc);
+ this->RegisterHandler(this, &BlockActor::handle_input_update);
this->RegisterHandler(this, &BlockActor::handle_output_buffer);
this->RegisterHandler(this, &BlockActor::handle_output_token);
this->RegisterHandler(this, &BlockActor::handle_output_check);
this->RegisterHandler(this, &BlockActor::handle_output_hint);
this->RegisterHandler(this, &BlockActor::handle_output_alloc);
+ this->RegisterHandler(this, &BlockActor::handle_output_update);
this->RegisterHandler(this, &BlockActor::handle_self_kick);
- this->RegisterHandler(this, &BlockActor::handle_update_inputs);
}
//handlers
@@ -72,15 +73,16 @@ struct BlockActor : Apology::Worker
void handle_input_token(const InputTokenMessage &, const Theron::Address);
void handle_input_check(const InputCheckMessage &, const Theron::Address);
void handle_input_alloc(const InputAllocMessage &, const Theron::Address);
+ void handle_input_update(const InputUpdateMessage &, const Theron::Address);
void handle_output_buffer(const OutputBufferMessage &, const Theron::Address);
void handle_output_token(const OutputTokenMessage &, const Theron::Address);
void handle_output_check(const OutputCheckMessage &, const Theron::Address);
void handle_output_hint(const OutputHintMessage &, const Theron::Address);
void handle_output_alloc(const OutputAllocMessage &, const Theron::Address);
+ void handle_output_update(const OutputUpdateMessage &, const Theron::Address);
void handle_self_kick(const SelfKickMessage &, const Theron::Address);
- void handle_update_inputs(const UpdateInputsMessage &, const Theron::Address);
//helpers
void buffer_returner(const size_t index, SBuffer &buffer);
@@ -93,7 +95,7 @@ struct BlockActor : Apology::Worker
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
- void flush_output(const size_t index, const bool force_pop = false);
+ void flush_output(const size_t index);
bool is_work_allowed(void);
GRAS_FORCE_INLINE bool is_input_done(const size_t i)
@@ -122,11 +124,10 @@ struct BlockActor : Apology::Worker
BitSet inputs_done;
BitSet outputs_done;
std::set<Token> token_pool;
- std::vector<SBufferToken> output_buffer_tokens;
//buffer queues and ready conditions
InputBufferQueues input_queues;
- OutputBufferQueues<SBuffer> output_queues;
+ OutputBufferQueues output_queues;
BitSet inputs_available;
//tag tracking
@@ -155,8 +156,6 @@ struct BlockActor : Apology::Worker
long buffer_affinity;
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
-
- bool topology_init;
};
} //namespace gras
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index a90eb6c..c8c7dfe 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -25,7 +25,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
//-- define to enable these debugs:
//----------------------------------------------------------------------
//#define WORK_DEBUG
-//#define ASSERTING
+#define ASSERTING
//#define MESSAGE_TRACING
//#define ITEM_CONSPROD
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 83ed44b..071f2ac 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -5,7 +5,7 @@
#include <gras_impl/debug.hpp>
#include <gras_impl/bitset.hpp>
-#include <gras_impl/buffer_queue.hpp>
+#include <gras_impl/simple_buffer_queue.hpp>
#include <gras/sbuffer.hpp>
#include <vector>
#include <queue>
@@ -52,6 +52,8 @@ struct InputBufferQueues
//special case when the null buffer is possible
if (_queues[i].empty()) return get_null_buff();
+ this->try_stitch(i);
+
//there are enough enqueued bytes, but not in the front buffer
const bool must_accumulate = _queues[i].front().length < _reserve_bytes[i];
@@ -60,7 +62,7 @@ struct InputBufferQueues
const bool light_front = _queues[i].front().length <= _maximum_bytes[i]/2;
const bool should_accumulate = heavy_load and light_front;
- if (must_accumulate or should_accumulate) this->accumulate(i);
+ if (must_accumulate/* or should_accumulate*/) this->accumulate(i);
ASSERT(_queues[i].front().length >= _reserve_bytes[i]);
@@ -69,6 +71,28 @@ struct InputBufferQueues
return _queues[i].front();
}
+ GRAS_FORCE_INLINE void try_stitch(const size_t i)
+ {
+ if (_queues[i].size() < 2) return;
+ SBuffer &b0 = _queues[i][0];
+ SBuffer &b1 = _queues[i][1];
+
+ if (b0.offset > b0.get_actual_length())
+ {
+ const size_t xfer_bytes = b0.length;
+ ASSERT(b1.offset >= xfer_bytes);
+ b1.offset -= xfer_bytes;
+ b1.length += xfer_bytes;
+ _queues[i].pop_front();
+ return;
+ }
+
+ const size_t xfer_bytes = b1.length;
+ b0.length += xfer_bytes;
+ b1.length = 0;
+ b1.offset += xfer_bytes;
+ }
+
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
@@ -104,20 +128,7 @@ struct InputBufferQueues
{
ASSERT(not _queues[i].full());
if (buffer.length == 0) return;
-
- //does this buffer starts where the last one ends?
- //perform buffer stitching into back of buffer
- if (
- not _queues[i].empty() and _queues[i].back() == buffer and
- (_queues[i].back().length + _queues[i].back().offset) == buffer.offset
- ){
- _queues[i].back().length += buffer.length;
- }
- else
- {
- _queues[i].push_back(buffer);
- }
-
+ _queues[i].push_back(buffer);
_enqueued_bytes[i] += buffer.length;
__update(i);
}
@@ -172,7 +183,7 @@ struct InputBufferQueues
std::vector<size_t> _maximum_bytes;
std::vector<boost::circular_buffer<SBuffer> > _queues;
std::vector<size_t> _preload_bytes;
- std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
+ std::vector<boost::shared_ptr<SimpleBufferQueue> > _aux_queues;
};
@@ -207,7 +218,7 @@ inline void InputBufferQueues::update_config(
_aux_queues[i]->empty() or
_aux_queues[i]->front().get_actual_length() != _maximum_bytes[i]
){
- _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
+ _aux_queues[i].reset(new SimpleBufferQueue());
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
@@ -287,7 +298,7 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
//update bounds on the current buffer
front.offset += bytes_consumed;
front.length -= bytes_consumed;
- ASSERT(front.offset <= front.get_actual_length());
+ //ASSERT(front.offset <= front.get_actual_length());
ASSERT((_queues[i].front().length % _items_sizes[i]) == 0);
if (front.length == 0) this->pop(i);
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index eae3258..b113e0d 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -3,6 +3,7 @@
#ifndef INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP
#define INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP
+#include <gras/buffer_queue.hpp>
#include <gras/sbuffer.hpp>
#include <gras/tags.hpp>
#include <gras/sbuffer.hpp>
@@ -72,6 +73,11 @@ struct InputCheckMessage
size_t index;
};
+struct InputUpdateMessage
+{
+ size_t index;
+};
+
//----------------------------------------------------------------------
//-- message to an output port
//-- do not ack
@@ -104,7 +110,12 @@ struct OutputHintMessage
struct OutputAllocMessage
{
size_t index;
- SBufferToken token;
+ BufferQueueSptr queue;
+};
+
+struct OutputUpdateMessage
+{
+ size_t index;
};
//----------------------------------------------------------------------
@@ -117,11 +128,6 @@ struct SelfKickMessage
//empty
};
-struct UpdateInputsMessage
-{
- //empty
-};
-
} //namespace gras
#include <Theron/Register.h>
@@ -141,14 +147,15 @@ THERON_DECLARE_REGISTERED_MESSAGE(gras::InputBufferMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputTokenMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputCheckMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::InputAllocMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::InputUpdateMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputBufferMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputTokenMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputCheckMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputHintMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputAllocMessage);
+THERON_DECLARE_REGISTERED_MESSAGE(gras::OutputUpdateMessage);
THERON_DECLARE_REGISTERED_MESSAGE(gras::SelfKickMessage);
-THERON_DECLARE_REGISTERED_MESSAGE(gras::UpdateInputsMessage);
#endif /*INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP*/
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index 722a4c2..3618b1b 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -3,83 +3,76 @@
#ifndef INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP
#define INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP
+#include <gras/buffer_queue.hpp>
#include <gras_impl/bitset.hpp>
#include <vector>
-#include <boost/circular_buffer.hpp>
namespace gras
{
-template <typename T>
struct OutputBufferQueues
{
- enum {MAX_QUEUE_SIZE = 128};
- BitSet _bitset;
- std::vector<boost::circular_buffer<T> > _queues;
-
- GRAS_FORCE_INLINE void resize(const size_t size)
+ void set_buffer_queue(const size_t i, BufferQueueSptr queue)
{
- _bitset.resize(size);
- _queues.resize(size, boost::circular_buffer<T>(MAX_QUEUE_SIZE));
+ _queues[i] = queue;
+ _update(i);
}
- GRAS_FORCE_INLINE void push(const size_t i, const T &value)
+ void set_reserve_bytes(const size_t i, const size_t num_bytes)
{
- _queues[i].push_back(value);
- _bitset.set(i);
+ _reserve_bytes[i] = num_bytes;
+ if (_queues[i]) _update(i);
}
- //! used for input buffer inlining
- GRAS_FORCE_INLINE void push_front(const size_t i, const T &value)
+ GRAS_FORCE_INLINE void resize(const size_t size)
{
- ASSERT(not _queues[i].full());
- _queues[i].push_front(value);
- _bitset.set(i);
+ _bitset.resize(size);
+ _queues.resize(size);
+ _reserve_bytes.resize(size, 1);
}
- GRAS_FORCE_INLINE const T &front(const size_t i) const
+ GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buff)
{
- return _queues[i].front();
+ if (not _queues[i]) return; //block is likely done, throw out buffer
+ _queues[i]->push(buff);
+ _update(i);
}
- GRAS_FORCE_INLINE T &front(const size_t i)
+ GRAS_FORCE_INLINE void flush_all(void)
{
- ASSERT(not _queues[i].empty());
- return _queues[i].front();
+ const size_t old_size = this->size();
+ this->resize(0);
+ this->resize(old_size);
}
- GRAS_FORCE_INLINE void pop(const size_t i)
+ GRAS_FORCE_INLINE SBuffer &front(const size_t i)
{
- _queues[i].front() = T();
- _queues[i].pop_front();
- _bitset.set(i, not _queues[i].empty());
+ ASSERT(not this->empty(i));
+ return _queues[i]->front();
}
- GRAS_FORCE_INLINE void fail(const size_t i)
+ GRAS_FORCE_INLINE void pop(const size_t i)
{
- _bitset.reset(i);
+ ASSERT(_queues[i]);
+ ASSERT(not _queues[i]->empty());
+ _queues[i]->pop();
+ _update(i);
}
- GRAS_FORCE_INLINE void flush(const size_t i)
+ GRAS_FORCE_INLINE void fail(const size_t i)
{
- _queues[i].clear();
_bitset.reset(i);
}
- GRAS_FORCE_INLINE void flush_all(void)
- {
- for (size_t i = 0; i < this->size(); i++) this->flush(i);
- }
-
GRAS_FORCE_INLINE bool ready(const size_t i) const
{
- return not _queues[i].empty();
+ return _bitset[i];
}
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
- return _queues[i].empty();
+ return (not _queues[i] or _queues[i]->empty());
}
GRAS_FORCE_INLINE bool all_ready(void) const
@@ -91,6 +84,23 @@ struct OutputBufferQueues
{
return _queues.size();
}
+
+ GRAS_FORCE_INLINE void _update(const size_t i)
+ {
+ if (not _queues[i] or _queues[i]->empty())
+ {
+ _bitset.reset(i);
+ return;
+ }
+ const SBuffer &front = _queues[i]->front();
+ const size_t avail = front.get_actual_length() - front.offset - front.length;
+ _bitset.set(i, avail >= _reserve_bytes[i]);
+ }
+
+ BitSet _bitset;
+ std::vector<BufferQueueSptr> _queues;
+ std::vector<size_t> _reserve_bytes;
+
};
} //namespace gras
diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/simple_buffer_queue.hpp
index 0e6be9e..db6ecc0 100644
--- a/lib/gras_impl/buffer_queue.hpp
+++ b/lib/gras_impl/simple_buffer_queue.hpp
@@ -1,7 +1,7 @@
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
-#ifndef INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP
-#define INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP
+#ifndef INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP
+#define INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP
#include <gras/sbuffer.hpp>
#include <boost/bind.hpp>
@@ -10,15 +10,15 @@
namespace gras
{
-struct BufferQueue : std::queue<SBuffer>
+struct SimpleBufferQueue : std::queue<SBuffer>
{
- BufferQueue(void)
+ SimpleBufferQueue(void)
{
- SBufferDeleter deleter = boost::bind(&BufferQueue::push_back, this, _1);
+ SBufferDeleter deleter = boost::bind(&SimpleBufferQueue::push_back, this, _1);
_token = SBufferToken(new SBufferDeleter(deleter));
}
- ~BufferQueue(void)
+ ~SimpleBufferQueue(void)
{
_token.reset();
while (not this->empty())
@@ -47,4 +47,4 @@ struct BufferQueue : std::queue<SBuffer>
} //namespace gras
-#endif /*INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP*/
+#endif /*INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP*/