summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------Apology0
m---------Theron0
m---------gnuradio0
-rw-r--r--include/gnuradio/block.hpp9
-rw-r--r--include/gnuradio/gr_block.h12
-rw-r--r--include/gnuradio/gr_top_block.h4
-rw-r--r--lib/block.cpp6
-rw-r--r--lib/block_handlers.cpp2
-rw-r--r--lib/block_task.cpp9
-rw-r--r--lib/gras_impl/block_actor.hpp6
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp5
-rw-r--r--lib/input_handlers.cpp8
12 files changed, 48 insertions, 13 deletions
diff --git a/Apology b/Apology
-Subproject 39f22a50578e321e7fd9c5face7a375da243c24
+Subproject 132718e729bbb6e9d18a6f776eca4a3187978f0
diff --git a/Theron b/Theron
-Subproject 87cbb28824ea255e3dedc93ababe5f7639de504
+Subproject 1d3b0b2dce36ae38981303b13aff705e20f0f07
diff --git a/gnuradio b/gnuradio
-Subproject bb3e2754a0e28396fa722725978986c2cc0e57c
+Subproject 517b057e732eee9504510f1171d44c98f1ceec9
diff --git a/include/gnuradio/block.hpp b/include/gnuradio/block.hpp
index a7744fd..d06c49c 100644
--- a/include/gnuradio/block.hpp
+++ b/include/gnuradio/block.hpp
@@ -249,6 +249,15 @@ struct GRAS_API Block : Element
//! scheduler calls when the topology is updated, can be overloaded
virtual bool check_topology(int ninputs, int noutputs);
+ /*!
+ * Set if the work call should be interruptible by stop().
+ * Some work implementations block with the expectation of
+ * getting a boost thread interrupt in a blocking call.
+ * Set set_interruptible_work(true) if this is the case.
+ * By default, work implementations are not interruptible.
+ */
+ void set_interruptible_work(const bool enb);
+
/*******************************************************************
* routines related to affinity and allocation
******************************************************************/
diff --git a/include/gnuradio/gr_block.h b/include/gnuradio/gr_block.h
index d46e252..317c207 100644
--- a/include/gnuradio/gr_block.h
+++ b/include/gnuradio/gr_block.h
@@ -118,6 +118,18 @@ struct GRAS_API gr_block : gnuradio::Block
void set_decimation(const size_t);
+ ///////////// TODO //////////////////////
+ int max_noutput_items(){return 0;}
+ void set_max_noutput_items(int){}
+ void unset_max_noutput_items(){}
+ bool is_set_max_noutput_items(){return false;}
+ void set_max_output_buffer(long){}
+ void set_max_output_buffer(int, long){}
+ long max_output_buffer(size_t){return 0;}
+ void set_min_output_buffer(long){}
+ void set_min_output_buffer(int, long){}
+ long min_output_buffer(size_t){return 0;}
+
};
typedef boost::shared_ptr<gr_block> gr_block_sptr;
diff --git a/include/gnuradio/gr_top_block.h b/include/gnuradio/gr_top_block.h
index 65680f2..eb1a9aa 100644
--- a/include/gnuradio/gr_top_block.h
+++ b/include/gnuradio/gr_top_block.h
@@ -37,6 +37,10 @@ struct GRAS_API gr_top_block : gnuradio::TopBlock
this->update();
}
+ ///////////// TODO //////////////////////
+ int max_noutput_items(){return 0;}
+ void set_max_noutput_items(int){}
+
};
typedef boost::shared_ptr<gr_top_block> gr_top_block_sptr;
diff --git a/lib/block.cpp b/lib/block.cpp
index e4a0d0a..15e5807 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -54,6 +54,7 @@ Block::Block(const std::string &name):
this->set_fixed_rate(true);
this->set_relative_rate(1.0);
this->set_tag_propagation_policy(TPP_ALL_TO_ALL);
+ this->set_interruptible_work(false);
}
template <typename V, typename T>
@@ -239,3 +240,8 @@ void Block::set_buffer_affinity(const Affinity &affinity)
{
(*this)->block->buffer_affinity = affinity;
}
+
+void Block::set_interruptible_work(const bool enb)
+{
+ (*this)->block->interruptible_work = enb;
+}
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 59feecf..2e71ac1 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -130,7 +130,7 @@ void BlockActor::handle_top_thread_group(
//spawn a new thread if this block is a source
this->thread_group = message;
this->interruptible_thread.reset(); //erase old one
- if (this->get_num_inputs() == 0) //its a source
+ if (this->interruptible_work)
{
this->interruptible_thread = boost::make_shared<InterruptibleThread>(
this->thread_group, boost::bind(&BlockActor::task_work, this)
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 960b41e..a64b3d7 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -244,14 +244,11 @@ void BlockActor::handle_task(void)
GRAS_FORCE_INLINE void BlockActor::conclusion(void)
{
-
+ //missing at least one upstream provider?
//since nothing else is coming in, its safe to mark done
- if (this->inputs_done.all()) //no upstream providers
+ if (this->any_inputs_done() or this->forecast_fail)
{
- if (not this->input_queues.all_ready() or this->forecast_fail)
- {
- this->mark_done();
- }
+ this->mark_done();
}
//still have IO ready? kick off another task
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 14bfacc..e3e8463 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -110,6 +110,11 @@ struct BlockActor : Apology::Worker
void sort_tags(const size_t index);
void trim_tags(const size_t index);
void conclusion(void);
+ GRAS_FORCE_INLINE bool any_inputs_done(void)
+ {
+ if (this->inputs_done.none()) return false;
+ return ((~this->input_queues.ready_bitset()) & this->inputs_done).any();
+ }
//per port properties
std::vector<size_t> input_items_sizes;
@@ -157,6 +162,7 @@ struct BlockActor : Apology::Worker
Block::tag_propagation_policy_t tag_prop_policy;
//interruptible thread stuff
+ bool interruptible_work;
SharedThreadGroup thread_group;
boost::shared_ptr<InterruptibleThread> interruptible_thread;
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 5b610d1..d9d864c 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -53,6 +53,11 @@ struct InputBufferQueues
void resize(const size_t size);
+ GRAS_FORCE_INLINE const BitSet &ready_bitset(void) const
+ {
+ return _bitset;
+ }
+
GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer)
{
ASSERT(not _queues[i].full());
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 8d778f1..574aee4 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -56,13 +56,9 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
//an upstream block declared itself done, recheck the token
this->inputs_done.set(index, this->input_tokens[index].unique());
-
- if (this->inputs_done.all()) //no upstream providers
+ if (this->any_inputs_done()) //missing an upstream provider
{
- if (not this->input_queues.all_ready())
- {
- this->mark_done();
- }
+ this->mark_done();
}
}