diff options
author | Josh Blum | 2012-08-30 20:22:44 -0700 |
---|---|---|
committer | Josh Blum | 2012-08-30 20:22:44 -0700 |
commit | a648f0970230203f05a434dba903e6a4a5a08d53 (patch) | |
tree | 9e48ccac26a85dec161f2d8806bf5c1cf650e016 /lib | |
parent | 44fb4c04d2ecda3f60fc4f7e31da64cce9ca7a6d (diff) | |
download | sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.tar.gz sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.tar.bz2 sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.zip |
checking in small fixes and message work
Diffstat (limited to 'lib')
-rw-r--r-- | lib/block.cpp | 3 | ||||
-rw-r--r-- | lib/block_handlers.cpp | 51 | ||||
-rw-r--r-- | lib/block_ports.cpp | 8 | ||||
-rw-r--r-- | lib/block_task.cpp | 31 | ||||
-rw-r--r-- | lib/common_impl.hpp | 9 | ||||
-rw-r--r-- | lib/element_impl.hpp | 21 | ||||
-rw-r--r-- | lib/top_block.cpp | 20 |
7 files changed, 104 insertions, 39 deletions
diff --git a/lib/block.cpp b/lib/block.cpp index 4c7f113..8329d53 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -43,8 +43,7 @@ Block::Block(const std::string &name): (*this)->block_ptr = this; (*this)->hint = 0; - (*this)->active = false; - (*this)->done = false; + (*this)->block_state = ElementImpl::BLOCK_STATE_INIT; } template <typename V, typename T> diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp index c3a6aff..4630d05 100644 --- a/lib/block_handlers.cpp +++ b/lib/block_handlers.cpp @@ -19,58 +19,69 @@ using namespace gnuradio; -void ElementImpl::handle_block_msg(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state) +void ElementImpl::handle_block_msg(const tsbe::TaskInterface &task_iface, const tsbe::Wax &msg) { - if (state.type() == typeid(BufferReturnMessage)) + std::cout << "handle_block_msg in " << name << std::endl; + + if (msg.type() == typeid(BufferReturnMessage)) { - const BufferReturnMessage &message = state.cast<BufferReturnMessage>(); + const BufferReturnMessage &message = msg.cast<BufferReturnMessage>(); this->handle_output_msg(task_iface, message.index, message.buffer); return; } + //TODO: generate a message to handle task in a loop for a while + //we may need to call it into exhaustion to be correct + //but dont call it from update, let the settings above sink in + if (msg.type() == typeid(SelfKickMessage)) + { + this->handle_task(task_iface); + return; + } + + ASSERT(msg.type() == typeid(TopBlockMessage)); + const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); //allocate output tokens and send them downstream - if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) + if (msg.cast<TopBlockMessage>().what == TopBlockMessage::TOKEN_TIME) { - this->input_tokens.resize(num_inputs); for (size_t i = 0; i < num_inputs; i++) { this->input_tokens[i] = Token::make(); task_iface.post_upstream(i, this->input_tokens[i]); } - this->output_tokens.resize(num_outputs); for (size_t i = 0; i < num_outputs; i++) { this->output_tokens[i] = Token::make(); task_iface.post_downstream(i, this->output_tokens[i]); } + this->token_pool.insert(msg.cast<TopBlockMessage>().token); } - if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) + if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ALLOCATE) { - this->active = true; - this->done = false; - this->token_pool.insert(state.cast<TopBlockMessage>().token); - //causes initial processing kick-off for source blocks this->handle_allocation(task_iface); } - if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT) + + if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE) { - this->mark_done(task_iface); + this->block_state = BLOCK_STATE_LIVE; + if (this->all_io_ready()) this->block.post_msg(SelfKickMessage()); } - //TODO: generate a message to handle task in a loop for a while - //we may need to call it into exhaustion to be correct - //but dont call it from update, let the settings above sink in - this->handle_task(task_iface); - this->handle_task(task_iface); + if (msg.cast<TopBlockMessage>().what == TopBlockMessage::INERT) + { + this->mark_done(task_iface); + } } void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) { + std::cout << "topology_update in " << name << std::endl; + const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); @@ -100,6 +111,9 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) this->inputs_ready.resize(num_inputs); this->outputs_ready.resize(num_outputs); + this->input_tokens.resize(num_inputs); + this->output_tokens.resize(num_outputs); + //resize tags vector to match sizes this->input_tags_changed.resize(num_inputs); this->input_tags.resize(num_inputs); @@ -123,6 +137,7 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface) //TODO: think more about this: if (num_inputs == 0 and num_outputs == 0) { + HERE(); this->mark_done(task_iface); } } diff --git a/lib/block_ports.cpp b/lib/block_ports.cpp index 3b3aec6..58b3366 100644 --- a/lib/block_ports.cpp +++ b/lib/block_ports.cpp @@ -20,9 +20,11 @@ using namespace gnuradio; void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) { + std::cout << "handle_input_msg in " << name << std::endl; + if (msg.type() == typeid(tsbe::Buffer)) { - if (this->done) return; + if (this->block_state == BLOCK_STATE_DONE) return; this->input_queues[index].push(msg.cast<tsbe::Buffer>()); this->inputs_ready.set(index, true); this->handle_task(handle); @@ -51,9 +53,11 @@ void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg) { + std::cout << "handle_output_msg in " << name << std::endl; + if (msg.type() == typeid(tsbe::Buffer)) { - if (this->done) return; + if (this->block_state == BLOCK_STATE_DONE) return; this->output_queues[index].push(msg.cast<tsbe::Buffer>()); this->outputs_ready.set(index, true); this->handle_task(handle); diff --git a/lib/block_task.cpp b/lib/block_task.cpp index faf8098..3ec9113 100644 --- a/lib/block_task.cpp +++ b/lib/block_task.cpp @@ -22,11 +22,10 @@ using namespace gnuradio; void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) { - if (this->done) return; //can re-enter checking done first + if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first //mark down the new state - this->active = false; - this->done = true; + this->block_state = BLOCK_STATE_DONE; //release upstream, downstream, and executor tokens this->token_pool.clear(); @@ -50,7 +49,11 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface) task_iface.post_downstream(i, CheckTokensMessage()); } - std::cout << "This one: " << name << " is done..." << std::endl; + std::cout + << "==================================================\n" + << "== The " << name << " is done...\n" + << "==================================================\n" + << std::flush; } void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) @@ -60,13 +63,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Handle task may get called for incoming buffers, //-- however, not all ports may have available buffers. //------------------------------------------------------------------ - const bool all_inputs_ready = (~this->inputs_ready).none(); - const bool all_outputs_ready = (~this->outputs_ready).none(); - if (not (this->active and all_inputs_ready and all_outputs_ready)) return; + if (not (this->block_state == BLOCK_STATE_LIVE and this->all_io_ready())) return; const size_t num_inputs = task_iface.get_num_inputs(); const size_t num_outputs = task_iface.get_num_outputs(); - const bool is_source = (num_inputs == 0); + //const bool is_source = (num_inputs == 0); //------------------------------------------------------------------ //-- sort the input tags before working @@ -83,7 +84,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //-- Processing time! //------------------------------------------------------------------ - //std::cout << "calling work on " << name << std::endl; + std::cout << "calling work on " << name << std::endl; //reset work trackers for production/consumption size_t input_tokens_count = 0; @@ -93,12 +94,15 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) //this->consume_items[i] = 0; ASSERT(this->input_history_items[i] == 0); + ASSERT(not this->input_queues[i].empty()); const tsbe::Buffer &buff = this->input_queues[i].front(); char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i]; const size_t bytes = buff.get_length() - this->input_buff_offsets[i]; const size_t items = bytes/this->input_items_sizes[i]; + ASSERT(this->input_buff_offsets[i] < buff.get_length()); + this->input_items[i]._mem = mem; this->input_items[i]._len = items; this->work_input_items[i] = mem; @@ -112,6 +116,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) output_tokens_count += this->output_tokens[i].use_count(); //this->produce_items[i] = 0; + ASSERT(not this->output_queues[i].empty()); + const tsbe::Buffer &buff = this->output_queues[i].front(); char *mem = ((char *)buff.get_memory()); const size_t bytes = buff.get_length(); @@ -154,7 +160,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) const size_t bytes = items*this->input_items_sizes[i]; this->input_buff_offsets[i] += bytes; tsbe::Buffer &buff = this->input_queues[i].front(); - if (buff.get_length() >= this->input_buff_offsets[i]) + + if (buff.get_length() <= this->input_buff_offsets[i]) { this->input_queues[i].pop(); this->inputs_ready.set(i, not this->input_queues[i].empty()); @@ -245,4 +252,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface) } this->output_tags[i].clear(); } + + //create a self-kick so we get called again + //TODO, we could just steal this thread context... + if (this->all_io_ready()) this->block.post_msg(SelfKickMessage()); } diff --git a/lib/common_impl.hpp b/lib/common_impl.hpp index c6c3e93..e34e999 100644 --- a/lib/common_impl.hpp +++ b/lib/common_impl.hpp @@ -25,7 +25,7 @@ #define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush; #define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush; -#define ASSERT(x) if(not (x)){HERE(); std::cerr << "assert failed: " << #x << std::endl << std::flush;} +#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;} static inline unsigned long myulround(const double x) { @@ -56,9 +56,11 @@ struct TopBlockMessage { enum { + ALLOCATE, ACTIVE, INERT, HINT, + TOKEN_TIME, } what; size_t hint; Token token; @@ -69,6 +71,11 @@ struct CheckTokensMessage //empty }; +struct SelfKickMessage +{ + //empty +}; + struct BufferReturnMessage { size_t index; diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp index e24e1d5..a8958c8 100644 --- a/lib/element_impl.hpp +++ b/lib/element_impl.hpp @@ -40,6 +40,12 @@ struct ElementImpl ~ElementImpl(void) { + if (this->executor) + { + TopBlockMessage event; + event.what = TopBlockMessage::INERT; + this->executor.post_msg(event); + } children.clear(); } @@ -121,9 +127,20 @@ struct ElementImpl void mark_done(const tsbe::TaskInterface &); void buffer_returner(const size_t index, tsbe::Buffer &buffer); + inline bool all_io_ready(void) + { + const bool all_inputs_ready = (~this->inputs_ready).none(); + const bool all_outputs_ready = (~this->outputs_ready).none(); + return all_inputs_ready and all_outputs_ready; + } + //is the fg running? - bool active; - bool done; + enum + { + BLOCK_STATE_INIT, + BLOCK_STATE_LIVE, + BLOCK_STATE_DONE, + } block_state; Token token; size_t hint; //some kind of allocation hint diff --git a/lib/top_block.cpp b/lib/top_block.cpp index 9d7a2f0..cf450b5 100644 --- a/lib/top_block.cpp +++ b/lib/top_block.cpp @@ -53,10 +53,22 @@ void TopBlock::set_buffer_hint(const size_t hint) void TopBlock::start(void) { (*this)->executor.commit(); - TopBlockMessage event; - event.what = TopBlockMessage::ACTIVE; - event.token = (*this)->token; - (*this)->executor.post_msg(event); + { + TopBlockMessage event; + event.what = TopBlockMessage::TOKEN_TIME; + event.token = (*this)->token; + (*this)->executor.post_msg(event); + } + { + TopBlockMessage event; + event.what = TopBlockMessage::ALLOCATE; + (*this)->executor.post_msg(event); + } + { + TopBlockMessage event; + event.what = TopBlockMessage::ACTIVE; + (*this)->executor.post_msg(event); + } } void TopBlock::stop(void) |