summaryrefslogtreecommitdiff
path: root/lib/block_produce.cpp
diff options
context:
space:
mode:
authorJosh Blum2013-06-06 13:45:50 -0700
committerJosh Blum2013-06-06 13:45:50 -0700
commitb7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6 (patch)
tree6ce12ebd668d120823c652f8b09d055a149d70dc /lib/block_produce.cpp
parent7889847eed1e8bc003b88b0d6ad4f7904873d2ac (diff)
parent7350e18b8d5090349390f54b76a0e251b66ce619 (diff)
downloadsandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.tar.gz
sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.tar.bz2
sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.zip
Merge branch 'actor_migration'
Diffstat (limited to 'lib/block_produce.cpp')
-rw-r--r--lib/block_produce.cpp42
1 files changed, 19 insertions, 23 deletions
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp
index 584d3a3..7133321 100644
--- a/lib/block_produce.cpp
+++ b/lib/block_produce.cpp
@@ -9,26 +9,26 @@ using namespace gras;
void Block::produce(const size_t which_output, const size_t num_items)
{
ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative
- (*this)->block->produce(which_output, num_items);
+ (*this)->block_actor->produce(which_output, num_items);
}
void Block::produce(const size_t num_items)
{
- const size_t num_outputs = (*this)->block->get_num_outputs();
+ const size_t num_outputs = (*this)->worker->get_num_outputs();
for (size_t o = 0; o < num_outputs; o++)
{
- (*this)->block->produce(o, num_items);
+ (*this)->block_actor->produce(o, num_items);
}
}
item_index_t Block::get_produced(const size_t which_output)
{
- return (*this)->block->stats.items_produced[which_output];
+ return (*this)->block_data->stats.items_produced[which_output];
}
SBuffer Block::get_output_buffer(const size_t which_output) const
{
- SBuffer &buff = (*this)->block->output_queues.front(which_output);
+ SBuffer &buff = (*this)->block_data->output_queues.front(which_output);
//increment length to auto pop full buffer size,
//when user doesnt call pop_output_buffer()
buff.length = buff.get_actual_length();
@@ -37,12 +37,19 @@ SBuffer Block::get_output_buffer(const size_t which_output) const
void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
{
- (*this)->block->output_queues.front(which_output).length = num_bytes;
+ (*this)->block_data->output_queues.front(which_output).length = num_bytes;
}
void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
{
- (*this)->block->produce_buffer(which_output, buffer);
+ boost::shared_ptr<BlockData> &data = (*this)->block_data;
+ data->output_queues.consume(which_output);
+ ASSERT((buffer.length % data->output_configs[which_output].item_size) == 0);
+ const size_t items = buffer.length/data->output_configs[which_output].item_size;
+ data->stats.items_produced[which_output] += items;
+ InputBufferMessage buff_msg;
+ buff_msg.buffer = buffer;
+ (*this)->worker->post_downstream(which_output, buff_msg);
}
GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
@@ -50,21 +57,10 @@ GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
#ifdef ITEM_CONSPROD
std::cerr << name << " produce " << items << std::endl;
#endif
- SBuffer &buff = this->output_queues.front(i);
- ASSERT((buff.length % output_configs[i].item_size) == 0);
- this->stats.items_produced[i] += items;
- const size_t bytes = items*this->output_configs[i].item_size;
+ SBuffer &buff = data->output_queues.front(i);
+ ASSERT((buff.length % data->output_configs[i].item_size) == 0);
+ data->stats.items_produced[i] += items;
+ const size_t bytes = items*data->output_configs[i].item_size;
buff.length += bytes;
- this->produce_outputs[i] = true;
-}
-
-GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
-{
- this->output_queues.consume(i);
- ASSERT((buffer.length % output_configs[i].item_size) == 0);
- const size_t items = buffer.length/output_configs[i].item_size;
- this->stats.items_produced[i] += items;
- InputBufferMessage buff_msg;
- buff_msg.buffer = buffer;
- this->post_downstream(i, buff_msg);
+ data->produce_outputs[i] = true;
}