summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/CMakeLists.txt1
-rw-r--r--lib/block.cpp1
-rw-r--r--lib/block_message.cpp25
-rw-r--r--lib/gras_impl/block_actor.hpp4
-rw-r--r--lib/gras_impl/debug.hpp2
-rw-r--r--lib/serialize_types.cpp74
-rw-r--r--lib/tag_handlers.hpp2
-rw-r--r--lib/task_fail.cpp4
8 files changed, 108 insertions, 5 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index bb74dc9..e721769 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -65,6 +65,7 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/top_block_query.cpp
${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp
${CMAKE_CURRENT_SOURCE_DIR}/weak_container.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/serialize_types.cpp
)
########################################################################
diff --git a/lib/block.cpp b/lib/block.cpp
index 0015f10..75af20c 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -14,6 +14,7 @@ InputPortConfig::InputPortConfig(void)
maximum_items = 0;
inline_buffer = false;
preload_items = 0;
+ force_done = true;
}
OutputPortConfig::OutputPortConfig(void)
diff --git a/lib/block_message.cpp b/lib/block_message.cpp
index 43dc7b4..eae0c68 100644
--- a/lib/block_message.cpp
+++ b/lib/block_message.cpp
@@ -47,3 +47,28 @@ void Block::propagate_tags(const size_t i, const TagIter &iter)
}
}
}
+
+void Block::post_input_tag(const size_t which_input, const Tag &tag)
+{
+ InputTagMessage message(tag);
+ message.index = which_input;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}
+
+void Block::post_input_msg(const size_t which_input, const PMCC &msg)
+{
+ InputMsgMessage message(msg);
+ message.index = which_input;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}
+
+void Block::post_input_buffer(const size_t which_input, const SBuffer &buffer)
+{
+ InputBufferMessage message;
+ message.index = which_input;
+ message.buffer = buffer;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 502f974..170ee1f 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -184,7 +184,9 @@ GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i)
GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i)
{
- return this->inputs_done[i] and not this->inputs_available[i];
+ const bool force_done = this->input_configs[i].force_done;
+ if GRAS_LIKELY(force_done) return this->inputs_done[i] and not this->inputs_available[i];
+ return this->inputs_done.all() and this->inputs_available.none();
}
GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index 7934a1e..52c2e16 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -63,7 +63,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
throw std::runtime_error(std::string("ASSERT FAIL ") + #x); \
}}
#else
-#define ASSERT(x)
+#define ASSERT(x) {}
#endif
#endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/
diff --git a/lib/serialize_types.cpp b/lib/serialize_types.cpp
new file mode 100644
index 0000000..33db43c
--- /dev/null
+++ b/lib/serialize_types.cpp
@@ -0,0 +1,74 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/sbuffer.hpp>
+#include <gras/tags.hpp>
+#include <PMC/Serialize.hpp>
+#include <boost/serialization/split_free.hpp>
+#include <boost/serialization/string.hpp>
+
+/***********************************************************************
+ * support for sbuffer
+ **********************************************************************/
+namespace boost { namespace serialization {
+template<class Archive>
+void save(Archive & ar, const gras::SBuffer & b, unsigned int version)
+{
+ bool null = not b;
+ ar & null;
+ if (null) return;
+
+ //TODO lazyness string
+ std::string s((const char *)b.get(), b.length);
+ ar & s;
+}
+template<class Archive>
+void load(Archive & ar, gras::SBuffer & b, unsigned int version)
+{
+ bool null = false;
+ ar & null;
+ if (null) b.reset();
+ if (null) return;
+
+ //TODO lazyness string
+ std::string s;
+ ar & s;
+ gras::SBufferConfig config;
+ config.length = s.length();
+ b = gras::SBuffer(config);
+ b.length = s.length();
+ std::memcpy(b.get(), s.c_str(), b.length);
+}
+}}
+
+BOOST_SERIALIZATION_SPLIT_FREE(gras::SBuffer)
+PMC_SERIALIZE_EXPORT(gras::SBuffer, "PMC<gras::SBuffer>")
+
+
+/***********************************************************************
+ * support for special packet msg type
+ **********************************************************************/
+namespace boost { namespace serialization {
+template <class Archive>
+void serialize(Archive &ar, gras::PacketMsg &t, const unsigned int)
+{
+ ar & t.info;
+ ar & t.buff;
+}
+}}
+
+PMC_SERIALIZE_EXPORT(gras::PacketMsg, "PMC<gras::PacketMsg>")
+
+/***********************************************************************
+ * support for special stream tag type
+ **********************************************************************/
+namespace boost { namespace serialization {
+template <class Archive>
+void serialize(Archive &ar, gras::StreamTag &t, const unsigned int)
+{
+ ar & t.key;
+ ar & t.val;
+ ar & t.src;
+}
+}}
+
+PMC_SERIALIZE_EXPORT(gras::StreamTag, "PMC<gras::StreamTag>")
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index 425748c..8d504dd 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -25,7 +25,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//------------------------------------------------------------------
std::vector<Tag> &tags_i = this->input_tags[i];
- const size_t items_consumed_i = this->stats.items_consumed[i];
+ const item_index_t items_consumed_i = this->stats.items_consumed[i];
size_t last = 0;
while (last < tags_i.size() and tags_i[last].offset < items_consumed_i)
{
diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp
index c30b668..720e2e1 100644
--- a/lib/task_fail.cpp
+++ b/lib/task_fail.cpp
@@ -35,7 +35,7 @@ void BlockActor::input_fail(const size_t i)
//check that the input is not already maxed
if (this->input_queues.is_front_maximal(i))
{
- throw std::runtime_error("input_fail called on maximum_items buffer");
+ throw std::runtime_error("input_fail called on maximum_items buffer in " + name);
}
//mark fail: not ready until a new buffer appears
@@ -50,7 +50,7 @@ void BlockActor::output_fail(const size_t i)
const size_t front_items = buff.length/this->output_configs[i].item_size;
if (front_items >= this->output_configs[i].maximum_items)
{
- throw std::runtime_error("output_fail called on maximum_items buffer");
+ throw std::runtime_error("output_fail called on maximum_items buffer in " + name);
}
//mark fail: not ready until a new buffer appears