diff --git a/aeron-client/src/main/cpp/Image.h b/aeron-client/src/main/cpp/Image.h index 7f2243d01f..befb997ab8 100644 --- a/aeron-client/src/main/cpp/Image.h +++ b/aeron-client/src/main/cpp/Image.h @@ -25,7 +25,9 @@ #include #include #include +#include #include +#include #include "LogBuffers.h" namespace aeron { @@ -140,38 +142,29 @@ class Image } Image(const Image& image) : + m_termBuffers(image.m_termBuffers), m_header(image.m_header), m_subscriberPosition(image.m_subscriberPosition), + m_logBuffers(image.m_logBuffers), m_sourceIdentity(image.m_sourceIdentity), m_isClosed(image.isClosed()), - m_exceptionHandler(image.m_exceptionHandler) + m_exceptionHandler(image.m_exceptionHandler), + m_correlationId(image.m_correlationId), + m_subscriptionRegistrationId(image.m_subscriptionRegistrationId), + m_joinPosition(image.m_joinPosition), + m_finalPosition(image.m_finalPosition), + m_sessionId(image.m_sessionId), + m_termLengthMask(image.m_termLengthMask), + m_positionBitsToShift(image.m_positionBitsToShift), + m_isEos(image.m_isEos) { - for (int i = 0; i < LogBufferDescriptor::PARTITION_COUNT; i++) - { - m_termBuffers[i].wrap(image.m_termBuffers[i]); - } - - m_subscriberPosition.wrap(image.m_subscriberPosition); - m_logBuffers = image.m_logBuffers; - m_correlationId = image.m_correlationId; - m_subscriptionRegistrationId = image.m_subscriptionRegistrationId; - m_joinPosition = image.m_joinPosition; - m_finalPosition = image.m_finalPosition; - m_sessionId = image.m_sessionId; - m_termLengthMask = image.m_termLengthMask; - m_positionBitsToShift = image.m_positionBitsToShift; - m_isEos = image.m_isEos; } - Image& operator=(Image& image) + Image& operator=(const Image& image) { - for (int i = 0; i < LogBufferDescriptor::PARTITION_COUNT; i++) - { - m_termBuffers[i].wrap(image.m_termBuffers[i]); - } - + m_termBuffers = image.m_termBuffers; m_header = image.m_header; - m_subscriberPosition.wrap(image.m_subscriberPosition); + m_subscriberPosition = image.m_subscriberPosition; m_logBuffers = image.m_logBuffers; m_sourceIdentity = image.m_sourceIdentity; m_isClosed = image.isClosed(); @@ -355,8 +348,9 @@ class Image { const std::int64_t position = m_subscriberPosition.get(); const std::int32_t termOffset = (std::int32_t) position & m_termLengthMask; - AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition( - position, m_positionBitsToShift)]; + const int index = LogBufferDescriptor::indexByPosition(position, m_positionBitsToShift); + assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT); + AtomicBuffer &termBuffer = m_termBuffers[index]; TermReader::ReadOutcome readOutcome; TermReader::read(readOutcome, termBuffer, termOffset, fragmentHandler, fragmentLimit, m_header, m_exceptionHandler); @@ -395,8 +389,9 @@ class Image int fragmentsRead = 0; std::int64_t initialPosition = m_subscriberPosition.get(); std::int32_t initialOffset = (std::int32_t) initialPosition & m_termLengthMask; - AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition( - initialPosition, m_positionBitsToShift)]; + const int index = LogBufferDescriptor::indexByPosition(initialPosition, m_positionBitsToShift); + assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT); + AtomicBuffer &termBuffer = m_termBuffers[index]; std::int32_t resultingOffset = initialOffset; const util::index_t capacity = termBuffer.capacity(); @@ -489,8 +484,9 @@ class Image int fragmentsRead = 0; std::int64_t initialPosition = m_subscriberPosition.get(); std::int32_t initialOffset = (std::int32_t) initialPosition & m_termLengthMask; - AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition( - initialPosition, m_positionBitsToShift)]; + const int index = LogBufferDescriptor::indexByPosition(initialPosition, m_positionBitsToShift); + assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT); + AtomicBuffer &termBuffer = m_termBuffers[index]; std::int32_t resultingOffset = initialOffset; const std::int64_t capacity = termBuffer.capacity(); const std::int32_t endOffset = @@ -587,8 +583,9 @@ class Image std::int32_t initialOffset = static_cast(initialPosition & m_termLengthMask); std::int32_t offset = initialOffset; std::int64_t position = initialPosition; - AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition( - initialPosition, m_positionBitsToShift)]; + const int index = LogBufferDescriptor::indexByPosition(initialPosition, m_positionBitsToShift); + assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT); + AtomicBuffer &termBuffer = m_termBuffers[index]; const util::index_t capacity = termBuffer.capacity(); m_header.buffer(termBuffer); @@ -677,8 +674,9 @@ class Image { const std::int64_t position = m_subscriberPosition.get(); const std::int32_t termOffset = (std::int32_t) position & m_termLengthMask; - AtomicBuffer &termBuffer = m_termBuffers[LogBufferDescriptor::indexByPosition( - position, m_positionBitsToShift)]; + const int index = LogBufferDescriptor::indexByPosition(position, m_positionBitsToShift); + assert(index >= 0 && index < LogBufferDescriptor::PARTITION_COUNT); + AtomicBuffer &termBuffer = m_termBuffers[index]; const std::int32_t limitOffset = std::min(termOffset + blockLengthLimit, termBuffer.capacity()); const std::int32_t resultingOffset = TermBlockScanner::scan(termBuffer, termOffset, limitOffset); const std::int32_t length = resultingOffset - termOffset; @@ -723,7 +721,7 @@ class Image /// @endcond private: - AtomicBuffer m_termBuffers[LogBufferDescriptor::PARTITION_COUNT]; + std::array m_termBuffers; Header m_header; Position m_subscriberPosition; std::shared_ptr m_logBuffers; diff --git a/aeron-client/src/main/cpp/concurrent/AtomicBuffer.h b/aeron-client/src/main/cpp/concurrent/AtomicBuffer.h index 120d36f3c7..b4fcafe407 100644 --- a/aeron-client/src/main/cpp/concurrent/AtomicBuffer.h +++ b/aeron-client/src/main/cpp/concurrent/AtomicBuffer.h @@ -88,7 +88,7 @@ class AtomicBuffer { } - AtomicBuffer& operator=(AtomicBuffer& buffer) = default; + AtomicBuffer& operator=(const AtomicBuffer& buffer) = default; // this class does not own the memory. It simply overlays it. virtual ~AtomicBuffer() = default; diff --git a/aeron-client/src/main/cpp/concurrent/logbuffer/Header.h b/aeron-client/src/main/cpp/concurrent/logbuffer/Header.h index 5748b33584..88d7d467c1 100644 --- a/aeron-client/src/main/cpp/concurrent/logbuffer/Header.h +++ b/aeron-client/src/main/cpp/concurrent/logbuffer/Header.h @@ -38,7 +38,7 @@ class Header Header(const Header& header) = default; - Header& operator=(Header& header) + Header& operator=(const Header& header) { m_context = header.m_context; m_buffer.wrap(header.m_buffer); diff --git a/aeron-client/src/main/cpp/concurrent/logbuffer/LogBufferDescriptor.h b/aeron-client/src/main/cpp/concurrent/logbuffer/LogBufferDescriptor.h index 4e5da28788..d830a076f9 100644 --- a/aeron-client/src/main/cpp/concurrent/logbuffer/LogBufferDescriptor.h +++ b/aeron-client/src/main/cpp/concurrent/logbuffer/LogBufferDescriptor.h @@ -29,18 +29,18 @@ namespace aeron { namespace concurrent { namespace logbuffer { namespace LogBufferDescriptor { -static const std::int32_t TERM_MIN_LENGTH = 64 * 1024; -static const std::int32_t TERM_MAX_LENGTH = 1024 * 1024 * 1024; -static const std::int32_t PAGE_MIN_SIZE = 4 * 1024; -static const std::int32_t PAGE_MAX_SIZE = 1024 * 1024 * 1024; +const std::int32_t TERM_MIN_LENGTH = 64 * 1024; +const std::int32_t TERM_MAX_LENGTH = 1024 * 1024 * 1024; +const std::int32_t PAGE_MIN_SIZE = 4 * 1024; +const std::int32_t PAGE_MAX_SIZE = 1024 * 1024 * 1024; #if defined(__GNUC__) || _MSC_VER >= 1900 -constexpr static const int PARTITION_COUNT = 3; +constexpr const int PARTITION_COUNT = 3; #else // Visual Studio 2013 doesn't like constexpr without an update // https://msdn.microsoft.com/en-us/library/vstudio/hh567368.aspx // https://www.microsoft.com/en-us/download/details.aspx?id=41151 -static const int PARTITION_COUNT = 3; +const int PARTITION_COUNT = 3; #endif /* @@ -60,7 +60,7 @@ static const int PARTITION_COUNT = 3; * */ -static const util::index_t LOG_META_DATA_SECTION_INDEX = PARTITION_COUNT; +const util::index_t LOG_META_DATA_SECTION_INDEX = PARTITION_COUNT; /** *
@@ -111,7 +111,7 @@ static const util::index_t LOG_META_DATA_SECTION_INDEX = PARTITION_COUNT;
  * 
*/ -static const util::index_t LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH = util::BitUtil::CACHE_LINE_LENGTH * 2; +const util::index_t LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH = util::BitUtil::CACHE_LINE_LENGTH * 2; #pragma pack(push) #pragma pack(4) @@ -133,20 +133,20 @@ struct LogMetaDataDefn }; #pragma pack(pop) -static const util::index_t TERM_TAIL_COUNTER_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, termTailCounters); +const util::index_t TERM_TAIL_COUNTER_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, termTailCounters); -static const util::index_t LOG_ACTIVE_TERM_COUNT_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, activeTermCount); -static const util::index_t LOG_END_OF_STREAM_POSITION_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, endOfStreamPosition); -static const util::index_t LOG_IS_CONNECTED_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, isConnected); -static const util::index_t LOG_INITIAL_TERM_ID_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, initialTermId); -static const util::index_t LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, defaultFrameHeaderLength); -static const util::index_t LOG_MTU_LENGTH_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, mtuLength); -static const util::index_t LOG_TERM_LENGTH_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, termLength); -static const util::index_t LOG_PAGE_SIZE_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, pageSize); -static const util::index_t LOG_DEFAULT_FRAME_HEADER_OFFSET = (util::index_t)sizeof(LogMetaDataDefn); -static const util::index_t LOG_META_DATA_LENGTH = 4 * 1024; +const util::index_t LOG_ACTIVE_TERM_COUNT_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, activeTermCount); +const util::index_t LOG_END_OF_STREAM_POSITION_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, endOfStreamPosition); +const util::index_t LOG_IS_CONNECTED_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, isConnected); +const util::index_t LOG_INITIAL_TERM_ID_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, initialTermId); +const util::index_t LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, defaultFrameHeaderLength); +const util::index_t LOG_MTU_LENGTH_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, mtuLength); +const util::index_t LOG_TERM_LENGTH_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, termLength); +const util::index_t LOG_PAGE_SIZE_OFFSET = (util::index_t)offsetof(LogMetaDataDefn, pageSize); +const util::index_t LOG_DEFAULT_FRAME_HEADER_OFFSET = (util::index_t)sizeof(LogMetaDataDefn); +const util::index_t LOG_META_DATA_LENGTH = 4 * 1024; -inline static void checkTermLength(std::int32_t termLength) +inline void checkTermLength(std::int32_t termLength) { if (termLength < TERM_MIN_LENGTH) { @@ -169,7 +169,7 @@ inline static void checkTermLength(std::int32_t termLength) } } -inline static void checkPageSize(std::int32_t pageSize) +inline void checkPageSize(std::int32_t pageSize) { if (pageSize < PAGE_MIN_SIZE) { @@ -192,88 +192,88 @@ inline static void checkPageSize(std::int32_t pageSize) } } -inline static std::int32_t initialTermId(const AtomicBuffer& logMetaDataBuffer) +inline std::int32_t initialTermId(const AtomicBuffer& logMetaDataBuffer) { return logMetaDataBuffer.getInt32(LOG_INITIAL_TERM_ID_OFFSET); } -inline static std::int32_t mtuLength(const AtomicBuffer& logMetaDataBuffer) +inline std::int32_t mtuLength(const AtomicBuffer& logMetaDataBuffer) { return logMetaDataBuffer.getInt32(LOG_MTU_LENGTH_OFFSET); } -inline static std::int32_t termLength(const AtomicBuffer& logMetaDataBuffer) +inline std::int32_t termLength(const AtomicBuffer& logMetaDataBuffer) { return logMetaDataBuffer.getInt32(LOG_TERM_LENGTH_OFFSET); } -inline static std::int32_t pageSize(const AtomicBuffer& logMetaDataBuffer) +inline std::int32_t pageSize(const AtomicBuffer& logMetaDataBuffer) { return logMetaDataBuffer.getInt32(LOG_PAGE_SIZE_OFFSET); } -inline static std::int32_t activeTermCount(const AtomicBuffer& logMetaDataBuffer) +inline std::int32_t activeTermCount(const AtomicBuffer& logMetaDataBuffer) { return logMetaDataBuffer.getInt32Volatile(LOG_ACTIVE_TERM_COUNT_OFFSET); } -inline static void activeTermCountOrdered(AtomicBuffer& logMetaDataBuffer, std::int32_t activeTermId) +inline void activeTermCountOrdered(AtomicBuffer& logMetaDataBuffer, std::int32_t activeTermId) { logMetaDataBuffer.putInt32Ordered(LOG_ACTIVE_TERM_COUNT_OFFSET, activeTermId); } -inline static bool casActiveTermCount( +inline bool casActiveTermCount( AtomicBuffer& logMetaDataBuffer, std::int32_t expectedTermCount, std::int32_t updateTermCount) { return logMetaDataBuffer.compareAndSetInt32(LOG_ACTIVE_TERM_COUNT_OFFSET, expectedTermCount, updateTermCount); } -inline static int nextPartitionIndex(int currentIndex) AERON_NOEXCEPT +inline int nextPartitionIndex(int currentIndex) AERON_NOEXCEPT { return (currentIndex + 1) % PARTITION_COUNT; } -inline static int previousPartitionIndex(int currentIndex) AERON_NOEXCEPT +inline int previousPartitionIndex(int currentIndex) AERON_NOEXCEPT { return (currentIndex + (PARTITION_COUNT - 1)) % PARTITION_COUNT; } -inline static bool isConnected(const AtomicBuffer &logMetaDataBuffer) AERON_NOEXCEPT +inline bool isConnected(const AtomicBuffer &logMetaDataBuffer) AERON_NOEXCEPT { return (logMetaDataBuffer.getInt32Volatile(LOG_IS_CONNECTED_OFFSET) == 1); } -inline static void isConnected(AtomicBuffer &logMetaDataBuffer, bool isConnected) AERON_NOEXCEPT +inline void isConnected(AtomicBuffer &logMetaDataBuffer, bool isConnected) AERON_NOEXCEPT { logMetaDataBuffer.putInt32Ordered(LOG_IS_CONNECTED_OFFSET, isConnected ? 1 : 0); } -inline static std::int64_t endOfStreamPosition(const AtomicBuffer &logMetaDataBuffer) AERON_NOEXCEPT +inline std::int64_t endOfStreamPosition(const AtomicBuffer &logMetaDataBuffer) AERON_NOEXCEPT { return logMetaDataBuffer.getInt64Volatile(LOG_END_OF_STREAM_POSITION_OFFSET); } -inline static void endOfStreamPosition(AtomicBuffer &logMetaDataBuffer, std::int64_t position) AERON_NOEXCEPT +inline void endOfStreamPosition(AtomicBuffer &logMetaDataBuffer, std::int64_t position) AERON_NOEXCEPT { logMetaDataBuffer.putInt64Ordered(LOG_END_OF_STREAM_POSITION_OFFSET, position); } -inline static int indexByTerm(std::int32_t initialTermId, std::int32_t activeTermId) AERON_NOEXCEPT +inline int indexByTerm(std::int32_t initialTermId, std::int32_t activeTermId) AERON_NOEXCEPT { return (activeTermId - initialTermId) % PARTITION_COUNT; } -inline static int indexByTermCount(std::int64_t termCount) AERON_NOEXCEPT +inline int indexByTermCount(std::int64_t termCount) AERON_NOEXCEPT { return static_cast(termCount % PARTITION_COUNT); } -inline static int indexByPosition(std::int64_t position, std::int32_t positionBitsToShift) AERON_NOEXCEPT +inline int indexByPosition(std::int64_t position, std::int32_t positionBitsToShift) AERON_NOEXCEPT { return static_cast((static_cast(position) >> positionBitsToShift) % PARTITION_COUNT); } -inline static std::int64_t computePosition( +inline std::int64_t computePosition( std::int32_t activeTermId, std::int32_t termOffset, std::int32_t positionBitsToShift, std::int32_t initialTermId) AERON_NOEXCEPT { const std::int64_t termCount = activeTermId - initialTermId; @@ -281,7 +281,7 @@ inline static std::int64_t computePosition( return (termCount << positionBitsToShift) + termOffset; } -inline static std::int64_t computeTermBeginPosition( +inline std::int64_t computeTermBeginPosition( std::int32_t activeTermId, std::int32_t positionBitsToShift, std::int32_t initialTermId) AERON_NOEXCEPT { const std::int64_t termCount = activeTermId - initialTermId; @@ -289,50 +289,50 @@ inline static std::int64_t computeTermBeginPosition( return termCount << positionBitsToShift; } -inline static std::int64_t rawTailVolatile(const AtomicBuffer& logMetaDataBuffer) +inline std::int64_t rawTailVolatile(const AtomicBuffer& logMetaDataBuffer) { const std::int32_t partitionIndex = indexByTermCount(activeTermCount(logMetaDataBuffer)); return logMetaDataBuffer.getInt64Volatile(TERM_TAIL_COUNTER_OFFSET + (partitionIndex * sizeof(std::int64_t))); } -inline static std::int64_t rawTail(const AtomicBuffer& logMetaDataBuffer) +inline std::int64_t rawTail(const AtomicBuffer& logMetaDataBuffer) { const std::int32_t partitionIndex = indexByTermCount(activeTermCount(logMetaDataBuffer)); return logMetaDataBuffer.getInt64(TERM_TAIL_COUNTER_OFFSET + (partitionIndex * sizeof(std::int64_t))); } -inline static std::int64_t rawTail(const AtomicBuffer& logMetaDataBuffer, int partitionIndex) +inline std::int64_t rawTail(const AtomicBuffer& logMetaDataBuffer, int partitionIndex) { return logMetaDataBuffer.getInt64(TERM_TAIL_COUNTER_OFFSET + (partitionIndex * sizeof(std::int64_t))); } -inline static std::int32_t termId(const std::int64_t rawTail) +inline std::int32_t termId(const std::int64_t rawTail) { return static_cast(rawTail >> 32); } -inline static std::int32_t termOffset(const std::int64_t rawTail, const std::int64_t termLength) +inline std::int32_t termOffset(const std::int64_t rawTail, const std::int64_t termLength) { const std::int64_t tail = rawTail & 0xFFFFFFFFl; return static_cast(std::min(tail, termLength)); } -inline static bool casRawTail( +inline bool casRawTail( AtomicBuffer& logMetaDataBuffer, int partitionIndex, std::int64_t expectedRawTail, std::int64_t updateRawTail) { return logMetaDataBuffer.compareAndSetInt64( TERM_TAIL_COUNTER_OFFSET + (partitionIndex * sizeof(std::int64_t)), expectedRawTail, updateRawTail); } -inline static AtomicBuffer defaultFrameHeader(AtomicBuffer& logMetaDataBuffer) +inline AtomicBuffer defaultFrameHeader(AtomicBuffer& logMetaDataBuffer) { std::uint8_t *header = logMetaDataBuffer.buffer() + LOG_DEFAULT_FRAME_HEADER_OFFSET; return AtomicBuffer(header, DataFrameHeader::LENGTH); } -inline static void rotateLog(AtomicBuffer& logMetaDataBuffer, std::int32_t currentTermCount, std::int32_t currentTermId) +inline void rotateLog(AtomicBuffer& logMetaDataBuffer, std::int32_t currentTermCount, std::int32_t currentTermId) { const std::int32_t nextTermId = currentTermId + 1; const std::int32_t nextTermCount = currentTermCount + 1; @@ -354,7 +354,7 @@ inline static void rotateLog(AtomicBuffer& logMetaDataBuffer, std::int32_t curre LogBufferDescriptor::casActiveTermCount(logMetaDataBuffer, currentTermCount, nextTermCount); } -inline static void initializeTailWithTermId(AtomicBuffer& logMetaDataBuffer, int partitionIndex, std::int32_t termId) +inline void initializeTailWithTermId(AtomicBuffer& logMetaDataBuffer, int partitionIndex, std::int32_t termId) { const std::int64_t rawTail = (termId * ((int64_t(1) << 32))); logMetaDataBuffer.putInt64(TERM_TAIL_COUNTER_OFFSET + (partitionIndex * sizeof(std::int64_t)), rawTail);