diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | lib/block.cpp | 1 | ||||
-rw-r--r-- | lib/block_message.cpp | 25 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 2 | ||||
-rw-r--r-- | lib/serialize_types.cpp | 74 | ||||
-rw-r--r-- | lib/tag_handlers.hpp | 2 | ||||
-rw-r--r-- | lib/task_fail.cpp | 4 |
8 files changed, 108 insertions, 5 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index bb74dc9..e721769 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -65,6 +65,7 @@ list(APPEND GRAS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/top_block_query.cpp ${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp ${CMAKE_CURRENT_SOURCE_DIR}/weak_container.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/serialize_types.cpp ) ######################################################################## diff --git a/lib/block.cpp b/lib/block.cpp index 0015f10..75af20c 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -14,6 +14,7 @@ InputPortConfig::InputPortConfig(void) maximum_items = 0; inline_buffer = false; preload_items = 0; + force_done = true; } OutputPortConfig::OutputPortConfig(void) diff --git a/lib/block_message.cpp b/lib/block_message.cpp index 43dc7b4..eae0c68 100644 --- a/lib/block_message.cpp +++ b/lib/block_message.cpp @@ -47,3 +47,28 @@ void Block::propagate_tags(const size_t i, const TagIter &iter) } } } + +void Block::post_input_tag(const size_t which_input, const Tag &tag) +{ + InputTagMessage message(tag); + message.index = which_input; + Theron::Actor &actor = *((*this)->block); + actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); +} + +void Block::post_input_msg(const size_t which_input, const PMCC &msg) +{ + InputMsgMessage message(msg); + message.index = which_input; + Theron::Actor &actor = *((*this)->block); + actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); +} + +void Block::post_input_buffer(const size_t which_input, const SBuffer &buffer) +{ + InputBufferMessage message; + message.index = which_input; + message.buffer = buffer; + Theron::Actor &actor = *((*this)->block); + actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); +} diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 502f974..170ee1f 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -184,7 +184,9 @@ GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i) GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i) { - return this->inputs_done[i] and not this->inputs_available[i]; + const bool force_done = this->input_configs[i].force_done; + if GRAS_LIKELY(force_done) return this->inputs_done[i] and not this->inputs_available[i]; + return this->inputs_done.all() and this->inputs_available.none(); } GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index 7934a1e..52c2e16 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -63,7 +63,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); throw std::runtime_error(std::string("ASSERT FAIL ") + #x); \ }} #else -#define ASSERT(x) +#define ASSERT(x) {} #endif #endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/ diff --git a/lib/serialize_types.cpp b/lib/serialize_types.cpp new file mode 100644 index 0000000..33db43c --- /dev/null +++ b/lib/serialize_types.cpp @@ -0,0 +1,74 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras/sbuffer.hpp> +#include <gras/tags.hpp> +#include <PMC/Serialize.hpp> +#include <boost/serialization/split_free.hpp> +#include <boost/serialization/string.hpp> + +/*********************************************************************** + * support for sbuffer + **********************************************************************/ +namespace boost { namespace serialization { +template<class Archive> +void save(Archive & ar, const gras::SBuffer & b, unsigned int version) +{ + bool null = not b; + ar & null; + if (null) return; + + //TODO lazyness string + std::string s((const char *)b.get(), b.length); + ar & s; +} +template<class Archive> +void load(Archive & ar, gras::SBuffer & b, unsigned int version) +{ + bool null = false; + ar & null; + if (null) b.reset(); + if (null) return; + + //TODO lazyness string + std::string s; + ar & s; + gras::SBufferConfig config; + config.length = s.length(); + b = gras::SBuffer(config); + b.length = s.length(); + std::memcpy(b.get(), s.c_str(), b.length); +} +}} + +BOOST_SERIALIZATION_SPLIT_FREE(gras::SBuffer) +PMC_SERIALIZE_EXPORT(gras::SBuffer, "PMC<gras::SBuffer>") + + +/*********************************************************************** + * support for special packet msg type + **********************************************************************/ +namespace boost { namespace serialization { +template <class Archive> +void serialize(Archive &ar, gras::PacketMsg &t, const unsigned int) +{ + ar & t.info; + ar & t.buff; +} +}} + +PMC_SERIALIZE_EXPORT(gras::PacketMsg, "PMC<gras::PacketMsg>") + +/*********************************************************************** + * support for special stream tag type + **********************************************************************/ +namespace boost { namespace serialization { +template <class Archive> +void serialize(Archive &ar, gras::StreamTag &t, const unsigned int) +{ + ar & t.key; + ar & t.val; + ar & t.src; +} +}} + +PMC_SERIALIZE_EXPORT(gras::StreamTag, "PMC<gras::StreamTag>") diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index 425748c..8d504dd 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -25,7 +25,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //------------------------------------------------------------------ std::vector<Tag> &tags_i = this->input_tags[i]; - const size_t items_consumed_i = this->stats.items_consumed[i]; + const item_index_t items_consumed_i = this->stats.items_consumed[i]; size_t last = 0; while (last < tags_i.size() and tags_i[last].offset < items_consumed_i) { diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp index c30b668..720e2e1 100644 --- a/lib/task_fail.cpp +++ b/lib/task_fail.cpp @@ -35,7 +35,7 @@ void BlockActor::input_fail(const size_t i) //check that the input is not already maxed if (this->input_queues.is_front_maximal(i)) { - throw std::runtime_error("input_fail called on maximum_items buffer"); + throw std::runtime_error("input_fail called on maximum_items buffer in " + name); } //mark fail: not ready until a new buffer appears @@ -50,7 +50,7 @@ void BlockActor::output_fail(const size_t i) const size_t front_items = buff.length/this->output_configs[i].item_size; if (front_items >= this->output_configs[i].maximum_items) { - throw std::runtime_error("output_fail called on maximum_items buffer"); + throw std::runtime_error("output_fail called on maximum_items buffer in " + name); } //mark fail: not ready until a new buffer appears |