summaryrefslogtreecommitdiff
path: root/lib/gras_impl/interruptible_thread.hpp
blob: 1e99111e0a4f76bb8cdaa8ba259ff18ab8dd3a0a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//
// Copyright 2012 Josh Blum
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with io_sig program.  If not, see <http://www.gnu.org/licenses/>.

#ifndef INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP
#define INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP

#include <gras_impl/debug.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>

//--------------------------- ATTENTION !!! --------------------------//
//-- The author does not intend to have 2 threading platforms.
//-- This file and its invocations should be removed when source blocks
//-- in the tree can yield the thread context without producing.
//--------------------------------------------------------------------//

/*!
 * This is the only place you will find any threading stuff.
 * The entire point here is that the source's in gnuradio
 * are sometimed bad and block forever (the author is guilty too).
 * This thread pool creates an interruptible thread to perform work.
 * Everything is nice and synchronous with the block actor.
 * The block actor will even block on the work function...
 * However, this will be interrupted and not block forever,
 * when the executor is told to stop/interrupt and wait/join.
 */
namespace gnuradio
{

    typedef boost::shared_ptr<boost::thread_group> SharedThreadGroup;

    struct InterruptibleThread
    {
        typedef boost::function<void(void)> Callable;

        InterruptibleThread(SharedThreadGroup thread_group, Callable callable):
            _thread_group(thread_group),
            _callable(callable)
        {
            _wait_msg = true;
            _wait_ack = true;
            _mutex.lock();
            _thread = _thread_group->create_thread(boost::bind(&InterruptibleThread::run, this));
            _mutex.lock();
            _mutex.unlock();
        }

        ~InterruptibleThread(void)
        {
            {
                boost::mutex::scoped_lock lock(_mutex);
                _callable = Callable();
            }
            _thread->interrupt();
            _thread->join();
        }

        inline void call(void)
        {
            boost::mutex::scoped_lock lock(_mutex);
            if (not _callable) return;
            _wait_msg = false;
            _notify(lock);
            while (_wait_ack) _cond.wait(lock);
            _wait_ack = true;
        }

        void run(void)
        {
            _mutex.unlock(); //spawn barrier unlock
            boost::mutex::scoped_lock lock(_mutex);
            try
            {
                while (not boost::this_thread::interruption_requested())
                {
                    while (_wait_msg) _cond.wait(lock);
                    _wait_msg = true;
                    _callable();
                    _wait_ack = false;
                    _notify(lock);
                }
            }
            catch(const std::exception &ex)
            {
                std::cerr << "InterruptibleThread threw " << ex.what() << std::endl;
            }
            catch(const boost::thread_interrupted &)
            {
                //normal exit is thread_interrupted
            }
            catch(...)
            {
                std::cerr << "InterruptibleThread threw unknown exception" << std::endl;
            }
            _callable = Callable();
            _wait_ack = false;
            _notify(lock);
        }

        template <typename Lock>
        void _notify(Lock &lock)
        {
            lock.unlock();
            _cond.notify_one();
            lock.lock();
        }

        //thread locking mechanisms
        bool _wait_msg;
        bool _wait_ack;
        boost::mutex _mutex;
        boost::condition_variable _cond;
        SharedThreadGroup _thread_group;
        Callable _callable;
        boost::thread *_thread;
    };

} //namespace gnuradio

#endif /*INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP*/