summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-09-11 01:27:19 -0700
committerJosh Blum2012-09-11 01:27:19 -0700
commit02f896e5a66575df4f16593584a7bc6892adf81c (patch)
tree80747cddeddfebfe4fa6f5db6af2625866b4ce63 /lib
parent053d2f5cdbb4027d65aa7cb8af851a9edeac1d0a (diff)
downloadsandhi-02f896e5a66575df4f16593584a7bc6892adf81c.tar.gz
sandhi-02f896e5a66575df4f16593584a7bc6892adf81c.tar.bz2
sandhi-02f896e5a66575df4f16593584a7bc6892adf81c.zip
work on interruptible_thread
Diffstat (limited to 'lib')
-rw-r--r--lib/block_handlers.cpp8
-rw-r--r--lib/block_task.cpp18
-rw-r--r--lib/element_impl.hpp5
-rw-r--r--lib/gras_impl/interruptible_thread.hpp130
-rw-r--r--lib/top_block.cpp6
5 files changed, 165 insertions, 2 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 85520bd..14cc2c8 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -58,6 +58,14 @@ void ElementImpl::handle_block_msg(
return;
}
+ //store the topology's thread group
+ if (msg.type() == typeid(SharedThreadGroup))
+ {
+ this->thread_group = msg.cast<SharedThreadGroup>();
+ this->interruptible_thread = boost::shared_ptr<InterruptibleThread>(new InterruptibleThread(this->thread_group));
+ return;
+ }
+
ASSERT(msg.type() == typeid(TopBlockMessage));
const size_t num_inputs = task_iface.get_num_inputs();
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index 3843b28..33ffbc1 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -36,6 +36,8 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
this->output_queues.pop(i);
}
+ this->interruptible_thread.reset();
+
//mark down the new state
this->block_state = BLOCK_STATE_DONE;
@@ -84,7 +86,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
- //const bool is_source = (num_inputs == 0);
+ const bool is_source = (num_inputs == 0);
//const bool is_sink = (num_outputs == 0);
this->work_io_ptr_mask = 0; //reset
@@ -180,7 +182,19 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
if (this->enable_fixed_rate) work_noutput_items = std::min(
work_noutput_items, myulround((num_input_items)*this->relative_rate));
this->work_task_iface = task_iface;
- const int ret = block_ptr->Work(this->input_items, this->output_items);
+ int ret = 0;
+ if (is_source)
+ {
+ this->interruptible_thread->block = block_ptr;
+ this->interruptible_thread->ret = &ret;
+ this->interruptible_thread->input_items = &this->input_items;
+ this->interruptible_thread->output_items = &this->output_items;
+ this->interruptible_thread->call();
+ }
+ else
+ {
+ ret = block_ptr->Work(this->input_items, this->output_items);
+ }
this->work_task_iface.reset();
const size_t noutput_items = size_t(ret);
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index 2119935..b4f2c1b 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -21,6 +21,7 @@
#include <gras_impl/misc.hpp>
#include <gras_impl/vector_of_queues.hpp>
#include <gras_impl/input_buffer_queues.hpp>
+#include <gras_impl/interruptible_thread.hpp>
#include <tsbe/block.hpp>
#include <tsbe/topology.hpp>
@@ -105,6 +106,10 @@ struct ElementImpl
Block *block_ptr;
tsbe::TaskInterface work_task_iface; //only valid during work
+ //interruptible thread stuff
+ SharedThreadGroup thread_group;
+ boost::shared_ptr<InterruptibleThread> interruptible_thread;
+
//handlers
void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &);
diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp
new file mode 100644
index 0000000..10b1286
--- /dev/null
+++ b/lib/gras_impl/interruptible_thread.hpp
@@ -0,0 +1,130 @@
+//
+// Copyright 2012 Josh Blum
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with io_sig program. If not, see <http://www.gnu.org/licenses/>.
+
+#ifndef INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP
+#define INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP
+
+#include <gnuradio/block.hpp>
+#include <gras_impl/debug.hpp>
+#include <boost/bind.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+
+/*!
+ * This is the only place you will find any threading stuff.
+ * The entire point here is that the source's in gnuradio
+ * are sometimed bad and block forever (the author is guilty too).
+ * This thread pool creates an interruptible thread to perform work.
+ * Everything is nice and synchronous with the block actor.
+ * The block actor will even block on the work function...
+ * However, this will be interrupted and not block forever,
+ * when the executor is told to stop/interrupt and wait/join.
+ */
+namespace gnuradio
+{
+
+ typedef boost::shared_ptr<boost::thread_group> SharedThreadGroup;
+
+ struct InterruptibleThread
+ {
+
+ InterruptibleThread(SharedThreadGroup thread_group):
+ _thread_group(thread_group)
+ {
+ _done = false;
+ _wait_msg = true;
+ _wait_ack = false;
+ _mutex.lock();
+ _thread = _thread_group->create_thread(boost::bind(&InterruptibleThread::run, this));
+ _mutex.lock();
+ _mutex.unlock();
+ }
+
+ ~InterruptibleThread(void)
+ {
+ {
+ boost::mutex::scoped_lock lock(_mutex);
+ _done = true;
+ }
+ _thread->interrupt();
+ _thread->join();
+ }
+
+ inline void call(void)
+ {
+ boost::mutex::scoped_lock lock(_mutex);
+ if (_done)
+ {
+ *ret = -1;
+ return;
+ }
+ _wait_msg = false;
+ _cond.notify_one();
+ while (_wait_ack) _cond.wait(lock);
+ _wait_ack = true;
+ }
+
+ void run(void)
+ {
+ _mutex.unlock(); //spawn barrier unlock
+ boost::mutex::scoped_lock lock(_mutex);
+ while (not _done and not boost::this_thread::interruption_requested())
+ {
+ try
+ {
+ while (_wait_msg) _cond.wait(lock);
+ _wait_msg = true;
+ *ret = block->Work(*input_items, *output_items);
+ }
+ catch(const std::exception &ex)
+ {
+ std::cerr << "InterruptibleThread threw " << ex.what() << std::endl;
+ _done = true;
+ }
+ catch(const boost::thread_interrupted &ex)
+ {
+ _done = true;
+ }
+ catch(...)
+ {
+ _done = true;
+ }
+ _wait_ack = false;
+ _cond.notify_one();
+ }
+ _done = true;
+ }
+
+ //shared work variables
+ Block *block;
+ int *ret;
+ Block::InputItems *input_items;
+ Block::OutputItems *output_items;
+
+ //thread locking mechanisms
+ bool _done;
+ bool _wait_msg;
+ bool _wait_ack;
+ boost::mutex _mutex;
+ boost::condition_variable _cond;
+ SharedThreadGroup _thread_group;
+ boost::thread *_thread;
+ };
+
+} //namespace gnuradio
+
+#endif /*INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP*/
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index c681859..440b7fa 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -32,6 +32,7 @@ TopBlock::TopBlock(const std::string &name):
config.topology = (*this)->topology;
(*this)->executor = tsbe::Executor(config);
(*this)->token = Token::make();
+ (*this)->thread_group = SharedThreadGroup(new boost::thread_group());
if (GENESIS) std::cout
<< "===================================================\n"
<< "== Top Block Created: " << name << "\n"
@@ -68,6 +69,9 @@ void TopBlock::start(void)
{
(*this)->executor.commit();
{
+ (*this)->executor.post_msg((*this)->thread_group);
+ }
+ {
TopBlockMessage event;
event.what = TopBlockMessage::TOKEN_TIME;
event.token = (*this)->token;
@@ -87,6 +91,7 @@ void TopBlock::start(void)
void TopBlock::stop(void)
{
+ (*this)->thread_group->interrupt_all();
TopBlockMessage event;
event.what = TopBlockMessage::INERT;
(*this)->executor.post_msg(event);
@@ -100,6 +105,7 @@ void TopBlock::run(void)
void TopBlock::wait(void)
{
+ //(*this)->thread_group->join_all();
while (not (*this)->token.unique())
{
boost::this_thread::yield();