diff options
author | Josh Blum | 2012-12-13 19:45:08 -0800 |
---|---|---|
committer | Josh Blum | 2012-12-13 19:45:08 -0800 |
commit | 83ad787f994d6efbde505ce9915f44bef2b5408d (patch) | |
tree | e9af973d7232d586ce5a61069e8b796369e77607 /lib | |
parent | 59d8dafe387b11ecf8bfff892e1e30fe07e33caa (diff) | |
download | sandhi-83ad787f994d6efbde505ce9915f44bef2b5408d.tar.gz sandhi-83ad787f994d6efbde505ce9915f44bef2b5408d.tar.bz2 sandhi-83ad787f994d6efbde505ce9915f44bef2b5408d.zip |
some work on the circular buffer alloc
Diffstat (limited to 'lib')
-rw-r--r-- | lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | lib/circular_buffer.cpp | 149 | ||||
-rw-r--r-- | lib/gras_impl/endless_buffer_queue.hpp | 98 |
3 files changed, 248 insertions, 0 deletions
diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 8f6f46c..84c8f4e 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -40,6 +40,7 @@ list(APPEND GRAS_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/debug.cpp ${CMAKE_CURRENT_SOURCE_DIR}/element.cpp ${CMAKE_CURRENT_SOURCE_DIR}/sbuffer.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/circular_buffer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tags.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block.cpp ${CMAKE_CURRENT_SOURCE_DIR}/block_actor.cpp diff --git a/lib/circular_buffer.cpp b/lib/circular_buffer.cpp new file mode 100644 index 0000000..820129d --- /dev/null +++ b/lib/circular_buffer.cpp @@ -0,0 +1,149 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#include <gras_impl/endless_buffer_queue.hpp> +#include <boost/format.hpp> +#include <boost/bind.hpp> +#include <boost/interprocess/shared_memory_object.hpp> +#include <boost/interprocess/mapped_region.hpp> +#include <boost/uuid/uuid.hpp> +#include <boost/uuid/uuid_io.hpp> +#include <ctime> + +using namespace gras; +namespace ipc = boost::interprocess; + +/*! +* This routine generates an incredibly unique name for the allocation. +* +* Because we are using boost IPC, and it expects that we share the memory; +* IPC allocation requires a unique name to share amongst the processes. +* Since we are not actually using IPC, the sharing aspect isnt very useful, +* but we still need a unique name for the shared memory allocation anyway. +* (I would like if a empty string would suffice as an anonymous allocation) +*/ +static std::string omg_so_unique(void) +{ + boost::uuids::uuid u1; // initialize uuid + return boost::str(boost::format("shmem-%s-%u-%u-%u") + % to_string(u1) % std::rand() % clock() % time(NULL)); +} + +struct CircularBuffer +{ + CircularBuffer(const size_t); + + ~CircularBuffer(void) + { + ipc::shared_memory_object::remove(shm_name.c_str()); + } + + char *buff_addr; + size_t actual_length; + std::string shm_name; + ipc::shared_memory_object shm_obj; + ipc::mapped_region region1; + ipc::mapped_region region2; +}; + +CircularBuffer::CircularBuffer(const size_t num_bytes) +{ + const size_t chunk = ipc::mapped_region::get_page_size(); + const size_t len = chunk*((num_bytes + chunk - 1)/chunk); + actual_length = len; + + //////////////////////////////////////////////////////////////// + // Step 0) Find an address that can be mapped across 2x length: + // Allocate physical memory 2x the required size. + // Map a virtual memory region across this memory. + // Now we have a 2x length swath of virtual memory. + //////////////////////////////////////////////////////////////// + { + shm_name = omg_so_unique(); + + //std::cout << "make shmem 2x\n" << std::endl; + ipc::shared_memory_object shm_obj_2x( + ipc::create_only, //only create + shm_name.c_str(), //name + ipc::read_write //read-write mode + ); + + //std::cout << "truncate 2x\n" << std::endl; + shm_obj_2x.truncate(2*len); + + //std::cout << "map region 0\n" << std::endl; + ipc::mapped_region region0( + shm_obj_2x, //Memory-mappable object + ipc::read_write, //Access mode + 0, //Offset from the beginning of shm + 2*len //Length of the region + ); + //std::cout << "region0.get_address() " << size_t(region0.get_address()) << std::endl; + + ipc::shared_memory_object::remove(shm_name.c_str()); + buff_addr = (char *)region0.get_address(); + } + + //////////////////////////////////////////////////////////////// + // Step 1) Allocate a chunk of physical memory of length bytes + //////////////////////////////////////////////////////////////// + //std::cout << "make shmem\n" << std::endl; + shm_name = omg_so_unique(); + shm_obj = ipc::shared_memory_object( + ipc::create_only, //only create + shm_name.c_str(), //name + ipc::read_write //read-write mode + ); + + //std::cout << "truncate\n" << std::endl; + shm_obj.truncate(len); + + //////////////////////////////////////////////////////////////// + //Step 2) Remap region1 of the virtual memory space + //////////////////////////////////////////////////////////////// + //std::cout << "map region 1\n" << std::endl; + region1 = ipc::mapped_region( + shm_obj, //Memory-mappable object + ipc::read_write, //Access mode + 0, //Offset from the beginning of shm + len, //Length of the region + buff_addr + ); + //std::cout << "region1.get_address() " << size_t(region1.get_address()) << std::endl; + + //////////////////////////////////////////////////////////////// + //Step 3) Remap region2 of the virtual memory space + //////////////////////////////////////////////////////////////// + //std::cout << "map region 2\n" << std::endl; + region2 = ipc::mapped_region( + shm_obj, //Memory-mappable object + ipc::read_write, //Access mode + 0, //Offset from the beginning of shm + len, //Length of the region + buff_addr + len + ); + + //std::cout << "region2.get_address() " << size_t(region2.get_address()) << std::endl; + //std::cout << "diff " << (long(region2.get_address()) - long(region1.get_address())) << std::endl; + + //////////////////////////////////////////////////////////////// + //4) Zero out the memory for good measure + //////////////////////////////////////////////////////////////// + std::memset(region1.get_address(), 0, region1.get_size()); + std::memset(region2.get_address(), 0, region2.get_size()); +} + +static void circular_buffer_delete(SBuffer &buff, CircularBuffer *circ_buff) +{ + delete circ_buff; +} + +SBuffer EndlessBufferQueue::make_circular_buffer(const size_t num_bytes) +{ + CircularBuffer *circ_buff = new CircularBuffer(num_bytes); + SBufferDeleter deleter = boost::bind(&circular_buffer_delete, _1, circ_buff); + SBufferConfig config; + config.memory = circ_buff->buff_addr; + config.length = circ_buff->actual_length; + config.deleter = deleter; + return SBuffer(config); +} diff --git a/lib/gras_impl/endless_buffer_queue.hpp b/lib/gras_impl/endless_buffer_queue.hpp new file mode 100644 index 0000000..68797a2 --- /dev/null +++ b/lib/gras_impl/endless_buffer_queue.hpp @@ -0,0 +1,98 @@ +// Copyright (C) by Josh Blum. See LICENSE.txt for licensing information. + +#ifndef INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP +#define INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP + +#include <gras_impl/debug.hpp> +#include <gras/sbuffer.hpp> +#include <boost/bind.hpp> +#include <boost/circular_buffer.hpp> + +namespace gras +{ + +struct EndlessBufferQueue +{ + enum {MAX_QUEUE_SIZE = 128}; + + static SBuffer make_circular_buffer(const size_t num_bytes); + + EndlessBufferQueue(const size_t num_bytes); + + SBuffer &front(void); + + void pop(const size_t num_bytes); + + void push(const SBuffer &buff); + + SBufferToken _token; + SBuffer _circ_buff; + char *_write_ptr; + size_t _bytes_avail; + size_t _cur_index, _ack_index; + boost::circular_buffer<SBuffer> _available_buffers; + boost::circular_buffer<SBuffer> _returned_buffers; +}; + +EndlessBufferQueue::EndlessBufferQueue(const size_t num_bytes) +{ + _cur_index = 0; + _ack_index = 0; + + //allocate a large buffer + _circ_buff = EndlessBufferQueue::make_circular_buffer(num_bytes); + _write_ptr = (char *)_circ_buff.get_actual_memory(); + _bytes_avail = _circ_buff.get_actual_length(); + + //create token as buffer returner + SBufferDeleter deleter = boost::bind(&EndlessBufferQueue::push, this, _1); + _token = SBufferToken(new SBufferDeleter(deleter)); + + //allocate pool of sbuffers + _available_buffers.resize(MAX_QUEUE_SIZE); + _returned_buffers.resize(MAX_QUEUE_SIZE); + SBufferConfig config; + config.memory = _circ_buff.get_actual_memory(); + config.length = _circ_buff.get_actual_length(); + for (size_t i = 0; i < _available_buffers.size(); i++) + { + _available_buffers.push_back(SBuffer(config)); + } +} + +GRAS_FORCE_INLINE SBuffer &EndlessBufferQueue::front(void) +{ + ASSERT(_bytes_avail); + ASSERT(not _available_buffers.empty()); + SBuffer &front = _available_buffers.front(); + front->config.memory = _write_ptr; + front->config.length = _bytes_avail; + front.offset = 0; + front.length = 0; + return front; +} + +GRAS_FORCE_INLINE void EndlessBufferQueue::pop(const size_t num_bytes) +{ + ASSERT(_bytes_avail >= num_bytes); + SBuffer &front = _available_buffers.front(); + front->config.length = num_bytes; + front->config.user_index = _cur_index++; + _write_ptr += num_bytes; + if (_write_ptr > (char *)_circ_buff.get(circ_buff.get_actual_length())) + { + _write_ptr -= circ_buff.get_actual_length(); + } + _bytes_avail -= num_bytes; +} + +void EndlessBufferQueue::push(const SBuffer &buff) +{ + _returned_buffers.push_back(buff); + //BOOST_FOREACH( + //TODO update available +} + +} //namespace gras + +#endif /*INCLUDED_LIBGRAS_IMPL_ENDLESS_BUFFER_QUEUE_HPP*/ |