blob: 584d3a3a4434775ecf470323215b506689df85f3 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
#include "element_impl.hpp"
#include <gras_impl/block_actor.hpp>
#include <gras/block.hpp>
using namespace gras;
void Block::produce(const size_t which_output, const size_t num_items)
{
ASSERT(long(num_items) >= 0); //sign bit set? you dont want a negative
(*this)->block->produce(which_output, num_items);
}
void Block::produce(const size_t num_items)
{
const size_t num_outputs = (*this)->block->get_num_outputs();
for (size_t o = 0; o < num_outputs; o++)
{
(*this)->block->produce(o, num_items);
}
}
item_index_t Block::get_produced(const size_t which_output)
{
return (*this)->block->stats.items_produced[which_output];
}
SBuffer Block::get_output_buffer(const size_t which_output) const
{
SBuffer &buff = (*this)->block->output_queues.front(which_output);
//increment length to auto pop full buffer size,
//when user doesnt call pop_output_buffer()
buff.length = buff.get_actual_length();
return buff;
}
void Block::pop_output_buffer(const size_t which_output, const size_t num_bytes)
{
(*this)->block->output_queues.front(which_output).length = num_bytes;
}
void Block::post_output_buffer(const size_t which_output, const SBuffer &buffer)
{
(*this)->block->produce_buffer(which_output, buffer);
}
GRAS_FORCE_INLINE void BlockActor::produce(const size_t i, const size_t items)
{
#ifdef ITEM_CONSPROD
std::cerr << name << " produce " << items << std::endl;
#endif
SBuffer &buff = this->output_queues.front(i);
ASSERT((buff.length % output_configs[i].item_size) == 0);
this->stats.items_produced[i] += items;
const size_t bytes = items*this->output_configs[i].item_size;
buff.length += bytes;
this->produce_outputs[i] = true;
}
GRAS_FORCE_INLINE void BlockActor::produce_buffer(const size_t i, const SBuffer &buffer)
{
this->output_queues.consume(i);
ASSERT((buffer.length % output_configs[i].item_size) == 0);
const size_t items = buffer.length/output_configs[i].item_size;
this->stats.items_produced[i] += items;
InputBufferMessage buff_msg;
buff_msg.buffer = buffer;
this->post_downstream(i, buff_msg);
}
|