diff options
m--------- | PMC | 0 | ||||
m--------- | grextras | 0 | ||||
-rw-r--r-- | include/gras/block.hpp | 62 | ||||
-rw-r--r-- | lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | lib/block.cpp | 1 | ||||
-rw-r--r-- | lib/block_message.cpp | 25 | ||||
-rw-r--r-- | lib/gras_impl/block_actor.hpp | 4 | ||||
-rw-r--r-- | lib/gras_impl/debug.hpp | 2 | ||||
-rw-r--r-- | lib/serialize_types.cpp | 74 | ||||
-rw-r--r-- | lib/tag_handlers.hpp | 2 | ||||
-rw-r--r-- | lib/task_fail.cpp | 4 | ||||
-rw-r--r-- | tests/CMakeLists.txt | 3 | ||||
-rw-r--r-- | tests/serialize_tags_test.cpp | 76 |
13 files changed, 242 insertions, 12 deletions
diff --git a/PMC b/PMC -Subproject 6b444922db565cd64b39729a8129eac7c81fe0c +Subproject 3fe4dfb18a12dd53ea3bc7f9c32255f96e8da49 diff --git a/grextras b/grextras -Subproject 07fbc5681cc4dcd2813ff7a1ef8fd1632085964 +Subproject af9435434bb4ca7c78d798b4710fa87089752bb diff --git a/include/gras/block.hpp b/include/gras/block.hpp index 8cb5f95..85a6ecc 100644 --- a/include/gras/block.hpp +++ b/include/gras/block.hpp @@ -76,6 +76,22 @@ struct GRAS_API InputPortConfig * Default = 0. */ size_t preload_items; + + /*! + * Force this block done when input port is done. + * When the upstream feeding this port declares done, + * this block will mark done once upstream notifies. + * The primary usage is to modify the done logic + * for the purposes of unit test confiruability. + * + * If the force done option is false, the block will + * not mark done when this port's upstream is done. + * However, this block will mark done when all + * input ports are done, reguardless of this setting. + * + * Default = true. + */ + bool force_done; }; //! Configuration parameters for an output port @@ -186,15 +202,22 @@ struct GRAS_API Block : Element */ virtual void propagate_tags(const size_t which_input, const TagIter &iter); + /*! + * Send a tag to the given input port on this block. + * This is a thread-safe way for external scheduler + * entities to post tags into the input of a block. + * \param which_input an input port on this block + * \param tag the tag to post to the input port + */ + void post_input_tag(const size_t which_input, const Tag &tag); /******************************************************************* * Deal with message passing ******************************************************************/ /*! - * Post output message convenience routine. * Send a message to the downstream on the given output port. - * The underlying implementation is a tag with an offset of 0. + * Messages are naturally asynchronous to stream and tag data. * * \param which_output the index of the output port * \param msg the message object to pass downstream @@ -214,6 +237,15 @@ struct GRAS_API Block : Element */ PMCC pop_input_msg(const size_t which_input); + /*! + * Send a message to the given input port on this block. + * This is a thread-safe way for external scheduler + * entities to post messages into the input of a block. + * \param which_input an input port on this block + * \param msg the message to post to the input port + */ + void post_input_msg(const size_t which_input, const PMCC &tag); + /******************************************************************* * The property interface: * Provides polymorphic, thread-safe access to block properties. @@ -356,6 +388,10 @@ struct GRAS_API Block : Element */ void mark_done(void); + /******************************************************************* + * Direct buffer access API + ******************************************************************/ + /*! * Get access to the underlying reference counted input buffer. * This is the same buffer pointed to by input_items[which]. @@ -407,6 +443,20 @@ struct GRAS_API Block : Element void post_output_buffer(const size_t which_output, const SBuffer &buffer); /*! + * Post a buffer to the given input port on this block. + * This is a thread-safe way for external scheduler + * entities to post buffers into the input of a block. + * + * \param which_input an input port on this block + * \param buffer the buffer to post to the input port + */ + void post_input_buffer(const size_t which_input, const SBuffer &buffer); + + /******************************************************************* + * Scheduler notification API + ******************************************************************/ + + /*! * Overload notify_active to get called when block becomes active. * This will be called when the TopBlock start/run API call executes. * The default implementation of notify_active is a NOP. @@ -427,6 +477,10 @@ struct GRAS_API Block : Element */ virtual void notify_topology(const size_t num_inputs, const size_t num_outputs); + /******************************************************************* + * routines related to affinity and allocation + ******************************************************************/ + /*! * Set if the work call should be interruptible by stop(). * Some work implementations block with the expectation of @@ -436,10 +490,6 @@ struct GRAS_API Block : Element */ void set_interruptible_work(const bool enb); - /******************************************************************* - * routines related to affinity and allocation - ******************************************************************/ - /*! * Set the node affinity of this block. * This call affects how output buffers are allocated. diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index bb74dc9..e721769 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -65,6 +65,7 @@ list(APPEND GRAS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/top_block_query.cpp ${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp ${CMAKE_CURRENT_SOURCE_DIR}/weak_container.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/serialize_types.cpp ) ######################################################################## diff --git a/lib/block.cpp b/lib/block.cpp index 0015f10..75af20c 100644 --- a/lib/block.cpp +++ b/lib/block.cpp @@ -14,6 +14,7 @@ InputPortConfig::InputPortConfig(void) maximum_items = 0; inline_buffer = false; preload_items = 0; + force_done = true; } OutputPortConfig::OutputPortConfig(void) diff --git a/lib/block_message.cpp b/lib/block_message.cpp index 43dc7b4..eae0c68 100644 --- a/lib/block_message.cpp +++ b/lib/block_message.cpp @@ -47,3 +47,28 @@ void Block::propagate_tags(const size_t i, const TagIter &iter) } } } + +void Block::post_input_tag(const size_t which_input, const Tag &tag) +{ + InputTagMessage message(tag); + message.index = which_input; + Theron::Actor &actor = *((*this)->block); + actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); +} + +void Block::post_input_msg(const size_t which_input, const PMCC &msg) +{ + InputMsgMessage message(msg); + message.index = which_input; + Theron::Actor &actor = *((*this)->block); + actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); +} + +void Block::post_input_buffer(const size_t which_input, const SBuffer &buffer) +{ + InputBufferMessage message; + message.index = which_input; + message.buffer = buffer; + Theron::Actor &actor = *((*this)->block); + actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress()); +} diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp index 502f974..170ee1f 100644 --- a/lib/gras_impl/block_actor.hpp +++ b/lib/gras_impl/block_actor.hpp @@ -184,7 +184,9 @@ GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i) GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i) { - return this->inputs_done[i] and not this->inputs_available[i]; + const bool force_done = this->input_configs[i].force_done; + if GRAS_LIKELY(force_done) return this->inputs_done[i] and not this->inputs_available[i]; + return this->inputs_done.all() and this->inputs_available.none(); } GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void) diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp index 7934a1e..52c2e16 100644 --- a/lib/gras_impl/debug.hpp +++ b/lib/gras_impl/debug.hpp @@ -63,7 +63,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc); throw std::runtime_error(std::string("ASSERT FAIL ") + #x); \ }} #else -#define ASSERT(x) +#define ASSERT(x) {} #endif #endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/ diff --git a/lib/serialize_types.cpp b/lib/serialize_types.cpp new file mode 100644 index 0000000..33db43c --- /dev/null +++ b/lib/serialize_types.cpp @@ -0,0 +1,74 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras/sbuffer.hpp> +#include <gras/tags.hpp> +#include <PMC/Serialize.hpp> +#include <boost/serialization/split_free.hpp> +#include <boost/serialization/string.hpp> + +/*********************************************************************** + * support for sbuffer + **********************************************************************/ +namespace boost { namespace serialization { +template<class Archive> +void save(Archive & ar, const gras::SBuffer & b, unsigned int version) +{ + bool null = not b; + ar & null; + if (null) return; + + //TODO lazyness string + std::string s((const char *)b.get(), b.length); + ar & s; +} +template<class Archive> +void load(Archive & ar, gras::SBuffer & b, unsigned int version) +{ + bool null = false; + ar & null; + if (null) b.reset(); + if (null) return; + + //TODO lazyness string + std::string s; + ar & s; + gras::SBufferConfig config; + config.length = s.length(); + b = gras::SBuffer(config); + b.length = s.length(); + std::memcpy(b.get(), s.c_str(), b.length); +} +}} + +BOOST_SERIALIZATION_SPLIT_FREE(gras::SBuffer) +PMC_SERIALIZE_EXPORT(gras::SBuffer, "PMC<gras::SBuffer>") + + +/*********************************************************************** + * support for special packet msg type + **********************************************************************/ +namespace boost { namespace serialization { +template <class Archive> +void serialize(Archive &ar, gras::PacketMsg &t, const unsigned int) +{ + ar & t.info; + ar & t.buff; +} +}} + +PMC_SERIALIZE_EXPORT(gras::PacketMsg, "PMC<gras::PacketMsg>") + +/*********************************************************************** + * support for special stream tag type + **********************************************************************/ +namespace boost { namespace serialization { +template <class Archive> +void serialize(Archive &ar, gras::StreamTag &t, const unsigned int) +{ + ar & t.key; + ar & t.val; + ar & t.src; +} +}} + +PMC_SERIALIZE_EXPORT(gras::StreamTag, "PMC<gras::StreamTag>") diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp index 425748c..8d504dd 100644 --- a/lib/tag_handlers.hpp +++ b/lib/tag_handlers.hpp @@ -25,7 +25,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i) //------------------------------------------------------------------ std::vector<Tag> &tags_i = this->input_tags[i]; - const size_t items_consumed_i = this->stats.items_consumed[i]; + const item_index_t items_consumed_i = this->stats.items_consumed[i]; size_t last = 0; while (last < tags_i.size() and tags_i[last].offset < items_consumed_i) { diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp index c30b668..720e2e1 100644 --- a/lib/task_fail.cpp +++ b/lib/task_fail.cpp @@ -35,7 +35,7 @@ void BlockActor::input_fail(const size_t i) //check that the input is not already maxed if (this->input_queues.is_front_maximal(i)) { - throw std::runtime_error("input_fail called on maximum_items buffer"); + throw std::runtime_error("input_fail called on maximum_items buffer in " + name); } //mark fail: not ready until a new buffer appears @@ -50,7 +50,7 @@ void BlockActor::output_fail(const size_t i) const size_t front_items = buff.length/this->output_configs[i].item_size; if (front_items >= this->output_configs[i].maximum_items) { - throw std::runtime_error("output_fail called on maximum_items buffer"); + throw std::runtime_error("output_fail called on maximum_items buffer in " + name); } //mark fail: not ready until a new buffer appears diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7626eb0..797f858 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -7,7 +7,7 @@ include(GrTest) ######################################################################## # unit test suite ######################################################################## -find_package(Boost COMPONENTS unit_test_framework) +find_package(Boost COMPONENTS unit_test_framework serialization) if (NOT Boost_FOUND) return() @@ -15,6 +15,7 @@ endif() set(test_sources block_props_test.cpp + serialize_tags_test.cpp ) include_directories(${GRAS_INCLUDE_DIRS}) diff --git a/tests/serialize_tags_test.cpp b/tests/serialize_tags_test.cpp new file mode 100644 index 0000000..2a07e2b --- /dev/null +++ b/tests/serialize_tags_test.cpp @@ -0,0 +1,76 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <boost/test/unit_test.hpp> +#include <algorithm> +#include <iostream> + +#include <PMC/PMC.hpp> +#include <PMC/Serialize.hpp> +#include <gras/tags.hpp> + +// include headers that implement a archive in simple text format +#include <boost/archive/polymorphic_text_oarchive.hpp> +#include <boost/archive/polymorphic_text_iarchive.hpp> +#include <sstream> +#include <cstdlib> + +static PMCC loopback_test(PMCC p0) +{ + std::cout << "\ndoing loopback test on " << p0 << std::endl; + std::stringstream ss; + boost::archive::polymorphic_text_oarchive oa(ss); + + oa << p0; + //std::cout << "stringstream holds " << ss.str() << std::endl; + + boost::archive::polymorphic_text_iarchive ia(ss); + PMCC p1; + ia >> p1; + + return p1; +} + +static gras::SBuffer get_random_sbuff(void) +{ + gras::SBufferConfig config; + config.length = 100; + gras::SBuffer buff(config); + for (size_t i = 0; i < buff.length/sizeof(long); i++) + { + reinterpret_cast<long *>(buff.get())[i] = std::rand(); + } + return buff; +} + +BOOST_AUTO_TEST_CASE(test_sbuffer_type) +{ + gras::SBuffer src_buff = get_random_sbuff(); + PMCC result = loopback_test(PMC_M(src_buff)); + const gras::SBuffer &result_buff = result.as<gras::SBuffer>(); + + BOOST_CHECK_EQUAL(src_buff.length, result_buff.length); + BOOST_CHECK(std::memcmp(src_buff.get(), result_buff.get(), src_buff.length) == 0); +} + +BOOST_AUTO_TEST_CASE(test_null_sbuffer_type) +{ + gras::SBuffer src_buff; + PMCC result = loopback_test(PMC_M(src_buff)); + const gras::SBuffer &result_buff = result.as<gras::SBuffer>(); + BOOST_CHECK(not src_buff); + BOOST_CHECK(not result_buff); +} + +BOOST_AUTO_TEST_CASE(test_pkt_msg_type) +{ + gras::PacketMsg pkt_msg; + pkt_msg.buff = get_random_sbuff(); + pkt_msg.info = PMC_M(long(42)); + + PMCC result = loopback_test(PMC_M(pkt_msg)); + const gras::PacketMsg &result_msg = result.as<gras::PacketMsg>(); + + BOOST_CHECK_EQUAL(pkt_msg.info.type().name(), result_msg.info.type().name()); + BOOST_CHECK(pkt_msg.info.eq(result_msg.info)); + BOOST_CHECK(std::memcmp(pkt_msg.buff.get(), result_msg.buff.get(), pkt_msg.buff.length) == 0); +} |