diff options
-rw-r--r-- | lib/block.cpp | 15 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 12 | ||||
-rw-r--r-- | lib/block_consume.cpp | 10 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 70 | ||||
-rw-r--r-- | lib/block_message.cpp | 12 | ||||
-rw-r--r-- | lib/block_produce.cpp | 24 | ||||
-rw-r--r-- | lib/block_props.cpp | 6 | ||||
-rw-r--r-- | lib/element_impl.hpp | 1 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 34 | ||||
-rw-r--r-- | lib/gras_impl/block_data.hpp | 15 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 42 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 34 | ||||
-rw-r--r-- | lib/tag_handlers.hpp | 18 | ||||
-rw-r--r-- | lib/task_done.cpp | 26 | ||||
-rw-r--r-- | lib/task_fail.cpp | 18 | ||||
-rw-r--r-- | lib/task_main.cpp | 74 | ||||
-rw-r--r-- | lib/top_block_query.cpp | 4 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 52 |
18 files changed, 233 insertions, 234 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 835b979..ac01f72 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -38,9 +38,10 @@ Block::Block(const std::string &name): (*this)->block->name = name; //for debug purposes (*this)->block->block_ptr = this; (*this)->block->data.reset(new BlockData()); + (*this)->block_data = (*this)->block->data; //setup some state variables - (*this)->block->data = BlockActor::BLOCK_STATE_INIT; + (*this)->block_data->block_state = BLOCK_STATE_INIT; //call block methods to init stuff this->set_interruptible_work(false); @@ -133,22 +134,22 @@ typename V::value_type &vector_get_resize(V &v, const size_t index) InputPortConfig &Block::input_config(const size_t which_input) { - return vector_get_resize((*this)->block->input_configs, which_input); + return vector_get_resize((*this)->block_data->input_configs, which_input); } const InputPortConfig &Block::input_config(const size_t which_input) const { - return vector_get_const((*this)->block->input_configs, which_input); + return vector_get_const((*this)->block_data->input_configs, which_input); } OutputPortConfig &Block::output_config(const size_t which_output) { - return vector_get_resize((*this)->block->output_configs, which_output); + return vector_get_resize((*this)->block_data->output_configs, which_output); } const OutputPortConfig &Block::output_config(const size_t which_output) const { - return vector_get_const((*this)->block->output_configs, which_output); + return vector_get_const((*this)->block_data->output_configs, which_output); } void Block::commit_config(void) @@ -191,10 +192,10 @@ void Block::set_thread_pool(const ThreadPool &thread_pool) void Block::set_buffer_affinity(const long affinity) { - (*this)->block->buffer_affinity = affinity; + (*this)->block_data->buffer_affinity = affinity; } void Block::set_interruptible_work(const bool enb) { - (*this)->block->interruptible_work = enb; + (*this)->block_data->interruptible_work = enb; } diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index 0dda99a..86732e2 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -62,10 +62,10 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address for (size_t i = 0; i < num_outputs; i++) { const size_t bytes = recommend_length( - this->output_allocation_hints[i], - my_round_up_mult(AT_LEAST_BYTES, this->output_configs[i].item_size), - this->output_configs[i].reserve_items*this->output_configs[i].item_size, - this->output_configs[i].maximum_items*this->output_configs[i].item_size + data->output_allocation_hints[i], + my_round_up_mult(AT_LEAST_BYTES, data->output_configs[i].item_size), + data->output_configs[i].reserve_items*data->output_configs[i].item_size, + data->output_configs[i].maximum_items*data->output_configs[i].item_size ); SBufferDeleter deleter = boost::bind(&buffer_returner, this->thread_pool, this->GetAddress(), i, _1); @@ -74,11 +74,11 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address SBufferConfig config; config.memory = NULL; config.length = bytes; - config.affinity = this->buffer_affinity; + config.affinity = data->buffer_affinity; config.token = token; BufferQueueSptr queue = block_ptr->output_buffer_allocator(i, config); - this->output_queues.set_buffer_queue(i, queue); + data->output_queues.set_buffer_queue(i, queue); InputAllocMessage message; //new token for the downstream allocator diff --git a/lib/block_consume.cpp b/lib/block_consume.cpp index efdb07e..04529f4 100644 --- a/lib/block_consume.cpp +++ b/lib/block_consume.cpp @@ -24,12 +24,12 @@ void Block::consume(const size_t num_items) item_index_t Block::get_consumed(const size_t which_input) { - return (*this)->block->stats.items_consumed[which_input]; + return (*this)->block_data->stats.items_consumed[which_input]; } SBuffer Block::get_input_buffer(const size_t which_input) const { - return (*this)->block->input_queues.front(which_input); + return (*this)->block_data->input_queues.front(which_input); } GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items) @@ -37,8 +37,8 @@ GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items) #ifdef ITEM_CONSPROD std::cerr << name << " consume " << items << std::endl; #endif - this->stats.items_consumed[i] += items; - const size_t bytes = items*this->input_configs[i].item_size; - this->input_queues.consume(i, bytes); + data->stats.items_consumed[i] += items; + const size_t bytes = items*data->input_configs[i].item_size; + data->input_queues.consume(i, bytes); this->trim_tags(i); } diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index e0ee67a..0cf4b1f 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -13,12 +13,12 @@ void BlockActor::handle_top_active( ){ MESSAGE_TRACER(); - if (this->block_state != BLOCK_STATE_LIVE) + if (data->block_state != BLOCK_STATE_LIVE) { - this->block_ptr->notify_active(); - this->stats.start_time = time_now(); + block_ptr->notify_active(); + data->stats.start_time = time_now(); } - this->block_state = BLOCK_STATE_LIVE; + data->block_state = BLOCK_STATE_LIVE; this->Send(0, from); //ACK @@ -45,17 +45,17 @@ void BlockActor::handle_top_token( //create input tokens and send allocation hints for (size_t i = 0; i < this->get_num_inputs(); i++) { - this->input_tokens[i] = Token::make(); - this->inputs_done.reset(i); + data->input_tokens[i] = Token::make(); + data->inputs_done.reset(i); OutputTokenMessage token_msg; - token_msg.token = this->input_tokens[i]; + token_msg.token = data->input_tokens[i]; this->post_upstream(i, token_msg); //TODO, schedule this message as a pre-allocation message //tell the upstream about the input requirements OutputHintMessage output_hints; - output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size; - output_hints.token = this->input_tokens[i]; + output_hints.reserve_bytes = data->input_configs[i].reserve_items*data->input_configs[i].item_size; + output_hints.token = data->input_tokens[i]; this->post_upstream(i, output_hints); } @@ -63,15 +63,15 @@ void BlockActor::handle_top_token( //create output token for (size_t i = 0; i < this->get_num_outputs(); i++) { - this->output_tokens[i] = Token::make(); - this->outputs_done.reset(i); + data->output_tokens[i] = Token::make(); + data->outputs_done.reset(i); InputTokenMessage token_msg; - token_msg.token = this->output_tokens[i]; + token_msg.token = data->output_tokens[i]; this->post_downstream(i, token_msg); } //store a token to the top level topology - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); this->Send(0, from); //ACK } @@ -83,18 +83,18 @@ void BlockActor::handle_top_config( MESSAGE_TRACER(); //overwrite with global config only if maxium_items is not set (zero) - for (size_t i = 0; i < this->output_configs.size(); i++) + for (size_t i = 0; i < data->output_configs.size(); i++) { - if (this->output_configs[i].maximum_items == 0) + if (data->output_configs[i].maximum_items == 0) { - this->output_configs[i].maximum_items = message.maximum_output_items; + data->output_configs[i].maximum_items = message.maximum_output_items; } } //overwrite with global node affinity setting for buffers if not set - if (this->buffer_affinity == -1) + if (data->buffer_affinity == -1) { - this->buffer_affinity = message.buffer_affinity; + data->buffer_affinity = message.buffer_affinity; } this->Send(0, from); //ACK @@ -109,12 +109,12 @@ void BlockActor::handle_top_thread_group( //store the topology's thread group //erase any potentially old lingering threads //spawn a new thread if this block is a source - this->thread_group = message; - this->interruptible_thread.reset(); //erase old one - if (this->interruptible_work) + data->thread_group = message; + data->interruptible_thread.reset(); //erase old one + if (data->interruptible_work) { - this->interruptible_thread = boost::make_shared<InterruptibleThread>( - this->thread_group, boost::bind(&BlockActor::task_work, this) + data->interruptible_thread = boost::make_shared<InterruptibleThread>( + data->thread_group, boost::bind(&BlockActor::task_work, this) ); } @@ -138,24 +138,24 @@ void BlockActor::handle_get_stats( //instantaneous states we update here, //and not interleaved with the rest of the code const size_t num_inputs = this->get_num_inputs(); - this->stats.items_enqueued.resize(num_inputs); - this->stats.tags_enqueued.resize(num_inputs); - this->stats.msgs_enqueued.resize(num_inputs); + data->stats.items_enqueued.resize(num_inputs); + data->stats.tags_enqueued.resize(num_inputs); + data->stats.msgs_enqueued.resize(num_inputs); for (size_t i = 0; i < num_inputs; i++) { - this->stats.items_enqueued[i] = this->input_queues.get_items_enqueued(i); - this->stats.tags_enqueued[i] = this->input_tags[i].size(); - this->stats.msgs_enqueued[i] = this->input_msgs[i].size(); + data->stats.items_enqueued[i] = data->input_queues.get_items_enqueued(i); + data->stats.tags_enqueued[i] = data->input_tags[i].size(); + data->stats.msgs_enqueued[i] = data->input_msgs[i].size(); } - this->stats.actor_queue_depth = this->GetNumQueuedMessages(); - this->stats.bytes_copied = this->input_queues.bytes_copied; - this->stats.inputs_idle = this->input_queues.total_idle_times; - this->stats.outputs_idle = this->output_queues.total_idle_times; + data->stats.actor_queue_depth = this->GetNumQueuedMessages(); + data->stats.bytes_copied = data->input_queues.bytes_copied; + data->stats.inputs_idle = data->input_queues.total_idle_times; + data->stats.outputs_idle = data->output_queues.total_idle_times; //create the message reply object GetStatsMessage message; - message.block_id = this->block_ptr->get_uid(); - message.stats = this->stats; + message.block_id = block_ptr->get_uid(); + message.stats = data->stats; message.stats_time = time_now(); this->Send(message, from); //ACK diff --git a/lib/block_message.cpp b/lib/block_message.cpp index ef7dda6..84078ce 100644 --- a/lib/block_message.cpp +++ b/lib/block_message.cpp @@ -8,29 +8,29 @@ using namespace gras; void Block::post_output_tag(const size_t which_output, const Tag &tag) { - (*this)->block->stats.tags_produced[which_output]++; + (*this)->block_data->stats.tags_produced[which_output]++; (*this)->block->post_downstream(which_output, InputTagMessage(tag)); } void Block::_post_output_msg(const size_t which_output, const PMCC &msg) { - (*this)->block->stats.msgs_produced[which_output]++; + (*this)->block_data->stats.msgs_produced[which_output]++; (*this)->block->post_downstream(which_output, InputMsgMessage(msg)); } TagIter Block::get_input_tags(const size_t which_input) { - const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input]; + const std::vector<Tag> &input_tags = (*this)->block_data->input_tags[which_input]; return TagIter(input_tags.begin(), input_tags.end()); } PMCC Block::pop_input_msg(const size_t which_input) { - std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input]; - size_t &num_read = (*this)->block->num_input_msgs_read[which_input]; + std::vector<PMCC> &input_msgs = (*this)->block_data->input_msgs[which_input]; + size_t &num_read = (*this)->block_data->num_input_msgs_read[which_input]; if (num_read >= input_msgs.size()) return PMCC(); PMCC p = input_msgs[num_read++]; - (*this)->block->stats.msgs_consumed[which_input]++; + (*this)->block_data->stats.msgs_consumed[which_input]++; return p; } diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp index 584d3a3..31a91f7 100644 --- a/lib/block_produce.cpp +++ b/lib/block_produce.cpp @@ -23,12 +23,12 @@ void Block::produce(const size_t num_items) item_index_t Block::get_produced(const size_t which_output) { - return (*this)->block->stats.items_produced[which_output]; + return (*this)->block_data->stats.items_produced[which_output]; } SBuffer Block::get_output_buffer(const size_t which_output) const { - SBuffer &buff = (*this)->block->output_queues.front(which_output); + SBuffer &buff = (*this)->block_data->output_queues.front(which_output); //increment length to auto pop full buffer size, //when user doesnt call pop_output_buffer() buff.length = buff.get_actual_length(); @@ -37,7 +37,7 @@ SBuffer Block::get_output_buffer(const size_t which_output) const void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes) { - (*this)->block->output_queues.front(which_output).length = num_bytes; + (*this)->block_data->output_queues.front(which_output).length = num_bytes; } void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer) @@ -50,20 +50,20 @@ GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items) #ifdef ITEM_CONSPROD std::cerr << name << " produce " << items << std::endl; #endif - SBuffer &buff = this->output_queues.front(i); - ASSERT((buff.length % output_configs[i].item_size) == 0); - this->stats.items_produced[i] += items; - const size_t bytes = items*this->output_configs[i].item_size; + SBuffer &buff = data->output_queues.front(i); + ASSERT((buff.length % data->output_configs[i].item_size) == 0); + data->stats.items_produced[i] += items; + const size_t bytes = items*data->output_configs[i].item_size; buff.length += bytes; - this->produce_outputs[i] = true; + data->produce_outputs[i] = true; } GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) { - this->output_queues.consume(i); - ASSERT((buffer.length % output_configs[i].item_size) == 0); - const size_t items = buffer.length/output_configs[i].item_size; - this->stats.items_produced[i] += items; + data->output_queues.consume(i); + ASSERT((buffer.length % data->output_configs[i].item_size) == 0); + const size_t items = buffer.length/data->output_configs[i].item_size; + data->stats.items_produced[i] += items; InputBufferMessage buff_msg; buff_msg.buffer = buffer; this->post_downstream(i, buff_msg); diff --git a/lib/block_props.cpp b/lib/block_props.cpp index b3456ac..cc42846 100644 --- a/lib/block_props.cpp +++ b/lib/block_props.cpp @@ -45,7 +45,7 @@ void BlockActor::handle_prop_access( PMCC Block::_handle_prop_access(const std::string &key, const PMCC &value, const bool set) { - const PropertyRegistryPair &pair = (*this)->block->property_registry[key]; + const PropertyRegistryPair &pair = (*this)->block_data->property_registry[key]; PropertyRegistrySptr pr = (set)? pair.setter : pair.getter; if (not pr) throw std::invalid_argument("no property registered for key: " + key); if (set) @@ -96,12 +96,12 @@ PMCC BlockActor::prop_access_dispatcher(const std::string &key, const PMCC &valu void Block::_register_getter(const std::string &key, void *pr) { - (*this)->block->property_registry[key].getter.reset(reinterpret_cast<PropertyRegistry *>(pr)); + (*this)->block_data->property_registry[key].getter.reset(reinterpret_cast<PropertyRegistry *>(pr)); } void Block::_register_setter(const std::string &key, void *pr) { - (*this)->block->property_registry[key].setter.reset(reinterpret_cast<PropertyRegistry *>(pr)); + (*this)->block_data->property_registry[key].setter.reset(reinterpret_cast<PropertyRegistry *>(pr)); } void Block::_set_property(const std::string &key, const PMCC &value) diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 477b478..4e75375 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -45,6 +45,7 @@ struct ElementImpl boost::shared_ptr<Apology::Topology> topology; boost::shared_ptr<Apology::Executor> executor; boost::shared_ptr<BlockActor> block; + boost::shared_ptr<BlockData> block_data; ThreadPool thread_pool; Apology::Base *get_elem(void) const { diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 005855e..7ee9e9b 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -14,13 +14,6 @@ namespace gras { -typedef boost::shared_ptr<PropertyRegistry> PropertyRegistrySptr; -struct PropertyRegistryPair -{ - PropertyRegistrySptr setter; - PropertyRegistrySptr getter; -}; - struct BlockActor : Apology::Worker { BlockActor(const ThreadPool &tp = ThreadPool()); @@ -111,8 +104,11 @@ struct BlockActor : Apology::Worker //work helpers inline void task_work(void) { - block_ptr->work(this->input_items, this->output_items); + block_ptr->work(data->input_items, data->output_items); } + + //property stuff + PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set); }; //-------------- common functions from this BlockActor class ---------// @@ -124,27 +120,27 @@ GRAS_FORCE_INLINE void BlockActor::task_kicker(void) GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i) { - const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i); - const bool has_input_msgs = not this->input_msgs[i].empty(); - this->inputs_available.set(i, has_input_bufs or has_input_msgs); - this->input_queues.update_has_msg(i, has_input_msgs); + const bool has_input_bufs = not data->input_queues.empty(i) and data->input_queues.ready(i); + const bool has_input_msgs = not data->input_msgs[i].empty(); + data->inputs_available.set(i, has_input_bufs or has_input_msgs); + data->input_queues.update_has_msg(i, has_input_msgs); } GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i) { - const bool force_done = this->input_configs[i].force_done; - if GRAS_LIKELY(force_done) return this->inputs_done[i] and not this->inputs_available[i]; - return this->inputs_done.all() and this->inputs_available.none(); + const bool force_done = data->input_configs[i].force_done; + if GRAS_LIKELY(force_done) return data->inputs_done[i] and not data->inputs_available[i]; + return data->inputs_done.all() and data->inputs_available.none(); } GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) { return ( this->prio_token.unique() and - this->block_state == BLOCK_STATE_LIVE and - this->inputs_available.any() and - this->input_queues.all_ready() and - this->output_queues.all_ready() + data->block_state == BLOCK_STATE_LIVE and + data->inputs_available.any() and + data->input_queues.all_ready() and + data->output_queues.all_ready() ); } diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp index 2dcf3c6..9fbf3eb 100644 --- a/lib/gras_impl/block_data.hpp +++ b/lib/gras_impl/block_data.hpp @@ -24,6 +24,13 @@ struct PropertyRegistryPair PropertyRegistrySptr getter; }; +enum BlockState +{ + BLOCK_STATE_INIT, + BLOCK_STATE_LIVE, + BLOCK_STATE_DONE, +}; + struct BlockData { //per port properties @@ -61,19 +68,13 @@ struct BlockData boost::shared_ptr<InterruptibleThread> interruptible_thread; //is the fg running? - enum - { - BLOCK_STATE_INIT, - BLOCK_STATE_LIVE, - BLOCK_STATE_DONE, - } block_state; + BlockState block_state; long buffer_affinity; std::vector<std::vector<OutputHintMessage> > output_allocation_hints; //property stuff std::map<std::string, PropertyRegistryPair> property_registry; - PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set); BlockStats stats; }; diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index faddda8..1210688 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -7,25 +7,25 @@ using namespace gras; void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming stream tag, push into the tag storage - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_tags[index].push_back(message.tag); - this->input_tags_changed[index] = true; + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_tags[index].push_back(message.tag); + data->input_tags_changed[index] = true; } void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming async message, push into the msg storage - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_msgs[index].push_back(message.msg); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_msgs[index].push_back(message.msg); this->update_input_avail(index); ta.done(); @@ -34,13 +34,13 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron:: void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //handle incoming stream buffer, push into the queue - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->input_queues.push(index, message.buffer); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->input_queues.push(index, message.buffer); this->update_input_avail(index); ta.done(); @@ -49,22 +49,22 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); ASSERT(message.index < this->get_num_inputs()); //store the token of the upstream producer - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); } void BlockActor::handle_input_check(const InputCheckMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; //an upstream block declared itself done, recheck the token - this->inputs_done.set(index, this->input_tokens[index].unique()); + data->inputs_done.set(index, data->input_tokens[index].unique()); //upstream done, give it one more attempt at task handling ta.done(); @@ -76,7 +76,7 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t index = message.index; @@ -88,14 +88,14 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_input); + TimerAccumulate ta(data->stats.total_time_input); MESSAGE_TRACER(); const size_t i = message.index; //update buffer queue configuration - if (i >= this->input_queues.size()) return; - const size_t preload_bytes = this->input_configs[i].item_size*this->input_configs[i].preload_items; - const size_t reserve_bytes = this->input_configs[i].item_size*this->input_configs[i].reserve_items; - const size_t maximum_bytes = this->input_configs[i].item_size*this->input_configs[i].maximum_items; - this->input_queues.update_config(i, this->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes); + if (i >= data->input_queues.size()) return; + const size_t preload_bytes = data->input_configs[i].item_size*data->input_configs[i].preload_items; + const size_t reserve_bytes = data->input_configs[i].item_size*data->input_configs[i].reserve_items; + const size_t maximum_bytes = data->input_configs[i].item_size*data->input_configs[i].maximum_items; + data->input_queues.update_config(i, data->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes); } diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index d937bbf..22a86ef 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -8,14 +8,14 @@ using namespace gras; void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //a buffer has returned from the downstream //(all interested consumers have finished with it) - if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return; - this->output_queues.push(index, message.buffer); + if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return; + data->output_queues.push(index, message.buffer); ta.done(); this->task_main(); @@ -23,23 +23,23 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); ASSERT(message.index < this->get_num_outputs()); //store the token of the downstream consumer - this->token_pool.insert(message.token); + data->token_pool.insert(message.token); } void BlockActor::handle_output_check(const OutputCheckMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //a downstream block has declared itself done, recheck the token - this->outputs_done.set(index, this->output_tokens[index].unique()); - if (this->outputs_done.all()) //no downstream subscribers? + data->outputs_done.set(index, data->output_tokens[index].unique()); + if (data->outputs_done.all()) //no downstream subscribers? { this->mark_done(); } @@ -47,7 +47,7 @@ void BlockActor::handle_output_check(const OutputCheckMessage &message, const Th void BlockActor::handle_output_hint(const OutputHintMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; @@ -57,7 +57,7 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther //remove any old hints with expired token //remove any older hints with matching token std::vector<OutputHintMessage> hints; - BOOST_FOREACH(const OutputHintMessage &hint, this->output_allocation_hints[index]) + BOOST_FOREACH(const OutputHintMessage &hint, data->output_allocation_hints[index]) { if (hint.token.expired()) continue; if (hint.token.lock() == message.token.lock()) continue; @@ -67,27 +67,27 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther //store the new hint as well hints.push_back(message); - this->output_allocation_hints[index] = hints; + data->output_allocation_hints[index] = hints; } void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t index = message.index; //return of a positive downstream allocation - this->output_queues.set_buffer_queue(index, message.queue); + data->output_queues.set_buffer_queue(index, message.queue); } void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address) { - TimerAccumulate ta(this->stats.total_time_output); + TimerAccumulate ta(data->stats.total_time_output); MESSAGE_TRACER(); const size_t i = message.index; //update buffer queue configuration - if (i >= this->output_queues.size()) return; - const size_t reserve_bytes = this->output_configs[i].item_size*this->output_configs[i].reserve_items; - this->output_queues.set_reserve_bytes(i, reserve_bytes); + if (i >= data->output_queues.size()) return; + const size_t reserve_bytes = data->output_configs[i].item_size*data->output_configs[i].reserve_items; + data->output_queues.set_reserve_bytes(i, reserve_bytes); } diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index 8d504dd..cc03a9b 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -11,10 +11,10 @@ namespace gras GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i) { - if GRAS_LIKELY(not this->input_tags_changed[i]) return; - std::vector<Tag> &tags_i = this->input_tags[i]; + if GRAS_LIKELY(not data->input_tags_changed[i]) return; + std::vector<Tag> &tags_i = data->input_tags[i]; std::sort(tags_i.begin(), tags_i.end()); - this->input_tags_changed[i] = false; + data->input_tags_changed[i] = false; } GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) @@ -24,8 +24,8 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //-- and post trimmed tags to the downstream based on policy //------------------------------------------------------------------ - std::vector<Tag> &tags_i = this->input_tags[i]; - const item_index_t items_consumed_i = this->stats.items_consumed[i]; + std::vector<Tag> &tags_i = data->input_tags[i]; + const item_index_t items_consumed_i = data->stats.items_consumed[i]; size_t last = 0; while (last < tags_i.size() and tags_i[last].offset < items_consumed_i) { @@ -35,19 +35,19 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) if GRAS_LIKELY(last == 0) return; //call the overloaded propagate_tags to do the dirty work - this->block_ptr->propagate_tags(i, TagIter(tags_i.begin(), tags_i.begin()+last)); + block_ptr->propagate_tags(i, TagIter(tags_i.begin(), tags_i.begin()+last)); //now its safe to perform the erasure tags_i.erase(tags_i.begin(), tags_i.begin()+last); - this->stats.tags_consumed[i] += last; + data->stats.tags_consumed[i] += last; } GRAS_FORCE_INLINE void BlockActor::trim_msgs(const size_t i) { - const size_t num_read = this->num_input_msgs_read[i]; + const size_t num_read = data->num_input_msgs_read[i]; if GRAS_UNLIKELY(num_read > 0) { - std::vector<PMCC> &input_msgs = this->input_msgs[i]; + std::vector<PMCC> &input_msgs = data->input_msgs[i]; input_msgs.erase(input_msgs.begin(), input_msgs.begin()+num_read); } } diff --git a/lib/task_done.cpp b/lib/task_done.cpp index d116202..b6e61a5 100644 --- a/lib/task_done.cpp +++ b/lib/task_done.cpp @@ -12,40 +12,40 @@ void Block::mark_done(void) void BlockActor::mark_done(void) { - if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first + if (data->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first - this->stats.stop_time = time_now(); - this->block_ptr->notify_inactive(); + data->stats.stop_time = time_now(); + block_ptr->notify_inactive(); //flush partial output buffers to the downstream for (size_t i = 0; i < this->get_num_outputs(); i++) { - if (not this->output_queues.ready(i)) continue; - SBuffer &buff = this->output_queues.front(i); + if (not data->output_queues.ready(i)) continue; + SBuffer &buff = data->output_queues.front(i); if (buff.length == 0) continue; InputBufferMessage buff_msg; buff_msg.buffer = buff; this->post_downstream(i, buff_msg); - this->output_queues.pop(i); + data->output_queues.pop(i); } - this->interruptible_thread.reset(); + data->interruptible_thread.reset(); //mark down the new state - this->block_state = BLOCK_STATE_DONE; + data->block_state = BLOCK_STATE_DONE; //release upstream, downstream, and executor tokens - this->token_pool.clear(); + data->token_pool.clear(); //release all buffers in queues - this->input_queues.flush_all(); - this->output_queues.flush_all(); + data->input_queues.flush_all(); + data->output_queues.flush_all(); //release all tags and msgs for (size_t i = 0; i < this->get_num_inputs(); i++) { - this->input_msgs[i].clear(); - this->input_tags[i].clear(); + data->input_msgs[i].clear(); + data->input_tags[i].clear(); } //tell the upstream and downstram to re-check their tokens diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp index 720e2e1..1168698 100644 --- a/lib/task_fail.cpp +++ b/lib/task_fail.cpp @@ -18,41 +18,41 @@ void Block::mark_input_fail(const size_t which_input) void BlockActor::input_fail(const size_t i) { //input failed, accumulate and try again - if (not this->input_queues.is_accumulated(i)) + if (not data->input_queues.is_accumulated(i)) { - this->input_queues.accumulate(i); + data->input_queues.accumulate(i); this->task_kicker(); return; } //otherwise check for done, else wait for more - if (this->inputs_done[i]) + if (data->inputs_done[i]) { this->mark_done(); return; } //check that the input is not already maxed - if (this->input_queues.is_front_maximal(i)) + if (data->input_queues.is_front_maximal(i)) { throw std::runtime_error("input_fail called on maximum_items buffer in " + name); } //mark fail: not ready until a new buffer appears - this->input_queues.fail(i); + data->input_queues.fail(i); } void BlockActor::output_fail(const size_t i) { - SBuffer &buff = this->output_queues.front(i); + SBuffer &buff = data->output_queues.front(i); //check that the input is not already maxed - const size_t front_items = buff.length/this->output_configs[i].item_size; - if (front_items >= this->output_configs[i].maximum_items) + const size_t front_items = buff.length/data->output_configs[i].item_size; + if (front_items >= data->output_configs[i].maximum_items) { throw std::runtime_error("output_fail called on maximum_items buffer in " + name); } //mark fail: not ready until a new buffer appears - this->output_queues.fail(i); + data->output_queues.fail(i); } diff --git a/lib/task_main.cpp b/lib/task_main.cpp index 01e38ce..2178b73 100644 --- a/lib/task_main.cpp +++ b/lib/task_main.cpp @@ -7,7 +7,7 @@ using namespace gras; void BlockActor::task_main(void) { - TimerAccumulate ta_prep(this->stats.total_time_prep); + TimerAccumulate ta_prep(data->stats.total_time_prep); //------------------------------------------------------------------ //-- Decide if its possible to continue any processing: @@ -23,33 +23,33 @@ void BlockActor::task_main(void) //-- initialize input buffers before work //------------------------------------------------------------------ size_t output_inline_index = 0; - this->input_items.min() = ~0; - this->input_items.max() = 0; + data->input_items.min() = ~0; + data->input_items.max() = 0; for (size_t i = 0; i < num_inputs; i++) { this->sort_tags(i); - this->num_input_msgs_read[i] = 0; + data->num_input_msgs_read[i] = 0; - ASSERT(this->input_queues.ready(i)); - const SBuffer &buff = this->input_queues.front(i); + ASSERT(data->input_queues.ready(i)); + const SBuffer &buff = data->input_queues.front(i); const void *mem = buff.get(); - size_t items = buff.length/this->input_configs[i].item_size; + size_t items = buff.length/data->input_configs[i].item_size; - this->input_items.vec()[i] = mem; - this->input_items[i].get() = mem; - this->input_items[i].size() = items; - this->input_items.min() = std::min(this->input_items.min(), items); - this->input_items.max() = std::max(this->input_items.max(), items); + data->input_items.vec()[i] = mem; + data->input_items[i].get() = mem; + data->input_items[i].size() = items; + data->input_items.min() = std::min(data->input_items.min(), items); + data->input_items.max() = std::max(data->input_items.max(), items); //inline dealings, how and when input buffers can be inlined into output buffers //* if GRAS_UNLIKELY( buff.unique() and - input_configs[i].inline_buffer and + data->input_configs[i].inline_buffer and output_inline_index < num_outputs and - buff.get_affinity() == this->buffer_affinity + buff.get_affinity() == data->buffer_affinity ){ - this->output_queues.set_inline(output_inline_index++, buff); + data->output_queues.set_inline(output_inline_index++, buff); } //*/ } @@ -57,41 +57,41 @@ void BlockActor::task_main(void) //------------------------------------------------------------------ //-- initialize output buffers before work //------------------------------------------------------------------ - this->output_items.min() = ~0; - this->output_items.max() = 0; + data->output_items.min() = ~0; + data->output_items.max() = 0; for (size_t i = 0; i < num_outputs; i++) { - ASSERT(this->output_queues.ready(i)); - SBuffer &buff = this->output_queues.front(i); + ASSERT(data->output_queues.ready(i)); + SBuffer &buff = data->output_queues.front(i); ASSERT(buff.length == 0); //assumes it was flushed last call void *mem = buff.get(); const size_t bytes = buff.get_actual_length() - buff.offset; - size_t items = bytes/this->output_configs[i].item_size; + size_t items = bytes/data->output_configs[i].item_size; - this->output_items.vec()[i] = mem; - this->output_items[i].get() = mem; - this->output_items[i].size() = items; - this->output_items.min() = std::min(this->output_items.min(), items); - this->output_items.max() = std::max(this->output_items.max(), items); + data->output_items.vec()[i] = mem; + data->output_items[i].get() = mem; + data->output_items[i].size() = items; + data->output_items.min() = std::min(data->output_items.min(), items); + data->output_items.max() = std::max(data->output_items.max(), items); } //------------------------------------------------------------------ //-- the work //------------------------------------------------------------------ ta_prep.done(); - this->stats.work_count++; - if GRAS_UNLIKELY(this->interruptible_thread) + data->stats.work_count++; + if GRAS_UNLIKELY(data->interruptible_thread) { - TimerAccumulate ta_work(this->stats.total_time_work); - this->interruptible_thread->call(); + TimerAccumulate ta_work(data->stats.total_time_work); + data->interruptible_thread->call(); } else { - TimerAccumulate ta_work(this->stats.total_time_work); + TimerAccumulate ta_work(data->stats.total_time_work); this->task_work(); } - this->stats.time_last_work = time_now(); - TimerAccumulate ta_post(this->stats.total_time_post); + data->stats.time_last_work = time_now(); + TimerAccumulate ta_post(data->stats.total_time_post); //------------------------------------------------------------------ //-- Post-work output tasks @@ -99,18 +99,18 @@ void BlockActor::task_main(void) for (size_t i = 0; i < num_outputs; i++) { //buffer may be popped by one of the special buffer api hooks - if GRAS_UNLIKELY(this->output_queues.empty(i)) continue; + if GRAS_UNLIKELY(data->output_queues.empty(i)) continue; //grab a copy of the front buffer then consume from the queue InputBufferMessage buff_msg; - buff_msg.buffer = this->output_queues.front(i); - this->output_queues.consume(i); + buff_msg.buffer = data->output_queues.front(i); + data->output_queues.consume(i); //Post a buffer message downstream only if the produce flag was marked. //So this explicitly after consuming the output queues so pop is called. //This is because pop may have special hooks in it to prepare the buffer. - if GRAS_LIKELY(this->produce_outputs[i]) this->post_downstream(i, buff_msg); - this->produce_outputs[i] = false; + if GRAS_LIKELY(data->produce_outputs[i]) this->post_downstream(i, buff_msg); + data->produce_outputs[i] = false; } //------------------------------------------------------------------ diff --git a/lib/top_block_query.cpp b/lib/top_block_query.cpp index c38fbd9..0918461 100644 --- a/lib/top_block_query.cpp +++ b/lib/top_block_query.cpp @@ -37,7 +37,7 @@ static ptree query_blocks(ElementImpl *self, const ptree &) BlockActor *block = dynamic_cast<BlockActor *>(worker); ptree prop_e; typedef std::pair<std::string, PropertyRegistryPair> PropRegistryKVP; - BOOST_FOREACH(const PropRegistryKVP &p, block->property_registry) + BOOST_FOREACH(const PropRegistryKVP &p, block->data->property_registry) { ptree prop_attrs; if (p.second.setter) @@ -175,7 +175,7 @@ static ptree query_props(ElementImpl *self, const ptree &query) if (block->block_ptr->get_uid() != block_id) continue; if (set) { - const std::type_info &t = block->property_registry[prop_key].setter->type(); + const std::type_info &t = block->data->property_registry[prop_key].setter->type(); const PMCC p = ptree_to_pmc(query.get_child("value"), t); block->prop_access_dispatcher(prop_key, p, true); } diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index 38ea138..e93089a 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -31,42 +31,42 @@ void BlockActor::handle_topology( this->block_ptr->notify_topology(num_inputs, num_outputs); //resize and fill port properties - resize_fill_back(this->input_configs, num_inputs); - resize_fill_back(this->output_configs, num_outputs); + resize_fill_back(data->input_configs, num_inputs); + resize_fill_back(data->output_configs, num_outputs); //resize the bytes consumed/produced - resize_fill_grow(this->stats.items_consumed, num_inputs, 0); - resize_fill_grow(this->stats.tags_consumed, num_inputs, 0); - resize_fill_grow(this->stats.msgs_consumed, num_inputs, 0); - resize_fill_grow(this->stats.items_produced, num_outputs, 0); - resize_fill_grow(this->stats.tags_produced, num_outputs, 0); - resize_fill_grow(this->stats.msgs_produced, num_outputs, 0); + resize_fill_grow(data->stats.items_consumed, num_inputs, 0); + resize_fill_grow(data->stats.tags_consumed, num_inputs, 0); + resize_fill_grow(data->stats.msgs_consumed, num_inputs, 0); + resize_fill_grow(data->stats.items_produced, num_outputs, 0); + resize_fill_grow(data->stats.tags_produced, num_outputs, 0); + resize_fill_grow(data->stats.msgs_produced, num_outputs, 0); //resize all work buffers to match current connections - this->input_items.resize(num_inputs); - this->output_items.resize(num_outputs); - this->input_queues.resize(num_inputs); - this->output_queues.resize(num_outputs); - this->produce_outputs.resize(num_outputs, false); - this->inputs_available.resize(num_inputs); - if (num_inputs == 0) this->inputs_available.resize(1, true); //so its always "available" + data->input_items.resize(num_inputs); + data->output_items.resize(num_outputs); + data->input_queues.resize(num_inputs); + data->output_queues.resize(num_outputs); + data->produce_outputs.resize(num_outputs, false); + data->inputs_available.resize(num_inputs); + if (num_inputs == 0) data->inputs_available.resize(1, true); //so its always "available" //copy the name into the queues for debug purposes - this->input_queues.name = this->name; - this->output_queues.name = this->name; + data->input_queues.name = this->name; + data->output_queues.name = this->name; //resize the token trackers - this->input_tokens.resize(num_inputs); - this->output_tokens.resize(num_outputs); - this->inputs_done.resize(num_inputs); - this->outputs_done.resize(num_outputs); - this->output_allocation_hints.resize(num_outputs); + data->input_tokens.resize(num_inputs); + data->output_tokens.resize(num_outputs); + data->inputs_done.resize(num_inputs); + data->outputs_done.resize(num_outputs); + data->output_allocation_hints.resize(num_outputs); //resize tags vector to match sizes - this->input_tags_changed.resize(num_inputs); - this->input_tags.resize(num_inputs); - this->num_input_msgs_read.resize(num_inputs); - this->input_msgs.resize(num_inputs); + data->input_tags_changed.resize(num_inputs); + data->input_tags.resize(num_inputs); + data->num_input_msgs_read.resize(num_inputs); + data->input_msgs.resize(num_inputs); //a block looses all connections, allow it to free if (num_inputs == 0 and num_outputs == 0) |