summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Blum2012-12-15 09:53:25 -0800
committerJosh Blum2012-12-15 09:53:25 -0800
commitb7c0a59c0a86f289f55935b19efaf448e892eefb (patch)
tree1e641f9816237481124835ac2fbf2792a8b1d50f
parent6a03c661ede88203ff90eb01bf1e678b87cb6056 (diff)
downloadsandhi-b7c0a59c0a86f289f55935b19efaf448e892eefb.tar.gz
sandhi-b7c0a59c0a86f289f55935b19efaf448e892eefb.tar.bz2
sandhi-b7c0a59c0a86f289f55935b19efaf448e892eefb.zip
work on the buffer queue api
-rw-r--r--include/gras/CMakeLists.txt1
-rw-r--r--include/gras/block.hpp9
-rw-r--r--include/gras/block.i1
-rw-r--r--include/gras/buffer_queue.hpp43
-rw-r--r--lib/CMakeLists.txt2
-rw-r--r--lib/block_allocator.cpp28
-rw-r--r--lib/block_task.cpp7
-rw-r--r--lib/buffer_queue_circ.cpp3
-rw-r--r--lib/buffer_queue_pool.cpp54
-rw-r--r--lib/circular_buffer.cpp4
-rw-r--r--lib/gras_impl/block_actor.hpp3
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp6
-rw-r--r--lib/gras_impl/messages.hpp3
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp72
-rw-r--r--lib/gras_impl/simple_buffer_queue.hpp (renamed from lib/gras_impl/buffer_queue.hpp)14
-rw-r--r--lib/input_handlers.cpp4
-rw-r--r--lib/output_handlers.cpp6
17 files changed, 190 insertions, 70 deletions
diff --git a/include/gras/CMakeLists.txt b/include/gras/CMakeLists.txt
index e8fb85e..f0b0d3a 100644
--- a/include/gras/CMakeLists.txt
+++ b/include/gras/CMakeLists.txt
@@ -21,6 +21,7 @@ install(FILES
thread_pool.hpp
top_block.hpp
work_buffer.hpp
+ buffer_queue.hpp
DESTINATION include/gras
COMPONENT ${GRAS_COMP_DEVEL}
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index 3899561..524ff00 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -8,6 +8,7 @@
#include <gras/tag_iter.hpp>
#include <gras/tags.hpp>
#include <gras/work_buffer.hpp>
+#include <gras/buffer_queue.hpp>
#include <vector>
#include <string>
@@ -337,9 +338,9 @@ struct GRAS_API Block : Element
* \param which_output the output port index number
* \param token the token for the buffer's returner
* \param recommend_length the schedulers recommended length in bytes
- * \return the token used for the buffer allocation (may be the same)
+ * \return a shared ptr to a new buffer queue object
*/
- virtual SBufferToken output_buffer_allocator(
+ virtual BufferQueueSptr output_buffer_allocator(
const size_t which_output,
const SBufferToken &token,
const size_t recommend_length
@@ -356,9 +357,9 @@ struct GRAS_API Block : Element
* \param which_input the input port index number
* \param token the token for the buffer's returner
* \param recommend_length the schedulers recommended length in bytes
- * \return the token used for the buffer allocation (may be the same)
+ * \return a shared ptr to a new buffer queue object
*/
- virtual SBufferToken input_buffer_allocator(
+ virtual BufferQueueSptr input_buffer_allocator(
const size_t which_input,
const SBufferToken &token,
const size_t recommend_length
diff --git a/include/gras/block.i b/include/gras/block.i
index 80c3905..ef2738c 100644
--- a/include/gras/block.i
+++ b/include/gras/block.i
@@ -11,6 +11,7 @@
%import <gras/tags.i>
%include <gras/tag_iter.i>
%import <gras/sbuffer.i>
+%include <gras/buffer_queue.hpp>
%include <gras/block.hpp>
#endif /*INCLUDED_GRAS_BLOCK_I*/
diff --git a/include/gras/buffer_queue.hpp b/include/gras/buffer_queue.hpp
new file mode 100644
index 0000000..5125555
--- /dev/null
+++ b/include/gras/buffer_queue.hpp
@@ -0,0 +1,43 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#ifndef INCLUDED_GRAS_BUFFER_QUEUE_HPP
+#define INCLUDED_GRAS_BUFFER_QUEUE_HPP
+
+#include <gras/gras.hpp>
+#include <gras/sbuffer.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace gras
+{
+
+struct BufferQueue;
+
+typedef boost::shared_ptr<BufferQueue> BufferQueueSptr;
+
+//! Buffer Queue is an interface enabling us to create custom buffer allocators.
+struct BufferQueue
+{
+
+ //! Create a buffer queue using the pool allocator
+ GRAS_API static BufferQueueSptr make_pool(const SBufferConfig &config, const size_t num_buffs);
+
+ //! Create a buffer queue using the circular buffer allocator
+ GRAS_API static BufferQueueSptr make_circ(const SBufferConfig &config);
+
+ //! Get a reference to the buffer at the front of the queue
+ virtual SBuffer &front(void) = 0;
+
+ //! Pop off the used portion of the queue
+ virtual void pop(void) = 0;
+
+ //! Push a used buffer back into the queue
+ virtual void push(const SBuffer &buff) = 0;
+
+ //! Is the queue empty?
+ virtual bool empty(void) const = 0;
+
+};
+
+} //namespace gras
+
+#endif /*INCLUDED_GRAS_BUFFER_QUEUE_HPP*/
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 84c8f4e..bc38d06 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -41,6 +41,8 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/element.cpp
${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/circular_buffer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_circ.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 64b3aed..54654ba 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -58,7 +58,6 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
//allocate output buffers which will also wake up the task
const size_t num_outputs = this->get_num_outputs();
- this->output_buffer_tokens.resize(num_outputs);
for (size_t i = 0; i < num_outputs; i++)
{
const size_t bytes = recommend_length(
@@ -71,7 +70,8 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1);
SBufferToken token = SBufferToken(new SBufferDeleter(deleter));
- this->output_buffer_tokens[i] = block_ptr->output_buffer_allocator(i, token, bytes);
+ BufferQueueSptr queue = block_ptr->output_buffer_allocator(i, token, bytes);
+ this->output_queues.set_buffer_queue(i, queue);
InputAllocMessage message;
message.token = SBufferToken(new SBufferDeleter(deleter));
@@ -82,29 +82,23 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
this->Send(0, from); //ACK
}
-SBufferToken Block::output_buffer_allocator(
+BufferQueueSptr Block::output_buffer_allocator(
const size_t,
const SBufferToken &token,
const size_t recommend_length
){
- for (size_t j = 0; j < THIS_MANY_BUFFERS; j++)
- {
- SBufferConfig config;
- config.memory = NULL;
- config.length = recommend_length;
- config.affinity = (*this)->block->buffer_affinity;
- config.token = token;
- SBuffer buff(config);
- std::memset(buff.get_actual_memory(), 0, buff.get_actual_length());
- //buffer derefs here and the token messages it back to the block
- }
- return token;
+ SBufferConfig config;
+ config.memory = NULL;
+ config.length = recommend_length;
+ config.affinity = (*this)->block->buffer_affinity;
+ config.token = token;
+ return BufferQueue::make_pool(config, THIS_MANY_BUFFERS);
}
-SBufferToken Block::input_buffer_allocator(
+BufferQueueSptr Block::input_buffer_allocator(
const size_t,
const SBufferToken &,
const size_t
){
- return SBufferToken(); //null
+ return BufferQueueSptr(); //null
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 9e5917d..6b18798 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -31,9 +31,6 @@ void BlockActor::mark_done(void)
//release upstream, downstream, and executor tokens
this->token_pool.clear();
- //release allocator tokens, buffers can now call deleters
- this->output_buffer_tokens.clear();
-
//release all buffers in queues
this->input_queues.flush_all();
this->output_queues.flush_all();
@@ -156,7 +153,8 @@ void BlockActor::handle_task(void)
this->input_items.max() = std::max(this->input_items.max(), items);
//inline dealings, how and when input buffers can be inlined into output buffers
- //continue;
+ continue; //FIXME to implement needs change
+ /*
if (
buff.unique() and
input_configs[i].inline_buffer and
@@ -170,6 +168,7 @@ void BlockActor::handle_task(void)
this->output_queues.push_front(output_inline_index, new_obuff); //you got inlined!
output_inline_index++; //done do this output port again
}
+ */
}
//------------------------------------------------------------------
diff --git a/lib/buffer_queue_circ.cpp b/lib/buffer_queue_circ.cpp
new file mode 100644
index 0000000..9d0dbcf
--- /dev/null
+++ b/lib/buffer_queue_circ.cpp
@@ -0,0 +1,3 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/buffer_queue.hpp>
diff --git a/lib/buffer_queue_pool.cpp b/lib/buffer_queue_pool.cpp
new file mode 100644
index 0000000..a30f4a6
--- /dev/null
+++ b/lib/buffer_queue_pool.cpp
@@ -0,0 +1,54 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/buffer_queue.hpp>
+#include <gras_impl/debug.hpp>
+#include <boost/circular_buffer.hpp>
+
+using namespace gras;
+
+struct BufferQueuePool : BufferQueue
+{
+ BufferQueuePool(const size_t num)
+ {
+ queue.resize(num);
+ }
+
+ SBuffer &front(void)
+ {
+ return queue.front();
+ }
+
+ void pop(void)
+ {
+ ASSERT(not queue.empty());
+ queue.front() = SBuffer(); //dont hold ref
+ queue.pop_front();
+ }
+
+ void push(const SBuffer &buff)
+ {
+ queue.push_back(buff);
+ }
+
+ bool empty(void) const
+ {
+ return queue.empty();
+ }
+
+ boost::circular_buffer<SBuffer> queue;
+
+};
+
+BufferQueueSptr BufferQueue::make_pool(
+ const SBufferConfig &config,
+ const size_t num_buffs
+){
+ BufferQueueSptr bq(new BufferQueuePool(num_buffs));
+ for (size_t i = 0; i < num_buffs; i++)
+ {
+ SBuffer buff(config);
+ std::memset(buff.get_actual_memory(), 0, buff.get_actual_length());
+ bq->push(buff);
+ }
+ return bq;
+}
diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp
index 820129d..1aa745b 100644
--- a/lib/circular_buffer.cpp
+++ b/lib/circular_buffer.cpp
@@ -1,6 +1,6 @@
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
-#include <gras_impl/endless_buffer_queue.hpp>
+#include <gras/buffer_queue.hpp>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
@@ -137,6 +137,7 @@ static void circular_buffer_delete(SBuffer &buff, CircularBuffer *circ_buff)
delete circ_buff;
}
+/*
SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes)
{
CircularBuffer *circ_buff = new CircularBuffer(num_bytes);
@@ -147,3 +148,4 @@ SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes)
config.deleter = deleter;
return SBuffer(config);
}
+*/
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index d204ec4..8717fa2 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -122,11 +122,10 @@ struct BlockActor : Apology::Worker
BitSet inputs_done;
BitSet outputs_done;
std::set<Token> token_pool;
- std::vector<SBufferToken> output_buffer_tokens;
//buffer queues and ready conditions
InputBufferQueues input_queues;
- OutputBufferQueues<SBuffer> output_queues;
+ OutputBufferQueues output_queues;
BitSet inputs_available;
//tag tracking
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 83ed44b..7f70d39 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -5,7 +5,7 @@
#include <gras_impl/debug.hpp>
#include <gras_impl/bitset.hpp>
-#include <gras_impl/buffer_queue.hpp>
+#include <gras_impl/simple_buffer_queue.hpp>
#include <gras/sbuffer.hpp>
#include <vector>
#include <queue>
@@ -172,7 +172,7 @@ struct InputBufferQueues
std::vector<size_t> _maximum_bytes;
std::vector<boost::circular_buffer<SBuffer> > _queues;
std::vector<size_t> _preload_bytes;
- std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
+ std::vector<boost::shared_ptr<SimpleBufferQueue> > _aux_queues;
};
@@ -207,7 +207,7 @@ inline void InputBufferQueues::update_config(
_aux_queues[i]->empty() or
_aux_queues[i]->front().get_actual_length() != _maximum_bytes[i]
){
- _aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
+ _aux_queues[i].reset(new SimpleBufferQueue());
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
_aux_queues[i]->allocate_one(_maximum_bytes[i]);
diff --git a/lib/gras_impl/messages.hpp b/lib/gras_impl/messages.hpp
index eae3258..62d48e6 100644
--- a/lib/gras_impl/messages.hpp
+++ b/lib/gras_impl/messages.hpp
@@ -3,6 +3,7 @@
#ifndef INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP
#define INCLUDED_LIBGRAS_IMPL_MESSAGES_HPP
+#include <gras/buffer_queue.hpp>
#include <gras/sbuffer.hpp>
#include <gras/tags.hpp>
#include <gras/sbuffer.hpp>
@@ -104,7 +105,7 @@ struct OutputHintMessage
struct OutputAllocMessage
{
size_t index;
- SBufferToken token;
+ BufferQueueSptr queue;
};
//----------------------------------------------------------------------
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index 722a4c2..5674c3d 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -3,64 +3,75 @@
#ifndef INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP
#define INCLUDED_LIBGRAS_IMPL_OUTPUT_BUFFER_QUEUES_HPP
+#include <gras/buffer_queue.hpp>
#include <gras_impl/bitset.hpp>
#include <vector>
-#include <boost/circular_buffer.hpp>
namespace gras
{
-template <typename T>
struct OutputBufferQueues
{
- enum {MAX_QUEUE_SIZE = 128};
- BitSet _bitset;
- std::vector<boost::circular_buffer<T> > _queues;
+ void set_buffer_queue(const size_t i, BufferQueueSptr queue)
+ {
+ _queues[i] = queue;
+ _update(i);
+ }
+
+ void set_reserve_bytes(const size_t i, const size_t num_bytes)
+ {
+ _reserve_bytes[i] = num_bytes;
+ _update(i);
+ }
GRAS_FORCE_INLINE void resize(const size_t size)
{
_bitset.resize(size);
- _queues.resize(size, boost::circular_buffer<T>(MAX_QUEUE_SIZE));
+ _queues.resize(size);
+ _reserve_bytes.resize(size, 1);
}
- GRAS_FORCE_INLINE void push(const size_t i, const T &value)
+ GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &value)
{
- _queues[i].push_back(value);
- _bitset.set(i);
+ _queues[i]->push(value);
+ _update(i);
+ }
+
+ GRAS_FORCE_INLINE void flush_all(void)
+ {
+ const size_t old_size = this->size();
+ this->resize(0);
+ this->resize(old_size);
}
//! used for input buffer inlining
+ /*
GRAS_FORCE_INLINE void push_front(const size_t i, const T &value)
{
ASSERT(not _queues[i].full());
_queues[i].push_front(value);
_bitset.set(i);
}
+ */
- GRAS_FORCE_INLINE const T &front(const size_t i) const
+ GRAS_FORCE_INLINE SBuffer &front(const size_t i)
{
- return _queues[i].front();
- }
-
- GRAS_FORCE_INLINE T &front(const size_t i)
- {
- ASSERT(not _queues[i].empty());
- return _queues[i].front();
+ ASSERT(not _queues[i]->empty());
+ return _queues[i]->front();
}
GRAS_FORCE_INLINE void pop(const size_t i)
{
- _queues[i].front() = T();
- _queues[i].pop_front();
- _bitset.set(i, not _queues[i].empty());
+ _queues[i]->pop();
+ _update(i);
}
GRAS_FORCE_INLINE void fail(const size_t i)
{
_bitset.reset(i);
}
-
+/*
GRAS_FORCE_INLINE void flush(const size_t i)
{
_queues[i].clear();
@@ -71,15 +82,18 @@ struct OutputBufferQueues
{
for (size_t i = 0; i < this->size(); i++) this->flush(i);
}
-
+*/
GRAS_FORCE_INLINE bool ready(const size_t i) const
{
- return not _queues[i].empty();
+ if (_queues[i]->empty()) return false;
+ const SBuffer &front = _queues[i]->front();
+ const size_t avail = front.get_actual_length() - front.offset - front.length;
+ return avail >= _reserve_bytes[i];
}
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
- return _queues[i].empty();
+ return _queues[i]->empty();
}
GRAS_FORCE_INLINE bool all_ready(void) const
@@ -91,6 +105,16 @@ struct OutputBufferQueues
{
return _queues.size();
}
+
+ GRAS_FORCE_INLINE void _update(const size_t i)
+ {
+ _bitset.set(i, this->ready(i));
+ }
+
+ BitSet _bitset;
+ std::vector<BufferQueueSptr> _queues;
+ std::vector<size_t> _reserve_bytes;
+
};
} //namespace gras
diff --git a/lib/gras_impl/buffer_queue.hpp b/lib/gras_impl/simple_buffer_queue.hpp
index 0e6be9e..db6ecc0 100644
--- a/lib/gras_impl/buffer_queue.hpp
+++ b/lib/gras_impl/simple_buffer_queue.hpp
@@ -1,7 +1,7 @@
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
-#ifndef INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP
-#define INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP
+#ifndef INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP
+#define INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP
#include <gras/sbuffer.hpp>
#include <boost/bind.hpp>
@@ -10,15 +10,15 @@
namespace gras
{
-struct BufferQueue : std::queue<SBuffer>
+struct SimpleBufferQueue : std::queue<SBuffer>
{
- BufferQueue(void)
+ SimpleBufferQueue(void)
{
- SBufferDeleter deleter = boost::bind(&BufferQueue::push_back, this, _1);
+ SBufferDeleter deleter = boost::bind(&SimpleBufferQueue::push_back, this, _1);
_token = SBufferToken(new SBufferDeleter(deleter));
}
- ~BufferQueue(void)
+ ~SimpleBufferQueue(void)
{
_token.reset();
while (not this->empty())
@@ -47,4 +47,4 @@ struct BufferQueue : std::queue<SBuffer>
} //namespace gras
-#endif /*INCLUDED_LIBGRAS_IMPL_BUFFER_QUEUE_HPP*/
+#endif /*INCLUDED_LIBGRAS_IMPL_SIMPLE_BUFFER_QUEUE_HPP*/
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 02f62f3..bef5234 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -66,8 +66,8 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther
//handle the upstream block allocation request
OutputAllocMessage new_msg;
- new_msg.token = block_ptr->input_buffer_allocator(
+ new_msg.queue = block_ptr->input_buffer_allocator(
index, message.token, message.recommend_length
);
- if (new_msg.token) this->post_upstream(index, new_msg);
+ if (new_msg.queue) this->post_upstream(index, new_msg);
}
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index 5876f89..a0bad75 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -70,9 +70,5 @@ void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Th
const size_t index = message.index;
//return of a positive downstream allocation
- //reset the token, and clear old output buffers
- //the new token from the downstream is installed
- this->output_buffer_tokens[index].reset();
- this->output_queues.flush(index);
- this->output_buffer_tokens[index] = message.token;
+ this->output_queues.set_buffer_queue(index, message.queue);
}