summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.cc7
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_detail.h6
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.cc17
-rw-r--r--gnuradio-core/src/lib/runtime/gr_msg_accepter.h10
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.cc45
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_detail.h30
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc51
-rw-r--r--gnuradio-examples/python/pfb/.gitignore2
-rw-r--r--gruel/src/include/gruel/Makefile.am1
-rw-r--r--gruel/src/include/gruel/msg_accepter.h13
-rw-r--r--gruel/src/include/gruel/send.h49
12 files changed, 187 insertions, 47 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 8efa8267a..2fa1066cb 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -41,8 +41,7 @@ gr_basic_block_ncurrently_allocated()
gr_basic_block::gr_basic_block(const std::string &name,
gr_io_signature_sptr input_signature,
gr_io_signature_sptr output_signature)
- : gr_msg_accepter(gruel::make_msg_queue(0)), // Non-blocking insert
- d_name(name),
+ : d_name(name),
d_input_signature(input_signature),
d_output_signature(output_signature),
d_unique_id(s_next_id++),
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index ae1ea2562..d33dfed84 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -106,3 +106,10 @@ gr_block_detail::produce_each (int how_many_items)
for (int i = 0; i < noutputs (); i++)
d_output[i]->update_write_pointer (how_many_items);
}
+
+
+void
+gr_block_detail::_post(pmt::pmt_t msg)
+{
+ d_tpb.insert_tail(msg);
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index 2856c402c..9d6358602 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -79,6 +79,12 @@ class gr_block_detail {
void produce_each (int how_many_items);
+ /*!
+ * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+ */
+ void _post(pmt::pmt_t msg);
+
+
gr_tpb_detail d_tpb; // used by thread-per-block scheduler
// ----------------------------------------------------------------------------
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
index 50b41df88..89876ae29 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
@@ -26,11 +26,12 @@
#include <gr_msg_accepter.h>
#include <gr_block.h>
#include <gr_block_detail.h>
+#include <gr_hier_block2.h>
+#include <stdexcept>
using namespace pmt;
-gr_msg_accepter::gr_msg_accepter(gruel::msg_queue_sptr msgq)
- : gruel::msg_accepter_msgq(msgq)
+gr_msg_accepter::gr_msg_accepter()
{
}
@@ -42,15 +43,17 @@ gr_msg_accepter::~gr_msg_accepter()
void
gr_msg_accepter::post(pmt_t msg)
{
- // Let parent class do whatever it would have
- gruel::msg_accepter_msgq::post(msg);
-
// Notify derived class, handled case by case
gr_block *p = dynamic_cast<gr_block *>(this);
if (p) {
- p->detail()->d_tpb.notify_msg();
+ p->detail()->_post(msg);
+ return;
+ }
+ gr_hier_block2 *p2 = dynamic_cast<gr_hier_block2 *>(this);
+ if (p2){
+ // FIXME do the right thing
return;
}
- // Test for other derived classes and handle
+ throw std::runtime_error("unknown derived class");
}
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
index 2073e7ff1..79a631f3a 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
@@ -22,19 +22,21 @@
#ifndef INCLUDED_GR_MSG_ACCEPTER_H
#define INCLUDED_GR_MSG_ACCEPTER_H
-#include <gruel/msg_accepter_msgq.h>
+#include <gruel/msg_accepter.h>
+#include <gruel/pmt.h>
/*!
* \brief Accepts messages and inserts them into a message queue, then notifies
* subclass gr_basic_block there is a message pending.
*/
-class gr_msg_accepter : public gruel::msg_accepter_msgq
+class gr_msg_accepter : public gruel::msg_accepter
{
public:
- gr_msg_accepter(gruel::msg_queue_sptr msgq);
+ gr_msg_accepter();
~gr_msg_accepter();
-
+
void post(pmt::pmt_t msg);
+
};
#endif /* INCLUDED_GR_MSG_ACCEPTER_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
index 02e8deed8..c6311ccaa 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2008 Free Software Foundation, Inc.
+ * Copyright 2008,2009 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -27,6 +27,8 @@
#include <gr_block_detail.h>
#include <gr_buffer.h>
+using namespace pmt;
+
/*
* We assume that no worker threads are ever running when the
* graph structure is being manipulated, thus it's safe for us to poke
@@ -65,3 +67,44 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d)
notify_downstream(d);
notify_upstream(d);
}
+
+void
+gr_tpb_detail::insert_tail(pmt::pmt_t msg)
+{
+ gruel::scoped_lock guard(mutex);
+
+ msg_queue.push_back(msg);
+
+ // wake up thread if BLKD_IN or BLKD_OUT
+ input_cond.notify_one();
+ output_cond.notify_one();
+}
+
+pmt_t
+gr_tpb_detail::delete_head_nowait()
+{
+ gruel::scoped_lock guard(mutex);
+
+ if (empty_p())
+ return pmt_t();
+
+ pmt_t m(msg_queue.front());
+ msg_queue.pop_front();
+
+ return m;
+}
+
+/*
+ * Caller must already be holding the mutex
+ */
+pmt_t
+gr_tpb_detail::delete_head_nowait_already_holding_mutex()
+{
+ if (empty_p())
+ return pmt_t();
+
+ pmt_t m(msg_queue.front());
+ msg_queue.pop_front();
+
+ return m;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
index a1df55806..acfa264c7 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -22,6 +22,8 @@
#define INCLUDED_GR_TPB_DETAIL_H
#include <gruel/thread.h>
+#include <deque>
+#include <gruel/pmt.h>
class gr_block_detail;
@@ -36,6 +38,10 @@ struct gr_tpb_detail {
bool output_changed;
gruel::condition_variable output_cond;
+private:
+ std::deque<pmt::pmt_t> msg_queue;
+
+public:
gr_tpb_detail()
: input_changed(false), output_changed(false) { }
@@ -55,16 +61,23 @@ struct gr_tpb_detail {
input_changed = false;
output_changed = false;
}
+
+ //! is the queue empty?
+ bool empty_p() const { return msg_queue.empty(); }
- //! Called to notify us that a message is pending in the queue
- void notify_msg()
- {
- gruel::scoped_lock guard(mutex);
+ //| Acquires and release the mutex
+ void insert_tail(pmt::pmt_t msg);
- // Just wake up thread if BLKD_IN or BLKD_OUT
- input_cond.notify_one();
- output_cond.notify_one();
- }
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ */
+ pmt::pmt_t delete_head_nowait();
+
+ /*!
+ * \returns returns pmt at head of queue or pmt_t() if empty.
+ * Caller must already be holding the mutex
+ */
+ pmt::pmt_t delete_head_nowait_already_holding_mutex();
private:
@@ -83,6 +96,7 @@ private:
output_changed = true;
output_cond.notify_one();
}
+
};
#endif /* INCLUDED_GR_TPB_DETAIL_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index c601b588c..03eef17d9 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -35,12 +35,15 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
gr_block_detail *d = block->detail().get();
gr_block_executor::state s;
+ pmt_t msg;
+
while (1){
boost::this_thread::interruption_point();
- while (!block->msg_queue()->empty_p())
- block->handle_msg(block->msg_queue()->delete_head_nowait());
+ // handle any queued up messages
+ while ((msg = d->d_tpb.delete_head_nowait()))
+ block->handle_msg(msg);
d->d_tpb.clear_changed();
s = d_exec.run_one_iteration();
@@ -59,37 +62,41 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
return;
case gr_block_executor::BLKD_IN: // Wait for input.
- while (!d->d_tpb.input_changed)
{
- boost::this_thread::interruption_point();
gruel::scoped_lock guard(d->d_tpb.mutex);
-
- // Block then wake on input_changed or msg arrived
- while(!d->d_tpb.input_changed && !block->msg_queue()->empty_p())
- d->d_tpb.input_cond.wait(guard);
+ while (!d->d_tpb.input_changed){
+
+ // wait for input or message
+ while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
+ d->d_tpb.input_cond.wait(guard);
- // Run msgq while unlocked
- guard.unlock();
- while (!block->msg_queue()->empty_p())
- block->handle_msg(block->msg_queue()->delete_head_nowait());
+ // handle all pending messages
+ while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+ guard.unlock(); // release lock while processing msg
+ block->handle_msg(msg);
+ guard.lock();
+ }
+ }
}
break;
case gr_block_executor::BLKD_OUT: // Wait for output buffer space.
- while (!d->d_tpb.output_changed)
{
- boost::this_thread::interruption_point();
gruel::scoped_lock guard(d->d_tpb.mutex);
+ while (!d->d_tpb.output_changed){
+
+ // wait for output room or message
+ while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
+ d->d_tpb.output_cond.wait(guard);
- // Block then wake on output_changed or msg arrived
- while(!d->d_tpb.output_changed && !block->msg_queue()->empty_p())
- d->d_tpb.output_cond.wait(guard);
-
- // Run msgq while unlocked
- guard.unlock();
- while (!block->msg_queue()->empty_p())
- block->handle_msg(block->msg_queue()->delete_head_nowait());
+ // handle all pending messages
+ while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+ guard.unlock(); // release lock while processing msg
+ block->handle_msg(msg);
+ guard.lock();
+ }
+ }
}
break;
diff --git a/gnuradio-examples/python/pfb/.gitignore b/gnuradio-examples/python/pfb/.gitignore
new file mode 100644
index 000000000..282522db0
--- /dev/null
+++ b/gnuradio-examples/python/pfb/.gitignore
@@ -0,0 +1,2 @@
+Makefile
+Makefile.in
diff --git a/gruel/src/include/gruel/Makefile.am b/gruel/src/include/gruel/Makefile.am
index c38c7fa38..9f50cb619 100644
--- a/gruel/src/include/gruel/Makefile.am
+++ b/gruel/src/include/gruel/Makefile.am
@@ -35,6 +35,7 @@ gruelinclude_HEADERS = \
pmt_pool.h \
pmt_serial_tags.h \
realtime.h \
+ send.h \
sys_pri.h \
thread_body_wrapper.h \
thread_group.h \
diff --git a/gruel/src/include/gruel/msg_accepter.h b/gruel/src/include/gruel/msg_accepter.h
index bc287afae..3afd6dde0 100644
--- a/gruel/src/include/gruel/msg_accepter.h
+++ b/gruel/src/include/gruel/msg_accepter.h
@@ -18,8 +18,8 @@
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifndef INCLUDED_MSG_ACCEPTER_H
-#define INCLUDED_MSG_ACCEPTER_H
+#ifndef INCLUDED_GRUEL_MSG_ACCEPTER_H
+#define INCLUDED_GRUEL_MSG_ACCEPTER_H
#include <gruel/pmt.h>
@@ -34,9 +34,16 @@ namespace gruel {
msg_accepter() {};
virtual ~msg_accepter();
+ /*!
+ * \brief send \p msg to \p msg_accepter
+ *
+ * Sending a message is an asynchronous operation. The \p post
+ * call will not wait for the message either to arrive at the
+ * destination or to be received.
+ */
virtual void post(pmt::pmt_t msg) = 0;
};
} /* namespace gruel */
-#endif /* INCLUDED_MSG_ACCEPTER_H */
+#endif /* INCLUDED_GRUEL_MSG_ACCEPTER_H */
diff --git a/gruel/src/include/gruel/send.h b/gruel/src/include/gruel/send.h
new file mode 100644
index 000000000..292017d45
--- /dev/null
+++ b/gruel/src/include/gruel/send.h
@@ -0,0 +1,49 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2009 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GRUEL_SEND_H
+#define INCLUDED_GRUEL_SEND_H
+
+#include <gruel/msg_accepter.h>
+
+namespace gruel {
+
+
+ /*!
+ * \brief send \p msg to \p msg_accepter
+ *
+ * Sending a message is an asynchronous operation. The \p send
+ * call will not wait for the message either to arrive at the
+ * destination or to be received.
+ *
+ * \returns msg
+ */
+ static inline pmt::pmt_t
+ send(msg_accepter &acc, pmt::pmt_t msg)
+ {
+ return acc.post(msg);
+ }
+
+
+
+} /* namespace gruel */
+
+
+#endif /* INCLUDED_SEND_H */