summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-08-30 20:22:44 -0700
committerJosh Blum2012-08-30 20:22:44 -0700
commita648f0970230203f05a434dba903e6a4a5a08d53 (patch)
tree9e48ccac26a85dec161f2d8806bf5c1cf650e016 /lib
parent44fb4c04d2ecda3f60fc4f7e31da64cce9ca7a6d (diff)
downloadsandhi-a648f0970230203f05a434dba903e6a4a5a08d53.tar.gz
sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.tar.bz2
sandhi-a648f0970230203f05a434dba903e6a4a5a08d53.zip
checking in small fixes and message work
Diffstat (limited to 'lib')
-rw-r--r--lib/block.cpp3
-rw-r--r--lib/block_handlers.cpp51
-rw-r--r--lib/block_ports.cpp8
-rw-r--r--lib/block_task.cpp31
-rw-r--r--lib/common_impl.hpp9
-rw-r--r--lib/element_impl.hpp21
-rw-r--r--lib/top_block.cpp20
7 files changed, 104 insertions, 39 deletions
diff --git a/lib/block.cpp b/lib/block.cpp
index 4c7f113..8329d53 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -43,8 +43,7 @@ Block::Block(const std::string &name):
(*this)->block_ptr = this;
(*this)->hint = 0;
- (*this)->active = false;
- (*this)->done = false;
+ (*this)->block_state = ElementImpl::BLOCK_STATE_INIT;
}
template <typename V, typename T>
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index c3a6aff..4630d05 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -19,58 +19,69 @@
using namespace gnuradio;
-void ElementImpl::handle_block_msg(const tsbe::TaskInterface &task_iface, const tsbe::Wax &state)
+void ElementImpl::handle_block_msg(const tsbe::TaskInterface &task_iface, const tsbe::Wax &msg)
{
- if (state.type() == typeid(BufferReturnMessage))
+ std::cout << "handle_block_msg in " << name << std::endl;
+
+ if (msg.type() == typeid(BufferReturnMessage))
{
- const BufferReturnMessage &message = state.cast<BufferReturnMessage>();
+ const BufferReturnMessage &message = msg.cast<BufferReturnMessage>();
this->handle_output_msg(task_iface, message.index, message.buffer);
return;
}
+ //TODO: generate a message to handle task in a loop for a while
+ //we may need to call it into exhaustion to be correct
+ //but dont call it from update, let the settings above sink in
+ if (msg.type() == typeid(SelfKickMessage))
+ {
+ this->handle_task(task_iface);
+ return;
+ }
+
+ ASSERT(msg.type() == typeid(TopBlockMessage));
+
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
//allocate output tokens and send them downstream
- if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
+ if (msg.cast<TopBlockMessage>().what == TopBlockMessage::TOKEN_TIME)
{
- this->input_tokens.resize(num_inputs);
for (size_t i = 0; i < num_inputs; i++)
{
this->input_tokens[i] = Token::make();
task_iface.post_upstream(i, this->input_tokens[i]);
}
- this->output_tokens.resize(num_outputs);
for (size_t i = 0; i < num_outputs; i++)
{
this->output_tokens[i] = Token::make();
task_iface.post_downstream(i, this->output_tokens[i]);
}
+ this->token_pool.insert(msg.cast<TopBlockMessage>().token);
}
- if (state.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
+ if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ALLOCATE)
{
- this->active = true;
- this->done = false;
- this->token_pool.insert(state.cast<TopBlockMessage>().token);
-
//causes initial processing kick-off for source blocks
this->handle_allocation(task_iface);
}
- if (state.cast<TopBlockMessage>().what == TopBlockMessage::INERT)
+
+ if (msg.cast<TopBlockMessage>().what == TopBlockMessage::ACTIVE)
{
- this->mark_done(task_iface);
+ this->block_state = BLOCK_STATE_LIVE;
+ if (this->all_io_ready()) this->block.post_msg(SelfKickMessage());
}
- //TODO: generate a message to handle task in a loop for a while
- //we may need to call it into exhaustion to be correct
- //but dont call it from update, let the settings above sink in
- this->handle_task(task_iface);
- this->handle_task(task_iface);
+ if (msg.cast<TopBlockMessage>().what == TopBlockMessage::INERT)
+ {
+ this->mark_done(task_iface);
+ }
}
void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
{
+ std::cout << "topology_update in " << name << std::endl;
+
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
@@ -100,6 +111,9 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
this->inputs_ready.resize(num_inputs);
this->outputs_ready.resize(num_outputs);
+ this->input_tokens.resize(num_inputs);
+ this->output_tokens.resize(num_outputs);
+
//resize tags vector to match sizes
this->input_tags_changed.resize(num_inputs);
this->input_tags.resize(num_inputs);
@@ -123,6 +137,7 @@ void ElementImpl::topology_update(const tsbe::TaskInterface &task_iface)
//TODO: think more about this:
if (num_inputs == 0 and num_outputs == 0)
{
+ HERE();
this->mark_done(task_iface);
}
}
diff --git a/lib/block_ports.cpp b/lib/block_ports.cpp
index 3b3aec6..58b3366 100644
--- a/lib/block_ports.cpp
+++ b/lib/block_ports.cpp
@@ -20,9 +20,11 @@ using namespace gnuradio;
void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg)
{
+ std::cout << "handle_input_msg in " << name << std::endl;
+
if (msg.type() == typeid(tsbe::Buffer))
{
- if (this->done) return;
+ if (this->block_state == BLOCK_STATE_DONE) return;
this->input_queues[index].push(msg.cast<tsbe::Buffer>());
this->inputs_ready.set(index, true);
this->handle_task(handle);
@@ -51,9 +53,11 @@ void ElementImpl::handle_input_msg(const tsbe::TaskInterface &handle, const size
void ElementImpl::handle_output_msg(const tsbe::TaskInterface &handle, const size_t index, const tsbe::Wax &msg)
{
+ std::cout << "handle_output_msg in " << name << std::endl;
+
if (msg.type() == typeid(tsbe::Buffer))
{
- if (this->done) return;
+ if (this->block_state == BLOCK_STATE_DONE) return;
this->output_queues[index].push(msg.cast<tsbe::Buffer>());
this->outputs_ready.set(index, true);
this->handle_task(handle);
diff --git a/lib/block_task.cpp b/lib/block_task.cpp
index faf8098..3ec9113 100644
--- a/lib/block_task.cpp
+++ b/lib/block_task.cpp
@@ -22,11 +22,10 @@ using namespace gnuradio;
void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
{
- if (this->done) return; //can re-enter checking done first
+ if (this->block_state == BLOCK_STATE_DONE) return; //can re-enter checking done first
//mark down the new state
- this->active = false;
- this->done = true;
+ this->block_state = BLOCK_STATE_DONE;
//release upstream, downstream, and executor tokens
this->token_pool.clear();
@@ -50,7 +49,11 @@ void ElementImpl::mark_done(const tsbe::TaskInterface &task_iface)
task_iface.post_downstream(i, CheckTokensMessage());
}
- std::cout << "This one: " << name << " is done..." << std::endl;
+ std::cout
+ << "==================================================\n"
+ << "== The " << name << " is done...\n"
+ << "==================================================\n"
+ << std::flush;
}
void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
@@ -60,13 +63,11 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- Handle task may get called for incoming buffers,
//-- however, not all ports may have available buffers.
//------------------------------------------------------------------
- const bool all_inputs_ready = (~this->inputs_ready).none();
- const bool all_outputs_ready = (~this->outputs_ready).none();
- if (not (this->active and all_inputs_ready and all_outputs_ready)) return;
+ if (not (this->block_state == BLOCK_STATE_LIVE and this->all_io_ready())) return;
const size_t num_inputs = task_iface.get_num_inputs();
const size_t num_outputs = task_iface.get_num_outputs();
- const bool is_source = (num_inputs == 0);
+ //const bool is_source = (num_inputs == 0);
//------------------------------------------------------------------
//-- sort the input tags before working
@@ -83,7 +84,7 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//-- Processing time!
//------------------------------------------------------------------
- //std::cout << "calling work on " << name << std::endl;
+ std::cout << "calling work on " << name << std::endl;
//reset work trackers for production/consumption
size_t input_tokens_count = 0;
@@ -93,12 +94,15 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
//this->consume_items[i] = 0;
ASSERT(this->input_history_items[i] == 0);
+ ASSERT(not this->input_queues[i].empty());
const tsbe::Buffer &buff = this->input_queues[i].front();
char *mem = ((char *)buff.get_memory()) + this->input_buff_offsets[i];
const size_t bytes = buff.get_length() - this->input_buff_offsets[i];
const size_t items = bytes/this->input_items_sizes[i];
+ ASSERT(this->input_buff_offsets[i] < buff.get_length());
+
this->input_items[i]._mem = mem;
this->input_items[i]._len = items;
this->work_input_items[i] = mem;
@@ -112,6 +116,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
output_tokens_count += this->output_tokens[i].use_count();
//this->produce_items[i] = 0;
+ ASSERT(not this->output_queues[i].empty());
+
const tsbe::Buffer &buff = this->output_queues[i].front();
char *mem = ((char *)buff.get_memory());
const size_t bytes = buff.get_length();
@@ -154,7 +160,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
const size_t bytes = items*this->input_items_sizes[i];
this->input_buff_offsets[i] += bytes;
tsbe::Buffer &buff = this->input_queues[i].front();
- if (buff.get_length() >= this->input_buff_offsets[i])
+
+ if (buff.get_length() <= this->input_buff_offsets[i])
{
this->input_queues[i].pop();
this->inputs_ready.set(i, not this->input_queues[i].empty());
@@ -245,4 +252,8 @@ void ElementImpl::handle_task(const tsbe::TaskInterface &task_iface)
}
this->output_tags[i].clear();
}
+
+ //create a self-kick so we get called again
+ //TODO, we could just steal this thread context...
+ if (this->all_io_ready()) this->block.post_msg(SelfKickMessage());
}
diff --git a/lib/common_impl.hpp b/lib/common_impl.hpp
index c6c3e93..e34e999 100644
--- a/lib/common_impl.hpp
+++ b/lib/common_impl.hpp
@@ -25,7 +25,7 @@
#define HERE() std::cerr << __FILE__ << ":" << __LINE__ << std::endl << std::flush;
#define VAR(x) std::cerr << #x << " = " << (x) << std::endl << std::flush;
-#define ASSERT(x) if(not (x)){HERE(); std::cerr << "assert failed: " << #x << std::endl << std::flush;}
+#define ASSERT(x) if(not (x)){std::cerr << "ASSERT FAIL " << __FILE__ << ":" << __LINE__ << "\n\t" << #x << std::endl << std::flush;}
static inline unsigned long myulround(const double x)
{
@@ -56,9 +56,11 @@ struct TopBlockMessage
{
enum
{
+ ALLOCATE,
ACTIVE,
INERT,
HINT,
+ TOKEN_TIME,
} what;
size_t hint;
Token token;
@@ -69,6 +71,11 @@ struct CheckTokensMessage
//empty
};
+struct SelfKickMessage
+{
+ //empty
+};
+
struct BufferReturnMessage
{
size_t index;
diff --git a/lib/element_impl.hpp b/lib/element_impl.hpp
index e24e1d5..a8958c8 100644
--- a/lib/element_impl.hpp
+++ b/lib/element_impl.hpp
@@ -40,6 +40,12 @@ struct ElementImpl
~ElementImpl(void)
{
+ if (this->executor)
+ {
+ TopBlockMessage event;
+ event.what = TopBlockMessage::INERT;
+ this->executor.post_msg(event);
+ }
children.clear();
}
@@ -121,9 +127,20 @@ struct ElementImpl
void mark_done(const tsbe::TaskInterface &);
void buffer_returner(const size_t index, tsbe::Buffer &buffer);
+ inline bool all_io_ready(void)
+ {
+ const bool all_inputs_ready = (~this->inputs_ready).none();
+ const bool all_outputs_ready = (~this->outputs_ready).none();
+ return all_inputs_ready and all_outputs_ready;
+ }
+
//is the fg running?
- bool active;
- bool done;
+ enum
+ {
+ BLOCK_STATE_INIT,
+ BLOCK_STATE_LIVE,
+ BLOCK_STATE_DONE,
+ } block_state;
Token token;
size_t hint; //some kind of allocation hint
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 9d7a2f0..cf450b5 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -53,10 +53,22 @@ void TopBlock::set_buffer_hint(const size_t hint)
void TopBlock::start(void)
{
(*this)->executor.commit();
- TopBlockMessage event;
- event.what = TopBlockMessage::ACTIVE;
- event.token = (*this)->token;
- (*this)->executor.post_msg(event);
+ {
+ TopBlockMessage event;
+ event.what = TopBlockMessage::TOKEN_TIME;
+ event.token = (*this)->token;
+ (*this)->executor.post_msg(event);
+ }
+ {
+ TopBlockMessage event;
+ event.what = TopBlockMessage::ALLOCATE;
+ (*this)->executor.post_msg(event);
+ }
+ {
+ TopBlockMessage event;
+ event.what = TopBlockMessage::ACTIVE;
+ (*this)->executor.post_msg(event);
+ }
}
void TopBlock::stop(void)