blob: c4c84f91882e3852de27a749b6ed0bd813672aaf (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#ifndef INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP
#define INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP
#include <gras_impl/debug.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
//--------------------------- ATTENTION !!! --------------------------//
//-- The author does not intend to have 2 threading platforms.
//-- This file and its invocations should be removed when source blocks
//-- in the tree can yield the thread context without producing.
//--------------------------------------------------------------------//
/*!
* This is the only place you will find any threading stuff.
* The entire point here is that the source's in gras
* are sometimed bad and block forever (the author is guilty too).
* This thread pool creates an interruptible thread to perform work.
* Everything is nice and synchronous with the block actor.
* The block actor will even block on the work function...
* However, this will be interrupted and not block forever,
* when the executor is told to stop/interrupt and wait/join.
*/
namespace gras
{
typedef boost::shared_ptr<boost::thread_group> SharedThreadGroup;
struct InterruptibleThread
{
typedef boost::function<void(void)> Callable;
InterruptibleThread(SharedThreadGroup thread_group, Callable callable):
_thread_group(thread_group),
_callable(callable)
{
_wait_msg = true;
_wait_ack = true;
_mutex.lock();
_thread = _thread_group->create_thread(boost::bind(&InterruptibleThread::run, this));
_mutex.lock();
_mutex.unlock();
}
~InterruptibleThread(void)
{
{
boost::mutex::scoped_lock lock(_mutex);
_callable = Callable();
}
_thread->interrupt();
_thread->join();
//We dont need to manually remove and delete the thread,
//but I thought it was nicer than thread group accumulating
//dead threads run after run.
_thread_group->remove_thread(_thread);
delete _thread;
}
GRAS_FORCE_INLINE void call(void)
{
boost::mutex::scoped_lock lock(_mutex);
if (not _callable) return;
_wait_msg = false;
_notify(lock);
while (_wait_ack) _cond.wait(lock);
_wait_ack = true;
}
void run(void)
{
_mutex.unlock(); //spawn barrier unlock
boost::mutex::scoped_lock lock(_mutex);
try
{
while (not boost::this_thread::interruption_requested())
{
while (_wait_msg) _cond.wait(lock);
_wait_msg = true;
if (not _callable) break;
_callable();
_wait_ack = false;
_notify(lock);
}
}
catch(const std::exception &ex)
{
std::cerr << "InterruptibleThread threw " << ex.what() << std::endl;
}
catch(const boost::thread_interrupted &)
{
//normal exit is thread_interrupted
}
catch(...)
{
std::cerr << "InterruptibleThread threw unknown exception" << std::endl;
}
_callable = Callable();
_wait_ack = false;
_notify(lock);
}
template <typename Lock>
void _notify(Lock &lock)
{
lock.unlock();
_cond.notify_one();
lock.lock();
}
//thread locking mechanisms
bool _wait_msg;
bool _wait_ack;
boost::mutex _mutex;
boost::condition_variable _cond;
SharedThreadGroup _thread_group;
Callable _callable;
boost::thread *_thread;
};
} //namespace gras
#endif /*INCLUDED_LIBGRAS_IMPL_INTERRUPTIBLE_THREAD_HPP*/
|