summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------gnuradio0
-rw-r--r--lib/block.cpp5
-rw-r--r--lib/block_allocator.cpp2
-rw-r--r--lib/block_handlers.cpp25
-rw-r--r--lib/block_task.cpp19
-rw-r--r--lib/element.cpp2
-rw-r--r--lib/gras_impl/debug.hpp4
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp52
-rw-r--r--lib/port_handlers.cpp4
-rw-r--r--lib/top_block.cpp20
10 files changed, 84 insertions, 49 deletions
diff --git a/gnuradio b/gnuradio
-Subproject 5b64019e6c1640edd13ae04fea251b144c0d2fb
+Subproject 00eca71506a89b19fff2081d158a3da5a36a18e
diff --git a/lib/block.cpp b/lib/block.cpp
index 7ac2144..7d48531 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -73,9 +73,8 @@ size_t Block::history(const size_t which_input) const
void Block::set_history(const size_t history_, const size_t which_input)
{
- //FIXME history of 1 actually means no history, why is this?
- //odd because I'm fairly certain history of N means N
- const size_t history = (history_ == 1)? 0 : history_;
+ //FIXME why is history - 1 (gnuradio loves this)
+ const size_t history = (history_ == 0)? 0 : history_-1;
vector_set((*this)->input_history_items, history, which_input);
}
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index f98a0bd..f5e632f 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -46,7 +46,7 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface)
tsbe::BufferDeleter deleter = boost::bind(&ElementImpl::buffer_returner, this, i, _1);
this->output_buffer_tokens[i] = tsbe::BufferToken(new tsbe::BufferDeleter(deleter));
- for (size_t j = 0; j < 2; j++)
+ for (size_t j = 0; j < 8; j++)
{
tsbe::BufferConfig config;
config.memory = NULL;
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 5eab5d3..8980791 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -23,7 +23,7 @@ void ElementImpl::handle_block_msg(
const tsbe::TaskInterface &task_iface,
const tsbe::Wax &msg
){
- //std::cout << "handle_block_msg in " << name << std::endl;
+ if (MESSAGE) std::cout << "handle_block_msg (" << msg.type().name() << ") " << name << std::endl;
//a buffer has returned from the downstream
//(all interested consumers have finished with it)
@@ -44,6 +44,20 @@ void ElementImpl::handle_block_msg(
return;
}
+ //clearly, this block is near death, hang on sparky
+ if (msg.type() == typeid(CheckTokensMessage))
+ {
+ if (this->input_queues.all_ready())
+ {
+ this->handle_task(task_iface);
+ }
+ else
+ {
+ this->mark_done(task_iface);
+ }
+ return;
+ }
+
ASSERT(msg.type() == typeid(TopBlockMessage));
const size_t num_inputs = task_iface.get_num_inputs();
@@ -131,11 +145,14 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
//impose input reserve requirements based on relative rate and output multiple
for (size_t i = 0; i < num_inputs; i++)
{
- if (num_outputs == 0) continue;
- if (this->enable_fixed_rate) continue;
+ if (num_outputs == 0 or not this->enable_fixed_rate)
+ {
+ this->input_queues.set_reserve(i, 0);
+ continue;
+ }
//TODO, this is a little cheap, we only look at output multiple [0]
size_t multiple = this->output_multiple_items.front();
- if (multiple == 1) multiple = 0; //1 is meaningless, so we use 0 to disable the reserve
+ //if (multiple == 1) multiple = 0; //1 is meaningless, so we use 0 to disable the reserve
const size_t reserve_items = myulround(multiple/this->relative_rate);
const size_t reserve_bytes = reserve_items * this->input_items_sizes[i];
this->input_queues.set_reserve(i, reserve_bytes);
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index a5dccbe..318b7b0 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -63,13 +63,11 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
task_iface.post_downstream(i, CheckTokensMessage());
}
- /*
- std::cout
+ if (ARMAGEDDON) std::cout
<< "==================================================\n"
<< "== The " << name << " is done...\n"
<< "==================================================\n"
<< std::flush;
- //*/
}
void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
@@ -84,7 +82,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->input_queues.all_ready() and
this->output_queues.all_ready()
)) return;
- std::cout << "=== calling work on " << name << " ===" << std::endl;
+ //std::cout << "=== calling work on " << name << " ===" << std::endl;
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
@@ -136,9 +134,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
const tsbe::Buffer &buff = this->output_queues.front(i);
char *mem = ((char *)buff.get_memory()) + this->output_bytes_offset[i];
const size_t bytes = buff.get_length() - this->output_bytes_offset[i];
- size_t items = bytes/this->output_items_sizes[i];
- ASSERT(items >= this->output_multiple_items[i]);
- items = this->output_multiple_items[i]*(items/this->output_multiple_items[i]);
+ const size_t items = bytes/this->output_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
this->output_items[i]._mem = mem;
@@ -187,7 +183,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->items_consumed[i] += items;
const size_t bytes = items*this->input_items_sizes[i];
- input_allows_flush = input_allows_flush and this->input_queues.pop(i, bytes);
+ input_allows_flush = input_allows_flush and this->input_queues.consume(i, bytes);
}
//------------------------------------------------------------------
@@ -273,10 +269,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
this->output_tags[i].clear();
}
- //if there are inputs, and not all are provided for, and we have an empty queue, mark done
- if (num_inputs != 0 and input_tokens_count == num_inputs and not this->input_queues.all_ready())
+ //if there are inputs, and not all are provided for,
+ //tell the block to check input queues and handle done
+ if (num_inputs != 0 and input_tokens_count == num_inputs)
{
- this->mark_done(task_iface);
+ this->block.post_msg(CheckTokensMessage());
return;
}
diff --git a/lib/element.cpp b/lib/element.cpp
index db0bbac..83b07b2 100644
--- a/lib/element.cpp
+++ b/lib/element.cpp
@@ -30,7 +30,7 @@ Element::Element(void)
Element::Element(const std::string &name)
{
this->reset(new ElementImpl());
- VAR(name);
+ if (GENESIS) std::cout << "New element: " << name << std::endl;
(*this)->name = name;
(*this)->unique_id = ++unique_id_pool;
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index ea0d7bf..bdf1388 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -19,6 +19,10 @@
#include <iostream>
+#define GENESIS 0
+#define ARMAGEDDON 0
+#define MESSAGE 1
+
#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush;
#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;}
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 2cac19e..8c2c41a 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -46,6 +46,9 @@ inline void my_buff_alloc(tsbe::Buffer &buff, const size_t num_bytes)
struct BufferWOffset
{
+ BufferWOffset(void): offset(0){}
+ BufferWOffset(const tsbe::Buffer &buffer):
+ offset(0), buffer(buffer){}
size_t offset;
tsbe::Buffer buffer;
};
@@ -100,8 +103,10 @@ struct InputBufferQueues
* Otherwise, resolve pointers to the input buffer,
* moving the memory and length by num history bytes.
*/
- inline BuffInfo front(const size_t i) const
+ inline BuffInfo front(const size_t i)
{
+ __prepare_front(i);
+
BuffInfo info;
const BufferWOffset &bo = _queues[i].front();
const tsbe::Buffer &buff = bo.buffer;
@@ -128,11 +133,17 @@ struct InputBufferQueues
info.len = buff.get_length() - bo.offset;
}
+ if (_reserve_bytes[i] != 0)
+ {
+ info.len /= _reserve_bytes[i];
+ info.len *= _reserve_bytes[i];
+ }
+
return info;
}
/*!
- * Rules for popping:
+ * Rules for consume:
*
* If we were operating in a mini history buffer, do nothing.
* Otherwise, check if the input buffer was entirely consumed.
@@ -141,10 +152,8 @@ struct InputBufferQueues
*
* \return true if the input allows output flushing
*/
- inline bool pop(const size_t i, const size_t bytes_consumed)
+ inline bool consume(const size_t i, const size_t bytes_consumed)
{
- __prepare_front(i);
-
BufferWOffset &bo = _queues[i].front();
const tsbe::Buffer &buff = bo.buffer;
@@ -188,22 +197,26 @@ struct InputBufferQueues
{
//TODO, we may call this dynamically, so 1) call __update
//and 2) safely resize the buffer and preserve its data
+ /*if (num_bytes)
+ {
+ std::cout << "set_reserve " << i << " " << num_bytes << " bytes\n";
+ }//*/
_reserve_bytes[i] = num_bytes;
- my_buff_alloc(_reserve_buffs[i], num_bytes);
+ my_buff_alloc(_reserve_buffs[i], _reserve_bytes[i]);
+ if (_reserve_buffs[i]) _reserve_buffs[i].get_length() = 0;
}
- inline void push(const size_t i, const tsbe::Buffer &value)
+ inline void push(const size_t i, const tsbe::Buffer &buffer)
{
- BufferWOffset bo;
- bo.offset = 0;
- bo.buffer = value;
- _queues[i].push_back(bo);
- _enqueued_bytes[i] += value.get_length();
+ _queues[i].push_back(buffer);
+ _enqueued_bytes[i] += buffer.get_length();
__update(i);
}
inline void __prepare_front(const size_t i)
{
+ VAR(_reserve_bytes[i]);
+ VAR(_history_bytes[i]);
ASSERT(_history_bytes[i] == 0 or _reserve_bytes[i] == 0); //FIXME dont mix history and reserve for now
tsbe::Buffer &reserve_buff = _reserve_buffs[i];
@@ -235,7 +248,10 @@ struct InputBufferQueues
while (reserve_buff.get_length() < _reserve_bytes[i])
{
BufferWOffset &bo = _queues[i].front();
- const size_t bytes_to_copy = std::min(bo.buffer.get_length()-bo.offset, _reserve_bytes[i]-reserve_buff.get_length());
+ const size_t bytes_to_copy = std::min(
+ bo.buffer.get_length()-bo.offset,
+ _reserve_bytes[i]-reserve_buff.get_length()
+ );
std::memcpy(
((char *)reserve_buff.get_memory())+reserve_buff.get_length(),
((char *)bo.buffer.get_memory())+bo.offset,
@@ -245,10 +261,7 @@ struct InputBufferQueues
bo.offset += bytes_to_copy;
if (bo.offset >= bo.buffer.get_length()) __pop(i);
}
- BufferWOffset new_bo;
- new_bo.offset = 0;
- new_bo.buffer = reserve_buff;
- _queues[i].push_front(new_bo);
+ _queues[i].push_front(reserve_buff);
}
inline void __consume(const size_t i, const size_t num_bytes)
@@ -260,6 +273,11 @@ struct InputBufferQueues
inline void __pop(const size_t i)
{
+ //TODO FIXME quick hack
+ if (_queues[i].front().buffer.get() == _reserve_buffs[i].get())
+ {
+ _reserve_buffs[i].get_length() = 0;
+ }
_queues[i].pop_front();
}
diff --git a/lib/port_handlers.cpp b/lib/port_handlers.cpp
index 36d3e27..9ec805d 100644
--- a/lib/port_handlers.cpp
+++ b/lib/port_handlers.cpp
@@ -23,7 +23,7 @@ void ElementImpl::handle_input_msg(
const size_t index,
const tsbe::Wax &msg
){
- //std::cout << "handle_input_msg in " << name << std::endl;
+ if (MESSAGE) std::cout << "handle_input_msg (" << msg.type().name() << ") " << name << std::endl;
//handle incoming stream buffer, push into the queue
if (msg.type() == typeid(tsbe::Buffer))
@@ -65,7 +65,7 @@ void ElementImpl::handle_output_msg(
const size_t index,
const tsbe::Wax &msg
){
- //std::cout << "handle_output_msg in " << name << std::endl;
+ if (MESSAGE) std::cout << "handle_output_msg (" << msg.type().name() << ") " << name << std::endl;
//store the token of the downstream consumer
if (msg.type() == typeid(Token))
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index ea86470..c681859 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -32,11 +32,11 @@ TopBlock::TopBlock(const std::string &name):
config.topology = (*this)->topology;
(*this)->executor = tsbe::Executor(config);
(*this)->token = Token::make();
- /*
- std::cout << "===================================================" << std::endl;
- std::cout << "== Top Block Created: " << name << std::endl;
- std::cout << "===================================================" << std::endl;
- //*/
+ if (GENESIS) std::cout
+ << "===================================================\n"
+ << "== Top Block Created: " << name << "\n"
+ << "===================================================\n"
+ << std::flush;
}
void ElementImpl::top_block_cleanup(void)
@@ -44,11 +44,11 @@ void ElementImpl::top_block_cleanup(void)
TopBlockMessage event;
event.what = TopBlockMessage::INERT;
this->executor.post_msg(event);
- /*
- std::cout << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" << std::endl;
- std::cout << "xx Top Block Deleted: " << name << std::endl;
- std::cout << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" << std::endl;
- //*/
+ if (ARMAGEDDON) std::cout
+ << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"
+ << "xx Top Block Destroyed: " << name << "\n"
+ << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"
+ << std::flush;
}
void TopBlock::update(void)