summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/gras/CMakeLists.txt1
-rw-r--r--include/gras/block.hpp150
-rw-r--r--include/gras/block.i1
-rw-r--r--include/gras/block_config.hpp159
-rw-r--r--include/gras/hier_block.i1
-rw-r--r--include/gras/top_block.hpp23
-rw-r--r--lib/CMakeLists.txt1
-rw-r--r--lib/block.cpp41
-rw-r--r--lib/block_allocator.cpp2
-rw-r--r--lib/block_config.cpp29
-rw-r--r--lib/block_handlers.cpp12
-rw-r--r--lib/gras_impl/block_data.hpp3
-rw-r--r--lib/task_main.cpp2
-rw-r--r--lib/top_block.cpp6
14 files changed, 231 insertions, 200 deletions
diff --git a/include/gras/CMakeLists.txt b/include/gras/CMakeLists.txt
index 7f37b1a..d0e3cfa 100644
--- a/include/gras/CMakeLists.txt
+++ b/include/gras/CMakeLists.txt
@@ -6,6 +6,7 @@ install(FILES
exception.i
chrono.hpp
block.hpp
+ block_config.hpp
block.i
element.hpp
element.i
diff --git a/include/gras/block.hpp b/include/gras/block.hpp
index 5a6e5e2..f2f1b2b 100644
--- a/include/gras/block.hpp
+++ b/include/gras/block.hpp
@@ -3,6 +3,7 @@
#ifndef INCLUDED_GRAS_BLOCK_HPP
#define INCLUDED_GRAS_BLOCK_HPP
+#include <gras/block_config.hpp>
#include <gras/element.hpp>
#include <gras/sbuffer.hpp>
#include <gras/thread_pool.hpp>
@@ -16,112 +17,6 @@
namespace gras
{
-//! Configuration parameters for an input port
-struct GRAS_API InputPortConfig
-{
- InputPortConfig(void);
-
- //! The size of an item in bytes
- size_t item_size;
-
- /*!
- * Set an input reserve requirement such that work is called
- * with an input buffer at least reserve items in size.
- *
- * Default = 1.
- */
- size_t reserve_items;
-
- /*!
- * Constrain the input buffer allocation size:
- * The scheduler may accumulate multiple buffers
- * into a single larger buffer under failure conditions.
- * The maximum size of this accumulated buffer
- * is constrained by this maximum_items setting.
- *
- * Default = 0 aka disabled.
- */
- size_t maximum_items;
-
- /*!
- * Set buffer inlining for this port config.
- * Inlining means that the input buffer can be used as an output buffer.
- * The goal is to make better use of cache and memory bandwidth.
- *
- * By default, inlining is disabled on all input ports.
- * The user should enable inlining on an input port
- * when it is understood that the work function will read
- * before writting to a particular section of the buffer.
- *
- * The scheduler will inline a buffer when
- * * inlining is enabled on the particular input port
- * * block holds the only buffer reference aka unique
- * * the input buffer has the same affinity as the block
- * * the input port has a buffer look-ahead of 0
- *
- * Default = false.
- */
- bool inline_buffer;
-
- /*!
- * Preload the input queue with num preload items.
- * All items preloaded into the buffer will be 0.
- * This is used to implement zero-padding for
- * things like sliding dot products/FIR filters.
- *
- * - Increasing the preload at runtime will
- * inject more items into the input queue.
- * - Decreasing the preload at runtime will
- * consume random items from the input queue.
- *
- * Default = 0.
- */
- size_t preload_items;
-
- /*!
- * Force this block done when input port is done.
- * When the upstream feeding this port declares done,
- * this block will mark done once upstream notifies.
- * The primary usage is to modify the done logic
- * for the purposes of unit test confiruability.
- *
- * If the force done option is false, the block will
- * not mark done when this port's upstream is done.
- * However, this block will mark done when all
- * input ports are done, reguardless of this setting.
- *
- * Default = true.
- */
- bool force_done;
-};
-
-//! Configuration parameters for an output port
-struct GRAS_API OutputPortConfig
-{
- OutputPortConfig(void);
-
- //! The size of an item in bytes
- size_t item_size;
-
- /*!
- * Set an output reserve requirement such that work is called
- * with an output buffer at least reserve items in size.
- *
- * Default = 1.
- */
- size_t reserve_items;
-
- /*!
- * Constrain the output buffer allocation size:
- * The user might set a small maximum items
- * to reduce the amount of buffered items
- * waiting for processing in downstream queues.
- *
- * Default = 0 aka disabled.
- */
- size_t maximum_items;
-};
-
struct GRAS_API Block : Element
{
//! Contruct an empty/null block
@@ -133,9 +28,22 @@ struct GRAS_API Block : Element
virtual ~Block(void);
/*******************************************************************
- * Deal with input and output port configuration
+ * Deal with block configuration configuration
******************************************************************/
+ /*!
+ * Set the thread pool of this block.
+ * Every block is created in the default active thread pool.
+ * This call will migrate the block to a new specified pool.
+ */
+ void set_thread_pool(const ThreadPool &thread_pool);
+
+ //! Get the global block config settings
+ const GlobalBlockConfig &global_config(void) const;
+
+ //! Get the global block config settings
+ GlobalBlockConfig &global_config(void);
+
//! Get the configuration rules of an input port
const InputPortConfig &input_config(const size_t which_input) const;
@@ -485,36 +393,10 @@ struct GRAS_API Block : Element
virtual void notify_topology(const size_t num_inputs, const size_t num_outputs);
/*******************************************************************
- * routines related to affinity and allocation
+ * custom buffer queue API
******************************************************************/
/*!
- * Set the thread pool of this block.
- * Every block is created in the default active thread pool.
- * This call will migrate the block to a new specified pool.
- */
- void set_thread_pool(const ThreadPool &thread_pool);
-
- /*!
- * Set if the work call should be interruptible by stop().
- * Some work implementations block with the expectation of
- * getting a boost thread interrupt in a blocking call.
- * Set set_interruptible_work(true) if this is the case.
- * By default, work implementations are not interruptible.
- */
- void set_interruptible_work(const bool enb);
-
- /*!
- * Set the node affinity of this block.
- * This call affects how output buffers are allocated.
- * By default memory is allocated by malloc.
- * When the affinity is set, virtual memory
- * will be locked to a physical CPU/memory node.
- * \param affinity a memory node on the system
- */
- void set_buffer_affinity(const long affinity);
-
- /*!
* The output buffer allocator method.
* This method is called by the scheduler to allocate output buffers.
* The user may overload this method to create a custom allocator.
diff --git a/include/gras/block.i b/include/gras/block.i
index 5c62f7e..6ac1d5c 100644
--- a/include/gras/block.i
+++ b/include/gras/block.i
@@ -13,6 +13,7 @@
%import <gras/sbuffer.i>
%include <gras/buffer_queue.hpp>
%include <gras/thread_pool.hpp>
+%include <gras/block_config.hpp>
%include <gras/block.hpp>
////////////////////////////////////////////////////////////////////////
diff --git a/include/gras/block_config.hpp b/include/gras/block_config.hpp
new file mode 100644
index 0000000..c026cba
--- /dev/null
+++ b/include/gras/block_config.hpp
@@ -0,0 +1,159 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#ifndef INCLUDED_GRAS_BLOCK_CONFIG_HPP
+#define INCLUDED_GRAS_BLOCK_CONFIG_HPP
+
+#include <gras/gras.hpp>
+#include <cstddef>
+
+namespace gras
+{
+
+//! Configuration parameters for a block
+struct GRAS_API GlobalBlockConfig
+{
+ GlobalBlockConfig(void);
+
+ /*!
+ * Constrain the maximum number of items that
+ * work can be called with for all output ports.
+ *
+ * Default = 0 aka disabled.
+ */
+ size_t maximum_output_items;
+
+ /*!
+ * Set the global memory node affinity.
+ * Blocks that have not been explicitly set,
+ * will take on this new buffer_affinity.
+ *
+ * This param affects how buffers are allocated.
+ * By default memory is allocated by malloc.
+ * When the affinity is set, virtual memory
+ * will be locked to a physical CPU/memory node.
+ *
+ * Default = -1 aka no affinity.
+ */
+ long buffer_affinity;
+
+ /*!
+ * True if the work call should be interruptible by stop().
+ * Some work implementations block with the expectation of
+ * getting a boost thread interrupt in a blocking call.
+ * If this is the case, this parameter should be set true.
+ * By default, work implementations are not interruptible.
+ *
+ * Default = false.
+ */
+ bool interruptible_work;
+};
+
+//! Configuration parameters for an input port
+struct GRAS_API InputPortConfig
+{
+ InputPortConfig(void);
+
+ //! The size of an item in bytes
+ size_t item_size;
+
+ /*!
+ * Set an input reserve requirement such that work is called
+ * with an input buffer at least reserve items in size.
+ *
+ * Default = 1.
+ */
+ size_t reserve_items;
+
+ /*!
+ * Constrain the input buffer allocation size:
+ * The scheduler may accumulate multiple buffers
+ * into a single larger buffer under failure conditions.
+ * The maximum size of this accumulated buffer
+ * is constrained by this maximum_items setting.
+ *
+ * Default = 0 aka disabled.
+ */
+ size_t maximum_items;
+
+ /*!
+ * Set buffer inlining for this port config.
+ * Inlining means that the input buffer can be used as an output buffer.
+ * The goal is to make better use of cache and memory bandwidth.
+ *
+ * By default, inlining is disabled on all input ports.
+ * The user should enable inlining on an input port
+ * when it is understood that the work function will read
+ * before writting to a particular section of the buffer.
+ *
+ * The scheduler will inline a buffer when
+ * * inlining is enabled on the particular input port
+ * * block holds the only buffer reference aka unique
+ * * the input buffer has the same affinity as the block
+ * * the input port has a buffer look-ahead of 0
+ *
+ * Default = false.
+ */
+ bool inline_buffer;
+
+ /*!
+ * Preload the input queue with num preload items.
+ * All items preloaded into the buffer will be 0.
+ * This is used to implement zero-padding for
+ * things like sliding dot products/FIR filters.
+ *
+ * - Increasing the preload at runtime will
+ * inject more items into the input queue.
+ * - Decreasing the preload at runtime will
+ * consume random items from the input queue.
+ *
+ * Default = 0.
+ */
+ size_t preload_items;
+
+ /*!
+ * Force this block done when input port is done.
+ * When the upstream feeding this port declares done,
+ * this block will mark done once upstream notifies.
+ * The primary usage is to modify the done logic
+ * for the purposes of unit test confiruability.
+ *
+ * If the force done option is false, the block will
+ * not mark done when this port's upstream is done.
+ * However, this block will mark done when all
+ * input ports are done, reguardless of this setting.
+ *
+ * Default = true.
+ */
+ bool force_done;
+};
+
+//! Configuration parameters for an output port
+struct GRAS_API OutputPortConfig
+{
+ OutputPortConfig(void);
+
+ //! The size of an item in bytes
+ size_t item_size;
+
+ /*!
+ * Set an output reserve requirement such that work is called
+ * with an output buffer at least reserve items in size.
+ *
+ * Default = 1.
+ */
+ size_t reserve_items;
+
+ /*!
+ * Constrain the output buffer allocation size:
+ * The user might set a small maximum items
+ * to reduce the amount of buffered items
+ * waiting for processing in downstream queues.
+ *
+ * Default = 0 aka disabled.
+ */
+ size_t maximum_items;
+};
+
+} //namespace gras
+
+#endif /*INCLUDED_GRAS_BLOCK_CONFIG_HPP*/
diff --git a/include/gras/hier_block.i b/include/gras/hier_block.i
index da57e3d..376bbf4 100644
--- a/include/gras/hier_block.i
+++ b/include/gras/hier_block.i
@@ -8,6 +8,7 @@
%}
%import <gras/element.i>
+%include <gras/block_config.hpp>
%include <gras/hier_block.hpp>
#endif /*INCLUDED_GRAS_HIER_BLOCK_I*/
diff --git a/include/gras/top_block.hpp b/include/gras/top_block.hpp
index 17f1e30..415c03e 100644
--- a/include/gras/top_block.hpp
+++ b/include/gras/top_block.hpp
@@ -3,33 +3,12 @@
#ifndef INCLUDED_GRAS_TOP_BLOCK_HPP
#define INCLUDED_GRAS_TOP_BLOCK_HPP
+#include <gras/block_config.hpp>
#include <gras/hier_block.hpp>
namespace gras
{
-struct GRAS_API GlobalBlockConfig
-{
- GlobalBlockConfig(void);
-
- /*!
- * Constrain the maximum number of items that
- * work can be called with for all output ports.
- *
- * Default = 0 aka disabled.
- */
- size_t maximum_output_items;
-
- /*!
- * Set the global memory node affinity.
- * Blocks that have not been explicitly set,
- * will take on this new buffer_affinity.
- *
- * Default = -1 aka no affinity.
- */
- long buffer_affinity;
-};
-
struct GRAS_API TopBlock : HierBlock
{
TopBlock(void);
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index b5ed2d5..5f31731 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -49,6 +49,7 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/buffer_queue_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/block_config.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_message.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_consume.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_produce.cpp
diff --git a/lib/block.cpp b/lib/block.cpp
index 00238ba..7cd7392 100644
--- a/lib/block.cpp
+++ b/lib/block.cpp
@@ -7,23 +7,6 @@
using namespace gras;
-InputPortConfig::InputPortConfig(void)
-{
- item_size = 1;
- reserve_items = 1;
- maximum_items = 0;
- inline_buffer = false;
- preload_items = 0;
- force_done = true;
-}
-
-OutputPortConfig::OutputPortConfig(void)
-{
- item_size = 1;
- reserve_items = 1;
- maximum_items = 0;
-}
-
Block::Block(void)
{
//NOP
@@ -43,10 +26,6 @@ Block::Block(const std::string &name):
//setup some state variables
(*this)->block_data->block_state = BLOCK_STATE_INIT;
-
- //call block methods to init stuff
- this->set_interruptible_work(false);
- this->set_buffer_affinity(-1);
}
Block::~Block(void)
@@ -142,6 +121,16 @@ typename V::value_type &vector_get_resize(V &v, const size_t index)
return v[index];
}
+const GlobalBlockConfig &Block::global_config(void) const
+{
+ return (*this)->block_data->global_config;
+}
+
+GlobalBlockConfig &Block::global_config(void)
+{
+ return (*this)->block_data->global_config;
+}
+
InputPortConfig &Block::input_config(const size_t which_input)
{
return vector_get_resize((*this)->block_data->input_configs, which_input);
@@ -202,13 +191,3 @@ void Block::set_thread_pool(const ThreadPool &thread_pool)
(*this)->setup_actor();
wait_actor_idle((*this)->repr, *old_actor);
}
-
-void Block::set_buffer_affinity(const long affinity)
-{
- (*this)->block_data->buffer_affinity = affinity;
-}
-
-void Block::set_interruptible_work(const bool enb)
-{
- (*this)->block_data->interruptible_work = enb;
-}
diff --git a/lib/block_allocator.cpp b/lib/block_allocator.cpp
index aa97405..3168bc2 100644
--- a/lib/block_allocator.cpp
+++ b/lib/block_allocator.cpp
@@ -74,7 +74,7 @@ void BlockActor::handle_top_alloc(const TopAllocMessage &, const Theron::Address
SBufferConfig config;
config.memory = NULL;
config.length = bytes;
- config.affinity = data->buffer_affinity;
+ config.affinity = data->global_config.buffer_affinity;
config.token = token;
BufferQueueSptr queue = data->block->output_buffer_allocator(i, config);
diff --git a/lib/block_config.cpp b/lib/block_config.cpp
new file mode 100644
index 0000000..04d37f1
--- /dev/null
+++ b/lib/block_config.cpp
@@ -0,0 +1,29 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras/block_config.hpp>
+
+using namespace gras;
+
+GlobalBlockConfig::GlobalBlockConfig(void)
+{
+ maximum_output_items = 0;
+ buffer_affinity = -1;
+ interruptible_work = false;
+}
+
+InputPortConfig::InputPortConfig(void)
+{
+ item_size = 1;
+ reserve_items = 1;
+ maximum_items = 0;
+ inline_buffer = false;
+ preload_items = 0;
+ force_done = true;
+}
+
+OutputPortConfig::OutputPortConfig(void)
+{
+ item_size = 1;
+ reserve_items = 1;
+ maximum_items = 0;
+}
diff --git a/lib/block_handlers.cpp b/lib/block_handlers.cpp
index c52d974..4cb03ab 100644
--- a/lib/block_handlers.cpp
+++ b/lib/block_handlers.cpp
@@ -92,9 +92,15 @@ void BlockActor::handle_top_config(
}
//overwrite with global node affinity setting for buffers if not set
- if (data->buffer_affinity == -1)
+ if (data->global_config.buffer_affinity == -1)
{
- data->buffer_affinity = message.buffer_affinity;
+ data->global_config.buffer_affinity = message.buffer_affinity;
+ }
+
+ //overwrite with global interruptable setting for work if not set
+ if (data->global_config.interruptible_work == false)
+ {
+ data->global_config.interruptible_work = message.interruptible_work;
}
this->Send(0, from); //ACK
@@ -111,7 +117,7 @@ void BlockActor::handle_top_thread_group(
//spawn a new thread if this block is a source
data->thread_group = message;
data->interruptible_thread.reset(); //erase old one
- if (data->interruptible_work)
+ if (data->global_config.interruptible_work)
{
data->interruptible_thread = boost::make_shared<InterruptibleThread>(
data->thread_group, boost::bind(&BlockActor::task_work, this)
diff --git a/lib/gras_impl/block_data.hpp b/lib/gras_impl/block_data.hpp
index 4b6e8de..9300d9f 100644
--- a/lib/gras_impl/block_data.hpp
+++ b/lib/gras_impl/block_data.hpp
@@ -67,13 +67,12 @@ struct BlockData
std::vector<std::vector<PMCC> > input_msgs;
//interruptible thread stuff
- bool interruptible_work;
SharedThreadGroup thread_group;
boost::shared_ptr<InterruptibleThread> interruptible_thread;
//is the fg running?
BlockState block_state;
- long buffer_affinity;
+ GlobalBlockConfig global_config;
std::vector<std::vector<OutputHintMessage> > output_allocation_hints;
diff --git a/lib/task_main.cpp b/lib/task_main.cpp
index cb3e0ee..f4f0053 100644
--- a/lib/task_main.cpp
+++ b/lib/task_main.cpp
@@ -47,7 +47,7 @@ void BlockActor::task_main(void)
buff.unique() and
data->input_configs[i].inline_buffer and
output_inline_index < num_outputs and
- buff.get_affinity() == data->buffer_affinity
+ buff.get_affinity() == data->global_config.buffer_affinity
){
data->output_queues.set_inline(output_inline_index++, buff);
}
diff --git a/lib/top_block.cpp b/lib/top_block.cpp
index 12055f8..240f417 100644
--- a/lib/top_block.cpp
+++ b/lib/top_block.cpp
@@ -6,12 +6,6 @@
using namespace gras;
-GlobalBlockConfig::GlobalBlockConfig(void)
-{
- maximum_output_items = 0;
- buffer_affinity = -1;
-}
-
TopBlock::TopBlock(void)
{
//NOP