summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_executor.cc7
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_executor.h3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler.cc2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler.h2
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc8
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_sts.h4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc17
-rw-r--r--gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block.cc16
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block.h13
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block.i5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block_impl.cc27
-rw-r--r--gnuradio-core/src/lib/runtime/gr_top_block_impl.h9
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc4
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h2
15 files changed, 89 insertions, 34 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
index 737b26f67..0403ce138 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_executor.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -155,8 +155,8 @@ propagate_tags(gr_block::tag_propagation_policy_t policy, gr_block_detail *d,
return true;
}
-gr_block_executor::gr_block_executor (gr_block_sptr block)
- : d_block(block), d_log(0)
+gr_block_executor::gr_block_executor (gr_block_sptr block, int max_noutput_items)
+ : d_block(block), d_log(0), d_max_noutput_items(max_noutput_items)
{
if (ENABLE_LOGGING){
std::string name = str(boost::format("sst-%03d.log") % which_scheduler++);
@@ -203,6 +203,7 @@ gr_block_executor::run_one_iteration()
// determine the minimum available output space
noutput_items = min_available_space (d, m->output_multiple ());
+ noutput_items = std::min(noutput_items, d_max_noutput_items);
LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
if (noutput_items == -1) // we're done
goto were_done;
@@ -247,6 +248,7 @@ gr_block_executor::run_one_iteration()
// take a swag at how much output we can sink
noutput_items = (int) (max_items_avail * m->relative_rate ());
noutput_items = round_down (noutput_items, m->output_multiple ());
+ noutput_items = std::min(noutput_items, d_max_noutput_items);
LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
@@ -308,6 +310,7 @@ gr_block_executor::run_one_iteration()
if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
noutput_items = reqd_noutput_items;
}
+ noutput_items = std::min(noutput_items, d_max_noutput_items);
// ask the block how much input they need to produce noutput_items
m->forecast (noutput_items, d_ninput_items_required);
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.h b/gnuradio-core/src/lib/runtime/gr_block_executor.h
index 15279f273..e022d8273 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_executor.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.h
@@ -51,9 +51,10 @@ protected:
gr_vector_void_star d_output_items;
std::vector<uint64_t> d_start_nitems_read; //stores where tag counts are before work
std::vector<gr_tag_t> d_returned_tags;
+ int d_max_noutput_items;
public:
- gr_block_executor(gr_block_sptr block);
+ gr_block_executor(gr_block_sptr block, int max_noutput_items=100000);
~gr_block_executor ();
enum state {
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_scheduler.cc
index e4d8b3dd9..3ae08a7a3 100644
--- a/gnuradio-core/src/lib/runtime/gr_scheduler.cc
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler.cc
@@ -24,7 +24,7 @@
#endif
#include <gr_scheduler.h>
-gr_scheduler::gr_scheduler(gr_flat_flowgraph_sptr ffg)
+gr_scheduler::gr_scheduler(gr_flat_flowgraph_sptr ffg, int max_noutput_items)
{
}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.h b/gnuradio-core/src/lib/runtime/gr_scheduler.h
index 4e97b5881..92af8d1cb 100644
--- a/gnuradio-core/src/lib/runtime/gr_scheduler.h
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler.h
@@ -47,7 +47,7 @@ public:
* The scheduler will continue running until all blocks until they
* report that they are done or the stop method is called.
*/
- gr_scheduler(gr_flat_flowgraph_sptr ffg);
+ gr_scheduler(gr_flat_flowgraph_sptr ffg, int max_noutput_items);
virtual ~gr_scheduler();
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
index fefc0dc70..3cc1d4d45 100644
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
@@ -43,13 +43,13 @@ public:
gr_scheduler_sptr
-gr_scheduler_sts::make(gr_flat_flowgraph_sptr ffg)
+gr_scheduler_sts::make(gr_flat_flowgraph_sptr ffg, int max_noutput_items)
{
- return gr_scheduler_sptr(new gr_scheduler_sts(ffg));
+ return gr_scheduler_sptr(new gr_scheduler_sts(ffg, max_noutput_items));
}
-gr_scheduler_sts::gr_scheduler_sts(gr_flat_flowgraph_sptr ffg)
- : gr_scheduler(ffg)
+gr_scheduler_sts::gr_scheduler_sts(gr_flat_flowgraph_sptr ffg, int max_noutput_items)
+ : gr_scheduler(ffg, max_noutput_items)
{
// Split the flattened flow graph into discrete partitions, each
// of which is topologically sorted.
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
index 9b73b68c1..08c68d88a 100644
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
@@ -39,10 +39,10 @@ protected:
* The scheduler will continue running until all blocks until they
* report that they are done or the stop method is called.
*/
- gr_scheduler_sts(gr_flat_flowgraph_sptr ffg);
+ gr_scheduler_sts(gr_flat_flowgraph_sptr ffg, int max_noutput_items);
public:
- static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg);
+ static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg, int max_noutput_items);
~gr_scheduler_sts();
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
index af0338570..0a7ff4556 100644
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
@@ -33,25 +33,27 @@
class tpb_container
{
gr_block_sptr d_block;
+ int d_max_noutput_items;
public:
- tpb_container(gr_block_sptr block) : d_block(block) {}
+ tpb_container(gr_block_sptr block, int max_noutput_items)
+ : d_block(block), d_max_noutput_items(max_noutput_items) {}
void operator()()
{
- gr_tpb_thread_body body(d_block);
+ gr_tpb_thread_body body(d_block, d_max_noutput_items);
}
};
gr_scheduler_sptr
-gr_scheduler_tpb::make(gr_flat_flowgraph_sptr ffg)
+gr_scheduler_tpb::make(gr_flat_flowgraph_sptr ffg, int max_noutput_items)
{
- return gr_scheduler_sptr(new gr_scheduler_tpb(ffg));
+ return gr_scheduler_sptr(new gr_scheduler_tpb(ffg, max_noutput_items));
}
-gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg)
- : gr_scheduler(ffg)
+gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg, int max_noutput_items)
+ : gr_scheduler(ffg, max_noutput_items)
{
// Get a topologically sorted vector of all the blocks in use.
// Being topologically sorted probably isn't going to matter, but
@@ -73,7 +75,8 @@ gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg)
std::stringstream name;
name << "thread-per-block[" << i << "]: " << blocks[i];
d_threads.create_thread(
- gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i]), name.str()));
+ gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i], max_noutput_items),
+ name.str()));
}
}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
index f97ab2e7f..ab74fa84d 100644
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
@@ -39,10 +39,10 @@ protected:
* The scheduler will continue running until all blocks until they
* report that they are done or the stop method is called.
*/
- gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg);
+ gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg, int max_noutput_items);
public:
- static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg);
+ static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg, int max_noutput_items=100000);
~gr_scheduler_tpb();
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block.cc b/gnuradio-core/src/lib/runtime/gr_top_block.cc
index f341525c0..55e4bb895 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_top_block.cc
@@ -54,9 +54,9 @@ gr_top_block::~gr_top_block()
}
void
-gr_top_block::start()
+gr_top_block::start(int max_noutput_items)
{
- d_impl->start();
+ d_impl->start(max_noutput_items);
}
void
@@ -96,6 +96,18 @@ gr_top_block::dump()
d_impl->dump();
}
+int
+gr_top_block::max_noutput_items()
+{
+ return d_impl->max_noutput_items();
+}
+
+void
+gr_top_block::set_max_noutput_items(int nmax)
+{
+ d_impl->set_max_noutput_items(nmax);
+}
+
gr_top_block_sptr
gr_top_block::to_top_block()
{
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block.h b/gnuradio-core/src/lib/runtime/gr_top_block.h
index fca68ae71..f01372fc2 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_top_block.h
@@ -61,8 +61,13 @@ public:
* execute the flow graph. Returns to the caller once the threads
* are created. Calling start() on a top_block that is already
* started IS an error.
+ *
+ * \param max_noutput_items the maximum number of output items
+ * allowed for any block in the flowgraph; the noutput_items can
+ * always be less than this, but this will cap it as a maximum. Use
+ * this to adjust the maximum latency a flowgraph can exhibit.
*/
- void start();
+ void start(int max_noutput_items=100000);
/*!
* Stop the running flowgraph. Notifies each thread created by the
@@ -107,6 +112,12 @@ public:
*/
void dump();
+ //! Get the number of max noutput_items in the flowgraph
+ int max_noutput_items();
+
+ //! Set the maximum number of noutput_items in the flowgraph
+ void set_max_noutput_items(int nmax);
+
gr_top_block_sptr to_top_block(); // Needed for Python/Guile type coercion
};
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block.i b/gnuradio-core/src/lib/runtime/gr_top_block.i
index 90fa18b94..70c627ffd 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_top_block.i
@@ -40,7 +40,7 @@ private:
public:
~gr_top_block();
- void start() throw (std::runtime_error);
+ void start(int max_noutput_items=100000) throw (std::runtime_error);
void stop();
//void wait();
//void run() throw (std::runtime_error);
@@ -48,6 +48,9 @@ public:
void unlock() throw (std::runtime_error);
void dump();
+ int max_noutput_items();
+ void set_max_noutput_items(int nmax);
+
gr_top_block_sptr to_top_block(); // Needed for Python/Guile type coercion
};
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
index 9cad687fb..0227d789c 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
+++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
@@ -39,7 +39,8 @@
#define GR_TOP_BLOCK_IMPL_DEBUG 0
-typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg);
+typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg,
+ int max_noutput_items);
static struct scheduler_table {
const char *name;
@@ -50,7 +51,7 @@ static struct scheduler_table {
};
static gr_scheduler_sptr
-make_scheduler(gr_flat_flowgraph_sptr ffg)
+make_scheduler(gr_flat_flowgraph_sptr ffg, int max_noutput_items)
{
static scheduler_maker factory = 0;
@@ -72,7 +73,7 @@ make_scheduler(gr_flat_flowgraph_sptr ffg)
}
}
}
- return factory(ffg);
+ return factory(ffg, max_noutput_items);
}
@@ -88,10 +89,12 @@ gr_top_block_impl::~gr_top_block_impl()
}
void
-gr_top_block_impl::start()
+gr_top_block_impl::start(int max_noutput_items)
{
gruel::scoped_lock l(d_mutex);
+ d_max_noutput_items = max_noutput_items;
+
if (d_state != IDLE)
throw std::runtime_error("top_block::start: top block already running or wait() not called after previous stop()");
@@ -105,7 +108,7 @@ gr_top_block_impl::start()
d_ffg->validate();
d_ffg->setup_connections();
- d_scheduler = make_scheduler(d_ffg);
+ d_scheduler = make_scheduler(d_ffg, d_max_noutput_items);
d_state = RUNNING;
}
@@ -168,7 +171,7 @@ gr_top_block_impl::restart()
d_ffg = new_ffg;
// Create a new scheduler to execute it
- d_scheduler = make_scheduler(d_ffg);
+ d_scheduler = make_scheduler(d_ffg, d_max_noutput_items);
d_state = RUNNING;
}
@@ -178,3 +181,15 @@ gr_top_block_impl::dump()
if (d_ffg)
d_ffg->dump();
}
+
+int
+gr_top_block_impl::max_noutput_items()
+{
+ return d_max_noutput_items;
+}
+
+void
+gr_top_block_impl::set_max_noutput_items(int nmax)
+{
+ d_max_noutput_items = nmax;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
index 904443be5..d804e3f30 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
+++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
@@ -42,7 +42,7 @@ public:
~gr_top_block_impl();
// Create and start scheduler threads
- void start();
+ void start(int max_noutput_items=100000);
// Signal scheduler threads to stop
void stop();
@@ -58,6 +58,12 @@ public:
// Dump the flowgraph to stdout
void dump();
+
+ // Get the number of max noutput_items in the flowgraph
+ int max_noutput_items();
+
+ // Set the maximum number of noutput_items in the flowgraph
+ void set_max_noutput_items(int nmax);
protected:
@@ -70,6 +76,7 @@ protected:
gruel::mutex d_mutex; // protects d_state and d_lock_count
tb_state d_state;
int d_lock_count;
+ int d_max_noutput_items;
private:
void restart();
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index faa888697..d44c09aa6 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -28,8 +28,8 @@
using namespace pmt;
-gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
- : d_exec(block)
+gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_items)
+ : d_exec(block, max_noutput_items)
{
// std::cerr << "gr_tpb_thread_body: " << block << std::endl;
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
index 548cfedfb..3170b402e 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
@@ -38,7 +38,7 @@ class GR_CORE_API gr_tpb_thread_body {
gr_block_executor d_exec;
public:
- gr_tpb_thread_body(gr_block_sptr block);
+ gr_tpb_thread_body(gr_block_sptr block, int max_noutput_items=100000);
~gr_tpb_thread_body();
};