summaryrefslogtreecommitdiff
path: root/mblock/src/lib/mb_mblock_impl.cc
blob: e11b0089834a6ddec9d5dd5ad24640aff5a35d45 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
/* -*- c++ -*- */
/*
 * Copyright 2006,2008 Free Software Foundation, Inc.
 * 
 * This file is part of GNU Radio
 * 
 * GNU Radio is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3, or (at your option)
 * any later version.
 * 
 * GNU Radio is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */

#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include <mb_mblock_impl.h>
#include <mblock/mblock.h>
#include <mblock/protocol_class.h>
#include <mblock/port.h>
#include <mb_port_simple.h>
#include <mblock/exception.h>
#include <mb_util.h>
#include <mb_msg_accepter_smp.h>
#include <mbi_runtime_lock.h>
#include <iostream>


static pmt_t s_self = pmt_intern("self");

////////////////////////////////////////////////////////////////////////

bool 
mb_mblock_impl::port_is_defined(const std::string &name)
{
  return d_port_map.count(name) != 0;
}

bool
mb_mblock_impl::comp_is_defined(const std::string &name)
{
  return name == "self" || d_comp_map.count(name) != 0;
}

////////////////////////////////////////////////////////////////////////

mb_mblock_impl::mb_mblock_impl(mb_runtime_base *runtime, mb_mblock *mb,
			       const std::string &instance_name)
  : d_runtime(runtime), d_mb(mb), d_mb_parent(0), 
    d_instance_name(instance_name), d_class_name("mblock")
{
}

mb_mblock_impl::~mb_mblock_impl()
{
  d_mb = 0;	// we don't own it
}


mb_port_sptr
mb_mblock_impl::define_port(const std::string &port_name,
			    const std::string &protocol_class_name,
			    bool conjugated,
			    mb_port::port_type_t port_type)
{
  mbi_runtime_lock	l(this);

  if (port_is_defined(port_name))
    throw mbe_duplicate_port(d_mb, port_name);

  mb_port_sptr p =
    mb_port_sptr(new mb_port_simple(d_mb,
				    port_name, protocol_class_name,
				    conjugated, port_type));
  d_port_map[port_name] = p;
  return p;
}

void
mb_mblock_impl::define_component(const std::string &name,
				 const std::string &class_name,
				 pmt_t user_arg)
{
  {
    mbi_runtime_lock	l(this);

    if (comp_is_defined(name))	// check for duplicate name
      throw mbe_duplicate_component(d_mb, name);
  }

  // We ask the runtime to create the component so that it can worry about
  // mblock placement on a NUMA machine or on a distributed multicomputer

  mb_mblock_sptr component =
    d_runtime->create_component(instance_name() + "/" + name,
				class_name, user_arg);
  {
    mbi_runtime_lock	l(this);

    component->d_impl->d_mb_parent = d_mb;     // set component's parent link
    d_comp_map[name] = component;
  }
}

void
mb_mblock_impl::connect(const std::string &comp_name1,
			const std::string &port_name1,
			const std::string &comp_name2,
			const std::string &port_name2)
{
  mbi_runtime_lock	l(this);

  mb_endpoint	ep0 = check_and_resolve_endpoint(comp_name1, port_name1);
  mb_endpoint	ep1 = check_and_resolve_endpoint(comp_name2, port_name2);

  if (!endpoints_are_compatible(ep0, ep1))
    throw mbe_incompatible_ports(d_mb,
				 comp_name1, port_name1,
				 comp_name2, port_name2);
  // FIXME more checks?

  d_conn_table.create_conn(ep0, ep1);
}

void
mb_mblock_impl::disconnect(const std::string &comp_name1,
			   const std::string &port_name1,
			   const std::string &comp_name2,
			   const std::string &port_name2)
{
  mbi_runtime_lock	l(this);

  d_conn_table.disconnect(comp_name1, port_name1, comp_name2, port_name2);
  invalidate_all_port_caches();
}

void
mb_mblock_impl::disconnect_component(const std::string component_name)
{
  mbi_runtime_lock	l(this);

  d_conn_table.disconnect_component(component_name);
  invalidate_all_port_caches();
}

void
mb_mblock_impl::disconnect_all()
{
  mbi_runtime_lock	l(this);

  d_conn_table.disconnect_all();
  invalidate_all_port_caches();
}

int
mb_mblock_impl::nconnections()
{
  mbi_runtime_lock	l(this);

  return d_conn_table.nconnections();
}

////////////////////////////////////////////////////////////////////////

mb_endpoint
mb_mblock_impl::check_and_resolve_endpoint(const std::string &comp_name,
					   const std::string &port_name)
{
  mb_conn_iter	it;
  int		which_ep;
  mb_port_sptr	port = resolve_port(comp_name, port_name);

  // Confirm that we're not trying to connect to the inside of one of
  // our EXTERNAL ports.  Connections that include "self" as the
  // component name must be either INTERNAL or RELAY.

  if (comp_name == "self" && port->port_type() == mb_port::EXTERNAL)
    throw mbe_invalid_port_type(d_mb, comp_name, port_name);

  // Is this endpoint already connected?
  if (d_conn_table.lookup_conn_by_name(comp_name, port_name, &it, &which_ep))
    throw mbe_already_connected(d_mb, comp_name, port_name);

  return mb_endpoint(comp_name, port_name, port);
}

mb_port_sptr
mb_mblock_impl::resolve_port(const std::string &comp_name,
			     const std::string &port_name)
{
  if (comp_name == "self"){
    // Look through our ports.
    if (!port_is_defined(port_name))
      throw mbe_no_such_port(d_mb, mb_util::join_names("self", port_name));
    return d_port_map[port_name];
  }
  else {
    // Look through the specified child's ports.
    if (!comp_is_defined(comp_name))
      throw mbe_no_such_component(d_mb, comp_name);
    
    mb_mblock_impl_sptr  c_impl = d_comp_map[comp_name]->d_impl;  // childs impl pointer
    if (!c_impl->port_is_defined(port_name))
      throw mbe_no_such_port(d_mb, mb_util::join_names(comp_name, port_name));

    mb_port_sptr c_port = c_impl->d_port_map[port_name];

    if (c_port->port_type() == mb_port::INTERNAL) // can't "see" a child's internal ports
      throw mbe_no_such_port(d_mb, mb_util::join_names(comp_name, port_name));

    return c_port;
  }
}



bool
mb_mblock_impl::endpoints_are_compatible(const mb_endpoint &ep0,
					 const mb_endpoint &ep1)
{
  pmt_t p0_outgoing = ep0.outgoing_message_set();
  pmt_t p0_incoming = ep0.incoming_message_set();

  pmt_t p1_outgoing = ep1.outgoing_message_set();
  pmt_t p1_incoming = ep1.incoming_message_set();

  return (pmt_subsetp(p0_outgoing, p1_incoming)
	  && pmt_subsetp(p1_outgoing, p0_incoming));
}

bool
mb_mblock_impl::walk_tree(mb_visitor *visitor)
{
  if (!(*visitor)(d_mb))
    return false;

  mb_comp_map_t::iterator it;
  for (it = d_comp_map.begin(); it != d_comp_map.end(); ++it)
    if (!(it->second->walk_tree(visitor)))
      return false;

  return true;
}

mb_msg_accepter_sptr
mb_mblock_impl::make_accepter(pmt_t port_name)
{
  // FIXME this should probably use some kind of configurable factory
  mb_msg_accepter *ma =
    new mb_msg_accepter_smp(d_mb->shared_from_this(), port_name);

  return mb_msg_accepter_sptr(ma);
}

bool
mb_mblock_impl::lookup_other_endpoint(const mb_port *port, mb_endpoint *ep)
{
  mb_conn_iter	it;
  int		which_ep = 0;

  if (!d_conn_table.lookup_conn_by_port(port, &it, &which_ep))
    return false;
  
  *ep = it->d_ep[which_ep^1];
  return true;
}

mb_mblock_sptr
mb_mblock_impl::component(const std::string &comp_name)
{
  if (comp_name == "self")
    return d_mb->shared_from_this();

  if (d_comp_map.count(comp_name) == 0)
    return mb_mblock_sptr();	// null pointer

  return d_comp_map[comp_name];
}

void
mb_mblock_impl::set_instance_name(const std::string &name)
{
  d_instance_name = name;
}

void
mb_mblock_impl::set_class_name(const std::string &name)
{
  d_class_name = name;
}

/*
 * This is the "Big Hammer" port cache invalidator.
 * It invalidates _all_ of the port caches in the entire mblock tree.
 * It's overkill, but was simple to code.
 */
void
mb_mblock_impl::invalidate_all_port_caches()
{
  class invalidator : public mb_visitor
  {
  public:
    bool operator()(mb_mblock *mblock)
    {
      mb_mblock_impl_sptr impl = mblock->impl();
      mb_port_map_t::iterator it = impl->d_port_map.begin();
      mb_port_map_t::iterator end = impl->d_port_map.end();
      for (; it != end; ++it)
	it->second->invalidate_cache();
      return true;
    }
  };

  invalidator visitor;

  // Always true, except in early QA code
  if (runtime()->top())
    runtime()->top()->walk_tree(&visitor);
}