// // Copyright 2012 Josh Blum // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with this program. If not, see . #include "pmx_helper.hpp" #include #include #include gr_block::gr_block(void) { //NOP } gr_block::gr_block( const std::string &name, gr_io_signature_sptr input_signature, gr_io_signature_sptr output_signature ): gras::Block(name) { //this initializes private vars, order matters this->set_fixed_rate(false); this->set_output_multiple(1); this->set_history(1); this->set_relative_rate(1.0); this->set_decimation(1); this->set_interpolation(1); this->set_tag_propagation_policy(TPP_ALL_TO_ALL); this->set_input_signature(input_signature); this->set_output_signature(output_signature); } void gr_block::notify_topology(const size_t num_inputs, const size_t num_outputs) { _fcast_ninput_items.resize(num_inputs); _work_ninput_items.resize(num_inputs); _work_input_items.resize(num_inputs); _work_output_items.resize(num_outputs); this->check_topology(num_inputs, num_outputs); } bool gr_block::check_topology(int, int) { return true; } void gr_block::work( const InputItems &input_items, const OutputItems &output_items ){ _work_io_ptr_mask = 0; #define REALLY_BIG size_t(1 << 30) const size_t num_inputs = input_items.size(); const size_t num_outputs = output_items.size(); //------------------------------------------------------------------ //-- initialize input buffers before work //------------------------------------------------------------------ size_t num_input_items = REALLY_BIG; //so big that it must std::min for (size_t i = 0; i < num_inputs; i++) { _work_ninput_items[i] = input_items[i].size(); _work_input_items[i] = input_items[i].get(); _work_io_ptr_mask |= ptrdiff_t(_work_input_items[i]); size_t items = input_items[i].size(); if (_enable_fixed_rate) { if (items <= _input_history_items) { return this->mark_input_fail(i); } items -= _input_history_items; } num_input_items = std::min(num_input_items, items); } //------------------------------------------------------------------ //-- initialize output buffers before work //------------------------------------------------------------------ size_t num_output_items = REALLY_BIG; //so big that it must std::min for (size_t i = 0; i < num_outputs; i++) { _work_output_items[i] = output_items[i].get(); _work_io_ptr_mask |= ptrdiff_t(_work_output_items[i]); size_t items = output_items[i].size(); items /= _output_multiple_items; items *= _output_multiple_items; num_output_items = std::min(num_output_items, items); } //------------------------------------------------------------------ //-- calculate the work_noutput_items given: //-- min of num_input_items //-- min of num_output_items //-- relative rate and output multiple items //------------------------------------------------------------------ size_t work_noutput_items = num_output_items; if (num_inputs and (_enable_fixed_rate or not num_outputs)) { size_t calc_output_items = size_t(num_input_items*_relative_rate); calc_output_items += _output_multiple_items-1; calc_output_items /= _output_multiple_items; calc_output_items *= _output_multiple_items; if (calc_output_items and calc_output_items < work_noutput_items) work_noutput_items = calc_output_items; } //------------------------------------------------------------------ //-- forecast //------------------------------------------------------------------ if (num_inputs or num_outputs) { forecast_again_you_jerk: _fcast_ninput_items = _work_ninput_items; //init for NOP case this->forecast(work_noutput_items, _fcast_ninput_items); for (size_t i = 0; i < input_items.size(); i++) { if (_fcast_ninput_items[i] <= _work_ninput_items[i]) continue; //handle the case of forecast failing if (work_noutput_items <= _output_multiple_items) { return this->mark_input_fail(i); } work_noutput_items = work_noutput_items/2; //backoff regime work_noutput_items += _output_multiple_items-1; work_noutput_items /= _output_multiple_items; work_noutput_items *= _output_multiple_items; goto forecast_again_you_jerk; } } const int work_ret = this->general_work( work_noutput_items, _work_ninput_items, _work_input_items, _work_output_items ); if (work_ret > 0) for (size_t i = 0; i < num_outputs; i++) { this->produce(i, work_ret); } if (work_ret == -1) this->mark_done(); } static inline unsigned long long myullround(const double x) { return (unsigned long long)(x + 0.5); } void gr_block::propagate_tags(const size_t which_input, const gras::TagIter &iter) { const size_t num_outputs = _work_output_items.size(); switch (_tag_prop_policy) { case TPP_DONT: break; //well that was ez case TPP_ALL_TO_ALL: for (size_t out_i = 0; out_i < num_outputs; out_i++) { BOOST_FOREACH(gras::Tag t, iter) { t.offset = myullround(t.offset * _relative_rate); this->post_output_tag(out_i, t); } } break; case TPP_ONE_TO_ONE: if (which_input < num_outputs) { BOOST_FOREACH(gras::Tag t, iter) { t.offset = myullround(t.offset * _relative_rate); this->post_output_tag(which_input, t); } } break; }; } void gr_block::forecast(int noutput_items, std::vector &ninputs_req) { for (size_t i = 0; i < ninputs_req.size(); i++) { ninputs_req[i] = fixed_rate_noutput_to_ninput(noutput_items); } } int gr_block::general_work( int noutput_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items ){ throw std::runtime_error("gr_block subclasses must overload general_work!"); } void gr_block::consume_each(const int how_many_items) { if (how_many_items < 0) return; gras::Block::consume(size_t(how_many_items)); } void gr_block::consume(const size_t i, const int how_many_items) { if (how_many_items < 0) return; gras::Block::consume(i, size_t(how_many_items)); } void gr_block::produce(const size_t o, const int how_many_items) { if (how_many_items < 0) return; gras::Block::produce(o, size_t(how_many_items)); } uint64_t gr_block::nitems_read(const size_t which_input) { return Block::get_consumed(which_input); } uint64_t gr_block::nitems_written(const size_t which_output) { return Block::get_produced(which_output); } void gr_block::set_alignment(const size_t) { //TODO //probably dont need this since buffers always start aligned //and therefore alignment is always re-acheived } bool gr_block::is_unaligned(void) { //TODO //probably dont need this since volk dispatcher checks alignment //32 byte aligned is good enough for you return (_work_io_ptr_mask & ptrdiff_t(GRAS_MAX_ALIGNMENT-1)) != 0; } size_t gr_block::fixed_rate_noutput_to_ninput(const size_t noutput_items) { return ((decimation()*noutput_items)/interpolation()) + _input_history_items; } size_t gr_block::interpolation(void) const { return _interp; } void gr_block::set_interpolation(const size_t interp) { _interp = interp; this->set_relative_rate(1.0*interp); this->set_output_multiple(interp); } size_t gr_block::decimation(void) const { return _decim; } void gr_block::set_decimation(const size_t decim) { _decim = decim; this->set_relative_rate(1.0/decim); } unsigned gr_block::history(void) const { //implement off-by-one history compat return _input_history_items+1; } void gr_block::set_history(unsigned history) { gras::InputPortConfig config = this->get_input_config(0); //implement off-by-one history compat if (history == 0) history++; _input_history_items = history-1; config.preload_items = _input_history_items; this->set_input_config(0, config); } void gr_block::set_fixed_rate(const bool fixed_rate) { _enable_fixed_rate = fixed_rate; } bool gr_block::fixed_rate(void) const { return _enable_fixed_rate; } void gr_block::_update_input_reserve(void) { /*! * Set an input reserve for fixed rate blocks. * * FIXME: Also do this when output multiple is large, * This makes gr-trellis pass under conditions where not fixed rate set, * but the output multiple is so large that default input isnt sufficient. */ if (_enable_fixed_rate or _output_multiple_items > 1024) { gras::InputPortConfig config = this->get_input_config(0); config.reserve_items = size_t(0.5 + _output_multiple_items/_relative_rate); if (config.reserve_items) this->set_input_config(0, config); } } void gr_block::set_output_multiple(const size_t multiple) { _output_multiple_items = multiple; gras::OutputPortConfig config = this->get_output_config(0); config.reserve_items = multiple; this->set_output_config(0, config); this->_update_input_reserve(); } size_t gr_block::output_multiple(void) const { return _output_multiple_items; } void gr_block::set_relative_rate(double relative_rate) { _relative_rate = relative_rate; this->_update_input_reserve(); } double gr_block::relative_rate(void) const { return _relative_rate; } int gr_block::max_noutput_items(void) const { return this->get_output_config(0).maximum_items; } void gr_block::set_max_noutput_items(int max_items) { gras::OutputPortConfig config = this->get_output_config(0); config.maximum_items = max_items; this->set_output_config(0, config); } void gr_block::unset_max_noutput_items(void) { this->set_max_noutput_items(0); } bool gr_block::is_set_max_noutput_items(void) const { return this->max_noutput_items() != 0; } static gr_tag_t Tag2gr_tag(const gras::Tag &tag) { gr_tag_t t; t.offset = tag.offset; const gras::StreamTag &st = tag.object.as(); t.key = pmt::pmc_to_pmt(st.key); t.value = pmt::pmc_to_pmt(st.val); t.srcid = pmt::pmc_to_pmt(st.src); return t; } static gras::Tag gr_tag2Tag(const gr_tag_t &tag) { return gras::Tag ( tag.offset, PMC_M(gras::StreamTag( pmt::pmt_to_pmc(tag.key), pmt::pmt_to_pmc(tag.value), pmt::pmt_to_pmc(tag.srcid) )) ); } void gr_block::add_item_tag( const size_t which_output, const gr_tag_t &tag ){ this->post_output_tag(which_output, gr_tag2Tag(tag)); } void gr_block::add_item_tag( const size_t which_output, uint64_t abs_offset, const pmt::pmt_t &key, const pmt::pmt_t &value, const pmt::pmt_t &srcid ){ gr_tag_t t; t.offset = abs_offset; t.key = key; t.value = value; t.srcid = srcid; this->add_item_tag(which_output, t); } void gr_block::get_tags_in_range( std::vector &tags, const size_t which_input, uint64_t abs_start, uint64_t abs_end, const pmt::pmt_t &key ){ tags.clear(); BOOST_FOREACH(const gras::Tag &tag, this->get_input_tags(which_input)) { if (tag.offset >= abs_start and tag.offset <= abs_end) { gr_tag_t t = Tag2gr_tag(tag); if (not key or pmt::pmt_equal(t.key, key)) tags.push_back(t); } } } gr_block::tag_propagation_policy_t gr_block::tag_propagation_policy(void) { return _tag_prop_policy; } void gr_block::set_tag_propagation_policy(gr_block::tag_propagation_policy_t p) { _tag_prop_policy = p; } gras::BufferQueueSptr gr_block::input_buffer_allocator(const size_t, const gras::SBufferConfig &config) { if (_input_history_items) { return gras::BufferQueue::make_circ(config, 32/*many*/); } return gras::BufferQueueSptr(); }