summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime
diff options
context:
space:
mode:
authorJohnathan Corgan2012-11-29 19:46:29 -0800
committerJohnathan Corgan2012-11-29 19:46:29 -0800
commit4478cb86a5dc9fccf66c2cdd5806419b70c3837e (patch)
tree8ba82695b249012761aff88c32c83731393a302c /gnuradio-core/src/lib/runtime
parent92cfb0240005675f4e7a55a81552f4c7a5128cd8 (diff)
downloadgnuradio-4478cb86a5dc9fccf66c2cdd5806419b70c3837e.tar.gz
gnuradio-4478cb86a5dc9fccf66c2cdd5806419b70c3837e.tar.bz2
gnuradio-4478cb86a5dc9fccf66c2cdd5806419b70c3837e.zip
Adding PDU to tagged stream and tagged stream to PDU blocks along with QA python
non-stream-connected blocks still need a new thread context
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.cc22
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h21
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.i3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc3
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_registry.cc18
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_registry.h10
-rw-r--r--gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc4
7 files changed, 48 insertions, 33 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 3d08b63d1..7d2f275e8 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -52,6 +52,7 @@ gr_basic_block::gr_basic_block(const std::string &name,
d_color(WHITE),
message_subscribers(pmt::pmt_make_dict())
{
+ mutex.unlock();
s_ncurrently_allocated++;
}
@@ -130,7 +131,7 @@ void
gr_basic_block::_post(pmt_t which_port, pmt_t msg)
{
insert_tail(which_port, msg);
- //notify_msg();
+ global_block_registry.notify_blk(alias());
}
void
@@ -151,8 +152,9 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port)
{
gruel::scoped_lock guard(mutex);
- if (empty_p(which_port))
- return pmt_t();
+ if (empty_p(which_port)){
+ return pmt::pmt_t();
+ }
pmt_t m(msg_queue[which_port].front());
msg_queue[which_port].pop_front();
@@ -160,18 +162,4 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port)
return m;
}
-/*
- * Caller must already be holding the mutex
- */
-pmt_t
-gr_basic_block::delete_head_nowait_already_holding_mutex(pmt::pmt_t which_port)
-{
- if (empty_p(which_port))
- return pmt_t();
-
- pmt_t m(msg_queue[which_port].front());
- msg_queue[which_port].pop_front();
-
- return m;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 31026a2e4..2ee8161c1 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -115,7 +115,6 @@ protected:
vcolor color() const { return d_color; }
// Message passing interface
- std::vector<pmt::pmt_t> message_inputs;
pmt::pmt_t message_subscribers;
public:
@@ -139,15 +138,18 @@ public:
void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
- /*!
- * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
- */
- void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
-
+ /*!
+ * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+ */
+ void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
//! is the queue empty?
//bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
- bool empty_p(pmt::pmt_t which_port) { return msg_queue[which_port].empty(); }
+ bool empty_p(pmt::pmt_t which_port) {
+ if(msg_queue.find(which_port) == msg_queue.end())
+ throw std::runtime_error("port does not exist!");
+ return msg_queue[which_port].empty();
+ }
bool empty_p() {
bool rv = true;
BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
@@ -160,11 +162,6 @@ public:
* \returns returns pmt at head of queue or pmt_t() if empty.
*/
pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
- /*!
- * \returns returns pmt at head of queue or pmt_t() if empty.
- * Caller must already be holding the mutex
- */
- pmt::pmt_t delete_head_nowait_already_holding_mutex( pmt::pmt_t which_port);
msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
return msg_queue[which_port].begin();
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.i b/gnuradio-core/src/lib/runtime/gr_basic_block.i
index 0a8473ba2..d6d6c3d16 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.i
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.i
@@ -23,6 +23,8 @@
class gr_basic_block;
typedef boost::shared_ptr<gr_basic_block> gr_basic_block_sptr;
%template(gr_basic_block_sptr) boost::shared_ptr<gr_basic_block>;
+%include "pmt_swig.i"
+using namespace pmt;
// support vectors of these...
namespace std {
@@ -45,6 +47,7 @@ public:
bool check_topology (int ninputs, int noutputs);
std::string alias();
void set_block_alias(std::string name);
+ void _post(pmt_t which_port, pmt_t msg);
};
%rename(block_ncurrently_allocated) gr_basic_block_ncurrently_allocated;
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index a88052ee0..dc77128a3 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -28,6 +28,7 @@
#include <gr_block_detail.h>
#include <stdexcept>
#include <iostream>
+#include <gr_block_registry.h>
gr_block::gr_block (const std::string &name,
gr_io_signature_sptr input_signature,
@@ -46,10 +47,12 @@ gr_block::gr_block (const std::string &name,
d_max_output_buffer(std::max(output_signature->max_streams(),1), -1),
d_min_output_buffer(std::max(output_signature->max_streams(),1), -1)
{
+ global_block_registry.register_primitive(alias(), this);
}
gr_block::~gr_block ()
{
+ global_block_registry.unregister_primitive(alias());
}
// stub implementation: 1:1
diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.cc b/gnuradio-core/src/lib/runtime/gr_block_registry.cc
index 2478e0019..ff23d97eb 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_registry.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_registry.cc
@@ -1,5 +1,9 @@
#include <gr_basic_block.h>
#include <gr_block_registry.h>
+#include <gr_tpb_detail.h>
+#include <gr_block_detail.h>
+#include <gr_block.h>
+#include <stdio.h>
gr_block_registry global_block_registry;
@@ -56,3 +60,17 @@ gr_basic_block_sptr gr_block_registry::block_lookup(pmt::pmt_t symbol){
}
+void gr_block_registry::register_primitive(std::string blk, gr_block* ref){
+ primitive_map[blk] = ref;
+}
+
+void gr_block_registry::unregister_primitive(std::string blk){
+ primitive_map.erase(primitive_map.find(blk));
+}
+
+void gr_block_registry::notify_blk(std::string blk){
+ if(primitive_map.find(blk) == primitive_map.end()){ return; }
+ if(primitive_map[blk]->detail().get())
+ primitive_map[blk]->detail()->d_tpb.notify_msg();
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.h b/gnuradio-core/src/lib/runtime/gr_block_registry.h
index 8f1982984..6a2d939e5 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_registry.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_registry.h
@@ -5,6 +5,7 @@
#ifndef GR_BASIC_BLOCK_H
class gr_basic_block;
+class gr_block;
#endif
class gr_block_registry {
@@ -18,7 +19,11 @@ class gr_block_registry {
void register_symbolic_name(gr_basic_block* block, std::string name);
gr_basic_block_sptr block_lookup(pmt::pmt_t symbol);
-
+
+ void register_primitive(std::string blk, gr_block* ref);
+ void unregister_primitive(std::string blk);
+ void notify_blk(std::string blk);
+
private:
//typedef std::map< long, gr_basic_block_sptr > blocksubmap_t;
@@ -27,7 +32,8 @@ class gr_block_registry {
blockmap_t d_map;
pmt::pmt_t d_ref_map;
-
+ std::map< std::string, gr_block*> primitive_map;
+
};
extern gr_block_registry global_block_registry;
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index 1bd3014ad..ff2afca10 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -80,7 +80,7 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
// handle all pending messages
BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
{
- while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){
+ while ((msg = block->delete_head_nowait(i.first))){
guard.unlock(); // release lock while processing msg
block->dispatch_msg(i.first, msg);
guard.lock();
@@ -103,7 +103,7 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
// handle all pending messages
BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
{
- while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){
+ while ((msg = block->delete_head_nowait(i.first))){
guard.unlock(); // release lock while processing msg
block->dispatch_msg(i.first,msg);
guard.lock();