diff options
author | Josh Blum | 2013-06-06 13:45:50 -0700 |
---|---|---|
committer | Josh Blum | 2013-06-06 13:45:50 -0700 |
commit | b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6 (patch) | |
tree | 6ce12ebd668d120823c652f8b09d055a149d70dc /lib/block_produce.cpp | |
parent | 7889847eed1e8bc003b88b0d6ad4f7904873d2ac (diff) | |
parent | 7350e18b8d5090349390f54b76a0e251b66ce619 (diff) | |
download | sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.tar.gz sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.tar.bz2 sandhi-b7c8f27d47ca78d2b07e7a4cb53d1c8df6eb32d6.zip |
Merge branch 'actor_migration'
Diffstat (limited to 'lib/block_produce.cpp')
-rw-r--r-- | lib/block_produce.cpp | 42 |
1 files changed, 19 insertions, 23 deletions
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp index 584d3a3..7133321 100644 --- a/lib/block_produce.cpp +++ b/lib/block_produce.cpp @@ -9,26 +9,26 @@ using namespace gras; void Block::produce(const size_t which_output, const size_t num_items) { ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative - (*this)->block->produce(which_output, num_items); + (*this)->block_actor->produce(which_output, num_items); } void Block::produce(const size_t num_items) { - const size_t num_outputs = (*this)->block->get_num_outputs(); + const size_t num_outputs = (*this)->worker->get_num_outputs(); for (size_t o = 0; o < num_outputs; o++) { - (*this)->block->produce(o, num_items); + (*this)->block_actor->produce(o, num_items); } } item_index_t Block::get_produced(const size_t which_output) { - return (*this)->block->stats.items_produced[which_output]; + return (*this)->block_data->stats.items_produced[which_output]; } SBuffer Block::get_output_buffer(const size_t which_output) const { - SBuffer &buff = (*this)->block->output_queues.front(which_output); + SBuffer &buff = (*this)->block_data->output_queues.front(which_output); //increment length to auto pop full buffer size, //when user doesnt call pop_output_buffer() buff.length = buff.get_actual_length(); @@ -37,12 +37,19 @@ SBuffer Block::get_output_buffer(const size_t which_output) const void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes) { - (*this)->block->output_queues.front(which_output).length = num_bytes; + (*this)->block_data->output_queues.front(which_output).length = num_bytes; } void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer) { - (*this)->block->produce_buffer(which_output, buffer); + boost::shared_ptr<BlockData> &data = (*this)->block_data; + data->output_queues.consume(which_output); + ASSERT((buffer.length % data->output_configs[which_output].item_size) == 0); + const size_t items = buffer.length/data->output_configs[which_output].item_size; + data->stats.items_produced[which_output] += items; + InputBufferMessage buff_msg; + buff_msg.buffer = buffer; + (*this)->worker->post_downstream(which_output, buff_msg); } GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) @@ -50,21 +57,10 @@ GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) #ifdef ITEM_CONSPROD std::cerr << name << " produce " << items << std::endl; #endif - SBuffer &buff = this->output_queues.front(i); - ASSERT((buff.length % output_configs[i].item_size) == 0); - this->stats.items_produced[i] += items; - const size_t bytes = items*this->output_configs[i].item_size; + SBuffer &buff = data->output_queues.front(i); + ASSERT((buff.length % data->output_configs[i].item_size) == 0); + data->stats.items_produced[i] += items; + const size_t bytes = items*data->output_configs[i].item_size; buff.length += bytes; - this->produce_outputs[i] = true; -} - -GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) -{ - this->output_queues.consume(i); - ASSERT((buffer.length % output_configs[i].item_size) == 0); - const size_t items = buffer.length/output_configs[i].item_size; - this->stats.items_produced[i] += items; - InputBufferMessage buff_msg; - buff_msg.buffer = buffer; - this->post_downstream(i, buff_msg); + data->produce_outputs[i] = true; } |