diff options
Diffstat (limited to 'gnuradio-core/src/python')
-rwxr-xr-x | gnuradio-core/src/python/gnuradio/gr/qa_pdu.py | 49 |
1 files changed, 33 insertions, 16 deletions
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py index 83c7748af..da1331d96 100755 --- a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py +++ b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py @@ -36,36 +36,53 @@ class test_pdu(gr_unittest.TestCase): # Just run some data through and make sure it doesn't puke. src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - src = gr.pdu_to_tagged_stream(gr.BYTE); - snk3 = gr.tagged_stream_to_pdu(gr.BYTE); - snk2 = gr.vector_sink_b(); - snk = gr.tag_debug(1, "test"); + src = gr.pdu_to_tagged_stream(gr.BYTE) + snk3 = gr.tagged_stream_to_pdu(gr.BYTE) + snk2 = gr.vector_sink_b() + snk = gr.tag_debug(1, "test") - dbg = gr.message_debug(); + dbg = gr.message_debug() self.tb.connect(src, snk) self.tb.connect(src, snk2) self.tb.connect(src, snk3) - self.tb.msg_connect(snk3, "pdus", dbg, "print"); + self.tb.msg_connect(snk3, "pdus", dbg, "store") self.tb.start() # make our reference and message pmts - port = pmt.pmt_intern("pdus"); - msg = pmt.pmt_cons( pmt.PMT_NIL, pmt.pmt_make_u8vector(16, 0xFF) ); + port = pmt.pmt_intern("pdus") + msg = pmt.pmt_cons( pmt.PMT_NIL, pmt.pmt_make_u8vector(16, 0xFF) ) - print "printing port & msg" - pmt.pmt_print(port); - pmt.pmt_print(msg); + #print "printing port & msg" + #pmt.pmt_print(port) + #pmt.pmt_print(msg) # post the message - src.to_basic_block()._post( port, msg ); + src.to_basic_block()._post( port, msg ) - time.sleep(1); - self.tb.stop(); - self.tb.wait(); + while(dbg.num_messages() < 1): + time.sleep(0.5) + self.tb.stop() + self.tb.wait() - print snk2.data(); + # Get the vector of data from the vector sink + result_data = snk2.data() + + # Get the vector of data from the message sink + # Convert the message PMT as a pair into its vector + result_msg = dbg.get_message(0) + msg_vec = pmt.pmt_cdr(result_msg) + pmt.pmt_print(msg_vec) + + # Convert the PMT vector into a Python list + msg_data = [] + for i in xrange(16): + msg_data.append(pmt.pmt_u8vector_ref(msg_vec, i)) + + actual_data = 16*[0xFF,] + self.assertEqual(actual_data, list(result_data)) + self.assertEqual(actual_data, msg_data) if __name__ == '__main__': gr_unittest.run(test_pdu, "test_pdu.xml") |