diff options
m--------- | gnuradio | 0 | ||||
-rw-r--r-- | lib/block.cpp | 5 | ||||
-rw-r--r-- | lib/block_allocator.cpp | 2 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 25 | ||||
-rw-r--r-- | lib/block_task.cpp | 19 | ||||
-rw-r--r-- | lib/element.cpp | 2 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/input_buffer_queues.hpp | 52 | ||||
-rw-r--r-- | lib/port_handlers.cpp | 4 | ||||
-rw-r--r-- | lib/top_block.cpp | 20 |
10 files changed, 84 insertions, 49 deletions
diff --git a/gnuradio b/gnuradio -Subproject 5b64019e6c1640edd13ae04fea251b144c0d2fb +Subproject 00eca71506a89b19fff2081d158a3da5a36a18e diff --git a/lib/block.cpp b/lib/block.cpp index 7ac2144..7d48531 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -73,9 +73,8 @@ size_t Block::history(const size_t which_input) const void Block::set_history(const size_t history_, const size_t which_input) { - //FIXME history of 1 actually means no history, why is this? - //odd because I'm fairly certain history of N means N - const size_t history = (history_ == 1)? 0 : history_; + //FIXME why is history - 1 (gnuradio loves this) + const size_t history = (history_ == 0)? 0 : history_-1; vector_set((*this)->input_history_items, history, which_input); } diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp index f98a0bd..f5e632f 100644 --- a/lib/block_allocator.cpp +++ b/lib/block_allocator.cpp @@ -46,7 +46,7 @@ void ElementImpl::handle_allocation(const tsbe::TaskInterface &task_iface) tsbe::BufferDeleter deleter = boost::bind(&ElementImpl::buffer_returner, this, i, _1); this->output_buffer_tokens[i] = tsbe::BufferToken(new tsbe::BufferDeleter(deleter)); - for (size_t j = 0; j < 2; j++) + for (size_t j = 0; j < 8; j++) { tsbe::BufferConfig config; config.memory = NULL; diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index 5eab5d3..8980791 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -23,7 +23,7 @@ void ElementImpl::handle_block_msg( const tsbe::TaskInterface &task_iface, const tsbe::Wax &msg ){ - //std::cout << "handle_block_msg in " << name << std::endl; + if (MESSAGE) std::cout << "handle_block_msg (" << msg.type().name() << ") " << name << std::endl; //a buffer has returned from the downstream //(all interested consumers have finished with it) @@ -44,6 +44,20 @@ void ElementImpl::handle_block_msg( return; } + //clearly, this block is near death, hang on sparky + if (msg.type() == typeid(CheckTokensMessage)) + { + if (this->input_queues.all_ready()) + { + this->handle_task(task_iface); + } + else + { + this->mark_done(task_iface); + } + return; + } + ASSERT(msg.type() == typeid(TopBlockMessage)); const size_t num_inputs = task_iface.get_num_inputs(); @@ -131,11 +145,14 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) //impose input reserve requirements based on relative rate and output multiple for (size_t i = 0; i < num_inputs; i++) { - if (num_outputs == 0) continue; - if (this->enable_fixed_rate) continue; + if (num_outputs == 0 or not this->enable_fixed_rate) + { + this->input_queues.set_reserve(i, 0); + continue; + } //TODO, this is a little cheap, we only look at output multiple [0] size_t multiple = this->output_multiple_items.front(); - if (multiple == 1) multiple = 0; //1 is meaningless, so we use 0 to disable the reserve + //if (multiple == 1) multiple = 0; //1 is meaningless, so we use 0 to disable the reserve const size_t reserve_items = myulround(multiple/this->relative_rate); const size_t reserve_bytes = reserve_items * this->input_items_sizes[i]; this->input_queues.set_reserve(i, reserve_bytes); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index a5dccbe..318b7b0 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -63,13 +63,11 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) task_iface.post_downstream(i, CheckTokensMessage()); } - /* - std::cout + if (ARMAGEDDON) std::cout << "==================================================\n" << "== The " << name << " is done...\n" << "==================================================\n" << std::flush; - //*/ } void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) @@ -84,7 +82,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->input_queues.all_ready() and this->output_queues.all_ready() )) return; - std::cout << "=== calling work on " << name << " ===" << std::endl; + //std::cout << "=== calling work on " << name << " ===" << std::endl; const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -136,9 +134,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) const tsbe::Buffer &buff = this->output_queues.front(i); char *mem = ((char *)buff.get_memory()) + this->output_bytes_offset[i]; const size_t bytes = buff.get_length() - this->output_bytes_offset[i]; - size_t items = bytes/this->output_items_sizes[i]; - ASSERT(items >= this->output_multiple_items[i]); - items = this->output_multiple_items[i]*(items/this->output_multiple_items[i]); + const size_t items = bytes/this->output_items_sizes[i]; this->work_io_ptr_mask |= ptrdiff_t(mem); this->output_items[i]._mem = mem; @@ -187,7 +183,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->items_consumed[i] += items; const size_t bytes = items*this->input_items_sizes[i]; - input_allows_flush = input_allows_flush and this->input_queues.pop(i, bytes); + input_allows_flush = input_allows_flush and this->input_queues.consume(i, bytes); } //------------------------------------------------------------------ @@ -273,10 +269,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) this->output_tags[i].clear(); } - //if there are inputs, and not all are provided for, and we have an empty queue, mark done - if (num_inputs != 0 and input_tokens_count == num_inputs and not this->input_queues.all_ready()) + //if there are inputs, and not all are provided for, + //tell the block to check input queues and handle done + if (num_inputs != 0 and input_tokens_count == num_inputs) { - this->mark_done(task_iface); + this->block.post_msg(CheckTokensMessage()); return; } diff --git a/lib/element.cpp b/lib/element.cpp index db0bbac..83b07b2 100644 --- a/lib/element.cpp +++ b/lib/element.cpp @@ -30,7 +30,7 @@ Element::Element(void) Element::Element(const std::string &name) { this->reset(new ElementImpl()); - VAR(name); + if (GENESIS) std::cout << "New element: " << name << std::endl; (*this)->name = name; (*this)->unique_id = ++unique_id_pool; diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index ea0d7bf..bdf1388 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -19,6 +19,10 @@ #include <iostream> +#define GENESIS 0 +#define ARMAGEDDON 0 +#define MESSAGE 1 + #define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; #define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; #define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;} diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp index 2cac19e..8c2c41a 100644 --- a/lib/gras_impl/input_buffer_queues.hpp +++ b/lib/gras_impl/input_buffer_queues.hpp @@ -46,6 +46,9 @@ inline void my_buff_alloc(tsbe::Buffer &buff, const size_t num_bytes) struct BufferWOffset { + BufferWOffset(void): offset(0){} + BufferWOffset(const tsbe::Buffer &buffer): + offset(0), buffer(buffer){} size_t offset; tsbe::Buffer buffer; }; @@ -100,8 +103,10 @@ struct InputBufferQueues * Otherwise, resolve pointers to the input buffer, * moving the memory and length by num history bytes. */ - inline BuffInfo front(const size_t i) const + inline BuffInfo front(const size_t i) { + __prepare_front(i); + BuffInfo info; const BufferWOffset &bo = _queues[i].front(); const tsbe::Buffer &buff = bo.buffer; @@ -128,11 +133,17 @@ struct InputBufferQueues info.len = buff.get_length() - bo.offset; } + if (_reserve_bytes[i] != 0) + { + info.len /= _reserve_bytes[i]; + info.len *= _reserve_bytes[i]; + } + return info; } /*! - * Rules for popping: + * Rules for consume: * * If we were operating in a mini history buffer, do nothing. * Otherwise, check if the input buffer was entirely consumed. @@ -141,10 +152,8 @@ struct InputBufferQueues * * \return true if the input allows output flushing */ - inline bool pop(const size_t i, const size_t bytes_consumed) + inline bool consume(const size_t i, const size_t bytes_consumed) { - __prepare_front(i); - BufferWOffset &bo = _queues[i].front(); const tsbe::Buffer &buff = bo.buffer; @@ -188,22 +197,26 @@ struct InputBufferQueues { //TODO, we may call this dynamically, so 1) call __update //and 2) safely resize the buffer and preserve its data + /*if (num_bytes) + { + std::cout << "set_reserve " << i << " " << num_bytes << " bytes\n"; + }//*/ _reserve_bytes[i] = num_bytes; - my_buff_alloc(_reserve_buffs[i], num_bytes); + my_buff_alloc(_reserve_buffs[i], _reserve_bytes[i]); + if (_reserve_buffs[i]) _reserve_buffs[i].get_length() = 0; } - inline void push(const size_t i, const tsbe::Buffer &value) + inline void push(const size_t i, const tsbe::Buffer &buffer) { - BufferWOffset bo; - bo.offset = 0; - bo.buffer = value; - _queues[i].push_back(bo); - _enqueued_bytes[i] += value.get_length(); + _queues[i].push_back(buffer); + _enqueued_bytes[i] += buffer.get_length(); __update(i); } inline void __prepare_front(const size_t i) { + VAR(_reserve_bytes[i]); + VAR(_history_bytes[i]); ASSERT(_history_bytes[i] == 0 or _reserve_bytes[i] == 0); //FIXME dont mix history and reserve for now tsbe::Buffer &reserve_buff = _reserve_buffs[i]; @@ -235,7 +248,10 @@ struct InputBufferQueues while (reserve_buff.get_length() < _reserve_bytes[i]) { BufferWOffset &bo = _queues[i].front(); - const size_t bytes_to_copy = std::min(bo.buffer.get_length()-bo.offset, _reserve_bytes[i]-reserve_buff.get_length()); + const size_t bytes_to_copy = std::min( + bo.buffer.get_length()-bo.offset, + _reserve_bytes[i]-reserve_buff.get_length() + ); std::memcpy( ((char *)reserve_buff.get_memory())+reserve_buff.get_length(), ((char *)bo.buffer.get_memory())+bo.offset, @@ -245,10 +261,7 @@ struct InputBufferQueues bo.offset += bytes_to_copy; if (bo.offset >= bo.buffer.get_length()) __pop(i); } - BufferWOffset new_bo; - new_bo.offset = 0; - new_bo.buffer = reserve_buff; - _queues[i].push_front(new_bo); + _queues[i].push_front(reserve_buff); } inline void __consume(const size_t i, const size_t num_bytes) @@ -260,6 +273,11 @@ struct InputBufferQueues inline void __pop(const size_t i) { + //TODO FIXME quick hack + if (_queues[i].front().buffer.get() == _reserve_buffs[i].get()) + { + _reserve_buffs[i].get_length() = 0; + } _queues[i].pop_front(); } diff --git a/lib/port_handlers.cpp b/lib/port_handlers.cpp index 36d3e27..9ec805d 100644 --- a/lib/port_handlers.cpp +++ b/lib/port_handlers.cpp @@ -23,7 +23,7 @@ void ElementImpl::handle_input_msg( const size_t index, const tsbe::Wax &msg ){ - //std::cout << "handle_input_msg in " << name << std::endl; + if (MESSAGE) std::cout << "handle_input_msg (" << msg.type().name() << ") " << name << std::endl; //handle incoming stream buffer, push into the queue if (msg.type() == typeid(tsbe::Buffer)) @@ -65,7 +65,7 @@ void ElementImpl::handle_output_msg( const size_t index, const tsbe::Wax &msg ){ - //std::cout << "handle_output_msg in " << name << std::endl; + if (MESSAGE) std::cout << "handle_output_msg (" << msg.type().name() << ") " << name << std::endl; //store the token of the downstream consumer if (msg.type() == typeid(Token)) diff --git a/lib/top_block.cpp b/lib/top_block.cpp index ea86470..c681859 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -32,11 +32,11 @@ TopBlock::TopBlock(const std::string &name): config.topology = (*this)->topology; (*this)->executor = tsbe::Executor(config); (*this)->token = Token::make(); - /* - std::cout << "===================================================" << std::endl; - std::cout << "== Top Block Created: " << name << std::endl; - std::cout << "===================================================" << std::endl; - //*/ + if (GENESIS) std::cout + << "===================================================\n" + << "== Top Block Created: " << name << "\n" + << "===================================================\n" + << std::flush; } void ElementImpl::top_block_cleanup(void) @@ -44,11 +44,11 @@ void ElementImpl::top_block_cleanup(void) TopBlockMessage event; event.what = TopBlockMessage::INERT; this->executor.post_msg(event); - /* - std::cout << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" << std::endl; - std::cout << "xx Top Block Deleted: " << name << std::endl; - std::cout << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" << std::endl; - //*/ + if (ARMAGEDDON) std::cout + << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" + << "xx Top Block Destroyed: " << name << "\n" + << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" + << std::flush; } void TopBlock::update(void) |