summaryrefslogtreecommitdiff
path: root/lib/gras_impl
diff options
context:
space:
mode:
authorJosh Blum2013-04-28 13:34:00 -0700
committerJosh Blum2013-04-28 13:34:00 -0700
commitf3c127cf6f650e9c5a655b6cb6ce75c2d39abbb4 (patch)
treeae5f88d51c56fdefb78c4d774faffd4b387f51a3 /lib/gras_impl
parent1a2d4cc189a0eae13ac8a7047327540b126f9267 (diff)
parentce75954a02b6ba386cb9677c9e492af9b8de328a (diff)
downloadsandhi-f3c127cf6f650e9c5a655b6cb6ce75c2d39abbb4.tar.gz
sandhi-f3c127cf6f650e9c5a655b6cb6ce75c2d39abbb4.tar.bz2
sandhi-f3c127cf6f650e9c5a655b6cb6ce75c2d39abbb4.zip
Merge branch 'master' into theron6_work
Diffstat (limited to 'lib/gras_impl')
-rw-r--r--lib/gras_impl/block_actor.hpp21
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp36
2 files changed, 36 insertions, 21 deletions
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 2799313..5444bb0 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -104,7 +104,6 @@ struct BlockActor : Apology::Worker
void produce(const size_t index, const size_t items);
void consume(const size_t index, const size_t items);
void produce_buffer(const size_t index, const SBuffer &buffer);
- void flush_output(const size_t index);
void task_kicker(void);
void update_input_avail(const size_t index);
bool is_input_done(const size_t index);
@@ -168,26 +167,6 @@ struct BlockActor : Apology::Worker
//-------------- common functions from this BlockActor class ---------//
-GRAS_FORCE_INLINE void BlockActor::flush_output(const size_t i)
-{
- if GRAS_UNLIKELY(this->output_queues.empty(i) or this->output_queues.front(i).length == 0) return;
- SBuffer &buff = this->output_queues.front(i);
- if GRAS_LIKELY(this->produce_outputs[i])
- {
- this->produce_outputs[i] = false;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buff;
- this->post_downstream(i, buff_msg);
- }
-
- //increment buffer for next use
- buff.offset += buff.length;
- buff.length = 0;
-
- //release whatever has been used of the output buffer
- this->output_queues.pop(i);
-}
-
GRAS_FORCE_INLINE void BlockActor::task_kicker(void)
{
if (this->is_work_allowed()) this->Send(SelfKickMessage(), this->GetAddress());
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index deea0e1..f0859a3 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -31,6 +31,7 @@ struct OutputBufferQueues
_bitset.resize(size);
_queues.resize(size);
_reserve_bytes.resize(size, 1);
+ _inline_buffer.resize(size);
}
GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buff)
@@ -49,12 +50,38 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE SBuffer &front(const size_t i)
{
+ if GRAS_UNLIKELY(_inline_buffer[i]) return _inline_buffer[i];
ASSERT(not this->empty(i));
return _queues[i]->front();
}
+ GRAS_FORCE_INLINE void consume(const size_t i)
+ {
+ if GRAS_UNLIKELY(_inline_buffer[i])
+ {
+ _inline_buffer[i].reset();
+ return;
+ }
+
+ ASSERT(not this->empty(i));
+ SBuffer &buff = this->front(i);
+ if GRAS_UNLIKELY(buff.length == 0) return;
+
+ //increment buffer for next use
+ buff.offset += buff.length;
+ buff.length = 0;
+
+ this->pop(i);
+ }
+
GRAS_FORCE_INLINE void pop(const size_t i)
{
+ if GRAS_UNLIKELY(_inline_buffer[i])
+ {
+ _inline_buffer[i].reset();
+ return;
+ }
+
ASSERT(_queues[i]);
ASSERT(not _queues[i]->empty());
_queues[i]->pop();
@@ -73,6 +100,7 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE bool empty(const size_t i) const
{
+ if GRAS_UNLIKELY(_inline_buffer[i]) return false;
return (not _queues[i] or _queues[i]->empty());
}
@@ -98,9 +126,17 @@ struct OutputBufferQueues
_bitset.set(i, avail >= _reserve_bytes[i]);
}
+ GRAS_FORCE_INLINE void set_inline(const size_t i, const SBuffer &inline_buffer)
+ {
+ _inline_buffer[i] = inline_buffer;
+ _inline_buffer[i].length = 0;
+ _bitset.set(i);
+ }
+
BitSet _bitset;
std::vector<BufferQueueSptr> _queues;
std::vector<size_t> _reserve_bytes;
+ std::vector<SBuffer> _inline_buffer;
};