summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/block.cpp15
-rw-r--r--lib/block_allocator.cpp12
-rw-r--r--lib/block_consume.cpp10
-rw-r--r--lib/block_handlers.cpp70
-rw-r--r--lib/block_message.cpp12
-rw-r--r--lib/block_produce.cpp24
-rw-r--r--lib/block_props.cpp6
-rw-r--r--lib/element_impl.hpp1
-rw-r--r--lib/gras_impl/block_actor.hpp34
-rw-r--r--lib/gras_impl/block_data.hpp15
-rw-r--r--lib/input_handlers.cpp42
-rw-r--r--lib/output_handlers.cpp34
-rw-r--r--lib/tag_handlers.hpp18
-rw-r--r--lib/task_done.cpp26
-rw-r--r--lib/task_fail.cpp18
-rw-r--r--lib/task_main.cpp74
-rw-r--r--lib/top_block_query.cpp4
-rw-r--r--lib/topology_handler.cpp52
18 files changed, 233 insertions, 234 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 835b979..ac01f72 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -38,9 +38,10 @@ Block::Block(const std::string &name):
(*this)->block->name = name; //for debug purposes
(*this)->block->block_ptr = this;
(*this)->block->data.reset(new BlockData());
+ (*this)->block_data = (*this)->block->data;
//setup some state variables
- (*this)->block->data = BlockActor::BLOCK_STATE_INIT;
+ (*this)->block_data->block_state = BLOCK_STATE_INIT;
//call block methods to init stuff
this->set_interruptible_work(false);
@@ -133,22 +134,22 @@ typename V::value_type &vector_get_resize(V &v, const size_t index)
InputPortConfig &Block::input_config(const size_t which_input)
{
- return vector_get_resize((*this)->block->input_configs, which_input);
+ return vector_get_resize((*this)->block_data->input_configs, which_input);
}
const InputPortConfig &Block::input_config(const size_t which_input) const
{
- return vector_get_const((*this)->block->input_configs, which_input);
+ return vector_get_const((*this)->block_data->input_configs, which_input);
}
OutputPortConfig &Block::output_config(const size_t which_output)
{
- return vector_get_resize((*this)->block->output_configs, which_output);
+ return vector_get_resize((*this)->block_data->output_configs, which_output);
}
const OutputPortConfig &Block::output_config(const size_t which_output) const
{
- return vector_get_const((*this)->block->output_configs, which_output);
+ return vector_get_const((*this)->block_data->output_configs, which_output);
}
void Block::commit_config(void)
@@ -191,10 +192,10 @@ void Block::set_thread_pool(const ThreadPool &thread_pool)
void Block::set_buffer_affinity(const long affinity)
{
- (*this)->block->buffer_affinity = affinity;
+ (*this)->block_data->buffer_affinity = affinity;
}
void Block::set_interruptible_work(const bool enb)
{
- (*this)->block->interruptible_work = enb;
+ (*this)->block_data->interruptible_work = enb;
}
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 0dda99a..86732e2 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -62,10 +62,10 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
for (size_t i = 0; i < num_outputs; i++)
{
const size_t bytes = recommend_length(
- this->output_allocation_hints[i],
- my_round_up_mult(AT_LEAST_BYTES, this->output_configs[i].item_size),
- this->output_configs[i].reserve_items*this->output_configs[i].item_size,
- this->output_configs[i].maximum_items*this->output_configs[i].item_size
+ data->output_allocation_hints[i],
+ my_round_up_mult(AT_LEAST_BYTES, data->output_configs[i].item_size),
+ data->output_configs[i].reserve_items*data->output_configs[i].item_size,
+ data->output_configs[i].maximum_items*data->output_configs[i].item_size
);
SBufferDeleter deleter = boost::bind(&buffer_returner, this->thread_pool, this->GetAddress(), i, _1);
@@ -74,11 +74,11 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
SBufferConfig config;
config.memory = NULL;
config.length = bytes;
- config.affinity = this->buffer_affinity;
+ config.affinity = data->buffer_affinity;
config.token = token;
BufferQueueSptr queue = block_ptr->output_buffer_allocator(i, config);
- this->output_queues.set_buffer_queue(i, queue);
+ data->output_queues.set_buffer_queue(i, queue);
InputAllocMessage message;
//new token for the downstream allocator
diff --git a/lib/block_consume.cpp b/lib/block_consume.cpp
index efdb07e..04529f4 100644
--- a/lib/block_consume.cpp
+++ b/lib/block_consume.cpp
@@ -24,12 +24,12 @@ void Block::consume(const size_t num_items)
item_index_t Block::get_consumed(const size_t which_input)
{
- return (*this)->block->stats.items_consumed[which_input];
+ return (*this)->block_data->stats.items_consumed[which_input];
}
SBuffer Block::get_input_buffer(const size_t which_input) const
{
- return (*this)->block->input_queues.front(which_input);
+ return (*this)->block_data->input_queues.front(which_input);
}
GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items)
@@ -37,8 +37,8 @@ GRAS_FORCE_INLINE void BlockActor::consume(const size_t i, const size_t items)
#ifdef ITEM_CONSPROD
std::cerr << name << " consume " << items << std::endl;
#endif
- this->stats.items_consumed[i] += items;
- const size_t bytes = items*this->input_configs[i].item_size;
- this->input_queues.consume(i, bytes);
+ data->stats.items_consumed[i] += items;
+ const size_t bytes = items*data->input_configs[i].item_size;
+ data->input_queues.consume(i, bytes);
this->trim_tags(i);
}
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index e0ee67a..0cf4b1f 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -13,12 +13,12 @@ void BlockActor::handle_top_active(
){
MESSAGE_TRACER();
- if (this->block_state != BLOCK_STATE_LIVE)
+ if (data->block_state != BLOCK_STATE_LIVE)
{
- this->block_ptr->notify_active();
- this->stats.start_time = time_now();
+ block_ptr->notify_active();
+ data->stats.start_time = time_now();
}
- this->block_state = BLOCK_STATE_LIVE;
+ data->block_state = BLOCK_STATE_LIVE;
this->Send(0, from); //ACK
@@ -45,17 +45,17 @@ void BlockActor::handle_top_token(
//create input tokens and send allocation hints
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
- this->input_tokens[i] = Token::make();
- this->inputs_done.reset(i);
+ data->input_tokens[i] = Token::make();
+ data->inputs_done.reset(i);
OutputTokenMessage token_msg;
- token_msg.token = this->input_tokens[i];
+ token_msg.token = data->input_tokens[i];
this->post_upstream(i, token_msg);
//TODO, schedule this message as a pre-allocation message
//tell the upstream about the input requirements
OutputHintMessage output_hints;
- output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size;
- output_hints.token = this->input_tokens[i];
+ output_hints.reserve_bytes = data->input_configs[i].reserve_items*data->input_configs[i].item_size;
+ output_hints.token = data->input_tokens[i];
this->post_upstream(i, output_hints);
}
@@ -63,15 +63,15 @@ void BlockActor::handle_top_token(
//create output token
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
- this->output_tokens[i] = Token::make();
- this->outputs_done.reset(i);
+ data->output_tokens[i] = Token::make();
+ data->outputs_done.reset(i);
InputTokenMessage token_msg;
- token_msg.token = this->output_tokens[i];
+ token_msg.token = data->output_tokens[i];
this->post_downstream(i, token_msg);
}
//store a token to the top level topology
- this->token_pool.insert(message.token);
+ data->token_pool.insert(message.token);
this->Send(0, from); //ACK
}
@@ -83,18 +83,18 @@ void BlockActor::handle_top_config(
MESSAGE_TRACER();
//overwrite with global config only if maxium_items is not set (zero)
- for (size_t i = 0; i < this->output_configs.size(); i++)
+ for (size_t i = 0; i < data->output_configs.size(); i++)
{
- if (this->output_configs[i].maximum_items == 0)
+ if (data->output_configs[i].maximum_items == 0)
{
- this->output_configs[i].maximum_items = message.maximum_output_items;
+ data->output_configs[i].maximum_items = message.maximum_output_items;
}
}
//overwrite with global node affinity setting for buffers if not set
- if (this->buffer_affinity == -1)
+ if (data->buffer_affinity == -1)
{
- this->buffer_affinity = message.buffer_affinity;
+ data->buffer_affinity = message.buffer_affinity;
}
this->Send(0, from); //ACK
@@ -109,12 +109,12 @@ void BlockActor::handle_top_thread_group(
//store the topology's thread group
//erase any potentially old lingering threads
//spawn a new thread if this block is a source
- this->thread_group = message;
- this->interruptible_thread.reset(); //erase old one
- if (this->interruptible_work)
+ data->thread_group = message;
+ data->interruptible_thread.reset(); //erase old one
+ if (data->interruptible_work)
{
- this->interruptible_thread = boost::make_shared<InterruptibleThread>(
- this->thread_group, boost::bind(&BlockActor::task_work, this)
+ data->interruptible_thread = boost::make_shared<InterruptibleThread>(
+ data->thread_group, boost::bind(&BlockActor::task_work, this)
);
}
@@ -138,24 +138,24 @@ void BlockActor::handle_get_stats(
//instantaneous states we update here,
//and not interleaved with the rest of the code
const size_t num_inputs = this->get_num_inputs();
- this->stats.items_enqueued.resize(num_inputs);
- this->stats.tags_enqueued.resize(num_inputs);
- this->stats.msgs_enqueued.resize(num_inputs);
+ data->stats.items_enqueued.resize(num_inputs);
+ data->stats.tags_enqueued.resize(num_inputs);
+ data->stats.msgs_enqueued.resize(num_inputs);
for (size_t i = 0; i < num_inputs; i++)
{
- this->stats.items_enqueued[i] = this->input_queues.get_items_enqueued(i);
- this->stats.tags_enqueued[i] = this->input_tags[i].size();
- this->stats.msgs_enqueued[i] = this->input_msgs[i].size();
+ data->stats.items_enqueued[i] = data->input_queues.get_items_enqueued(i);
+ data->stats.tags_enqueued[i] = data->input_tags[i].size();
+ data->stats.msgs_enqueued[i] = data->input_msgs[i].size();
}
- this->stats.actor_queue_depth = this->GetNumQueuedMessages();
- this->stats.bytes_copied = this->input_queues.bytes_copied;
- this->stats.inputs_idle = this->input_queues.total_idle_times;
- this->stats.outputs_idle = this->output_queues.total_idle_times;
+ data->stats.actor_queue_depth = this->GetNumQueuedMessages();
+ data->stats.bytes_copied = data->input_queues.bytes_copied;
+ data->stats.inputs_idle = data->input_queues.total_idle_times;
+ data->stats.outputs_idle = data->output_queues.total_idle_times;
//create the message reply object
GetStatsMessage message;
- message.block_id = this->block_ptr->get_uid();
- message.stats = this->stats;
+ message.block_id = block_ptr->get_uid();
+ message.stats = data->stats;
message.stats_time = time_now();
this->Send(message, from); //ACK
diff --git a/lib/block_message.cpp b/lib/block_message.cpp
index ef7dda6..84078ce 100644
--- a/lib/block_message.cpp
+++ b/lib/block_message.cpp
@@ -8,29 +8,29 @@ using namespace gras;
void Block::post_output_tag(const size_t which_output, const Tag &tag)
{
- (*this)->block->stats.tags_produced[which_output]++;
+ (*this)->block_data->stats.tags_produced[which_output]++;
(*this)->block->post_downstream(which_output, InputTagMessage(tag));
}
void Block::_post_output_msg(const size_t which_output, const PMCC &msg)
{
- (*this)->block->stats.msgs_produced[which_output]++;
+ (*this)->block_data->stats.msgs_produced[which_output]++;
(*this)->block->post_downstream(which_output, InputMsgMessage(msg));
}
TagIter Block::get_input_tags(const size_t which_input)
{
- const std::vector<Tag> &input_tags = (*this)->block->input_tags[which_input];
+ const std::vector<Tag> &input_tags = (*this)->block_data->input_tags[which_input];
return TagIter(input_tags.begin(), input_tags.end());
}
PMCC Block::pop_input_msg(const size_t which_input)
{
- std::vector<PMCC> &input_msgs = (*this)->block->input_msgs[which_input];
- size_t &num_read = (*this)->block->num_input_msgs_read[which_input];
+ std::vector<PMCC> &input_msgs = (*this)->block_data->input_msgs[which_input];
+ size_t &num_read = (*this)->block_data->num_input_msgs_read[which_input];
if (num_read >= input_msgs.size()) return PMCC();
PMCC p = input_msgs[num_read++];
- (*this)->block->stats.msgs_consumed[which_input]++;
+ (*this)->block_data->stats.msgs_consumed[which_input]++;
return p;
}
diff --git a/lib/block_produce.cpp b/lib/block_produce.cpp
index 584d3a3..31a91f7 100644
--- a/lib/block_produce.cpp
+++ b/lib/block_produce.cpp
@@ -23,12 +23,12 @@ void Block::produce(const size_t num_items)
item_index_t Block::get_produced(const size_t which_output)
{
- return (*this)->block->stats.items_produced[which_output];
+ return (*this)->block_data->stats.items_produced[which_output];
}
SBuffer Block::get_output_buffer(const size_t which_output) const
{
- SBuffer &buff = (*this)->block->output_queues.front(which_output);
+ SBuffer &buff = (*this)->block_data->output_queues.front(which_output);
//increment length to auto pop full buffer size,
//when user doesnt call pop_output_buffer()
buff.length = buff.get_actual_length();
@@ -37,7 +37,7 @@ SBuffer Block::get_output_buffer(const size_t which_output) const
void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
{
- (*this)->block->output_queues.front(which_output).length = num_bytes;
+ (*this)->block_data->output_queues.front(which_output).length = num_bytes;
}
void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
@@ -50,20 +50,20 @@ GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
#ifdef ITEM_CONSPROD
std::cerr << name << " produce " << items << std::endl;
#endif
- SBuffer &buff = this->output_queues.front(i);
- ASSERT((buff.length % output_configs[i].item_size) == 0);
- this->stats.items_produced[i] += items;
- const size_t bytes = items*this->output_configs[i].item_size;
+ SBuffer &buff = data->output_queues.front(i);
+ ASSERT((buff.length % data->output_configs[i].item_size) == 0);
+ data->stats.items_produced[i] += items;
+ const size_t bytes = items*data->output_configs[i].item_size;
buff.length += bytes;
- this->produce_outputs[i] = true;
+ data->produce_outputs[i] = true;
}
GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
{
- this->output_queues.consume(i);
- ASSERT((buffer.length % output_configs[i].item_size) == 0);
- const size_t items = buffer.length/output_configs[i].item_size;
- this->stats.items_produced[i] += items;
+ data->output_queues.consume(i);
+ ASSERT((buffer.length % data->output_configs[i].item_size) == 0);
+ const size_t items = buffer.length/data->output_configs[i].item_size;
+ data->stats.items_produced[i] += items;
InputBufferMessage buff_msg;
buff_msg.buffer = buffer;
this->post_downstream(i, buff_msg);
diff --git a/lib/block_props.cpp b/lib/block_props.cpp
index b3456ac..cc42846 100644
--- a/lib/block_props.cpp
+++ b/lib/block_props.cpp
@@ -45,7 +45,7 @@ void BlockActor::handle_prop_access(
PMCC Block::_handle_prop_access(const std::string &key, const PMCC &value, const bool set)
{
- const PropertyRegistryPair &pair = (*this)->block->property_registry[key];
+ const PropertyRegistryPair &pair = (*this)->block_data->property_registry[key];
PropertyRegistrySptr pr = (set)? pair.setter : pair.getter;
if (not pr) throw std::invalid_argument("no property registered for key: " + key);
if (set)
@@ -96,12 +96,12 @@ PMCC BlockActor::prop_access_dispatcher(const std::string &key, const PMCC &valu
void Block::_register_getter(const std::string &key, void *pr)
{
- (*this)->block->property_registry[key].getter.reset(reinterpret_cast<PropertyRegistry *>(pr));
+ (*this)->block_data->property_registry[key].getter.reset(reinterpret_cast<PropertyRegistry *>(pr));
}
void Block::_register_setter(const std::string &key, void *pr)
{
- (*this)->block->property_registry[key].setter.reset(reinterpret_cast<PropertyRegistry *>(pr));
+ (*this)->block_data->property_registry[key].setter.reset(reinterpret_cast<PropertyRegistry *>(pr));
}
void Block::_set_property(const std::string &key, const PMCC &value)
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 477b478..4e75375 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -45,6 +45,7 @@ struct ElementImpl
boost::shared_ptr<Apology::Topology> topology;
boost::shared_ptr<Apology::Executor> executor;
boost::shared_ptr<BlockActor> block;
+ boost::shared_ptr<BlockData> block_data;
ThreadPool thread_pool;
Apology::Base *get_elem(void) const
{
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 005855e..7ee9e9b 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -14,13 +14,6 @@
namespace gras
{
-typedef boost::shared_ptr<PropertyRegistry> PropertyRegistrySptr;
-struct PropertyRegistryPair
-{
- PropertyRegistrySptr setter;
- PropertyRegistrySptr getter;
-};
-
struct BlockActor : Apology::Worker
{
BlockActor(const ThreadPool &tp = ThreadPool());
@@ -111,8 +104,11 @@ struct BlockActor : Apology::Worker
//work helpers
inline void task_work(void)
{
- block_ptr->work(this->input_items, this->output_items);
+ block_ptr->work(data->input_items, data->output_items);
}
+
+ //property stuff
+ PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set);
};
//-------------- common functions from this BlockActor class ---------//
@@ -124,27 +120,27 @@ GRAS_FORCE_INLINE void BlockActor::task_kicker(void)
GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i)
{
- const bool has_input_bufs = not this->input_queues.empty(i) and this->input_queues.ready(i);
- const bool has_input_msgs = not this->input_msgs[i].empty();
- this->inputs_available.set(i, has_input_bufs or has_input_msgs);
- this->input_queues.update_has_msg(i, has_input_msgs);
+ const bool has_input_bufs = not data->input_queues.empty(i) and data->input_queues.ready(i);
+ const bool has_input_msgs = not data->input_msgs[i].empty();
+ data->inputs_available.set(i, has_input_bufs or has_input_msgs);
+ data->input_queues.update_has_msg(i, has_input_msgs);
}
GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i)
{
- const bool force_done = this->input_configs[i].force_done;
- if GRAS_LIKELY(force_done) return this->inputs_done[i] and not this->inputs_available[i];
- return this->inputs_done.all() and this->inputs_available.none();
+ const bool force_done = data->input_configs[i].force_done;
+ if GRAS_LIKELY(force_done) return data->inputs_done[i] and not data->inputs_available[i];
+ return data->inputs_done.all() and data->inputs_available.none();
}
GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
{
return (
this->prio_token.unique() and
- this->block_state == BLOCK_STATE_LIVE and
- this->inputs_available.any() and
- this->input_queues.all_ready() and
- this->output_queues.all_ready()
+ data->block_state == BLOCK_STATE_LIVE and
+ data->inputs_available.any() and
+ data->input_queues.all_ready() and
+ data->output_queues.all_ready()
);
}
diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp
index 2dcf3c6..9fbf3eb 100644
--- a/lib/gras_impl/block_data.hpp
+++ b/lib/gras_impl/block_data.hpp
@@ -24,6 +24,13 @@ struct PropertyRegistryPair
PropertyRegistrySptr getter;
};
+enum BlockState
+{
+ BLOCK_STATE_INIT,
+ BLOCK_STATE_LIVE,
+ BLOCK_STATE_DONE,
+};
+
struct BlockData
{
//per port properties
@@ -61,19 +68,13 @@ struct BlockData
boost::shared_ptr<InterruptibleThread> interruptible_thread;
//is the fg running?
- enum
- {
- BLOCK_STATE_INIT,
- BLOCK_STATE_LIVE,
- BLOCK_STATE_DONE,
- } block_state;
+ BlockState block_state;
long buffer_affinity;
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
//property stuff
std::map<std::string, PropertyRegistryPair> property_registry;
- PMCC prop_access_dispatcher(const std::string &key, const PMCC &value, const bool set);
BlockStats stats;
};
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index faddda8..1210688 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -7,25 +7,25 @@ using namespace gras;
void BlockActor::handle_input_tag(const InputTagMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//handle incoming stream tag, push into the tag storage
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->input_tags[index].push_back(message.tag);
- this->input_tags_changed[index] = true;
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->input_tags[index].push_back(message.tag);
+ data->input_tags_changed[index] = true;
}
void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//handle incoming async message, push into the msg storage
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->input_msgs[index].push_back(message.msg);
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->input_msgs[index].push_back(message.msg);
this->update_input_avail(index);
ta.done();
@@ -34,13 +34,13 @@ void BlockActor::handle_input_msg(const InputMsgMessage &message, const Theron::
void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//handle incoming stream buffer, push into the queue
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->input_queues.push(index, message.buffer);
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->input_queues.push(index, message.buffer);
this->update_input_avail(index);
ta.done();
@@ -49,22 +49,22 @@ void BlockActor::handle_input_buffer(const InputBufferMessage &message, const Th
void BlockActor::handle_input_token(const InputTokenMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
ASSERT(message.index < this->get_num_inputs());
//store the token of the upstream producer
- this->token_pool.insert(message.token);
+ data->token_pool.insert(message.token);
}
void BlockActor::handle_input_check(const InputCheckMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
//an upstream block declared itself done, recheck the token
- this->inputs_done.set(index, this->input_tokens[index].unique());
+ data->inputs_done.set(index, data->input_tokens[index].unique());
//upstream done, give it one more attempt at task handling
ta.done();
@@ -76,7 +76,7 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t index = message.index;
@@ -88,14 +88,14 @@ void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Ther
void BlockActor::handle_input_update(const InputUpdateMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_input);
+ TimerAccumulate ta(data->stats.total_time_input);
MESSAGE_TRACER();
const size_t i = message.index;
//update buffer queue configuration
- if (i >= this->input_queues.size()) return;
- const size_t preload_bytes = this->input_configs[i].item_size*this->input_configs[i].preload_items;
- const size_t reserve_bytes = this->input_configs[i].item_size*this->input_configs[i].reserve_items;
- const size_t maximum_bytes = this->input_configs[i].item_size*this->input_configs[i].maximum_items;
- this->input_queues.update_config(i, this->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes);
+ if (i >= data->input_queues.size()) return;
+ const size_t preload_bytes = data->input_configs[i].item_size*data->input_configs[i].preload_items;
+ const size_t reserve_bytes = data->input_configs[i].item_size*data->input_configs[i].reserve_items;
+ const size_t maximum_bytes = data->input_configs[i].item_size*data->input_configs[i].maximum_items;
+ data->input_queues.update_config(i, data->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes);
}
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index d937bbf..22a86ef 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -8,14 +8,14 @@ using namespace gras;
void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//a buffer has returned from the downstream
//(all interested consumers have finished with it)
- if GRAS_UNLIKELY(this->block_state == BLOCK_STATE_DONE) return;
- this->output_queues.push(index, message.buffer);
+ if GRAS_UNLIKELY(data->block_state == BLOCK_STATE_DONE) return;
+ data->output_queues.push(index, message.buffer);
ta.done();
this->task_main();
@@ -23,23 +23,23 @@ void BlockActor::handle_output_buffer(const OutputBufferMessage &message, const
void BlockActor::handle_output_token(const OutputTokenMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
ASSERT(message.index < this->get_num_outputs());
//store the token of the downstream consumer
- this->token_pool.insert(message.token);
+ data->token_pool.insert(message.token);
}
void BlockActor::handle_output_check(const OutputCheckMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//a downstream block has declared itself done, recheck the token
- this->outputs_done.set(index, this->output_tokens[index].unique());
- if (this->outputs_done.all()) //no downstream subscribers?
+ data->outputs_done.set(index, data->output_tokens[index].unique());
+ if (data->outputs_done.all()) //no downstream subscribers?
{
this->mark_done();
}
@@ -47,7 +47,7 @@ void BlockActor::handle_output_check(const OutputCheckMessage &message, const Th
void BlockActor::handle_output_hint(const OutputHintMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
@@ -57,7 +57,7 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther
//remove any old hints with expired token
//remove any older hints with matching token
std::vector<OutputHintMessage> hints;
- BOOST_FOREACH(const OutputHintMessage &hint, this->output_allocation_hints[index])
+ BOOST_FOREACH(const OutputHintMessage &hint, data->output_allocation_hints[index])
{
if (hint.token.expired()) continue;
if (hint.token.lock() == message.token.lock()) continue;
@@ -67,27 +67,27 @@ void BlockActor::handle_output_hint(const OutputHintMessage &message, const Ther
//store the new hint as well
hints.push_back(message);
- this->output_allocation_hints[index] = hints;
+ data->output_allocation_hints[index] = hints;
}
void BlockActor::handle_output_alloc(const OutputAllocMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t index = message.index;
//return of a positive downstream allocation
- this->output_queues.set_buffer_queue(index, message.queue);
+ data->output_queues.set_buffer_queue(index, message.queue);
}
void BlockActor::handle_output_update(const OutputUpdateMessage &message, const Theron::Address)
{
- TimerAccumulate ta(this->stats.total_time_output);
+ TimerAccumulate ta(data->stats.total_time_output);
MESSAGE_TRACER();
const size_t i = message.index;
//update buffer queue configuration
- if (i >= this->output_queues.size()) return;
- const size_t reserve_bytes = this->output_configs[i].item_size*this->output_configs[i].reserve_items;
- this->output_queues.set_reserve_bytes(i, reserve_bytes);
+ if (i >= data->output_queues.size()) return;
+ const size_t reserve_bytes = data->output_configs[i].item_size*data->output_configs[i].reserve_items;
+ data->output_queues.set_reserve_bytes(i, reserve_bytes);
}
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index 8d504dd..cc03a9b 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -11,10 +11,10 @@ namespace gras
GRAS_FORCE_INLINE void BlockActor::sort_tags(const size_t i)
{
- if GRAS_LIKELY(not this->input_tags_changed[i]) return;
- std::vector<Tag> &tags_i = this->input_tags[i];
+ if GRAS_LIKELY(not data->input_tags_changed[i]) return;
+ std::vector<Tag> &tags_i = data->input_tags[i];
std::sort(tags_i.begin(), tags_i.end());
- this->input_tags_changed[i] = false;
+ data->input_tags_changed[i] = false;
}
GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
@@ -24,8 +24,8 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//-- and post trimmed tags to the downstream based on policy
//------------------------------------------------------------------
- std::vector<Tag> &tags_i = this->input_tags[i];
- const item_index_t items_consumed_i = this->stats.items_consumed[i];
+ std::vector<Tag> &tags_i = data->input_tags[i];
+ const item_index_t items_consumed_i = data->stats.items_consumed[i];
size_t last = 0;
while (last < tags_i.size() and tags_i[last].offset < items_consumed_i)
{
@@ -35,19 +35,19 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
if GRAS_LIKELY(last == 0) return;
//call the overloaded propagate_tags to do the dirty work
- this->block_ptr->propagate_tags(i, TagIter(tags_i.begin(), tags_i.begin()+last));
+ block_ptr->propagate_tags(i, TagIter(tags_i.begin(), tags_i.begin()+last));
//now its safe to perform the erasure
tags_i.erase(tags_i.begin(), tags_i.begin()+last);
- this->stats.tags_consumed[i] += last;
+ data->stats.tags_consumed[i] += last;
}
GRAS_FORCE_INLINE void BlockActor::trim_msgs(const size_t i)
{
- const size_t num_read = this->num_input_msgs_read[i];
+ const size_t num_read = data->num_input_msgs_read[i];
if GRAS_UNLIKELY(num_read > 0)
{
- std::vector<PMCC> &input_msgs = this->input_msgs[i];
+ std::vector<PMCC> &input_msgs = data->input_msgs[i];
input_msgs.erase(input_msgs.begin(), input_msgs.begin()+num_read);
}
}
diff --git a/lib/task_done.cpp b/lib/task_done.cpp
index d116202..b6e61a5 100644
--- a/lib/task_done.cpp
+++ b/lib/task_done.cpp
@@ -12,40 +12,40 @@ void Block::mark_done(void)
void BlockActor::mark_done(void)
{
- if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ if (data->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
- this->stats.stop_time = time_now();
- this->block_ptr->notify_inactive();
+ data->stats.stop_time = time_now();
+ block_ptr->notify_inactive();
//flush partial output buffers to the downstream
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
- if (not this->output_queues.ready(i)) continue;
- SBuffer &buff = this->output_queues.front(i);
+ if (not data->output_queues.ready(i)) continue;
+ SBuffer &buff = data->output_queues.front(i);
if (buff.length == 0) continue;
InputBufferMessage buff_msg;
buff_msg.buffer = buff;
this->post_downstream(i, buff_msg);
- this->output_queues.pop(i);
+ data->output_queues.pop(i);
}
- this->interruptible_thread.reset();
+ data->interruptible_thread.reset();
//mark down the new state
- this->block_state = BLOCK_STATE_DONE;
+ data->block_state = BLOCK_STATE_DONE;
//release upstream, downstream, and executor tokens
- this->token_pool.clear();
+ data->token_pool.clear();
//release all buffers in queues
- this->input_queues.flush_all();
- this->output_queues.flush_all();
+ data->input_queues.flush_all();
+ data->output_queues.flush_all();
//release all tags and msgs
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
- this->input_msgs[i].clear();
- this->input_tags[i].clear();
+ data->input_msgs[i].clear();
+ data->input_tags[i].clear();
}
//tell the upstream and downstram to re-check their tokens
diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp
index 720e2e1..1168698 100644
--- a/lib/task_fail.cpp
+++ b/lib/task_fail.cpp
@@ -18,41 +18,41 @@ void Block::mark_input_fail(const size_t which_input)
void BlockActor::input_fail(const size_t i)
{
//input failed, accumulate and try again
- if (not this->input_queues.is_accumulated(i))
+ if (not data->input_queues.is_accumulated(i))
{
- this->input_queues.accumulate(i);
+ data->input_queues.accumulate(i);
this->task_kicker();
return;
}
//otherwise check for done, else wait for more
- if (this->inputs_done[i])
+ if (data->inputs_done[i])
{
this->mark_done();
return;
}
//check that the input is not already maxed
- if (this->input_queues.is_front_maximal(i))
+ if (data->input_queues.is_front_maximal(i))
{
throw std::runtime_error("input_fail called on maximum_items buffer in " + name);
}
//mark fail: not ready until a new buffer appears
- this->input_queues.fail(i);
+ data->input_queues.fail(i);
}
void BlockActor::output_fail(const size_t i)
{
- SBuffer &buff = this->output_queues.front(i);
+ SBuffer &buff = data->output_queues.front(i);
//check that the input is not already maxed
- const size_t front_items = buff.length/this->output_configs[i].item_size;
- if (front_items >= this->output_configs[i].maximum_items)
+ const size_t front_items = buff.length/data->output_configs[i].item_size;
+ if (front_items >= data->output_configs[i].maximum_items)
{
throw std::runtime_error("output_fail called on maximum_items buffer in " + name);
}
//mark fail: not ready until a new buffer appears
- this->output_queues.fail(i);
+ data->output_queues.fail(i);
}
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
index 01e38ce..2178b73 100644
--- a/lib/task_main.cpp
+++ b/lib/task_main.cpp
@@ -7,7 +7,7 @@ using namespace gras;
void BlockActor::task_main(void)
{
- TimerAccumulate ta_prep(this->stats.total_time_prep);
+ TimerAccumulate ta_prep(data->stats.total_time_prep);
//------------------------------------------------------------------
//-- Decide if its possible to continue any processing:
@@ -23,33 +23,33 @@ void BlockActor::task_main(void)
//-- initialize input buffers before work
//------------------------------------------------------------------
size_t output_inline_index = 0;
- this->input_items.min() = ~0;
- this->input_items.max() = 0;
+ data->input_items.min() = ~0;
+ data->input_items.max() = 0;
for (size_t i = 0; i < num_inputs; i++)
{
this->sort_tags(i);
- this->num_input_msgs_read[i] = 0;
+ data->num_input_msgs_read[i] = 0;
- ASSERT(this->input_queues.ready(i));
- const SBuffer &buff = this->input_queues.front(i);
+ ASSERT(data->input_queues.ready(i));
+ const SBuffer &buff = data->input_queues.front(i);
const void *mem = buff.get();
- size_t items = buff.length/this->input_configs[i].item_size;
+ size_t items = buff.length/data->input_configs[i].item_size;
- this->input_items.vec()[i] = mem;
- this->input_items[i].get() = mem;
- this->input_items[i].size() = items;
- this->input_items.min() = std::min(this->input_items.min(), items);
- this->input_items.max() = std::max(this->input_items.max(), items);
+ data->input_items.vec()[i] = mem;
+ data->input_items[i].get() = mem;
+ data->input_items[i].size() = items;
+ data->input_items.min() = std::min(data->input_items.min(), items);
+ data->input_items.max() = std::max(data->input_items.max(), items);
//inline dealings, how and when input buffers can be inlined into output buffers
//*
if GRAS_UNLIKELY(
buff.unique() and
- input_configs[i].inline_buffer and
+ data->input_configs[i].inline_buffer and
output_inline_index < num_outputs and
- buff.get_affinity() == this->buffer_affinity
+ buff.get_affinity() == data->buffer_affinity
){
- this->output_queues.set_inline(output_inline_index++, buff);
+ data->output_queues.set_inline(output_inline_index++, buff);
}
//*/
}
@@ -57,41 +57,41 @@ void BlockActor::task_main(void)
//------------------------------------------------------------------
//-- initialize output buffers before work
//------------------------------------------------------------------
- this->output_items.min() = ~0;
- this->output_items.max() = 0;
+ data->output_items.min() = ~0;
+ data->output_items.max() = 0;
for (size_t i = 0; i < num_outputs; i++)
{
- ASSERT(this->output_queues.ready(i));
- SBuffer &buff = this->output_queues.front(i);
+ ASSERT(data->output_queues.ready(i));
+ SBuffer &buff = data->output_queues.front(i);
ASSERT(buff.length == 0); //assumes it was flushed last call
void *mem = buff.get();
const size_t bytes = buff.get_actual_length() - buff.offset;
- size_t items = bytes/this->output_configs[i].item_size;
+ size_t items = bytes/data->output_configs[i].item_size;
- this->output_items.vec()[i] = mem;
- this->output_items[i].get() = mem;
- this->output_items[i].size() = items;
- this->output_items.min() = std::min(this->output_items.min(), items);
- this->output_items.max() = std::max(this->output_items.max(), items);
+ data->output_items.vec()[i] = mem;
+ data->output_items[i].get() = mem;
+ data->output_items[i].size() = items;
+ data->output_items.min() = std::min(data->output_items.min(), items);
+ data->output_items.max() = std::max(data->output_items.max(), items);
}
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
ta_prep.done();
- this->stats.work_count++;
- if GRAS_UNLIKELY(this->interruptible_thread)
+ data->stats.work_count++;
+ if GRAS_UNLIKELY(data->interruptible_thread)
{
- TimerAccumulate ta_work(this->stats.total_time_work);
- this->interruptible_thread->call();
+ TimerAccumulate ta_work(data->stats.total_time_work);
+ data->interruptible_thread->call();
}
else
{
- TimerAccumulate ta_work(this->stats.total_time_work);
+ TimerAccumulate ta_work(data->stats.total_time_work);
this->task_work();
}
- this->stats.time_last_work = time_now();
- TimerAccumulate ta_post(this->stats.total_time_post);
+ data->stats.time_last_work = time_now();
+ TimerAccumulate ta_post(data->stats.total_time_post);
//------------------------------------------------------------------
//-- Post-work output tasks
@@ -99,18 +99,18 @@ void BlockActor::task_main(void)
for (size_t i = 0; i < num_outputs; i++)
{
//buffer may be popped by one of the special buffer api hooks
- if GRAS_UNLIKELY(this->output_queues.empty(i)) continue;
+ if GRAS_UNLIKELY(data->output_queues.empty(i)) continue;
//grab a copy of the front buffer then consume from the queue
InputBufferMessage buff_msg;
- buff_msg.buffer = this->output_queues.front(i);
- this->output_queues.consume(i);
+ buff_msg.buffer = data->output_queues.front(i);
+ data->output_queues.consume(i);
//Post a buffer message downstream only if the produce flag was marked.
//So this explicitly after consuming the output queues so pop is called.
//This is because pop may have special hooks in it to prepare the buffer.
- if GRAS_LIKELY(this->produce_outputs[i]) this->post_downstream(i, buff_msg);
- this->produce_outputs[i] = false;
+ if GRAS_LIKELY(data->produce_outputs[i]) this->post_downstream(i, buff_msg);
+ data->produce_outputs[i] = false;
}
//------------------------------------------------------------------
diff --git a/lib/top_block_query.cpp b/lib/top_block_query.cpp
index c38fbd9..0918461 100644
--- a/lib/top_block_query.cpp
+++ b/lib/top_block_query.cpp
@@ -37,7 +37,7 @@ static ptree query_blocks(ElementImpl *self, const ptree &)
BlockActor *block = dynamic_cast<BlockActor *>(worker);
ptree prop_e;
typedef std::pair<std::string, PropertyRegistryPair> PropRegistryKVP;
- BOOST_FOREACH(const PropRegistryKVP &p, block->property_registry)
+ BOOST_FOREACH(const PropRegistryKVP &p, block->data->property_registry)
{
ptree prop_attrs;
if (p.second.setter)
@@ -175,7 +175,7 @@ static ptree query_props(ElementImpl *self, const ptree &query)
if (block->block_ptr->get_uid() != block_id) continue;
if (set)
{
- const std::type_info &t = block->property_registry[prop_key].setter->type();
+ const std::type_info &t = block->data->property_registry[prop_key].setter->type();
const PMCC p = ptree_to_pmc(query.get_child("value"), t);
block->prop_access_dispatcher(prop_key, p, true);
}
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 38ea138..e93089a 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -31,42 +31,42 @@ void BlockActor::handle_topology(
this->block_ptr->notify_topology(num_inputs, num_outputs);
//resize and fill port properties
- resize_fill_back(this->input_configs, num_inputs);
- resize_fill_back(this->output_configs, num_outputs);
+ resize_fill_back(data->input_configs, num_inputs);
+ resize_fill_back(data->output_configs, num_outputs);
//resize the bytes consumed/produced
- resize_fill_grow(this->stats.items_consumed, num_inputs, 0);
- resize_fill_grow(this->stats.tags_consumed, num_inputs, 0);
- resize_fill_grow(this->stats.msgs_consumed, num_inputs, 0);
- resize_fill_grow(this->stats.items_produced, num_outputs, 0);
- resize_fill_grow(this->stats.tags_produced, num_outputs, 0);
- resize_fill_grow(this->stats.msgs_produced, num_outputs, 0);
+ resize_fill_grow(data->stats.items_consumed, num_inputs, 0);
+ resize_fill_grow(data->stats.tags_consumed, num_inputs, 0);
+ resize_fill_grow(data->stats.msgs_consumed, num_inputs, 0);
+ resize_fill_grow(data->stats.items_produced, num_outputs, 0);
+ resize_fill_grow(data->stats.tags_produced, num_outputs, 0);
+ resize_fill_grow(data->stats.msgs_produced, num_outputs, 0);
//resize all work buffers to match current connections
- this->input_items.resize(num_inputs);
- this->output_items.resize(num_outputs);
- this->input_queues.resize(num_inputs);
- this->output_queues.resize(num_outputs);
- this->produce_outputs.resize(num_outputs, false);
- this->inputs_available.resize(num_inputs);
- if (num_inputs == 0) this->inputs_available.resize(1, true); //so its always "available"
+ data->input_items.resize(num_inputs);
+ data->output_items.resize(num_outputs);
+ data->input_queues.resize(num_inputs);
+ data->output_queues.resize(num_outputs);
+ data->produce_outputs.resize(num_outputs, false);
+ data->inputs_available.resize(num_inputs);
+ if (num_inputs == 0) data->inputs_available.resize(1, true); //so its always "available"
//copy the name into the queues for debug purposes
- this->input_queues.name = this->name;
- this->output_queues.name = this->name;
+ data->input_queues.name = this->name;
+ data->output_queues.name = this->name;
//resize the token trackers
- this->input_tokens.resize(num_inputs);
- this->output_tokens.resize(num_outputs);
- this->inputs_done.resize(num_inputs);
- this->outputs_done.resize(num_outputs);
- this->output_allocation_hints.resize(num_outputs);
+ data->input_tokens.resize(num_inputs);
+ data->output_tokens.resize(num_outputs);
+ data->inputs_done.resize(num_inputs);
+ data->outputs_done.resize(num_outputs);
+ data->output_allocation_hints.resize(num_outputs);
//resize tags vector to match sizes
- this->input_tags_changed.resize(num_inputs);
- this->input_tags.resize(num_inputs);
- this->num_input_msgs_read.resize(num_inputs);
- this->input_msgs.resize(num_inputs);
+ data->input_tags_changed.resize(num_inputs);
+ data->input_tags.resize(num_inputs);
+ data->num_input_msgs_read.resize(num_inputs);
+ data->input_msgs.resize(num_inputs);
//a block looses all connections, allow it to free
if (num_inputs == 0 and num_outputs == 0)