diff options
m--------- | Apology | 0 | ||||
m--------- | Theron | 0 | ||||
m--------- | gnuradio | 0 | ||||
-rw-r--r-- | include/gnuradio/block.hpp | 9 | ||||
-rw-r--r-- | include/gnuradio/gr_block.h | 12 | ||||
-rw-r--r-- | include/gnuradio/gr_top_block.h | 4 | ||||
-rw-r--r-- | lib/block.cpp | 6 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 2 | ||||
-rw-r--r-- | lib/block_task.cpp | 9 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 6 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 5 | ||||
-rw-r--r-- | lib/input_handlers.cpp | 8 |
12 files changed, 48 insertions, 13 deletions
diff --git a/Apology b/Apology -Subproject 39f22a50578e321e7fd9c5face7a375da243c24 +Subproject 132718e729bbb6e9d18a6f776eca4a3187978f0 diff --git a/Theron b/Theron -Subproject 87cbb28824ea255e3dedc93ababe5f7639de504 +Subproject 1d3b0b2dce36ae38981303b13aff705e20f0f07 diff --git a/gnuradio b/gnuradio -Subproject bb3e2754a0e28396fa722725978986c2cc0e57c +Subproject 517b057e732eee9504510f1171d44c98f1ceec9 diff --git a/include/gnuradio/block.hpp b/include/gnuradio/block.hpp index a7744fd..d06c49c 100644 --- a/include/gnuradio/block.hpp +++ b/include/gnuradio/block.hpp @@ -249,6 +249,15 @@ struct GRAS_API Block : Element //! scheduler calls when the topology is updated, can be overloaded virtual bool check_topology(int ninputs, int noutputs); + /*! + * Set if the work call should be interruptible by stop(). + * Some work implementations block with the expectation of + * getting a boost thread interrupt in a blocking call. + * Set set_interruptible_work(true) if this is the case. + * By default, work implementations are not interruptible. + */ + void set_interruptible_work(const bool enb); + /******************************************************************* * routines related to affinity and allocation ******************************************************************/ diff --git a/include/gnuradio/gr_block.h b/include/gnuradio/gr_block.h index d46e252..317c207 100644 --- a/include/gnuradio/gr_block.h +++ b/include/gnuradio/gr_block.h @@ -118,6 +118,18 @@ struct GRAS_API gr_block : gnuradio::Block void set_decimation(const size_t); + ///////////// TODO ////////////////////// + int max_noutput_items(){return 0;} + void set_max_noutput_items(int){} + void unset_max_noutput_items(){} + bool is_set_max_noutput_items(){return false;} + void set_max_output_buffer(long){} + void set_max_output_buffer(int, long){} + long max_output_buffer(size_t){return 0;} + void set_min_output_buffer(long){} + void set_min_output_buffer(int, long){} + long min_output_buffer(size_t){return 0;} + }; typedef boost::shared_ptr<gr_block> gr_block_sptr; diff --git a/include/gnuradio/gr_top_block.h b/include/gnuradio/gr_top_block.h index 65680f2..eb1a9aa 100644 --- a/include/gnuradio/gr_top_block.h +++ b/include/gnuradio/gr_top_block.h @@ -37,6 +37,10 @@ struct GRAS_API gr_top_block : gnuradio::TopBlock this->update(); } + ///////////// TODO ////////////////////// + int max_noutput_items(){return 0;} + void set_max_noutput_items(int){} + }; typedef boost::shared_ptr<gr_top_block> gr_top_block_sptr; diff --git a/lib/block.cpp b/lib/block.cpp index e4a0d0a..15e5807 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -54,6 +54,7 @@ Block::Block(const std::string &name): this->set_fixed_rate(true); this->set_relative_rate(1.0); this->set_tag_propagation_policy(TPP_ALL_TO_ALL); + this->set_interruptible_work(false); } template <typename V, typename T> @@ -239,3 +240,8 @@ void Block::set_buffer_affinity(const Affinity &affinity) { (*this)->block->buffer_affinity = affinity; } + +void Block::set_interruptible_work(const bool enb) +{ + (*this)->block->interruptible_work = enb; +} diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 59feecf..2e71ac1 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -130,7 +130,7 @@ void BlockActor::handle_top_thread_group( //spawn a new thread if this block is a source this->thread_group = message; this->interruptible_thread.reset(); //erase old one - if (this->get_num_inputs() == 0) //its a source + if (this->interruptible_work) { this->interruptible_thread = boost::make_shared<InterruptibleThread>( this->thread_group, boost::bind(&BlockActor::task_work, this) diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 960b41e..a64b3d7 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -244,14 +244,11 @@ void BlockActor::handle_task(void) GRAS_FORCE_INLINE void BlockActor::conclusion(void) { - + //missing at least one upstream provider? //since nothing else is coming in, its safe to mark done - if (this->inputs_done.all()) //no upstream providers + if (this->any_inputs_done() or this->forecast_fail) { - if (not this->input_queues.all_ready() or this->forecast_fail) - { - this->mark_done(); - } + this->mark_done(); } //still have IO ready? kick off another task diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 14bfacc..e3e8463 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -110,6 +110,11 @@ struct BlockActor : Apology::Worker void sort_tags(const size_t index); void trim_tags(const size_t index); void conclusion(void); + GRAS_FORCE_INLINE bool any_inputs_done(void) + { + if (this->inputs_done.none()) return false; + return ((~this->input_queues.ready_bitset()) & this->inputs_done).any(); + } //per port properties std::vector<size_t> input_items_sizes; @@ -157,6 +162,7 @@ struct BlockActor : Apology::Worker Block::tag_propagation_policy_t tag_prop_policy; //interruptible thread stuff + bool interruptible_work; SharedThreadGroup thread_group; boost::shared_ptr<InterruptibleThread> interruptible_thread; diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 5b610d1..d9d864c 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -53,6 +53,11 @@ struct InputBufferQueues void resize(const size_t size); + GRAS_FORCE_INLINE const BitSet &ready_bitset(void) const + { + return _bitset; + } + GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer) { ASSERT(not _queues[i].full()); diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp index 8d778f1..574aee4 100644 --- a/lib/input_handlers.cpp +++ b/lib/input_handlers.cpp @@ -56,13 +56,9 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther //an upstream block declared itself done, recheck the token this->inputs_done.set(index, this->input_tokens[index].unique()); - - if (this->inputs_done.all()) //no upstream providers + if (this->any_inputs_done()) //missing an upstream provider { - if (not this->input_queues.all_ready()) - { - this->mark_done(); - } + this->mark_done(); } } |