summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------gnuradio0
m---------grextras0
-rw-r--r--include/gras/block.hpp41
-rw-r--r--lib/block.cpp63
-rw-r--r--lib/block_allocator.cpp6
-rw-r--r--lib/block_handlers.cpp2
-rw-r--r--lib/block_task.cpp12
-rw-r--r--lib/gras_impl/block_actor.hpp2
-rw-r--r--lib/input_handlers.cpp8
-rw-r--r--lib/output_handlers.cpp2
-rw-r--r--lib/topology_handler.cpp4
-rw-r--r--python/gras/GRAS_Block.i6
12 files changed, 62 insertions, 84 deletions
diff --git a/gnuradio b/gnuradio
-Subproject 7233e7c2dd2c0cba1a998cdd5b3ad7f76816e9d
+Subproject c60a6dc8b61c26c37874b27e4440c02e573dca2
diff --git a/grextras b/grextras
-Subproject f274e53f3a74f7fbac6f241504be3eafee30e58
+Subproject 1ebef1dfa208ea00dd67376cafde89988e0a845
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index f00a6f7..1c6516d 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -20,6 +20,9 @@ struct GRAS_API InputPortConfig
{
InputPortConfig(void);
+ //! The size of an item in bytes
+ size_t item_size;
+
/*!
* Set an input reserve requirement such that work is called
* with an input buffer at least reserve items in size.
@@ -80,6 +83,9 @@ struct GRAS_API OutputPortConfig
{
OutputPortConfig(void);
+ //! The size of an item in bytes
+ size_t item_size;
+
/*!
* Set an output reserve requirement such that work is called
* with an output buffer at least reserve items in size.
@@ -109,36 +115,27 @@ struct GRAS_API Block : Element
Block(const std::string &name);
/*******************************************************************
- * Item sizes for ports
- ******************************************************************/
-
- //! Get the input item size for this port in bytes
- size_t get_input_size(const size_t which_input) const;
-
- //! Set the input item size for this port in bytes
- void set_input_size(const size_t which_input, const size_t bytes);
-
- //! Get the output item size for this port in bytes
- size_t get_output_size(const size_t which_output) const;
-
- //! Set the output item size for this port in bytes
- void set_output_size(const size_t which_output, const size_t bytes);
-
- /*******************************************************************
* Deal with input and output port configuration
******************************************************************/
//! Get the configuration rules of an input port
- InputPortConfig get_input_config(const size_t which_input) const;
+ const InputPortConfig &input_config(const size_t which_input) const;
- //! Set the configuration rules for an input port
- void set_input_config(const size_t which_input, const InputPortConfig &config);
+ //! Get the configuration rules of an input port
+ InputPortConfig &input_config(const size_t which_input);
//! Get the configuration rules of an output port
- OutputPortConfig get_output_config(const size_t which_output) const;
+ const OutputPortConfig &output_config(const size_t which_output) const;
- //! Set the configuration rules for an output port
- void set_output_config(const size_t which_output, const OutputPortConfig &config);
+ //! Get the configuration rules of an output port
+ OutputPortConfig &output_config(const size_t which_output);
+
+ /*!
+ * Commit changes to the port configuration.
+ * Changes are commited automatically when the block becomes active.
+ * However, once active, changes may not effect until commit_config().
+ */
+ void commit_config(void);
/*******************************************************************
* Deal with data production and consumption
diff --git a/lib/block.cpp b/lib/block.cpp
index 424df83..e9e170b 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -9,6 +9,7 @@ using namespace gras;
InputPortConfig::InputPortConfig(void)
{
+ item_size = 1;
reserve_items = 1;
maximum_items = 0;
inline_buffer = false;
@@ -17,6 +18,7 @@ InputPortConfig::InputPortConfig(void)
OutputPortConfig::OutputPortConfig(void)
{
+ item_size = 1;
reserve_items = 1;
maximum_items = 0;
}
@@ -38,10 +40,8 @@ Block::Block(const std::string &name):
(*this)->block->block_state = BlockActor::BLOCK_STATE_INIT;
//call block methods to init stuff
- this->set_input_size(0, 1);
- this->set_output_size(0, 1);
- this->set_input_config(0, InputPortConfig());
- this->set_output_config(0, OutputPortConfig());
+ this->input_config(0) = InputPortConfig();
+ this->output_config(0) = OutputPortConfig();
this->set_interruptible_work(false);
this->set_buffer_affinity(-1);
}
@@ -62,74 +62,62 @@ void ElementImpl::block_cleanup(void)
this->thread_pool.reset(); //must be deleted after actor
}
-template <typename V, typename T>
-void vector_set(V &v, const T &t, const size_t index)
+template <typename V>
+const typename V::value_type &vector_get_const(const V &v, const size_t index)
{
if (v.size() <= index)
{
- v.resize(index+1, t);
+ return v.back();
}
- v[index] = t;
+ return v[index];
}
template <typename V>
-typename V::value_type vector_get(const V &v, const size_t index)
+typename V::value_type &vector_get_resize(V &v, const size_t index)
{
if (v.size() <= index)
{
- return v.front();
+ if (v.empty()) v.resize(1);
+ v.resize(index+1, v.back());
}
return v[index];
}
-size_t Block::get_input_size(const size_t which_input) const
-{
- return vector_get((*this)->block->input_items_sizes, which_input);
-}
-
-void Block::set_input_size(const size_t which_input, const size_t bytes)
+InputPortConfig &Block::input_config(const size_t which_input)
{
- vector_set((*this)->block->input_items_sizes, bytes, which_input);
+ return vector_get_resize((*this)->block->input_configs, which_input);
}
-size_t Block::get_output_size(const size_t which_output) const
+const InputPortConfig &Block::input_config(const size_t which_input) const
{
- return vector_get((*this)->block->output_items_sizes, which_output);
+ return vector_get_const((*this)->block->input_configs, which_input);
}
-void Block::set_output_size(const size_t which_output, const size_t bytes)
+OutputPortConfig &Block::output_config(const size_t which_output)
{
- vector_set((*this)->block->output_items_sizes, bytes, which_output);
+ return vector_get_resize((*this)->block->output_configs, which_output);
}
-InputPortConfig Block::get_input_config(const size_t which_input) const
+const OutputPortConfig &Block::output_config(const size_t which_output) const
{
- return vector_get((*this)->block->input_configs, which_input);
+ return vector_get_const((*this)->block->output_configs, which_output);
}
-void Block::set_input_config(const size_t which_input, const InputPortConfig &config)
+void Block::commit_config(void)
{
- vector_set((*this)->block->input_configs, config, which_input);
+ for (size_t i = 0; i < (*this)->block->get_num_inputs(); i++)
{
InputUpdateMessage message;
- message.index = which_input;
+ message.index = i;
(*this)->block->Push(message, Theron::Address());
}
-}
-
-OutputPortConfig Block::get_output_config(const size_t which_output) const
-{
- return vector_get((*this)->block->output_configs, which_output);
-}
-
-void Block::set_output_config(const size_t which_output, const OutputPortConfig &config)
-{
- vector_set((*this)->block->output_configs, config, which_output);
+ for (size_t i = 0; i < (*this)->block->get_num_outputs(); i++)
{
OutputUpdateMessage message;
- message.index = which_output;
+ message.index = i;
(*this)->block->Push(message, Theron::Address());
}
+
}
void Block::consume(const size_t which_input, const size_t num_items)
@@ -195,7 +183,6 @@ PMCC Block::pop_input_msg(const size_t which_input)
if (input_tags.empty()) return PMCC();
PMCC p = input_tags.front().object;
input_tags.erase(input_tags.begin());
- (*this)->block->stats.items_consumed[which_input]++;
return p;
}
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index c1ef827..3161653 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -62,9 +62,9 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
{
const size_t bytes = recommend_length(
this->output_allocation_hints[i],
- my_round_up_mult(AT_LEAST_BYTES, this->output_items_sizes[i]),
- this->output_configs[i].reserve_items*this->output_items_sizes[i],
- this->output_configs[i].maximum_items*this->output_items_sizes[i]
+ my_round_up_mult(AT_LEAST_BYTES, this->output_configs[i].item_size),
+ this->output_configs[i].reserve_items*this->output_configs[i].item_size,
+ this->output_configs[i].maximum_items*this->output_configs[i].item_size
);
SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1);
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 73a22da..a6d49bb 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -55,7 +55,7 @@ void BlockActor::handle_top_token(
//TODO, schedule this message as a pre-allocation message
//tell the upstream about the input requirements
OutputHintMessage output_hints;
- output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_items_sizes[i];
+ output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size;
output_hints.token = this->input_tokens[i];
this->post_upstream(i, output_hints);
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 52cf505..56d89fb 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -87,7 +87,7 @@ void BlockActor::output_fail(const size_t i)
SBuffer &buff = this->output_queues.front(i);
//check that the input is not already maxed
- const size_t front_items = buff.length/this->output_items_sizes[i];
+ const size_t front_items = buff.length/this->output_configs[i].item_size;
if (front_items >= this->output_configs[i].maximum_items)
{
throw std::runtime_error("output_fail called on maximum_items buffer");
@@ -133,7 +133,7 @@ void BlockActor::handle_task(void)
ASSERT(this->input_queues.ready(i));
const SBuffer &buff = this->input_queues.front(i);
const void *mem = buff.get();
- size_t items = buff.length/this->input_items_sizes[i];
+ size_t items = buff.length/this->input_configs[i].item_size;
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
@@ -170,7 +170,7 @@ void BlockActor::handle_task(void)
ASSERT(buff.length == 0); //assumes it was flushed last call
void *mem = buff.get();
const size_t bytes = buff.get_actual_length() - buff.offset;
- size_t items = bytes/this->output_items_sizes[i];
+ size_t items = bytes/this->output_configs[i].item_size;
this->output_items[i].get() = mem;
this->output_items[i].size() = items;
@@ -230,7 +230,7 @@ void BlockActor::consume(const size_t i, const size_t items)
std::cerr << name << " consume " << items << std::endl;
#endif
this->stats.items_consumed[i] += items;
- const size_t bytes = items*this->input_items_sizes[i];
+ const size_t bytes = items*this->input_configs[i].item_size;
this->input_queues.consume(i, bytes);
this->trim_tags(i);
}
@@ -242,14 +242,14 @@ void BlockActor::produce(const size_t i, const size_t items)
#endif
SBuffer &buff = this->output_queues.front(i);
this->stats.items_produced[i] += items;
- const size_t bytes = items*this->output_items_sizes[i];
+ const size_t bytes = items*this->output_configs[i].item_size;
buff.length += bytes;
}
void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
{
this->flush_output(i);
- const size_t items = buffer.length/output_items_sizes[i];
+ const size_t items = buffer.length/output_configs[i].item_size;
this->stats.items_produced[i] += items;
InputBufferMessage buff_msg;
buff_msg.buffer = buffer;
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index ba69567..8bea03c 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -122,8 +122,6 @@ struct BlockActor : Apology::Worker
}
//per port properties
- std::vector<size_t> input_items_sizes;
- std::vector<size_t> output_items_sizes;
std::vector<InputPortConfig> input_configs;
std::vector<OutputPortConfig> output_configs;
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 4ca0bdf..e51808b 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -77,8 +77,8 @@ void BlockActor::handle_input_update(const InputUpdateMessage &message, const Th
//update buffer queue configuration
if (i >= this->input_queues.size()) return;
- const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items;
- const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items;
- const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items;
- this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes);
+ const size_t preload_bytes = this->input_configs[i].item_size*this->input_configs[i].preload_items;
+ const size_t reserve_bytes = this->input_configs[i].item_size*this->input_configs[i].reserve_items;
+ const size_t maximum_bytes = this->input_configs[i].item_size*this->input_configs[i].maximum_items;
+ this->input_queues.update_config(i, this->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes);
}
diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp
index aaf5544..3ab78fa 100644
--- a/lib/output_handlers.cpp
+++ b/lib/output_handlers.cpp
@@ -80,6 +80,6 @@ void BlockActor::handle_output_update(const OutputUpdateMessage &message, const
//update buffer queue configuration
if (i >= this->output_queues.size()) return;
- const size_t reserve_bytes = this->output_items_sizes[i]*this->output_configs[i].reserve_items;
+ const size_t reserve_bytes = this->output_configs[i].item_size*this->output_configs[i].reserve_items;
this->output_queues.set_reserve_bytes(i, reserve_bytes);
}
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index 4f12619..f0a8b21 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -30,10 +30,6 @@ void BlockActor::handle_topology(
//call notify_topology on block before committing settings
this->block_ptr->notify_topology(num_inputs, num_outputs);
- //fill the item sizes per port
- resize_fill_back(this->input_items_sizes, num_inputs);
- resize_fill_back(this->output_items_sizes, num_outputs);
-
//resize and fill port properties
resize_fill_back(this->input_configs, num_inputs);
resize_fill_back(this->output_configs, num_outputs);
diff --git a/python/gras/GRAS_Block.i b/python/gras/GRAS_Block.i
index 9e7299c..26cf8ea 100644
--- a/python/gras/GRAS_Block.i
+++ b/python/gras/GRAS_Block.i
@@ -204,11 +204,11 @@ class Block(BlockPython):
def set_input_signature(self, sig):
self.__in_sig = sig_to_dtype_sig(sig)
- for i, n in enumerate(self.__in_sig): self.set_input_size(i, n.itemsize)
+ for i, n in enumerate(self.__in_sig): self.input_config(i).item_size = n.itemsize
def set_output_signature(self, sig):
self.__out_sig = sig_to_dtype_sig(sig)
- for i, n in enumerate(self.__out_sig): self.set_output_size(i, n.itemsize)
+ for i, n in enumerate(self.__out_sig): self.output_config(i).item_size = n.itemsize
def input_signature(self): return self.__in_sig
def output_signature(self): return self.__out_sig
@@ -275,8 +275,8 @@ class Block(BlockPython):
def propagate_tags(self, i, iter):
for o in self.__out_indexes:
for t in iter:
- t.offset -= self.get_consumed(i)
t.offset += self.get_produced(o)
+ t.offset -= self.get_consumed(i)
self.post_output_tag(o, t)
%}