summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------PMC0
m---------grextras0
-rw-r--r--include/gras/block.hpp62
-rw-r--r--lib/CMakeLists.txt1
-rw-r--r--lib/block.cpp1
-rw-r--r--lib/block_message.cpp25
-rw-r--r--lib/gras_impl/block_actor.hpp4
-rw-r--r--lib/gras_impl/debug.hpp2
-rw-r--r--lib/serialize_types.cpp74
-rw-r--r--lib/tag_handlers.hpp2
-rw-r--r--lib/task_fail.cpp4
-rw-r--r--tests/CMakeLists.txt3
-rw-r--r--tests/serialize_tags_test.cpp76
13 files changed, 242 insertions, 12 deletions
diff --git a/PMC b/PMC
-Subproject 6b444922db565cd64b39729a8129eac7c81fe0c
+Subproject 3fe4dfb18a12dd53ea3bc7f9c32255f96e8da49
diff --git a/grextras b/grextras
-Subproject 07fbc5681cc4dcd2813ff7a1ef8fd1632085964
+Subproject af9435434bb4ca7c78d798b4710fa87089752bb
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index 8cb5f95..85a6ecc 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -76,6 +76,22 @@ struct GRAS_API InputPortConfig
* Default = 0.
*/
size_t preload_items;
+
+ /*!
+ * Force this block done when input port is done.
+ * When the upstream feeding this port declares done,
+ * this block will mark done once upstream notifies.
+ * The primary usage is to modify the done logic
+ * for the purposes of unit test confiruability.
+ *
+ * If the force done option is false, the block will
+ * not mark done when this port's upstream is done.
+ * However, this block will mark done when all
+ * input ports are done, reguardless of this setting.
+ *
+ * Default = true.
+ */
+ bool force_done;
};
//! Configuration parameters for an output port
@@ -186,15 +202,22 @@ struct GRAS_API Block : Element
*/
virtual void propagate_tags(const size_t which_input, const TagIter &iter);
+ /*!
+ * Send a tag to the given input port on this block.
+ * This is a thread-safe way for external scheduler
+ * entities to post tags into the input of a block.
+ * \param which_input an input port on this block
+ * \param tag the tag to post to the input port
+ */
+ void post_input_tag(const size_t which_input, const Tag &tag);
/*******************************************************************
* Deal with message passing
******************************************************************/
/*!
- * Post output message convenience routine.
* Send a message to the downstream on the given output port.
- * The underlying implementation is a tag with an offset of 0.
+ * Messages are naturally asynchronous to stream and tag data.
*
* \param which_output the index of the output port
* \param msg the message object to pass downstream
@@ -214,6 +237,15 @@ struct GRAS_API Block : Element
*/
PMCC pop_input_msg(const size_t which_input);
+ /*!
+ * Send a message to the given input port on this block.
+ * This is a thread-safe way for external scheduler
+ * entities to post messages into the input of a block.
+ * \param which_input an input port on this block
+ * \param msg the message to post to the input port
+ */
+ void post_input_msg(const size_t which_input, const PMCC &tag);
+
/*******************************************************************
* The property interface:
* Provides polymorphic, thread-safe access to block properties.
@@ -356,6 +388,10 @@ struct GRAS_API Block : Element
*/
void mark_done(void);
+ /*******************************************************************
+ * Direct buffer access API
+ ******************************************************************/
+
/*!
* Get access to the underlying reference counted input buffer.
* This is the same buffer pointed to by input_items[which].
@@ -407,6 +443,20 @@ struct GRAS_API Block : Element
void post_output_buffer(const size_t which_output, const SBuffer &buffer);
/*!
+ * Post a buffer to the given input port on this block.
+ * This is a thread-safe way for external scheduler
+ * entities to post buffers into the input of a block.
+ *
+ * \param which_input an input port on this block
+ * \param buffer the buffer to post to the input port
+ */
+ void post_input_buffer(const size_t which_input, const SBuffer &buffer);
+
+ /*******************************************************************
+ * Scheduler notification API
+ ******************************************************************/
+
+ /*!
* Overload notify_active to get called when block becomes active.
* This will be called when the TopBlock start/run API call executes.
* The default implementation of notify_active is a NOP.
@@ -427,6 +477,10 @@ struct GRAS_API Block : Element
*/
virtual void notify_topology(const size_t num_inputs, const size_t num_outputs);
+ /*******************************************************************
+ * routines related to affinity and allocation
+ ******************************************************************/
+
/*!
* Set if the work call should be interruptible by stop().
* Some work implementations block with the expectation of
@@ -436,10 +490,6 @@ struct GRAS_API Block : Element
*/
void set_interruptible_work(const bool enb);
- /*******************************************************************
- * routines related to affinity and allocation
- ******************************************************************/
-
/*!
* Set the node affinity of this block.
* This call affects how output buffers are allocated.
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index bb74dc9..e721769 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -65,6 +65,7 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/top_block_query.cpp
${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp
${CMAKE_CURRENT_SOURCE_DIR}/weak_container.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/serialize_types.cpp
)
########################################################################
diff --git a/lib/block.cpp b/lib/block.cpp
index 0015f10..75af20c 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -14,6 +14,7 @@ InputPortConfig::InputPortConfig(void)
maximum_items = 0;
inline_buffer = false;
preload_items = 0;
+ force_done = true;
}
OutputPortConfig::OutputPortConfig(void)
diff --git a/lib/block_message.cpp b/lib/block_message.cpp
index 43dc7b4..eae0c68 100644
--- a/lib/block_message.cpp
+++ b/lib/block_message.cpp
@@ -47,3 +47,28 @@ void Block::propagate_tags(const size_t i, const TagIter &iter)
}
}
}
+
+void Block::post_input_tag(const size_t which_input, const Tag &tag)
+{
+ InputTagMessage message(tag);
+ message.index = which_input;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}
+
+void Block::post_input_msg(const size_t which_input, const PMCC &msg)
+{
+ InputMsgMessage message(msg);
+ message.index = which_input;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}
+
+void Block::post_input_buffer(const size_t which_input, const SBuffer &buffer)
+{
+ InputBufferMessage message;
+ message.index = which_input;
+ message.buffer = buffer;
+ Theron::Actor &actor = *((*this)->block);
+ actor.GetFramework().Send(message, Theron::Address::Null(), actor.GetAddress());
+}
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index 502f974..170ee1f 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -184,7 +184,9 @@ GRAS_FORCE_INLINE void BlockActor::update_input_avail(const size_t i)
GRAS_FORCE_INLINE bool BlockActor::is_input_done(const size_t i)
{
- return this->inputs_done[i] and not this->inputs_available[i];
+ const bool force_done = this->input_configs[i].force_done;
+ if GRAS_LIKELY(force_done) return this->inputs_done[i] and not this->inputs_available[i];
+ return this->inputs_done.all() and this->inputs_available.none();
}
GRAS_FORCE_INLINE bool BlockActor::is_work_allowed(void)
diff --git a/lib/gras_impl/debug.hpp b/lib/gras_impl/debug.hpp
index 7934a1e..52c2e16 100644
--- a/lib/gras_impl/debug.hpp
+++ b/lib/gras_impl/debug.hpp
@@ -63,7 +63,7 @@ extern void *operator new(std::size_t n) throw (std::bad_alloc);
throw std::runtime_error(std::string("ASSERT FAIL ") + #x); \
}}
#else
-#define ASSERT(x)
+#define ASSERT(x) {}
#endif
#endif /*INCLUDED_LIBGRAS_IMPL_DEBUG_HPP*/
diff --git a/lib/serialize_types.cpp b/lib/serialize_types.cpp
new file mode 100644
index 0000000..33db43c
--- /dev/null
+++ b/lib/serialize_types.cpp
@@ -0,0 +1,74 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/sbuffer.hpp>
+#include <gras/tags.hpp>
+#include <PMC/Serialize.hpp>
+#include <boost/serialization/split_free.hpp>
+#include <boost/serialization/string.hpp>
+
+/***********************************************************************
+ * support for sbuffer
+ **********************************************************************/
+namespace boost { namespace serialization {
+template<class Archive>
+void save(Archive & ar, const gras::SBuffer & b, unsigned int version)
+{
+ bool null = not b;
+ ar & null;
+ if (null) return;
+
+ //TODO lazyness string
+ std::string s((const char *)b.get(), b.length);
+ ar & s;
+}
+template<class Archive>
+void load(Archive & ar, gras::SBuffer & b, unsigned int version)
+{
+ bool null = false;
+ ar & null;
+ if (null) b.reset();
+ if (null) return;
+
+ //TODO lazyness string
+ std::string s;
+ ar & s;
+ gras::SBufferConfig config;
+ config.length = s.length();
+ b = gras::SBuffer(config);
+ b.length = s.length();
+ std::memcpy(b.get(), s.c_str(), b.length);
+}
+}}
+
+BOOST_SERIALIZATION_SPLIT_FREE(gras::SBuffer)
+PMC_SERIALIZE_EXPORT(gras::SBuffer, "PMC<gras::SBuffer>")
+
+
+/***********************************************************************
+ * support for special packet msg type
+ **********************************************************************/
+namespace boost { namespace serialization {
+template <class Archive>
+void serialize(Archive &ar, gras::PacketMsg &t, const unsigned int)
+{
+ ar & t.info;
+ ar & t.buff;
+}
+}}
+
+PMC_SERIALIZE_EXPORT(gras::PacketMsg, "PMC<gras::PacketMsg>")
+
+/***********************************************************************
+ * support for special stream tag type
+ **********************************************************************/
+namespace boost { namespace serialization {
+template <class Archive>
+void serialize(Archive &ar, gras::StreamTag &t, const unsigned int)
+{
+ ar & t.key;
+ ar & t.val;
+ ar & t.src;
+}
+}}
+
+PMC_SERIALIZE_EXPORT(gras::StreamTag, "PMC<gras::StreamTag>")
diff --git a/lib/tag_handlers.hpp b/lib/tag_handlers.hpp
index 425748c..8d504dd 100644
--- a/lib/tag_handlers.hpp
+++ b/lib/tag_handlers.hpp
@@ -25,7 +25,7 @@ GRAS_FORCE_INLINE void BlockActor::trim_tags(const size_t i)
//------------------------------------------------------------------
std::vector<Tag> &tags_i = this->input_tags[i];
- const size_t items_consumed_i = this->stats.items_consumed[i];
+ const item_index_t items_consumed_i = this->stats.items_consumed[i];
size_t last = 0;
while (last < tags_i.size() and tags_i[last].offset < items_consumed_i)
{
diff --git a/lib/task_fail.cpp b/lib/task_fail.cpp
index c30b668..720e2e1 100644
--- a/lib/task_fail.cpp
+++ b/lib/task_fail.cpp
@@ -35,7 +35,7 @@ void BlockActor::input_fail(const size_t i)
//check that the input is not already maxed
if (this->input_queues.is_front_maximal(i))
{
- throw std::runtime_error("input_fail called on maximum_items buffer");
+ throw std::runtime_error("input_fail called on maximum_items buffer in " + name);
}
//mark fail: not ready until a new buffer appears
@@ -50,7 +50,7 @@ void BlockActor::output_fail(const size_t i)
const size_t front_items = buff.length/this->output_configs[i].item_size;
if (front_items >= this->output_configs[i].maximum_items)
{
- throw std::runtime_error("output_fail called on maximum_items buffer");
+ throw std::runtime_error("output_fail called on maximum_items buffer in " + name);
}
//mark fail: not ready until a new buffer appears
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 7626eb0..797f858 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -7,7 +7,7 @@ include(GrTest)
########################################################################
# unit test suite
########################################################################
-find_package(Boost COMPONENTS unit_test_framework)
+find_package(Boost COMPONENTS unit_test_framework serialization)
if (NOT Boost_FOUND)
return()
@@ -15,6 +15,7 @@ endif()
set(test_sources
block_props_test.cpp
+ serialize_tags_test.cpp
)
include_directories(${GRAS_INCLUDE_DIRS})
diff --git a/tests/serialize_tags_test.cpp b/tests/serialize_tags_test.cpp
new file mode 100644
index 0000000..2a07e2b
--- /dev/null
+++ b/tests/serialize_tags_test.cpp
@@ -0,0 +1,76 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <boost/test/unit_test.hpp>
+#include <algorithm>
+#include <iostream>
+
+#include <PMC/PMC.hpp>
+#include <PMC/Serialize.hpp>
+#include <gras/tags.hpp>
+
+// include headers that implement a archive in simple text format
+#include <boost/archive/polymorphic_text_oarchive.hpp>
+#include <boost/archive/polymorphic_text_iarchive.hpp>
+#include <sstream>
+#include <cstdlib>
+
+static PMCC loopback_test(PMCC p0)
+{
+ std::cout << "\ndoing loopback test on " << p0 << std::endl;
+ std::stringstream ss;
+ boost::archive::polymorphic_text_oarchive oa(ss);
+
+ oa << p0;
+ //std::cout << "stringstream holds " << ss.str() << std::endl;
+
+ boost::archive::polymorphic_text_iarchive ia(ss);
+ PMCC p1;
+ ia >> p1;
+
+ return p1;
+}
+
+static gras::SBuffer get_random_sbuff(void)
+{
+ gras::SBufferConfig config;
+ config.length = 100;
+ gras::SBuffer buff(config);
+ for (size_t i = 0; i < buff.length/sizeof(long); i++)
+ {
+ reinterpret_cast<long *>(buff.get())[i] = std::rand();
+ }
+ return buff;
+}
+
+BOOST_AUTO_TEST_CASE(test_sbuffer_type)
+{
+ gras::SBuffer src_buff = get_random_sbuff();
+ PMCC result = loopback_test(PMC_M(src_buff));
+ const gras::SBuffer &result_buff = result.as<gras::SBuffer>();
+
+ BOOST_CHECK_EQUAL(src_buff.length, result_buff.length);
+ BOOST_CHECK(std::memcmp(src_buff.get(), result_buff.get(), src_buff.length) == 0);
+}
+
+BOOST_AUTO_TEST_CASE(test_null_sbuffer_type)
+{
+ gras::SBuffer src_buff;
+ PMCC result = loopback_test(PMC_M(src_buff));
+ const gras::SBuffer &result_buff = result.as<gras::SBuffer>();
+ BOOST_CHECK(not src_buff);
+ BOOST_CHECK(not result_buff);
+}
+
+BOOST_AUTO_TEST_CASE(test_pkt_msg_type)
+{
+ gras::PacketMsg pkt_msg;
+ pkt_msg.buff = get_random_sbuff();
+ pkt_msg.info = PMC_M(long(42));
+
+ PMCC result = loopback_test(PMC_M(pkt_msg));
+ const gras::PacketMsg &result_msg = result.as<gras::PacketMsg>();
+
+ BOOST_CHECK_EQUAL(pkt_msg.info.type().name(), result_msg.info.type().name());
+ BOOST_CHECK(pkt_msg.info.eq(result_msg.info));
+ BOOST_CHECK(std::memcmp(pkt_msg.buff.get(), result_msg.buff.get(), pkt_msg.buff.length) == 0);
+}