diff options
m--------- | gnuradio | 0 | ||||
m--------- | grextras | 0 | ||||
-rw-r--r-- | include/gras/block.hpp | 41 | ||||
-rw-r--r-- | lib/block.cpp | 63 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 6 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/block_task.cpp | 12 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 2 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 8 | ||||
-rw-r--r-- | lib/output_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/topology_handler.cpp | 4 | ||||
-rw-r--r-- | python/gras/GRAS_Block.i | 6 |
12 files changed, 62 insertions, 84 deletions
diff --git a/gnuradio b/gnuradio -Subproject 7233e7c2dd2c0cba1a998cdd5b3ad7f76816e9d +Subproject c60a6dc8b61c26c37874b27e4440c02e573dca2 diff --git a/grextras b/grextras -Subproject f274e53f3a74f7fbac6f241504be3eafee30e58 +Subproject 1ebef1dfa208ea00dd67376cafde89988e0a845 diff --git a/include/gras/block.hpp b/include/gras/block.hpp index f00a6f7..1c6516d 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -20,6 +20,9 @@ struct GRAS_API InputPortConfig { InputPortConfig(void); + //! The size of an item in bytes + size_t item_size; + /*! * Set an input reserve requirement such that work is called * with an input buffer at least reserve items in size. @@ -80,6 +83,9 @@ struct GRAS_API OutputPortConfig { OutputPortConfig(void); + //! The size of an item in bytes + size_t item_size; + /*! * Set an output reserve requirement such that work is called * with an output buffer at least reserve items in size. @@ -109,36 +115,27 @@ struct GRAS_API Block : Element Block(const std::string &name); /******************************************************************* - * Item sizes for ports - ******************************************************************/ - - //! Get the input item size for this port in bytes - size_t get_input_size(const size_t which_input) const; - - //! Set the input item size for this port in bytes - void set_input_size(const size_t which_input, const size_t bytes); - - //! Get the output item size for this port in bytes - size_t get_output_size(const size_t which_output) const; - - //! Set the output item size for this port in bytes - void set_output_size(const size_t which_output, const size_t bytes); - - /******************************************************************* * Deal with input and output port configuration ******************************************************************/ //! Get the configuration rules of an input port - InputPortConfig get_input_config(const size_t which_input) const; + const InputPortConfig &input_config(const size_t which_input) const; - //! Set the configuration rules for an input port - void set_input_config(const size_t which_input, const InputPortConfig &config); + //! Get the configuration rules of an input port + InputPortConfig &input_config(const size_t which_input); //! Get the configuration rules of an output port - OutputPortConfig get_output_config(const size_t which_output) const; + const OutputPortConfig &output_config(const size_t which_output) const; - //! Set the configuration rules for an output port - void set_output_config(const size_t which_output, const OutputPortConfig &config); + //! Get the configuration rules of an output port + OutputPortConfig &output_config(const size_t which_output); + + /*! + * Commit changes to the port configuration. + * Changes are commited automatically when the block becomes active. + * However, once active, changes may not effect until commit_config(). + */ + void commit_config(void); /******************************************************************* * Deal with data production and consumption diff --git a/lib/block.cpp b/lib/block.cpp index 424df83..e9e170b 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -9,6 +9,7 @@ using namespace gras; InputPortConfig::InputPortConfig(void) { + item_size = 1; reserve_items = 1; maximum_items = 0; inline_buffer = false; @@ -17,6 +18,7 @@ InputPortConfig::InputPortConfig(void) OutputPortConfig::OutputPortConfig(void) { + item_size = 1; reserve_items = 1; maximum_items = 0; } @@ -38,10 +40,8 @@ Block::Block(const std::string &name): (*this)->block->block_state = BlockActor::BLOCK_STATE_INIT; //call block methods to init stuff - this->set_input_size(0, 1); - this->set_output_size(0, 1); - this->set_input_config(0, InputPortConfig()); - this->set_output_config(0, OutputPortConfig()); + this->input_config(0) = InputPortConfig(); + this->output_config(0) = OutputPortConfig(); this->set_interruptible_work(false); this->set_buffer_affinity(-1); } @@ -62,74 +62,62 @@ void ElementImpl::block_cleanup(void) this->thread_pool.reset(); //must be deleted after actor } -template <typename V, typename T> -void vector_set(V &v, const T &t, const size_t index) +template <typename V> +const typename V::value_type &vector_get_const(const V &v, const size_t index) { if (v.size() <= index) { - v.resize(index+1, t); + return v.back(); } - v[index] = t; + return v[index]; } template <typename V> -typename V::value_type vector_get(const V &v, const size_t index) +typename V::value_type &vector_get_resize(V &v, const size_t index) { if (v.size() <= index) { - return v.front(); + if (v.empty()) v.resize(1); + v.resize(index+1, v.back()); } return v[index]; } -size_t Block::get_input_size(const size_t which_input) const -{ - return vector_get((*this)->block->input_items_sizes, which_input); -} - -void Block::set_input_size(const size_t which_input, const size_t bytes) +InputPortConfig &Block::input_config(const size_t which_input) { - vector_set((*this)->block->input_items_sizes, bytes, which_input); + return vector_get_resize((*this)->block->input_configs, which_input); } -size_t Block::get_output_size(const size_t which_output) const +const InputPortConfig &Block::input_config(const size_t which_input) const { - return vector_get((*this)->block->output_items_sizes, which_output); + return vector_get_const((*this)->block->input_configs, which_input); } -void Block::set_output_size(const size_t which_output, const size_t bytes) +OutputPortConfig &Block::output_config(const size_t which_output) { - vector_set((*this)->block->output_items_sizes, bytes, which_output); + return vector_get_resize((*this)->block->output_configs, which_output); } -InputPortConfig Block::get_input_config(const size_t which_input) const +const OutputPortConfig &Block::output_config(const size_t which_output) const { - return vector_get((*this)->block->input_configs, which_input); + return vector_get_const((*this)->block->output_configs, which_output); } -void Block::set_input_config(const size_t which_input, const InputPortConfig &config) +void Block::commit_config(void) { - vector_set((*this)->block->input_configs, config, which_input); + for (size_t i = 0; i < (*this)->block->get_num_inputs(); i++) { InputUpdateMessage message; - message.index = which_input; + message.index = i; (*this)->block->Push(message, Theron::Address()); } -} - -OutputPortConfig Block::get_output_config(const size_t which_output) const -{ - return vector_get((*this)->block->output_configs, which_output); -} - -void Block::set_output_config(const size_t which_output, const OutputPortConfig &config) -{ - vector_set((*this)->block->output_configs, config, which_output); + for (size_t i = 0; i < (*this)->block->get_num_outputs(); i++) { OutputUpdateMessage message; - message.index = which_output; + message.index = i; (*this)->block->Push(message, Theron::Address()); } + } void Block::consume(const size_t which_input, const size_t num_items) @@ -195,7 +183,6 @@ PMCC Block::pop_input_msg(const size_t which_input) if (input_tags.empty()) return PMCC(); PMCC p = input_tags.front().object; input_tags.erase(input_tags.begin()); - (*this)->block->stats.items_consumed[which_input]++; return p; } diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index c1ef827..3161653 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -62,9 +62,9 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address { const size_t bytes = recommend_length( this->output_allocation_hints[i], - my_round_up_mult(AT_LEAST_BYTES, this->output_items_sizes[i]), - this->output_configs[i].reserve_items*this->output_items_sizes[i], - this->output_configs[i].maximum_items*this->output_items_sizes[i] + my_round_up_mult(AT_LEAST_BYTES, this->output_configs[i].item_size), + this->output_configs[i].reserve_items*this->output_configs[i].item_size, + this->output_configs[i].maximum_items*this->output_configs[i].item_size ); SBufferDeleter deleter = boost::bind(&BlockActor::buffer_returner, this, i, _1); diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 73a22da..a6d49bb 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -55,7 +55,7 @@ void BlockActor::handle_top_token( //TODO, schedule this message as a pre-allocation message //tell the upstream about the input requirements OutputHintMessage output_hints; - output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_items_sizes[i]; + output_hints.reserve_bytes = this->input_configs[i].reserve_items*this->input_configs[i].item_size; output_hints.token = this->input_tokens[i]; this->post_upstream(i, output_hints); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 52cf505..56d89fb 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -87,7 +87,7 @@ void BlockActor::output_fail(const size_t i) SBuffer &buff = this->output_queues.front(i); //check that the input is not already maxed - const size_t front_items = buff.length/this->output_items_sizes[i]; + const size_t front_items = buff.length/this->output_configs[i].item_size; if (front_items >= this->output_configs[i].maximum_items) { throw std::runtime_error("output_fail called on maximum_items buffer"); @@ -133,7 +133,7 @@ void BlockActor::handle_task(void) ASSERT(this->input_queues.ready(i)); const SBuffer &buff = this->input_queues.front(i); const void *mem = buff.get(); - size_t items = buff.length/this->input_items_sizes[i]; + size_t items = buff.length/this->input_configs[i].item_size; this->input_items[i].get() = mem; this->input_items[i].size() = items; @@ -170,7 +170,7 @@ void BlockActor::handle_task(void) ASSERT(buff.length == 0); //assumes it was flushed last call void *mem = buff.get(); const size_t bytes = buff.get_actual_length() - buff.offset; - size_t items = bytes/this->output_items_sizes[i]; + size_t items = bytes/this->output_configs[i].item_size; this->output_items[i].get() = mem; this->output_items[i].size() = items; @@ -230,7 +230,7 @@ void BlockActor::consume(const size_t i, const size_t items) std::cerr << name << " consume " << items << std::endl; #endif this->stats.items_consumed[i] += items; - const size_t bytes = items*this->input_items_sizes[i]; + const size_t bytes = items*this->input_configs[i].item_size; this->input_queues.consume(i, bytes); this->trim_tags(i); } @@ -242,14 +242,14 @@ void BlockActor::produce(const size_t i, const size_t items) #endif SBuffer &buff = this->output_queues.front(i); this->stats.items_produced[i] += items; - const size_t bytes = items*this->output_items_sizes[i]; + const size_t bytes = items*this->output_configs[i].item_size; buff.length += bytes; } void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer) { this->flush_output(i); - const size_t items = buffer.length/output_items_sizes[i]; + const size_t items = buffer.length/output_configs[i].item_size; this->stats.items_produced[i] += items; InputBufferMessage buff_msg; buff_msg.buffer = buffer; diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index ba69567..8bea03c 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -122,8 +122,6 @@ struct BlockActor : Apology::Worker } //per port properties - std::vector<size_t> input_items_sizes; - std::vector<size_t> output_items_sizes; std::vector<InputPortConfig> input_configs; std::vector<OutputPortConfig> output_configs; diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 4ca0bdf..e51808b 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -77,8 +77,8 @@ void BlockActor::handle_input_update(const InputUpdateMessage &message, const Th //update buffer queue configuration if (i >= this->input_queues.size()) return; - const size_t preload_bytes = this->input_items_sizes[i]*this->input_configs[i].preload_items; - const size_t reserve_bytes = this->input_items_sizes[i]*this->input_configs[i].reserve_items; - const size_t maximum_bytes = this->input_items_sizes[i]*this->input_configs[i].maximum_items; - this->input_queues.update_config(i, this->input_items_sizes[i], preload_bytes, reserve_bytes, maximum_bytes); + const size_t preload_bytes = this->input_configs[i].item_size*this->input_configs[i].preload_items; + const size_t reserve_bytes = this->input_configs[i].item_size*this->input_configs[i].reserve_items; + const size_t maximum_bytes = this->input_configs[i].item_size*this->input_configs[i].maximum_items; + this->input_queues.update_config(i, this->input_configs[i].item_size, preload_bytes, reserve_bytes, maximum_bytes); } diff --git a/lib/output_handlers.cpp b/lib/output_handlers.cpp index aaf5544..3ab78fa 100644 --- a/lib/output_handlers.cpp +++ b/lib/output_handlers.cpp @@ -80,6 +80,6 @@ void BlockActor::handle_output_update(const OutputUpdateMessage &message, const //update buffer queue configuration if (i >= this->output_queues.size()) return; - const size_t reserve_bytes = this->output_items_sizes[i]*this->output_configs[i].reserve_items; + const size_t reserve_bytes = this->output_configs[i].item_size*this->output_configs[i].reserve_items; this->output_queues.set_reserve_bytes(i, reserve_bytes); } diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp index 4f12619..f0a8b21 100644 --- a/lib/topology_handler.cpp +++ b/lib/topology_handler.cpp @@ -30,10 +30,6 @@ void BlockActor::handle_topology( //call notify_topology on block before committing settings this->block_ptr->notify_topology(num_inputs, num_outputs); - //fill the item sizes per port - resize_fill_back(this->input_items_sizes, num_inputs); - resize_fill_back(this->output_items_sizes, num_outputs); - //resize and fill port properties resize_fill_back(this->input_configs, num_inputs); resize_fill_back(this->output_configs, num_outputs); diff --git a/python/gras/GRAS_Block.i b/python/gras/GRAS_Block.i index 9e7299c..26cf8ea 100644 --- a/python/gras/GRAS_Block.i +++ b/python/gras/GRAS_Block.i @@ -204,11 +204,11 @@ class Block(BlockPython): def set_input_signature(self, sig): self.__in_sig = sig_to_dtype_sig(sig) - for i, n in enumerate(self.__in_sig): self.set_input_size(i, n.itemsize) + for i, n in enumerate(self.__in_sig): self.input_config(i).item_size = n.itemsize def set_output_signature(self, sig): self.__out_sig = sig_to_dtype_sig(sig) - for i, n in enumerate(self.__out_sig): self.set_output_size(i, n.itemsize) + for i, n in enumerate(self.__out_sig): self.output_config(i).item_size = n.itemsize def input_signature(self): return self.__in_sig def output_signature(self): return self.__out_sig @@ -275,8 +275,8 @@ class Block(BlockPython): def propagate_tags(self, i, iter): for o in self.__out_indexes: for t in iter: - t.offset -= self.get_consumed(i) t.offset += self.get_produced(o) + t.offset -= self.get_consumed(i) self.post_output_tag(o, t) %} |