summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/CMakeLists.txt3
-rw-r--r--lib/block_actor.cpp4
-rw-r--r--lib/gras_impl/block_actor.hpp2
-rw-r--r--lib/gras_impl/stats.hpp4
-rw-r--r--lib/theron_allocator.cpp100
-rw-r--r--lib/top_block_query.cpp10
6 files changed, 20 insertions, 103 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 6a9006c..e721769 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -24,6 +24,8 @@ add_definitions(${THERON_DEFINES})
list(APPEND GRAS_LIBRARIES ${THERON_LIBRARIES})
list(APPEND GRAS_SOURCES ${THERON_SOURCES})
+add_definitions(-DTHERON_ENABLE_DEFAULTALLOCATOR_CHECKS=1)
+
########################################################################
# Setup Apology Deps
########################################################################
@@ -62,7 +64,6 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/top_block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/top_block_query.cpp
${CMAKE_CURRENT_SOURCE_DIR}/register_messages.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/theron_allocator.cpp
${CMAKE_CURRENT_SOURCE_DIR}/weak_container.cpp
${CMAKE_CURRENT_SOURCE_DIR}/serialize_types.cpp
)
diff --git a/lib/block_actor.cpp b/lib/block_actor.cpp
index 750e52d..fe7f79e 100644
--- a/lib/block_actor.cpp
+++ b/lib/block_actor.cpp
@@ -14,7 +14,7 @@ ThreadPoolConfig::ThreadPoolConfig(void)
thread_count = std::max(size_t(2), thread_count);
node_mask = 0;
processor_mask = 0xffffffff;
- yield_strategy = "STRONG";
+ yield_strategy = "BLOCKING";
//environment variable override
const char * gras_yield = getenv("GRAS_YIELD");
@@ -44,7 +44,7 @@ ThreadPool::ThreadPool(const ThreadPoolConfig &config)
);
if (config.yield_strategy.empty()) params.mYieldStrategy = Theron::YIELD_STRATEGY_STRONG;
- //else if (config.yield_strategy == "BLOCKING") params.mYieldStrategy = Theron::YIELD_STRATEGY_BLOCKING;
+ else if (config.yield_strategy == "BLOCKING") params.mYieldStrategy = Theron::YIELD_STRATEGY_BLOCKING;
else if (config.yield_strategy == "POLITE") params.mYieldStrategy = Theron::YIELD_STRATEGY_POLITE;
else if (config.yield_strategy == "STRONG") params.mYieldStrategy = Theron::YIELD_STRATEGY_STRONG;
else if (config.yield_strategy == "AGGRESSIVE") params.mYieldStrategy = Theron::YIELD_STRATEGY_AGGRESSIVE;
diff --git a/lib/gras_impl/block_actor.hpp b/lib/gras_impl/block_actor.hpp
index e5dbaac..170ee1f 100644
--- a/lib/gras_impl/block_actor.hpp
+++ b/lib/gras_impl/block_actor.hpp
@@ -129,6 +129,8 @@ struct BlockActor : Apology::Worker
OutputBufferQueues output_queues;
std::vector<bool> produce_outputs;
BitSet inputs_available;
+ std::vector<time_ticks_t> time_input_not_ready;
+ std::vector<time_ticks_t> time_output_not_ready;
//tag and msg tracking
std::vector<bool> input_tags_changed;
diff --git a/lib/gras_impl/stats.hpp b/lib/gras_impl/stats.hpp
index 7edab29..d9e2341 100644
--- a/lib/gras_impl/stats.hpp
+++ b/lib/gras_impl/stats.hpp
@@ -37,6 +37,10 @@ struct BlockStats
std::vector<item_index_t> tags_produced;
std::vector<item_index_t> msgs_produced;
+ //port starvation tracking
+ std::vector<time_ticks_t> inputs_idle;
+ std::vector<time_ticks_t> outputs_idle;
+
//instantaneous port status
std::vector<size_t> items_enqueued;
std::vector<size_t> msgs_enqueued;
diff --git a/lib/theron_allocator.cpp b/lib/theron_allocator.cpp
deleted file mode 100644
index 9db9367..0000000
--- a/lib/theron_allocator.cpp
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
-
-/***********************************************************************
- * There is allocation overhead for sending messages.
- * Want per worker-allocator for message allocation...
- * But until thats possible, install a new global allocator.
- * This allocator uses a fixed pool for small sized buffers,
- * and otherwise the regular malloc/free for larger buffers.
- **********************************************************************/
-
-#include <gras/gras.hpp>
-#include <gras_impl/debug.hpp>
-#include <Theron/Detail/Threading/SpinLock.h>
-#include <Theron/IAllocator.h>
-#include <Theron/AllocatorManager.h>
-#include <boost/circular_buffer.hpp>
-
-#define MY_ALLOCATOR_CHUNK_SIZE 256
-#define MY_ALLOCATOR_POOL_SIZE (MY_ALLOCATOR_CHUNK_SIZE * (1 << 18))
-
-static unsigned long long unwanted_malloc_count = 0;
-
-static struct ExitPrinter
-{
- ExitPrinter(void){}
- ~ExitPrinter(void)
- {
- if (unwanted_malloc_count)
- {
- VAR(unwanted_malloc_count);
- }
- }
-} exit_printer;
-
-static struct WorkerAllocator : Theron::IAllocator
-{
- WorkerAllocator(void)
- {
- const size_t N = MY_ALLOCATOR_POOL_SIZE/MY_ALLOCATOR_CHUNK_SIZE;
- queue.set_capacity(N);
- for (size_t i = 0; i < N; i++)
- {
- const ptrdiff_t pool_ptr = ptrdiff_t(pool) + i*MY_ALLOCATOR_CHUNK_SIZE;
- queue.push_back((void *)pool_ptr);
- }
- pool_end = ptrdiff_t(pool) + MY_ALLOCATOR_POOL_SIZE;
- Theron::AllocatorManager::Instance().SetAllocator(this);
- }
-
- ~WorkerAllocator(void)
- {
- //NOP
- }
-
- void *Allocate(const SizeType size)
- {
- if GRAS_LIKELY(size <= MY_ALLOCATOR_CHUNK_SIZE)
- {
- mSpinLock.Lock();
- if GRAS_UNLIKELY(queue.empty())
- {
- unwanted_malloc_count++;
- mSpinLock.Unlock();
- return std::malloc(size);
- }
- void *memory = queue.front();
- queue.pop_front();
- mSpinLock.Unlock();
- return memory;
- }
- else
- {
- //std::cout << "malloc size " << size << std::endl;
- return std::malloc(size);
- }
- }
-
- void Free(void *const memory)
- {
- const bool in_pool = ptrdiff_t(memory) >= ptrdiff_t(pool) and ptrdiff_t(memory) < pool_end;
- if GRAS_LIKELY(in_pool)
- {
- mSpinLock.Lock();
- queue.push_front(memory);
- mSpinLock.Unlock();
- }
- else
- {
- std::free(memory);
- }
- }
-
- boost::circular_buffer<void *> queue;
- THERON_PREALIGN(GRAS_MAX_ALIGNMENT)
- char pool[MY_ALLOCATOR_POOL_SIZE]
- THERON_POSTALIGN(GRAS_MAX_ALIGNMENT);
- ptrdiff_t pool_end;
- Theron::Detail::SpinLock mSpinLock;
-
-} my_alloc;
diff --git a/lib/top_block_query.cpp b/lib/top_block_query.cpp
index b000a2d..3203507 100644
--- a/lib/top_block_query.cpp
+++ b/lib/top_block_query.cpp
@@ -7,6 +7,7 @@
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/xml_parser.hpp>
#include <boost/regex.hpp>
+#include <Theron/DefaultAllocator.h>
#include <algorithm>
#include <sstream>
@@ -87,6 +88,15 @@ static std::string query_stats(ElementImpl *self, const boost::property_tree::pt
root.put("now", time_now());
root.put("tps", time_tps());
+ //allocator debugs
+ Theron::DefaultAllocator *allocator = dynamic_cast<Theron::DefaultAllocator *>(Theron::AllocatorManager::Instance().GetAllocator());
+ if (allocator)
+ {
+ root.put("bytes_allocated", allocator->GetBytesAllocated());
+ root.put("peak_bytes_allocated", allocator->GetPeakBytesAllocated());
+ root.put("allocation_count", allocator->GetAllocationCount());
+ }
+
//iterate through blocks
boost::property_tree::ptree blocks;
BOOST_FOREACH(const GetStatsMessage &message, receiver.messages)