diff options
author | Josh Blum | 2012-09-11 01:27:19 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-11 01:27:19 -0700 |
commit | 02f896e5a66575df4f16593584a7bc6892adf81c (patch) | |
tree | 80747cddeddfebfe4fa6f5db6af2625866b4ce63 /lib | |
parent | 053d2f5cdbb4027d65aa7cb8af851a9edeac1d0a (diff) | |
download | sandhi-02f896e5a66575df4f16593584a7bc6892adf81c.tar.gz sandhi-02f896e5a66575df4f16593584a7bc6892adf81c.tar.bz2 sandhi-02f896e5a66575df4f16593584a7bc6892adf81c.zip |
work on interruptible_thread
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block_handlers.cpp | 8 | ||||
-rw-r--r-- | lib/block_task.cpp | 18 | ||||
-rw-r--r-- | lib/element_impl.hpp | 5 | ||||
-rw-r--r-- | lib/gras_impl/interruptible_thread.hpp | 130 | ||||
-rw-r--r-- | lib/top_block.cpp | 6 |
5 files changed, 165 insertions, 2 deletions
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 85520bd..14cc2c8 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -58,6 +58,14 @@ void ElementImpl::handle_block_msg( return; } + //store the topology's thread group + if (msg.type() == typeid(SharedThreadGroup)) + { + this->thread_group = msg.cast<SharedThreadGroup>(); + this->interruptible_thread = boost::shared_ptr<InterruptibleThread>(new InterruptibleThread(this->thread_group)); + return; + } + ASSERT(msg.type() == typeid(TopBlockMessage)); const size_t num_inputs = task_iface.get_num_inputs(); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index 3843b28..33ffbc1 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -36,6 +36,8 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) this->output_queues.pop(i); } + this->interruptible_thread.reset(); + //mark down the new state this->block_state = BLOCK_STATE_DONE; @@ -84,7 +86,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); - //const bool is_source = (num_inputs == 0); + const bool is_source = (num_inputs == 0); //const bool is_sink = (num_outputs == 0); this->work_io_ptr_mask = 0; //reset @@ -180,7 +182,19 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) if (this->enable_fixed_rate) work_noutput_items = std::min( work_noutput_items, myulround((num_input_items)*this->relative_rate)); this->work_task_iface = task_iface; - const int ret = block_ptr->Work(this->input_items, this->output_items); + int ret = 0; + if (is_source) + { + this->interruptible_thread->block = block_ptr; + this->interruptible_thread->ret = &ret; + this->interruptible_thread->input_items = &this->input_items; + this->interruptible_thread->output_items = &this->output_items; + this->interruptible_thread->call(); + } + else + { + ret = block_ptr->Work(this->input_items, this->output_items); + } this->work_task_iface.reset(); const size_t noutput_items = size_t(ret); diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index 2119935..b4f2c1b 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -21,6 +21,7 @@ #include <gras_impl/misc.hpp> #include <gras_impl/vector_of_queues.hpp> #include <gras_impl/input_buffer_queues.hpp> +#include <gras_impl/interruptible_thread.hpp> #include <tsbe/block.hpp> #include <tsbe/topology.hpp> @@ -105,6 +106,10 @@ struct ElementImpl Block *block_ptr; tsbe::TaskInterface work_task_iface; //only valid during work + //interruptible thread stuff + SharedThreadGroup thread_group; + boost::shared_ptr<InterruptibleThread> interruptible_thread; + //handlers void handle_input_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); void handle_output_msg(const tsbe::TaskInterface &, const size_t, const tsbe::Wax &); diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp new file mode 100644 index 0000000..10b1286 --- /dev/null +++ b/lib/gras_impl/interruptible_thread.hpp @@ -0,0 +1,130 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP +#define INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP + +#include <gnuradio/block.hpp> +#include <gras_impl/debug.hpp> +#include <boost/bind.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> + +/*! + * This is the only place you will find any threading stuff. + * The entire point here is that the source's in gnuradio + * are sometimed bad and block forever (the author is guilty too). + * This thread pool creates an interruptible thread to perform work. + * Everything is nice and synchronous with the block actor. + * The block actor will even block on the work function... + * However, this will be interrupted and not block forever, + * when the executor is told to stop/interrupt and wait/join. + */ +namespace gnuradio +{ + + typedef boost::shared_ptr<boost::thread_group> SharedThreadGroup; + + struct InterruptibleThread + { + + InterruptibleThread(SharedThreadGroup thread_group): + _thread_group(thread_group) + { + _done = false; + _wait_msg = true; + _wait_ack = false; + _mutex.lock(); + _thread = _thread_group->create_thread(boost::bind(&InterruptibleThread::run, this)); + _mutex.lock(); + _mutex.unlock(); + } + + ~InterruptibleThread(void) + { + { + boost::mutex::scoped_lock lock(_mutex); + _done = true; + } + _thread->interrupt(); + _thread->join(); + } + + inline void call(void) + { + boost::mutex::scoped_lock lock(_mutex); + if (_done) + { + *ret = -1; + return; + } + _wait_msg = false; + _cond.notify_one(); + while (_wait_ack) _cond.wait(lock); + _wait_ack = true; + } + + void run(void) + { + _mutex.unlock(); //spawn barrier unlock + boost::mutex::scoped_lock lock(_mutex); + while (not _done and not boost::this_thread::interruption_requested()) + { + try + { + while (_wait_msg) _cond.wait(lock); + _wait_msg = true; + *ret = block->Work(*input_items, *output_items); + } + catch(const std::exception &ex) + { + std::cerr << "InterruptibleThread threw " << ex.what() << std::endl; + _done = true; + } + catch(const boost::thread_interrupted &ex) + { + _done = true; + } + catch(...) + { + _done = true; + } + _wait_ack = false; + _cond.notify_one(); + } + _done = true; + } + + //shared work variables + Block *block; + int *ret; + Block::InputItems *input_items; + Block::OutputItems *output_items; + + //thread locking mechanisms + bool _done; + bool _wait_msg; + bool _wait_ack; + boost::mutex _mutex; + boost::condition_variable _cond; + SharedThreadGroup _thread_group; + boost::thread *_thread; + }; + +} //namespace gnuradio + +#endif /*INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP*/ diff --git a/lib/top_block.cpp b/lib/top_block.cpp index c681859..440b7fa 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -32,6 +32,7 @@ TopBlock::TopBlock(const std::string &name): config.topology = (*this)->topology; (*this)->executor = tsbe::Executor(config); (*this)->token = Token::make(); + (*this)->thread_group = SharedThreadGroup(new boost::thread_group()); if (GENESIS) std::cout << "===================================================\n" << "== Top Block Created: " << name << "\n" @@ -68,6 +69,9 @@ void TopBlock::start(void) { (*this)->executor.commit(); { + (*this)->executor.post_msg((*this)->thread_group); + } + { TopBlockMessage event; event.what = TopBlockMessage::TOKEN_TIME; event.token = (*this)->token; @@ -87,6 +91,7 @@ void TopBlock::start(void) void TopBlock::stop(void) { + (*this)->thread_group->interrupt_all(); TopBlockMessage event; event.what = TopBlockMessage::INERT; (*this)->executor.post_msg(event); @@ -100,6 +105,7 @@ void TopBlock::run(void) void TopBlock::wait(void) { + //(*this)->thread_group->join_all(); while (not (*this)->token.unique()) { boost::this_thread::yield(); |