summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime/gr_block.h
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src/lib/runtime/gr_block.h')
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h437
1 files changed, 437 insertions, 0 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
new file mode 100644
index 000000000..bd085100b
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -0,0 +1,437 @@
+/*
+ * Copyright 2012-2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GNURADIO_GR_BLOCK_H
+#define INCLUDED_GNURADIO_GR_BLOCK_H
+
+#include <gr_core_api.h>
+#include <gras/block.hpp>
+#include <gr_io_signature.h>
+#include <gr_types.h>
+#include <gr_tags.h>
+#include <string>
+#include <deque>
+#include <map>
+#include <boost/foreach.hpp>
+#include <gruel/thread.h>
+#include <gr_sptr_magic.h>
+
+struct GR_CORE_API gr_block : gras::Block
+{
+
+ gr_block(void);
+
+ gr_block(
+ const std::string &name,
+ gr_io_signature_sptr input_signature,
+ gr_io_signature_sptr output_signature
+ );
+
+ long unique_id(void) const{return _unique_id;}
+ std::string name(void) const{return _name;}
+ long _unique_id;
+ std::string _name;
+
+ virtual ~gr_block(void);
+
+ gr_io_signature_sptr input_signature(void) const;
+ gr_io_signature_sptr output_signature(void) const;
+
+ void set_input_signature(gr_io_signature_sptr sig);
+ void set_output_signature(gr_io_signature_sptr sig);
+
+ virtual bool check_topology(int ninputs, int noutputs);
+
+ //! Overload me! I am the forecast
+ virtual void forecast(int, std::vector<int> &);
+
+ //! Return options for the work call
+ enum
+ {
+ WORK_CALLED_PRODUCE = -2,
+ WORK_DONE = -1
+ };
+
+ /*!
+ * \brief compute output items from input items
+ *
+ * \param noutput_items number of output items to write on each output stream
+ * \param ninput_items number of input items available on each input stream
+ * \param input_items vector of pointers to the input items, one entry per input stream
+ * \param output_items vector of pointers to the output items, one entry per output stream
+ *
+ * \returns number of items actually written to each output stream, or -1 on EOF.
+ * It is OK to return a value less than noutput_items. -1 <= return value <= noutput_items
+ *
+ * general_work must call consume or consume_each to indicate how many items
+ * were consumed on each input stream.
+ */
+ virtual int general_work(
+ int noutput_items,
+ gr_vector_int &ninput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items
+ );
+
+ virtual bool start(void);
+ virtual bool stop(void);
+
+ //! Call during work to consume items
+ void consume_each(const int how_many_items);
+
+ void consume(const size_t i, const int how_many_items);
+
+ void produce(const size_t o, const int how_many_items);
+
+ //! Get absolute count of all items consumed on the given input port
+ uint64_t nitems_read(const size_t which_input = 0);
+
+ //! Get absolute count of all items produced on the given output port
+ uint64_t nitems_written(const size_t which_output = 0);
+
+ void add_item_tag(
+ const size_t which_output, const gr_tag_t &tag
+ );
+
+ void add_item_tag(
+ const size_t which_output,
+ uint64_t abs_offset,
+ const pmt::pmt_t &key,
+ const pmt::pmt_t &value,
+ const pmt::pmt_t &srcid=pmt::PMT_F
+ );
+
+ void get_tags_in_range(
+ std::vector<gr_tag_t> &tags,
+ const size_t which_input,
+ uint64_t abs_start,
+ uint64_t abs_end,
+ const pmt::pmt_t &key = pmt::pmt_t()
+ );
+
+ void set_alignment(const size_t alignment);
+
+ bool is_unaligned(void);
+
+ size_t fixed_rate_noutput_to_ninput(const size_t noutput_items);
+
+ size_t interpolation(void) const;
+
+ void set_interpolation(const size_t);
+
+ size_t decimation(void) const;
+
+ void set_decimation(const size_t);
+
+ int max_noutput_items(void) const;
+
+ void set_max_noutput_items(int);
+
+ void unset_max_noutput_items(void);
+
+ bool is_set_max_noutput_items(void) const;
+
+ /*******************************************************************
+ * Deal with input and output port configuration
+ ******************************************************************/
+
+ unsigned history(void) const;
+
+ void set_history(unsigned history);
+
+ /*!
+ * Enable fixed rate logic.
+ * When enabled, relative rate is assumed to be set,
+ * and forecast is automatically called.
+ * Also, consume will be called automatically.
+ */
+ void set_fixed_rate(const bool fixed_rate);
+
+ //! Get the fixed rate setting
+ bool fixed_rate(void) const;
+
+ /*!
+ * The relative rate can be thought of as interpolation/decimation.
+ * In other words, relative rate is the ratio of output items to input items.
+ */
+ void set_relative_rate(const double relative_rate);
+
+ //! Get the relative rate setting
+ double relative_rate(void) const;
+
+ /*!
+ * The output multiple setting controls work output buffer sizes.
+ * Buffers will be number of items modulo rounted to the multiple.
+ */
+ void set_output_multiple(const size_t multiple);
+
+ //! Get the output multiple setting
+ size_t output_multiple(void) const;
+
+ /*******************************************************************
+ * Deal with tag handling and tag configuration
+ ******************************************************************/
+
+ enum tag_propagation_policy_t
+ {
+ TPP_DONT = 0,
+ TPP_ALL_TO_ALL = 1,
+ TPP_ONE_TO_ONE = 2
+ };
+
+ tag_propagation_policy_t tag_propagation_policy(void);
+
+ void set_tag_propagation_policy(tag_propagation_policy_t p);
+
+ ///////////// TODO //////////////////////
+ void set_max_output_buffer(long){}
+ void set_max_output_buffer(int, long){}
+ long max_output_buffer(size_t){return 0;}
+ void set_min_output_buffer(long){}
+ void set_min_output_buffer(int, long){}
+ long min_output_buffer(size_t){return 0;}
+
+ ///////////// ALIAS stuff - is it used? //////////////////////
+ std::string d_symbol_alias;
+ std::string d_symbol_name;
+ std::string symbol_name() const { return d_symbol_name; }
+ bool alias_set() { return !d_symbol_alias.empty(); }
+ std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
+ pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
+ void set_block_alias(std::string name){d_symbol_alias = name;}
+
+ ///////////// MSG stuff not implemented //////////////////////
+ typedef std::deque<pmt::pmt_t> msg_queue_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
+ typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
+ msg_queue_map_t msg_queue;
+ pmt::pmt_t message_subscribers;
+
+ typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
+ typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
+ d_msg_handlers_t d_msg_handlers;
+
+ template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){}
+
+ void message_port_register_in(pmt::pmt_t /*port_id*/){}
+ void message_port_register_out(pmt::pmt_t /*port_id*/){}
+ void message_port_pub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*msg*/){}
+ void message_port_sub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*target*/){}
+ void message_port_unsub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*target*/){}
+
+ virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier\n";*/ return false; }
+ virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier_in\n";*/ return false; }
+ virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier_out\n";*/ return false; }
+
+ /*!
+ * \brief Get input message port names.
+ *
+ * Returns the available input message ports for a block. The
+ * return object is a PMT vector that is filled with PMT symbols.
+ */
+ pmt::pmt_t message_ports_in(){return pmt::PMT_NIL;}
+
+ /*!
+ * \brief Get output message port names.
+ *
+ * Returns the available output message ports for a block. The
+ * return object is a PMT vector that is filled with PMT symbols.
+ */
+ pmt::pmt_t message_ports_out(){return pmt::PMT_NIL;}
+
+ //! is the queue empty?
+ bool empty_p(pmt::pmt_t which_port) {
+ if(msg_queue.find(which_port) == msg_queue.end())
+ throw std::runtime_error("port does not exist!");
+ return msg_queue[which_port].empty();
+ }
+ bool empty_p() {
+ bool rv = true;
+ BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
+ return rv;
+ }
+
+ //| Acquires and release the mutex
+ void insert_tail( pmt::pmt_t /*which_port*/, pmt::pmt_t /*msg*/){}
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_nowait( pmt::pmt_t /*which_port*/){return pmt::PMT_NIL;}
+
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_blocking( pmt::pmt_t /*which_port*/){return pmt::PMT_NIL;}
+
+ msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
+ return msg_queue[which_port].begin();
+ }
+
+ void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
+ msg_queue[which_port].erase(it);
+ }
+
+ virtual bool has_msg_port(pmt::pmt_t which_port){
+ if(msg_queue.find(which_port) != msg_queue.end()){
+ return true;
+ }
+ if(pmt::pmt_dict_has_key(message_subscribers, which_port)){
+ return true;
+ }
+ return false;
+ }
+
+ /*!
+ * \brief Tests if there is a handler attached to port \p which_port
+ */
+ bool has_msg_handler(pmt::pmt_t which_port) {
+ return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
+ }
+
+ /*
+ * This function is called by the runtime system to dispatch messages.
+ *
+ * The thread-safety guarantees mentioned in set_msg_handler are implemented
+ * by the callers of this method.
+ */
+ virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
+ {
+ // AA Update this
+ if(has_msg_handler(which_port)) { // Is there a handler?
+ d_msg_handlers[which_port](msg); // Yes, invoke it.
+ }
+ }
+
+ /*! Used by block's setters and work functions to make
+ * setting/resetting of parameters thread-safe.
+ *
+ * Used by calling gruel::scoped_lock l(d_setlock);
+ */
+ gruel::mutex d_setlock;
+
+ // ----------------------------------------------------------------------------
+ // Functions to handle thread affinity
+ std::vector<int> d_affinity; // thread affinity proc. mask
+
+ /*!
+ * \brief Set the thread's affinity to processor core \p n.
+ *
+ * \param mask a vector of unsigned ints of the core numbers available to this block.
+ */
+ void set_processor_affinity(const std::vector<int> &mask){d_affinity=mask;}
+
+ /*!
+ * \brief Remove processor affinity to a specific core.
+ */
+ void unset_processor_affinity(){}
+
+ /*!
+ * \brief Get the current processor affinity.
+ */
+ std::vector<int> processor_affinity() { return d_affinity; }
+
+ ///////////////// private vars //////////////////////
+
+ gr_vector_int _work_ninput_items;
+ gr_vector_int _fcast_ninput_items;
+ size_t _num_outputs;
+ ptrdiff_t _work_io_ptr_mask;
+ size_t _output_multiple_items;
+ double _relative_rate;
+ bool _enable_fixed_rate;
+ size_t _input_history_items;
+ tag_propagation_policy_t _tag_prop_policy;
+ size_t _interp, _decim;
+ gr_io_signature_sptr _in_sig, _out_sig;
+
+ ///////////////// the Block overloads //////////////////////
+
+ //! implements work -> calls general work
+ void work(const InputItems &, const OutputItems &);
+
+ //! notifications of new topological commits
+ void notify_topology(const size_t, const size_t);
+
+ //! start notification
+ void notify_active(void);
+
+ //! stop notification
+ void notify_inactive(void);
+
+ //! implements tag_propagation_policy()
+ virtual void propagate_tags(const size_t, const gras::TagIter &);
+
+ void _update_input_reserve(void);
+
+ gras::BufferQueueSptr input_buffer_allocator(const size_t, const gras::SBufferConfig &);
+ gras::BufferQueueSptr output_buffer_allocator(const size_t, const gras::SBufferConfig &);
+
+};
+
+typedef boost::shared_ptr<gr_block> gr_block_sptr;
+
+GRAS_FORCE_INLINE void gr_block::consume_each(const int how_many_items)
+{
+ if GRAS_UNLIKELY(how_many_items < 0) return;
+ gras::Block::consume(size_t(how_many_items));
+}
+
+GRAS_FORCE_INLINE void gr_block::consume(const size_t i, const int how_many_items)
+{
+ if GRAS_UNLIKELY(how_many_items < 0) return;
+ gras::Block::consume(i, size_t(how_many_items));
+}
+
+GRAS_FORCE_INLINE void gr_block::produce(const size_t o, const int how_many_items)
+{
+ if GRAS_UNLIKELY(how_many_items < 0) return;
+ gras::Block::produce(o, size_t(how_many_items));
+}
+
+GRAS_FORCE_INLINE uint64_t gr_block::nitems_read(const size_t which_input)
+{
+ return Block::get_consumed(which_input);
+}
+
+GRAS_FORCE_INLINE uint64_t gr_block::nitems_written(const size_t which_output)
+{
+ return Block::get_produced(which_output);
+}
+
+GRAS_FORCE_INLINE size_t gr_block::interpolation(void) const
+{
+ return _interp;
+}
+
+GRAS_FORCE_INLINE size_t gr_block::decimation(void) const
+{
+ return _decim;
+}
+
+GRAS_FORCE_INLINE bool gr_block::is_unaligned(void)
+{
+ //TODO
+ //probably dont need this since volk dispatcher checks alignment
+ //32 byte aligned is good enough for you
+ return (_work_io_ptr_mask & ptrdiff_t(GRAS_MAX_ALIGNMENT-1)) != 0;
+}
+
+#endif /*INCLUDED_GNURADIO_GR_BLOCK_H*/