summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp18
-rw-r--r--lib/block_allocator.cpp6
-rw-r--r--lib/block_handlers.cpp18
-rw-r--r--lib/block_task.cpp48
-rw-r--r--lib/gr_block.cpp23
-rw-r--r--lib/gr_top_block.cpp10
-rw-r--r--lib/gras_impl/block_actor.hpp4
-rw-r--r--lib/gras_impl/input_buffer_queues.hpp239
-rw-r--r--lib/gras_impl/output_buffer_queues.hpp4
-rw-r--r--lib/hier_block.cpp5
-rw-r--r--lib/input_handlers.cpp5
-rw-r--r--lib/top_block.cpp3
-rw-r--r--lib/topology_handler.cpp12
13 files changed, 160 insertions, 235 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index f236ca4..6b27ab2 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -128,6 +128,24 @@ void Block::set_fixed_rate(const bool fixed_rate)
(*this)->block->enable_fixed_rate = fixed_rate;
}
+bool Block::fixed_rate(void) const
+{
+ return (*this)->block->enable_fixed_rate;
+}
+
+void Block::set_output_multiple(const size_t multiple)
+{
+ (*this)->block->output_multiple_items = multiple;
+ gnuradio::OutputPortConfig config = this->output_config();
+ config.reserve_items = multiple;
+ this->set_output_config(config);
+}
+
+size_t Block::output_multiple(void) const
+{
+ return (*this)->block->output_multiple_items;
+}
+
void Block::set_relative_rate(double relative_rate)
{
(*this)->block->relative_rate = relative_rate;
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index 9b70824..05b35ac 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -78,12 +78,6 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
{
MESSAGE_TRACER();
- if (this->block_state == BLOCK_STATE_DONE)
- {
- this->Send(0, from); //ACK
- return;
- }
-
//allocate output buffers which will also wake up the task
const size_t num_outputs = this->get_num_outputs();
this->output_buffer_tokens.resize(num_outputs);
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index 2c14643..ca42b45 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -27,12 +27,6 @@ void BlockActor::handle_top_active(
){
MESSAGE_TRACER();
- if (this->block_state == BLOCK_STATE_DONE)
- {
- this->Send(0, from); //ACK
- return;
- }
-
if (this->block_state != BLOCK_STATE_LIVE)
{
this->block_ptr->start();
@@ -50,10 +44,6 @@ void BlockActor::handle_top_inert(
){
MESSAGE_TRACER();
- if (this->block_state != BLOCK_STATE_DONE)
- {
- this->block_ptr->stop();
- }
this->mark_done();
this->Send(0, from); //ACK
@@ -65,12 +55,6 @@ void BlockActor::handle_top_token(
){
MESSAGE_TRACER();
- if (this->block_state == BLOCK_STATE_DONE)
- {
- this->Send(0, from); //ACK
- return;
- }
-
//create input tokens and send allocation hints
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
@@ -84,7 +68,7 @@ void BlockActor::handle_top_token(
//tell the upstream about the input requirements
OutputHintMessage output_hints;
output_hints.history_bytes = this->input_configs[i].lookahead_items*this->input_items_sizes[i];
- output_hints.reserve_bytes = this->input_reserve_items[i];
+ output_hints.reserve_bytes = size_t(std::ceil(this->output_multiple_items/this->relative_rate));
output_hints.token = this->input_tokens[i];
this->post_upstream(i, output_hints);
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index c1046c3..a291c01 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -25,6 +25,8 @@ void BlockActor::mark_done(void)
{
if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
+ this->block_ptr->stop();
+
//flush partial output buffers to the downstream
for (size_t i = 0; i < this->get_num_outputs(); i++)
{
@@ -71,6 +73,19 @@ void BlockActor::mark_done(void)
<< std::flush;
}
+GRAS_FORCE_INLINE void BlockActor::input_fail(const size_t i)
+{
+ //input failed, accumulate and try again
+ if (not this->input_queues.is_accumulated(i))
+ {
+ this->input_queues.accumulate(i, this->input_items_sizes[i]);
+ this->Push(SelfKickMessage(), Theron::Address());
+ return;
+ }
+ //otherwise check for done, else wait for more
+ if (this->inputs_done[i]) this->mark_done();
+}
+
void BlockActor::handle_task(void)
{
#ifdef WORK_DEBUG
@@ -102,16 +117,26 @@ void BlockActor::handle_task(void)
this->sort_tags(i);
ASSERT(this->input_queues.ready(i));
- bool potential_inline;
- const SBuffer buff = this->input_queues.front(i, this->enable_fixed_rate, potential_inline);
+ //this->input_queues.accumulate(i, this->input_items_sizes[i]);
+ const SBuffer &buff = this->input_queues.front(i);
void *mem = buff.get();
- const size_t items = buff.length/this->input_items_sizes[i];
+ size_t items = buff.length/this->input_items_sizes[i];
this->work_io_ptr_mask |= ptrdiff_t(mem);
this->input_items[i].get() = mem;
this->input_items[i].size() = items;
this->work_input_items[i] = mem;
this->work_ninput_items[i] = items;
+
+ if (this->enable_fixed_rate)
+ {
+ if (items <= this->input_configs[i].lookahead_items)
+ {
+ this->input_fail(i); return;
+ }
+ items -= this->input_configs[i].lookahead_items;
+ }
+
num_input_items = std::min(num_input_items, items);
this->consume_items[i] = 0;
this->consume_called[i] = false;
@@ -119,7 +144,7 @@ void BlockActor::handle_task(void)
//inline dealings, how and when input buffers can be inlined into output buffers
//continue;
if (
- potential_inline and
+ buff.unique() and
input_configs[i].inline_buffer and
output_inline_index < num_outputs and
buff.get_affinity() == this->buffer_affinity
@@ -175,6 +200,7 @@ void BlockActor::handle_task(void)
//------------------------------------------------------------------
//-- forecast
//------------------------------------------------------------------
+ //VAR(work_noutput_items);
if (this->forecast_enable)
{
forecast_again_you_jerk:
@@ -184,21 +210,24 @@ void BlockActor::handle_task(void)
{
if (fcast_ninput_items[i] <= work_ninput_items[i]) continue;
+ //handle the case of forecast failing
+ if (work_noutput_items <= this->output_multiple_items)
+ {
+ this->input_fail(i); return;
+ }
+
work_noutput_items = work_noutput_items/2; //backoff regime
work_noutput_items += this->output_multiple_items-1;
work_noutput_items /= this->output_multiple_items;
work_noutput_items *= this->output_multiple_items;
- if (work_noutput_items) goto forecast_again_you_jerk;
-
- //handle the case of forecast failing
- this->mark_done();
- return;
+ goto forecast_again_you_jerk;
}
}
//------------------------------------------------------------------
//-- the work
//------------------------------------------------------------------
+ //VAR(work_noutput_items);
this->work_ret = -1;
if (this->interruptible_thread)
{
@@ -209,6 +238,7 @@ void BlockActor::handle_task(void)
this->task_work();
}
const size_t noutput_items = size_t(work_ret);
+ //VAR(work_ret);
if (work_ret == Block::WORK_DONE)
{
diff --git a/lib/gr_block.cpp b/lib/gr_block.cpp
index 834bbd5..b4e7c04 100644
--- a/lib/gr_block.cpp
+++ b/lib/gr_block.cpp
@@ -80,7 +80,14 @@ bool gr_block::is_unaligned(void)
size_t gr_block::fixed_rate_noutput_to_ninput(const size_t noutput_items)
{
- return size_t((noutput_items/this->relative_rate()));
+ if (this->fixed_rate())
+ {
+ return size_t(0.5 + (noutput_items/this->relative_rate())) + this->history() - 1;
+ }
+ else
+ {
+ return noutput_items + this->history() - 1;
+ }
}
size_t gr_block::interpolation(void) const
@@ -91,6 +98,7 @@ size_t gr_block::interpolation(void) const
void gr_block::set_interpolation(const size_t interp)
{
this->set_relative_rate(1.0*interp);
+ this->set_output_multiple(interp);
}
size_t gr_block::decimation(void) const
@@ -118,19 +126,6 @@ void gr_block::set_history(unsigned history)
this->set_input_config(config);
}
-unsigned gr_block::output_multiple(void) const
-{
- return (*this)->block->output_multiple_items;
-}
-
-void gr_block::set_output_multiple(unsigned multiple)
-{
- (*this)->block->output_multiple_items = multiple;
- gnuradio::OutputPortConfig config = this->output_config();
- config.reserve_items = multiple;
- this->set_output_config(config);
-}
-
int gr_block::max_noutput_items(void) const
{
return this->output_config().maximum_items;
diff --git a/lib/gr_top_block.cpp b/lib/gr_top_block.cpp
index 6c023b1..c51f9b3 100644
--- a/lib/gr_top_block.cpp
+++ b/lib/gr_top_block.cpp
@@ -34,16 +34,6 @@ gr_top_block_sptr gr_make_top_block(const std::string &name)
return gr_top_block_sptr(new gr_top_block(name));
}
-void gr_top_block::lock(void)
-{
- //NOP
-}
-
-void gr_top_block::unlock(void)
-{
- this->commit();
-}
-
void gr_top_block::start(void)
{
gnuradio::TopBlock::start();
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 5dd7421..d32c105 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -110,11 +110,12 @@ struct BlockActor : Apology::Worker
void buffer_returner(const size_t index, SBuffer &buffer);
void mark_done(void);
void handle_task(void);
+ void input_fail(const size_t index);
void sort_tags(const size_t index);
void trim_tags(const size_t index);
GRAS_FORCE_INLINE bool any_inputs_done(void)
{
- if (this->inputs_done.none()) return false;
+ if (this->inputs_done.none() or this->input_queues.all_ready()) return false;
for (size_t i = 0; i < this->get_num_inputs(); i++)
{
if (this->inputs_done[i] and not this->input_queues.ready(i))
@@ -130,7 +131,6 @@ struct BlockActor : Apology::Worker
std::vector<size_t> output_items_sizes;
std::vector<InputPortConfig> input_configs;
std::vector<OutputPortConfig> output_configs;
- std::vector<size_t> input_reserve_items;
size_t output_multiple_items;
//keeps track of production
diff --git a/lib/gras_impl/input_buffer_queues.hpp b/lib/gras_impl/input_buffer_queues.hpp
index 5b610d1..d17c4f0 100644
--- a/lib/gras_impl/input_buffer_queues.hpp
+++ b/lib/gras_impl/input_buffer_queues.hpp
@@ -33,26 +33,44 @@ namespace gnuradio
struct InputBufferQueues
{
enum {MAX_QUEUE_SIZE = 128};
+ enum {MAX_AUX_BUFF_BYTES=(1<<16)};
~InputBufferQueues(void)
{
this->resize(0);
}
- void init(
- const std::vector<size_t> &input_history_items,
- const std::vector<size_t> &input_reserve_items,
- const std::vector<size_t> &input_item_sizes
- );
+ void update_history_bytes(const size_t i, const size_t hist_bytes);
//! Call to get an input buffer for work
- SBuffer front(const size_t i, const bool conserve_history, bool &potential_GRAS_FORCE_INLINE);
+ GRAS_FORCE_INLINE SBuffer &front(const size_t i)
+ {
+ ASSERT(not _queues[i].empty());
+ ASSERT(this->ready(i));
+
+ return _queues[i].front();
+ }
//! Call when input bytes consumed by work
void consume(const size_t i, const size_t bytes_consumed);
void resize(const size_t size);
+ void accumulate(const size_t i, const size_t item_size);
+
+ /*!
+ * Can we consider this queue's buffers to be accumulated?
+ * Either the first buffer holds all of the enqueued bytes
+ * or the first buffer is larger than we can accumulate.
+ */
+ GRAS_FORCE_INLINE bool is_accumulated(const size_t i) const
+ {
+ ASSERT(not _queues[i].empty());
+ return
+ (_queues[i].front().length == _enqueued_bytes[i]) or
+ (_queues[i].front().length >= MAX_AUX_BUFF_BYTES);
+ }
+
GRAS_FORCE_INLINE void push(const size_t i, const SBuffer &buffer)
{
ASSERT(not _queues[i].full());
@@ -94,21 +112,16 @@ struct InputBufferQueues
return _bitset.all();
}
- void __prepare(const size_t i);
-
GRAS_FORCE_INLINE void __update(const size_t i)
{
- _bitset.set(i, _enqueued_bytes[i] >= _reserve_bytes[i]);
+ _bitset.set(i, _enqueued_bytes[i] != 0);
}
BitSet _bitset;
std::vector<size_t> _enqueued_bytes;
std::vector<boost::circular_buffer<SBuffer> > _queues;
std::vector<size_t> _history_bytes;
- std::vector<size_t> _reserve_bytes;
- std::vector<size_t> _post_bytes;
std::vector<boost::shared_ptr<BufferQueue> > _aux_queues;
- std::vector<bool> _in_aux_buff;
};
@@ -118,152 +131,76 @@ GRAS_FORCE_INLINE void InputBufferQueues::resize(const size_t size)
_enqueued_bytes.resize(size, 0);
_queues.resize(size, boost::circular_buffer<SBuffer>(MAX_QUEUE_SIZE));
_history_bytes.resize(size, 0);
- _reserve_bytes.resize(size, 0);
- _post_bytes.resize(size, 0);
_aux_queues.resize(size);
- _in_aux_buff.resize(size, false);
-}
-
-GRAS_FORCE_INLINE void InputBufferQueues::init(
- const std::vector<size_t> &input_history_items,
- const std::vector<size_t> &input_reserve_items,
- const std::vector<size_t> &input_item_sizes
-){
- if (this->size() == 0) return;
-
- const size_t max_history_items = *std::max_element(input_history_items.begin(), input_history_items.end());
for (size_t i = 0; i < this->size(); i++)
{
- ASSERT(input_reserve_items[i] > 0);
-
+ if (_aux_queues[i]) continue;
_aux_queues[i] = boost::shared_ptr<BufferQueue>(new BufferQueue());
-
- //determine byte sizes for buffers and dealing with history
- const size_t old_history = _history_bytes[i];
- _history_bytes[i] = input_item_sizes[i]*input_history_items[i];
-
- //calculate the input reserve aka reserve size
- _reserve_bytes[i] = input_item_sizes[i]*input_reserve_items[i];
- _reserve_bytes[i] = std::max(size_t(1), _reserve_bytes[i]);
-
- //calculate the input reserve aka reserve size
- _reserve_bytes[i] = std::max(
- _history_bytes[i] + _reserve_bytes[i],
- _reserve_bytes[i]
- );
-
- //post bytes are the desired buffer size to escape the edge case
- _post_bytes[i] = std::max(
- input_item_sizes[i]*max_history_items + _reserve_bytes[i],
- _reserve_bytes[i]
- );
-
- //allocate mini buffers for history edge conditions
- size_t num_bytes = _post_bytes[i];
- _aux_queues[i]->allocate_one(num_bytes);
- _aux_queues[i]->allocate_one(num_bytes);
-
- //there is history, so enqueue some initial history
- if (_history_bytes[i] > old_history)
- {
- SBuffer buff = _aux_queues[i]->front();
- _aux_queues[i]->pop();
-
- const size_t delta = _history_bytes[i] - old_history;
- std::memset(buff.get_actual_memory(), 0, delta);
- buff.offset = 0;
- buff.length = delta;
-
- this->push(i, buff);
- _in_aux_buff[i] = true;
- }
- if (_history_bytes[i] < old_history)
- {
- size_t delta = old_history - _history_bytes[i];
- delta = std::min(delta, _enqueued_bytes[i]); //FIXME
- //TODO consume extra delta on push...? so we dont need std::min
- this->consume(i, delta);
- }
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
+ _aux_queues[i]->allocate_one(MAX_AUX_BUFF_BYTES);
}
-}
+}
-GRAS_FORCE_INLINE SBuffer InputBufferQueues::front(const size_t i, const bool conserve_history, bool &potential_inline)
+inline void InputBufferQueues::update_history_bytes(const size_t i, const size_t hist_bytes)
{
- //if (_queues[i].empty()) return BuffInfo();
-
- ASSERT(not _queues[i].empty());
- ASSERT(this->ready(i));
- __prepare(i);
- ASSERT(_queues[i].front().length >= _history_bytes[i]);
- SBuffer &front = _queues[i].front();
- const bool unique = front.unique();
+ //there is history, so enqueue some initial history
+ if (hist_bytes > _history_bytes[i])
+ {
+ SBuffer buff = _aux_queues[i]->front();
+ _aux_queues[i]->pop();
- //same buffer, different offset and length
- SBuffer buff = front;
- if (conserve_history) buff.length -= _history_bytes[i];
+ const size_t delta = hist_bytes - _history_bytes[i];
+ std::memset(buff.get_actual_memory(), 0, delta);
+ buff.offset = 0;
+ buff.length = delta;
- //set the flag that this buffer *might* be inlined as an output buffer
- potential_inline = unique and (buff.length == front.length);
+ this->push(i, buff);
+ }
+ if (hist_bytes < _history_bytes[i])
+ {
+ size_t delta = _history_bytes[i] - hist_bytes;
+ delta = std::min(delta, _enqueued_bytes[i]); //FIXME
+ //TODO consume extra delta on push...? so we dont need std::min
+ this->consume(i, delta);
+ }
- return buff;
+ _history_bytes[i] = hist_bytes;
}
-GRAS_FORCE_INLINE void InputBufferQueues::__prepare(const size_t i)
+GRAS_FORCE_INLINE void InputBufferQueues::accumulate(const size_t i, const size_t item_size)
{
- //HERE();
- //assumes that we are always pushing proper history buffs on front
- //ASSERT(_queues[i].front().length >= _history_bytes[i]);
+ ASSERT(not _aux_queues[i]->empty());
+ SBuffer accum_buff = _aux_queues[i]->front();
+ _aux_queues[i]->pop();
+ accum_buff.offset = 0;
+ accum_buff.length = 0;
- while (_queues[i].front().length < _reserve_bytes[i])
+ size_t free_bytes = accum_buff.get_actual_length();
+ free_bytes /= item_size; free_bytes *= item_size;
+
+ while (not _queues[i].empty() and free_bytes != 0)
{
SBuffer &front = _queues[i].front();
- SBuffer dst;
-
- //do we need a new buffer:
- //- is the buffer unique (queue has only reference)?
- //- can its remaining space meet reserve requirements?
- const bool enough_space = front.get_actual_length() >= _reserve_bytes[i] + front.offset;
- if (enough_space and front.unique())
- {
- dst = _queues[i].front();
- _queues[i].pop_front();
- }
- else
- {
- dst = _aux_queues[i]->front();
- _aux_queues[i]->pop();
- dst.offset = 0;
- dst.length = 0;
- _in_aux_buff[i] = true;
- }
-
- SBuffer src = _queues[i].front();
- _queues[i].pop_front();
- const size_t dst_tail = dst.get_actual_length() - (dst.offset + dst.length);
- const size_t bytes = std::min(dst_tail, src.length);
- //const size_t bytes = std::min(std::min(dst_tail, src.length), _post_bytes[i]);
- std::memcpy(dst.get(dst.length), src.get(), bytes);
-
- //update buffer additions, consumptions
- dst.length += bytes;
- src.offset += bytes;
- src.length -= bytes;
-
- //keep the source buffer if not fully consumed
- if (src.length) _queues[i].push_front(src);
-
- //destination buffer is the new front of the queue
- _queues[i].push_front(dst);
+ const size_t bytes = std::min(front.length, free_bytes);
+ std::memcpy(accum_buff.get(accum_buff.length), front.get(), bytes);
+ accum_buff.length += bytes;
+ free_bytes -= bytes;
+ front.length -= bytes;
+ front.offset += bytes;
+ if (front.length == 0) _queues[i].pop_front();
}
-}
+ _queues[i].push_front(accum_buff);
+
+ ASSERT(this->is_accumulated(i));
+}
GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t bytes_consumed)
{
- //if (bytes_consumed == 0) return true;
-
//assert that we dont consume past the bounds of the buffer
ASSERT(_queues[i].front().length >= bytes_consumed);
@@ -271,43 +208,13 @@ GRAS_FORCE_INLINE void InputBufferQueues::consume(const size_t i, const size_t b
_queues[i].front().offset += bytes_consumed;
_queues[i].front().length -= bytes_consumed;
- //safe to pop here when the buffer is consumed and no history
- if (_queues[i].front().length == 0 and _history_bytes[i] == 0)
- {
- _queues[i].pop_front();
- }
-
- else if (_in_aux_buff[i] and _queues[i].front().offset >= 2*_history_bytes[i])
- {
- const SBuffer buff = _queues[i].front();
- _queues[i].pop_front();
-
- if (_queues[i].empty())
- {
- _queues[i].push_front(buff);
- }
- else
- {
- _in_aux_buff[i] = false;
- const size_t residual = buff.length;
- _queues[i].front().length += residual;
- _queues[i].front().offset -= residual;
- }
- }
+ ASSERT(_queues[i].front().offset <= _queues[i].front().get_actual_length());
//update the number of bytes in this queue
ASSERT(_enqueued_bytes[i] >= bytes_consumed);
_enqueued_bytes[i] -= bytes_consumed;
- //we have consumed the history, change reqs
- if (_enqueued_bytes[i] < _history_bytes[i])
- {
- _history_bytes[i] = 0;
- _reserve_bytes[i] = 1; //cant be 0
- }
-
__update(i);
-
}
} //namespace gnuradio
diff --git a/lib/gras_impl/output_buffer_queues.hpp b/lib/gras_impl/output_buffer_queues.hpp
index a740b2d..1f07af0 100644
--- a/lib/gras_impl/output_buffer_queues.hpp
+++ b/lib/gras_impl/output_buffer_queues.hpp
@@ -76,9 +76,7 @@ struct OutputBufferQueues
GRAS_FORCE_INLINE void flush_all(void)
{
- _queues.clear();
- _queues.resize(_bitset.size());
- _bitset.reset();
+ for (size_t i = 0; i < this->size(); i++) this->flush(i);
}
GRAS_FORCE_INLINE bool ready(const size_t i) const
diff --git a/lib/hier_block.cpp b/lib/hier_block.cpp
index ef29cc2..422ed77 100644
--- a/lib/hier_block.cpp
+++ b/lib/hier_block.cpp
@@ -76,3 +76,8 @@ void HierBlock::disconnect_all(void)
{
(*this)->topology->clear_all();
}
+
+void HierBlock::commit(void)
+{
+ (*this)->topology->commit();
+}
diff --git a/lib/input_handlers.cpp b/lib/input_handlers.cpp
index 574aee4..30bb613 100644
--- a/lib/input_handlers.cpp
+++ b/lib/input_handlers.cpp
@@ -60,6 +60,11 @@ void BlockActor::handle_input_check(const InputCheckMessage &message, const Ther
{
this->mark_done();
}
+ //or re-enter handle task so forecast logic can mark done
+ else
+ {
+ this->handle_task();
+ }
}
void BlockActor::handle_input_alloc(const InputAllocMessage &message, const Theron::Address)
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index bcce1bf..a09744e 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -47,6 +47,8 @@ TopBlock::TopBlock(const std::string &name):
void ElementImpl::top_block_cleanup(void)
{
this->executor->post_all(TopInertMessage());
+ this->topology->clear_all();;
+ this->executor->commit();
if (ARMAGEDDON) std::cerr
<< "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"
<< "xx Top Block Destroyed: " << name << "\n"
@@ -105,6 +107,7 @@ void TopBlock::run(void)
{
this->start();
this->wait();
+ this->disconnect_all();
}
GRAS_FORCE_INLINE void wait_thread_yield(void)
diff --git a/lib/topology_handler.cpp b/lib/topology_handler.cpp
index cc57ed6..5ad9378 100644
--- a/lib/topology_handler.cpp
+++ b/lib/topology_handler.cpp
@@ -106,18 +106,14 @@ void BlockActor::handle_update_inputs(
const UpdateInputsMessage &,
const Theron::Address
){
+ MESSAGE_TRACER();
const size_t num_inputs = this->get_num_inputs();
+ this->input_queues.resize(num_inputs);
//impose input reserve requirements based on relative rate and output multiple
- resize_fill_grow(this->input_reserve_items, num_inputs, 1);
- std::vector<size_t> input_lookahead_items(num_inputs);
for (size_t i = 0; i < num_inputs; i++)
{
- input_lookahead_items[i] = this->input_configs[i].lookahead_items;
- this->input_reserve_items[i] = size_t(std::ceil(this->output_multiple_items/this->relative_rate));
- if (this->input_reserve_items[i] == 0) this->input_reserve_items[i] = 1;
+ const size_t hist_bytes = this->input_items_sizes[i]*this->input_configs[i].lookahead_items;
+ this->input_queues.update_history_bytes(i, hist_bytes);
}
-
- //init the history comprehension on input queues
- this->input_queues.init(input_lookahead_items, this->input_reserve_items, this->input_items_sizes);
}