summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-11 19:24:52 -0700
committerJosh Blum2012-09-11 19:24:52 -0700
commit3e820aba9c4ef292508c00b686d54eaca040c200 (patch)
tree1ab8bee0b59d22a1f267f9c7e6bba69bf0d7bd65 /lib
parent02f896e5a66575df4f16593584a7bc6892adf81c (diff)
downloadsandhi-3e820aba9c4ef292508c00b686d54eaca040c200.tar.gz
sandhi-3e820aba9c4ef292508c00b686d54eaca040c200.tar.bz2
sandhi-3e820aba9c4ef292508c00b686d54eaca040c200.zip
interruptible_thread working on tests including udp source
Diffstat (limited to 'lib')
-rw-r--r--lib/block_handlers.cpp10
-rw-r--r--lib/block_task.cpp6
-rw-r--r--lib/gras_impl/interruptible_thread.hpp59
-rw-r--r--lib/top_block.cpp9
4 files changed, 54 insertions, 30 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 14cc2c8..3e861be 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -16,6 +16,7 @@
#include "element_impl.hpp"
#include <gras_impl/vector_utils.hpp>
+#include <boost/make_shared.hpp>
using namespace gnuradio;
@@ -59,10 +60,17 @@ void ElementImpl::handle_block_msg(
}
//store the topology's thread group
+ //erase any potentially old lingering threads
+ //spawn a new thread if this block is a source
if (msg.type() == typeid(SharedThreadGroup))
{
this->thread_group = msg.cast<SharedThreadGroup>();
- this->interruptible_thread = boost::shared_ptr<InterruptibleThread>(new InterruptibleThread(this->thread_group));
+ this->interruptible_thread.reset(); //erase old one
+ if (task_iface.get_num_inputs() == 0) //its a source
+ {
+ this->interruptible_thread =
+ boost::make_shared<InterruptibleThread>(this->thread_group);
+ }
return;
}
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 33ffbc1..3556bac 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -86,7 +86,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
- const bool is_source = (num_inputs == 0);
+ //const bool is_source = (num_inputs == 0);
//const bool is_sink = (num_outputs == 0);
this->work_io_ptr_mask = 0; //reset
@@ -182,8 +182,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
if (this->enable_fixed_rate) work_noutput_items = std::min(
work_noutput_items, myulround((num_input_items)*this->relative_rate));
this->work_task_iface = task_iface;
- int ret = 0;
- if (is_source)
+ int ret = -1;
+ if (this->interruptible_thread)
{
this->interruptible_thread->block = block_ptr;
this->interruptible_thread->ret = &ret;
diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp
index 10b1286..a480e78 100644
--- a/lib/gras_impl/interruptible_thread.hpp
+++ b/lib/gras_impl/interruptible_thread.hpp
@@ -24,6 +24,12 @@
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
+//--------------------------- ATTENTION !!! --------------------------//
+//-- The author does not intend to have 2 threading platforms.
+//-- This file and its invocations should be removed when source blocks
+//-- in the tree can yield the thread context without producing.
+//--------------------------------------------------------------------//
+
/*!
* This is the only place you will find any threading stuff.
* The entire point here is that the source's in gnuradio
@@ -47,7 +53,7 @@ namespace gnuradio
{
_done = false;
_wait_msg = true;
- _wait_ack = false;
+ _wait_ack = true;
_mutex.lock();
_thread = _thread_group->create_thread(boost::bind(&InterruptibleThread::run, this));
_mutex.lock();
@@ -67,13 +73,9 @@ namespace gnuradio
inline void call(void)
{
boost::mutex::scoped_lock lock(_mutex);
- if (_done)
- {
- *ret = -1;
- return;
- }
+ if (_done) return;
_wait_msg = false;
- _cond.notify_one();
+ _notify(lock);
while (_wait_ack) _cond.wait(lock);
_wait_ack = true;
}
@@ -82,31 +84,40 @@ namespace gnuradio
{
_mutex.unlock(); //spawn barrier unlock
boost::mutex::scoped_lock lock(_mutex);
- while (not _done and not boost::this_thread::interruption_requested())
+ try
{
- try
+ while (not boost::this_thread::interruption_requested())
{
while (_wait_msg) _cond.wait(lock);
_wait_msg = true;
*ret = block->Work(*input_items, *output_items);
+ _wait_ack = false;
+ _notify(lock);
}
- catch(const std::exception &ex)
- {
- std::cerr << "InterruptibleThread threw " << ex.what() << std::endl;
- _done = true;
- }
- catch(const boost::thread_interrupted &ex)
- {
- _done = true;
- }
- catch(...)
- {
- _done = true;
- }
- _wait_ack = false;
- _cond.notify_one();
+ }
+ catch(const std::exception &ex)
+ {
+ std::cerr << "InterruptibleThread threw " << ex.what() << std::endl;
+ }
+ catch(const boost::thread_interrupted &)
+ {
+ //normal exit is thread_interrupted
+ }
+ catch(...)
+ {
+ std::cerr << "InterruptibleThread threw unknown exception" << std::endl;
}
_done = true;
+ _wait_ack = false;
+ _notify(lock);
+ }
+
+ template <typename Lock>
+ void _notify(Lock &lock)
+ {
+ lock.unlock();
+ _cond.notify_one();
+ lock.lock();
}
//shared work variables
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 440b7fa..306e152 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -91,7 +91,10 @@ void TopBlock::start(void)
void TopBlock::stop(void)
{
+ //interrupt these "special" threads
(*this)->thread_group->interrupt_all();
+
+ //message all blocks to mark done
TopBlockMessage event;
event.what = TopBlockMessage::INERT;
(*this)->executor.post_msg(event);
@@ -105,11 +108,13 @@ void TopBlock::run(void)
void TopBlock::wait(void)
{
+ //We do not need to join "special" threads;
+ //the token mechainism will do just fine.
//(*this)->thread_group->join_all();
+
+ //wait for all blocks to release the token
while (not (*this)->token.unique())
{
boost::this_thread::yield();
- //sleep(1);
- //VAR((*this)->token.use_count());
}
}