diff options
author | Josh Blum | 2012-09-11 01:27:19 -0700 |
---|---|---|
committer | Josh Blum | 2012-09-11 01:27:19 -0700 |
commit | 02f896e5a66575df4f16593584a7bc6892adf81c (patch) | |
tree | 80747cddeddfebfe4fa6f5db6af2625866b4ce63 /lib/gras_impl/interruptible_thread.hpp | |
parent | 053d2f5cdbb4027d65aa7cb8af851a9edeac1d0a (diff) | |
download | sandhi-02f896e5a66575df4f16593584a7bc6892adf81c.tar.gz sandhi-02f896e5a66575df4f16593584a7bc6892adf81c.tar.bz2 sandhi-02f896e5a66575df4f16593584a7bc6892adf81c.zip |
work on interruptible_thread
Diffstat (limited to 'lib/gras_impl/interruptible_thread.hpp')
-rw-r--r-- | lib/gras_impl/interruptible_thread.hpp | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/lib/gras_impl/interruptible_thread.hpp b/lib/gras_impl/interruptible_thread.hpp new file mode 100644 index 0000000..10b1286 --- /dev/null +++ b/lib/gras_impl/interruptible_thread.hpp @@ -0,0 +1,130 @@ +// +// Copyright 2012 Josh Blum +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with io_sig program. If not, see <http://www.gnu.org/licenses/>. + +#ifndef INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP +#define INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP + +#include <gnuradio/block.hpp> +#include <gras_impl/debug.hpp> +#include <boost/bind.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> + +/*! + * This is the only place you will find any threading stuff. + * The entire point here is that the source's in gnuradio + * are sometimed bad and block forever (the author is guilty too). + * This thread pool creates an interruptible thread to perform work. + * Everything is nice and synchronous with the block actor. + * The block actor will even block on the work function... + * However, this will be interrupted and not block forever, + * when the executor is told to stop/interrupt and wait/join. + */ +namespace gnuradio +{ + + typedef boost::shared_ptr<boost::thread_group> SharedThreadGroup; + + struct InterruptibleThread + { + + InterruptibleThread(SharedThreadGroup thread_group): + _thread_group(thread_group) + { + _done = false; + _wait_msg = true; + _wait_ack = false; + _mutex.lock(); + _thread = _thread_group->create_thread(boost::bind(&InterruptibleThread::run, this)); + _mutex.lock(); + _mutex.unlock(); + } + + ~InterruptibleThread(void) + { + { + boost::mutex::scoped_lock lock(_mutex); + _done = true; + } + _thread->interrupt(); + _thread->join(); + } + + inline void call(void) + { + boost::mutex::scoped_lock lock(_mutex); + if (_done) + { + *ret = -1; + return; + } + _wait_msg = false; + _cond.notify_one(); + while (_wait_ack) _cond.wait(lock); + _wait_ack = true; + } + + void run(void) + { + _mutex.unlock(); //spawn barrier unlock + boost::mutex::scoped_lock lock(_mutex); + while (not _done and not boost::this_thread::interruption_requested()) + { + try + { + while (_wait_msg) _cond.wait(lock); + _wait_msg = true; + *ret = block->Work(*input_items, *output_items); + } + catch(const std::exception &ex) + { + std::cerr << "InterruptibleThread threw " << ex.what() << std::endl; + _done = true; + } + catch(const boost::thread_interrupted &ex) + { + _done = true; + } + catch(...) + { + _done = true; + } + _wait_ack = false; + _cond.notify_one(); + } + _done = true; + } + + //shared work variables + Block *block; + int *ret; + Block::InputItems *input_items; + Block::OutputItems *output_items; + + //thread locking mechanisms + bool _done; + bool _wait_msg; + bool _wait_ack; + boost::mutex _mutex; + boost::condition_variable _cond; + SharedThreadGroup _thread_group; + boost::thread *_thread; + }; + +} //namespace gnuradio + +#endif /*INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP*/ |