summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJosh Blum2012-12-13 19:45:08 -0800
committerJosh Blum2012-12-13 19:45:08 -0800
commit83ad787f994d6efbde505ce9915f44bef2b5408d (patch)
treee9af973d7232d586ce5a61069e8b796369e77607 /lib
parent59d8dafe387b11ecf8bfff892e1e30fe07e33caa (diff)
downloadsandhi-83ad787f994d6efbde505ce9915f44bef2b5408d.tar.gz
sandhi-83ad787f994d6efbde505ce9915f44bef2b5408d.tar.bz2
sandhi-83ad787f994d6efbde505ce9915f44bef2b5408d.zip
some work on the circular buffer alloc
Diffstat (limited to 'lib')
-rw-r--r--lib/CMakeLists.txt1
-rw-r--r--lib/circular_buffer.cpp149
-rw-r--r--lib/gras_impl/endless_buffer_queue.hpp98
3 files changed, 248 insertions, 0 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt
index 8f6f46c..84c8f4e 100644
--- a/lib/CMakeLists.txt
+++ b/lib/CMakeLists.txt
@@ -40,6 +40,7 @@ list(APPEND GRAS_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/debug.cpp
${CMAKE_CURRENT_SOURCE_DIR}/element.cpp
${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/circular_buffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block.cpp
${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp
diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp
new file mode 100644
index 0000000..820129d
--- /dev/null
+++ b/lib/circular_buffer.cpp
@@ -0,0 +1,149 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#include <gras_impl/endless_buffer_queue.hpp>
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
+#include <boost/interprocess/shared_memory_object.hpp>
+#include <boost/interprocess/mapped_region.hpp>
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_io.hpp>
+#include <ctime>
+
+using namespace gras;
+namespace ipc = boost::interprocess;
+
+/*!
+* This routine generates an incredibly unique name for the allocation.
+*
+* Because we are using boost IPC, and it expects that we share the memory;
+* IPC allocation requires a unique name to share amongst the processes.
+* Since we are not actually using IPC, the sharing aspect isnt very useful,
+* but we still need a unique name for the shared memory allocation anyway.
+* (I would like if a empty string would suffice as an anonymous allocation)
+*/
+static std::string omg_so_unique(void)
+{
+ boost::uuids::uuid u1; // initialize uuid
+ return boost::str(boost::format("shmem-%s-%u-%u-%u")
+ % to_string(u1) % std::rand() % clock() % time(NULL));
+}
+
+struct CircularBuffer
+{
+ CircularBuffer(const size_t);
+
+ ~CircularBuffer(void)
+ {
+ ipc::shared_memory_object::remove(shm_name.c_str());
+ }
+
+ char *buff_addr;
+ size_t actual_length;
+ std::string shm_name;
+ ipc::shared_memory_object shm_obj;
+ ipc::mapped_region region1;
+ ipc::mapped_region region2;
+};
+
+CircularBuffer::CircularBuffer(const size_t num_bytes)
+{
+ const size_t chunk = ipc::mapped_region::get_page_size();
+ const size_t len = chunk*((num_bytes + chunk - 1)/chunk);
+ actual_length = len;
+
+ ////////////////////////////////////////////////////////////////
+ // Step 0) Find an address that can be mapped across 2x length:
+ // Allocate physical memory 2x the required size.
+ // Map a virtual memory region across this memory.
+ // Now we have a 2x length swath of virtual memory.
+ ////////////////////////////////////////////////////////////////
+ {
+ shm_name = omg_so_unique();
+
+ //std::cout << "make shmem 2x\n" << std::endl;
+ ipc::shared_memory_object shm_obj_2x(
+ ipc::create_only, //only create
+ shm_name.c_str(), //name
+ ipc::read_write //read-write mode
+ );
+
+ //std::cout << "truncate 2x\n" << std::endl;
+ shm_obj_2x.truncate(2*len);
+
+ //std::cout << "map region 0\n" << std::endl;
+ ipc::mapped_region region0(
+ shm_obj_2x, //Memory-mappable object
+ ipc::read_write, //Access mode
+ 0, //Offset from the beginning of shm
+ 2*len //Length of the region
+ );
+ //std::cout << "region0.get_address() " << size_t(region0.get_address()) << std::endl;
+
+ ipc::shared_memory_object::remove(shm_name.c_str());
+ buff_addr = (char *)region0.get_address();
+ }
+
+ ////////////////////////////////////////////////////////////////
+ // Step 1) Allocate a chunk of physical memory of length bytes
+ ////////////////////////////////////////////////////////////////
+ //std::cout << "make shmem\n" << std::endl;
+ shm_name = omg_so_unique();
+ shm_obj = ipc::shared_memory_object(
+ ipc::create_only, //only create
+ shm_name.c_str(), //name
+ ipc::read_write //read-write mode
+ );
+
+ //std::cout << "truncate\n" << std::endl;
+ shm_obj.truncate(len);
+
+ ////////////////////////////////////////////////////////////////
+ //Step 2) Remap region1 of the virtual memory space
+ ////////////////////////////////////////////////////////////////
+ //std::cout << "map region 1\n" << std::endl;
+ region1 = ipc::mapped_region(
+ shm_obj, //Memory-mappable object
+ ipc::read_write, //Access mode
+ 0, //Offset from the beginning of shm
+ len, //Length of the region
+ buff_addr
+ );
+ //std::cout << "region1.get_address() " << size_t(region1.get_address()) << std::endl;
+
+ ////////////////////////////////////////////////////////////////
+ //Step 3) Remap region2 of the virtual memory space
+ ////////////////////////////////////////////////////////////////
+ //std::cout << "map region 2\n" << std::endl;
+ region2 = ipc::mapped_region(
+ shm_obj, //Memory-mappable object
+ ipc::read_write, //Access mode
+ 0, //Offset from the beginning of shm
+ len, //Length of the region
+ buff_addr + len
+ );
+
+ //std::cout << "region2.get_address() " << size_t(region2.get_address()) << std::endl;
+ //std::cout << "diff " << (long(region2.get_address()) - long(region1.get_address())) << std::endl;
+
+ ////////////////////////////////////////////////////////////////
+ //4) Zero out the memory for good measure
+ ////////////////////////////////////////////////////////////////
+ std::memset(region1.get_address(), 0, region1.get_size());
+ std::memset(region2.get_address(), 0, region2.get_size());
+}
+
+static void circular_buffer_delete(SBuffer &buff, CircularBuffer *circ_buff)
+{
+ delete circ_buff;
+}
+
+SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes)
+{
+ CircularBuffer *circ_buff = new CircularBuffer(num_bytes);
+ SBufferDeleter deleter = boost::bind(&circular_buffer_delete, _1, circ_buff);
+ SBufferConfig config;
+ config.memory = circ_buff->buff_addr;
+ config.length = circ_buff->actual_length;
+ config.deleter = deleter;
+ return SBuffer(config);
+}
diff --git a/lib/gras_impl/endless_buffer_queue.hpp b/lib/gras_impl/endless_buffer_queue.hpp
new file mode 100644
index 0000000..68797a2
--- /dev/null
+++ b/lib/gras_impl/endless_buffer_queue.hpp
@@ -0,0 +1,98 @@
+// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information.
+
+#ifndef INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP
+#define INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP
+
+#include <gras_impl/debug.hpp>
+#include <gras/sbuffer.hpp>
+#include <boost/bind.hpp>
+#include <boost/circular_buffer.hpp>
+
+namespace gras
+{
+
+struct EndlessBufferQueue
+{
+ enum {MAX_QUEUE_SIZE = 128};
+
+ static SBuffer make_circular_buffer(const size_t num_bytes);
+
+ EndlessBufferQueue(const size_t num_bytes);
+
+ SBuffer &front(void);
+
+ void pop(const size_t num_bytes);
+
+ void push(const SBuffer &buff);
+
+ SBufferToken _token;
+ SBuffer _circ_buff;
+ char *_write_ptr;
+ size_t _bytes_avail;
+ size_t _cur_index, _ack_index;
+ boost::circular_buffer<SBuffer> _available_buffers;
+ boost::circular_buffer<SBuffer> _returned_buffers;
+};
+
+EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes)
+{
+ _cur_index = 0;
+ _ack_index = 0;
+
+ //allocate a large buffer
+ _circ_buff = EndlessBufferQueue::make_circular_buffer(num_bytes);
+ _write_ptr = (char *)_circ_buff.get_actual_memory();
+ _bytes_avail = _circ_buff.get_actual_length();
+
+ //create token as buffer returner
+ SBufferDeleter deleter = boost::bind(&EndlessBufferQueue::push, this, _1);
+ _token = SBufferToken(new SBufferDeleter(deleter));
+
+ //allocate pool of sbuffers
+ _available_buffers.resize(MAX_QUEUE_SIZE);
+ _returned_buffers.resize(MAX_QUEUE_SIZE);
+ SBufferConfig config;
+ config.memory = _circ_buff.get_actual_memory();
+ config.length = _circ_buff.get_actual_length();
+ for (size_t i = 0; i < _available_buffers.size(); i++)
+ {
+ _available_buffers.push_back(SBuffer(config));
+ }
+}
+
+GRAS_FORCE_INLINE SBuffer &EndlessBufferQueue::front(void)
+{
+ ASSERT(_bytes_avail);
+ ASSERT(not _available_buffers.empty());
+ SBuffer &front = _available_buffers.front();
+ front->config.memory = _write_ptr;
+ front->config.length = _bytes_avail;
+ front.offset = 0;
+ front.length = 0;
+ return front;
+}
+
+GRAS_FORCE_INLINE void EndlessBufferQueue::pop(const size_t num_bytes)
+{
+ ASSERT(_bytes_avail >= num_bytes);
+ SBuffer &front = _available_buffers.front();
+ front->config.length = num_bytes;
+ front->config.user_index = _cur_index++;
+ _write_ptr += num_bytes;
+ if (_write_ptr > (char *)_circ_buff.get(circ_buff.get_actual_length()))
+ {
+ _write_ptr -= circ_buff.get_actual_length();
+ }
+ _bytes_avail -= num_bytes;
+}
+
+void EndlessBufferQueue::push(const SBuffer &buff)
+{
+ _returned_buffers.push_back(buff);
+ //BOOST_FOREACH(
+ //TODO update available
+}
+
+} //namespace gras
+
+#endif /*INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP*/