summaryrefslogtreecommitdiff
path: root/lib/top_block.cpp
blob: 4892e772f9c7cd0c79829190431287ddd8492d78 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.

#include "element_impl.hpp"
#include <gras/top_block.hpp>
#include <boost/thread/thread.hpp> //sleep

using namespace gras;

TopBlock::TopBlock(void)
{
    //NOP
}

TopBlock::TopBlock(const std::string &name):
    HierBlock(name)
{
    (*this)->executor.reset(new Apology::Executor((*this)->topology.get()));
    (*this)->token = Token::make();
    (*this)->thread_group = SharedThreadGroup(new boost::thread_group());
}

TopBlock::~TopBlock(void)
{
    //NOP
}

void ElementImpl::top_block_cleanup(void)
{
    this->bcast_prio_msg(TopInertMessage());
    this->topology->clear_all();
    this->executor->commit();
}

void TopBlock::commit_config(void)
{
    HierBlock::commit_config();
}

void TopBlock::commit(void)
{
    this->commit_config();
    this->start(); //ok to re-start, means update
}

void TopBlock::start(void)
{
    (*this)->executor->commit();
    {
        TopThreadMessage message;
        message.thread_group = (*this)->thread_group;
        (*this)->bcast_prio_msg(message);
    }
    {
        TopTokenMessage message;
        message.token = (*this)->token;
        (*this)->bcast_prio_msg(message);
    }
    {
        //send the global block config before alloc
        TopConfigMessage message;
        message.config = (*this)->global_config;
        (*this)->bcast_prio_msg(message);
    }
    {
        (*this)->bcast_prio_msg(TopAllocMessage());
    }
    {
        (*this)->bcast_prio_msg(TopActiveMessage());
    }
}

void TopBlock::stop(void)
{
    //interrupt these "special" threads
    (*this)->thread_group->interrupt_all();

    //message all blocks to mark done
    (*this)->bcast_prio_msg(TopInertMessage());
}

void TopBlock::run(void)
{
    this->start();
    this->wait();
}

GRAS_FORCE_INLINE void wait_thread_yield(void)
{
    //boost::this_thread::yield();
    boost::this_thread::sleep(boost::posix_time::milliseconds(1));
}

static const boost::posix_time::time_duration CHECK_DONE_INTERVAL = boost::posix_time::milliseconds(100);
static const boost::posix_time::time_duration INPUT_DONE_GRACE_PERIOD = boost::posix_time::milliseconds(100);

void TopBlock::wait(void)
{
    //We do not need to join "special" threads;
    //the token mechainism will do just fine.
    //FIXME it might be nice to uncomment this for some nice quiet wait(),
    //however, thread group cant be joined twice and this breaks some qa code
    //(*this)->thread_group->join_all();

    //QA lockup detection setup
    bool lockup_debug = getenv("GRAS_LOCKUP_DEBUG") != NULL;
    boost::system_time check_done_time = boost::get_system_time();
    bool has_a_done = false;

    //wait for all blocks to release the token
    while (not (*this)->token.unique())
    {
        wait_thread_yield();

        //determine if we should check on the done status
        if (boost::get_system_time() < check_done_time) continue;
        check_done_time += CHECK_DONE_INTERVAL;

        //optional dot print to understand lockup condition
        if (has_a_done and lockup_debug)
        {
            std::cerr << TopBlock::query("{\"path\":\"/topology.dot\"}") << std::endl;
            lockup_debug = false;
        }

        //loop through blocks looking for non-done blocks with done inputs
        BOOST_FOREACH(Apology::Worker *w, (*this)->topology->get_workers())
        {
            BlockActor *actor = dynamic_cast<BlockActor *>(w->get_actor());
            if (actor->data->block_state == BLOCK_STATE_DONE) has_a_done = true;
            if (actor->data->inputs_done.size() and actor->data->inputs_done.any())
            {
                const boost::system_time grace_over = actor->data->first_input_done_time + INPUT_DONE_GRACE_PERIOD;
                if ((not lockup_debug) and (boost::get_system_time() > grace_over))
                {
                    actor->GetFramework().Send(TopInertMessage(), Theron::Address::Null(), actor->GetAddress());
                }
            }
        }
    }
}

bool TopBlock::wait(const double timeout)
{
    const boost::system_time exit_time = boost::get_system_time() +
        boost::posix_time::microseconds(long(timeout*1e6));

    //wait for all blocks to release the token
    while (not (*this)->token.unique() or boost::get_system_time() < exit_time)
    {
        wait_thread_yield();
    }

    return (*this)->token.unique();
}