summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/block_task.cpp18
-rw-r--r--lib/buffer_queue_circ.cpp3
-rw-r--r--lib/circular_buffer.cpp1
-rw-r--r--lib/gras_impl/block_actor.hpp2
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp3
5 files changed, 7 insertions, 20 deletions
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index c84f702..4f44de3 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -92,9 +92,6 @@ void BlockActor::output_fail(const size_t i)
throw std::runtime_error("output_fail called on maximum_items buffer");
}
- //force pop so next work() gets a new buffer
- this->flush_output(i, true);
-
//mark fail: not ready until a new buffer appears
this->output_queues.fail(i);
}
@@ -265,7 +262,7 @@ void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
this->post_downstream(i, buff_msg);
}
-GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force_pop)
+GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
{
if (this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return;
SBuffer &buff = this->output_queues.front(i);
@@ -277,17 +274,6 @@ GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i, const bool force
buff.offset += buff.length;
buff.length = 0;
- //simply just pop
+ //release whatever has been used of the output buffer
this->output_queues.pop(i);
-
- /*
- //when to pop the buffer and give next work a new one
- const size_t reserve_bytes = this->output_configs[i].reserve_items/this->output_items_sizes[i];
- if (
- force_pop or (buff.offset*2 > buff.get_actual_length()) or
- (buff.get_actual_length() - buff.offset) < reserve_bytes
- )
- {
- this->output_queues.pop(i);
- }*/
}
diff --git a/lib/buffer_queue_circ.cpp b/lib/buffer_queue_circ.cpp
index 5e69fe0..a75e0b0 100644
--- a/lib/buffer_queue_circ.cpp
+++ b/lib/buffer_queue_circ.cpp
@@ -53,7 +53,7 @@ BufferQueueCirc::BufferQueueCirc(const SBufferConfig &config, const size_t num_b
_bytes_avail = _circ_buff.get_actual_length();
//allocate pool of sbuffers
- _available_buffers.resize(num_buffs);
+ _available_buffers.set_capacity(num_buffs);
_returned_buffers.resize(num_buffs);
_outgone_bytes.resize(num_buffs, 0);
SBufferConfig sconfig = config;
@@ -70,6 +70,7 @@ SBuffer &BufferQueueCirc::front(void)
{
ASSERT(not this->empty());
SBuffer &front = _available_buffers.front();
+ ASSERT(front);
front->config.memory = _write_ptr;
return front;
}
diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp
index 7f482a7..179e650 100644
--- a/lib/circular_buffer.cpp
+++ b/lib/circular_buffer.cpp
@@ -1,6 +1,7 @@
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#include <gras/buffer_queue.hpp>
+#include <gras_impl/debug.hpp>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index a490583..afc248e 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -95,7 +95,7 @@ struct BlockActor : Apology::Worker
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
- void flush_output(const size_t index, const bool force_pop = false);
+ void flush_output(const size_t index);
bool is_work_allowed(void);
GRAS_FORCE_INLINE bool is_input_done(const size_t i)
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index 9ff0807..3618b1b 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -72,8 +72,7 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
- ASSERT(_queues[i]);
- return _queues[i]->empty();
+ return (not _queues[i] or _queues[i]->empty());
}
GRAS_FORCE_INLINE bool all_ready(void) const