summaryrefslogtreecommitdiff
path: root/tests/block_test.py
blob: 2f8d91bddf002ca1640a1efe600d2b77fd5f6a2c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.

import unittest
import gras
import numpy
from demo_blocks import *

class BlockTest(unittest.TestCase):

    def setUp(self):
        self.tb = gras.TopBlock()

    def tearDown(self):
        self.tb = None

    def test_vector_blocks(self):
        vec_source = VectorSource(numpy.uint32, [0, 9, 8, 7, 6])
        vec_sink = VectorSink(numpy.uint32)

        self.tb.connect(vec_source, vec_sink)
        self.tb.run()

        self.assertEqual(vec_sink.get_vector(), (0, 9, 8, 7, 6))

    def test_add_f32(self):
        src0 = VectorSource(numpy.float32, [1, 3, 5, 7, 9])
        src1 = VectorSource(numpy.float32, [0, 2, 4, 6, 8])
        adder = Add2X(numpy.float32)
        sink = VectorSink(numpy.float32)
        self.tb.connect((src0, 0), (adder, 0))
        self.tb.connect((src1, 0), (adder, 1))
        self.tb.connect(adder, sink)
        self.tb.run()
        self.assertEqual(sink.get_vector(), (1, 5, 9, 13, 17))

    def test_add_fc32(self):
        src0 = VectorSource(numpy.complex64, [1, 3j, 5, 7j, 9])
        src1 = VectorSource(numpy.complex64, [0, 2j, 4, 6j, 8])
        adder = Add2X(numpy.complex64)
        sink = VectorSink(numpy.complex64)
        self.tb.connect((src0, 0), (adder, 0))
        self.tb.connect((src1, 0), (adder, 1))
        self.tb.connect(adder, sink)
        self.tb.run()
        self.assertEqual(sink.get_vector(), (1, 5j, 9, 13j, 17))

    def test_add_f32_feedback(self):
        """
        Feedback adder output to input1 of the adder.
        Preload input1 of the adder to set the delay.
        """
        src0 = VectorSource(numpy.float32, [1, 3, 5, 7, 9])
        adder = Add2X(numpy.float32)
        adder.input_config(1).preload_items = 1 #make this a feedback delay of 1
        sink = VectorSink(numpy.float32)
        self.tb.connect((src0, 0), (adder, 0))
        self.tb.connect((adder, 0), (adder, 1))
        self.tb.connect(adder, sink)
        self.tb.run()
        self.assertEqual(sink.get_vector(), (1, 4, 9, 16, 25))

    def test_tag_source_sink(self):
        values = (0, 'hello', 4.2, True, None, [2, 3, 4], (9, 8, 7), 1j, {2:'d'})
        src = TagSource(values)
        sink = TagSink()
        self.tb.connect(src, sink)
        self.tb.run()
        self.assertEqual(sink.get_values(), values)

    def test_ro_buffers(self):
        class BadTouch(gras.Block):
            def __init__(self, in_sig):
                gras.Block.__init__(self, 'BadTouch', in_sig=[in_sig])

            def work(self, ins, outs):
                try:
                    ins[0][0] = 0 #assign to ro buffer should fail
                    raise Exception, 'ins should not be writeable!'
                except: pass
                self.consume(0, len(ins[0]))

        source = VectorSource(numpy.uint32, [0, 9, 8, 7, 6])
        sink = BadTouch(numpy.uint32)

        self.tb.connect(source, sink)
        self.tb.run()

if __name__ == '__main__':
    unittest.main()