diff --git a/aeron-driver/src/main/c/aeron_publication_image.c b/aeron-driver/src/main/c/aeron_publication_image.c index 682aa1c5cd..c041debfa7 100644 --- a/aeron-driver/src/main/c/aeron_publication_image.c +++ b/aeron-driver/src/main/c/aeron_publication_image.c @@ -106,6 +106,40 @@ static aeron_feedback_delay_generator_state_t *aeron_publication_image_acquire_d return &image->feedback_delay_state; } +static void aeron_publication_image_report_loss( + aeron_publication_image_t *image, int32_t term_id, int32_t term_offset, size_t length, size_t bytes_lost) +{ + if (image->conductor_fields.loss_report_entry_offset >= 0) + { + aeron_loss_reporter_record_observation( + image->conductor_fields.loss_report, image->conductor_fields.loss_report_entry_offset, (int64_t)bytes_lost, image->epoch_clock()); + } + else if (NULL != image->conductor_fields.loss_report) + { + if (NULL != image->conductor_fields.endpoint) + { + image->conductor_fields.loss_report_entry_offset = aeron_loss_reporter_create_entry( + image->conductor_fields.loss_report, + (int64_t)bytes_lost, + image->epoch_clock(), + image->session_id, + image->stream_id, + image->conductor_fields.endpoint->conductor_fields.udp_channel->original_uri, + image->conductor_fields.endpoint->conductor_fields.udp_channel->uri_length, + image->source_identity, + image->source_identity_length); + } + + if (-1 == image->conductor_fields.loss_report_entry_offset) + { + image->conductor_fields.loss_report = NULL; + } + } + image->conductor_fields.loss_report_term_id = term_id; + image->conductor_fields.loss_report_term_offset = term_offset; + image->conductor_fields.loss_report_length = length; +} + int aeron_publication_image_create( aeron_publication_image_t **image, aeron_receive_channel_endpoint_t *endpoint, @@ -264,8 +298,11 @@ int aeron_publication_image_create( _image->conductor_fields.endpoint = endpoint; _image->congestion_control = congestion_control; - _image->loss_reporter = loss_reporter; - _image->loss_reporter_offset = -1; + _image->conductor_fields.loss_report = loss_reporter; + _image->conductor_fields.loss_report_entry_offset = -1; + _image->conductor_fields.loss_report_term_id = 0; + _image->conductor_fields.loss_report_term_offset = 0; + _image->conductor_fields.loss_report_length = 0; _image->conductor_fields.subscribable.correlation_id = correlation_id; _image->conductor_fields.subscribable.array = NULL; _image->conductor_fields.subscribable.length = 0; @@ -342,14 +379,14 @@ int aeron_publication_image_create( active_term_id, initial_term_offset, _image->position_bits_to_shift, initial_term_id); const int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock); - _image->begin_loss_change = -1; - _image->end_loss_change = -1; + _image->begin_loss_change = 0; + _image->end_loss_change = 0; _image->loss_term_id = active_term_id; _image->loss_term_offset = initial_term_offset; _image->loss_length = 0; - _image->begin_sm_change = -1; - _image->end_sm_change = -1; + _image->begin_sm_change = 0; + _image->end_sm_change = 0; _image->next_sm_position = initial_position; _image->next_sm_receiver_window_length = _image->congestion_control->initial_window_length( _image->congestion_control->state); @@ -461,31 +498,15 @@ void aeron_publication_image_on_gap_detected(void *clientd, int32_t term_id, int AERON_SET_RELEASE(image->end_loss_change, change_number); - if (image->loss_reporter_offset >= 0) + size_t loss_report_end_offset; + if (term_id != image->conductor_fields.loss_report_term_id || + (size_t)term_offset >= (loss_report_end_offset = (size_t)image->conductor_fields.loss_report_term_offset + image->conductor_fields.loss_report_length)) { - aeron_loss_reporter_record_observation( - image->loss_reporter, image->loss_reporter_offset, (int64_t)length, image->epoch_clock()); + aeron_publication_image_report_loss(image, term_id, term_offset, length, length); } - else if (NULL != image->loss_reporter) + else if ((size_t)term_offset + length > loss_report_end_offset) { - if (NULL != image->conductor_fields.endpoint) - { - image->loss_reporter_offset = aeron_loss_reporter_create_entry( - image->loss_reporter, - (int64_t)length, - image->epoch_clock(), - image->session_id, - image->stream_id, - image->conductor_fields.endpoint->conductor_fields.udp_channel->original_uri, - image->conductor_fields.endpoint->conductor_fields.udp_channel->uri_length, - image->source_identity, - image->source_identity_length); - } - - if (-1 == image->loss_reporter_offset) - { - image->loss_reporter = NULL; - } + aeron_publication_image_report_loss(image, term_id, term_offset, length, length - (loss_report_end_offset - (size_t)term_offset)); } } diff --git a/aeron-driver/src/main/c/aeron_publication_image.h b/aeron-driver/src/main/c/aeron_publication_image.h index 551ebaf0de..b8e0f1824e 100644 --- a/aeron-driver/src/main/c/aeron_publication_image.h +++ b/aeron-driver/src/main/c/aeron_publication_image.h @@ -63,6 +63,11 @@ typedef struct aeron_publication_image_stct int64_t untethered_window_limit_timeout_ns; int64_t untethered_resting_timeout_ns; int64_t clean_position; + int32_t loss_report_term_id; + int32_t loss_report_term_offset; + size_t loss_report_length; + aeron_loss_reporter_t *loss_report; + aeron_loss_reporter_entry_offset_t loss_report_entry_offset; aeron_receive_channel_endpoint_t *endpoint; uint8_t flags; } @@ -95,9 +100,6 @@ typedef struct aeron_publication_image_stct aeron_clock_func_t epoch_clock; aeron_clock_cache_t *cached_clock; - aeron_loss_reporter_t *loss_reporter; - aeron_loss_reporter_entry_offset_t loss_reporter_offset; - char *log_file_name; int32_t session_id; int32_t stream_id; diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java index 0d694e8329..ca5c2bbb1a 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java @@ -68,6 +68,9 @@ class PublicationImageConductorFields extends PublicationImagePadding1 long cleanPosition; final ArrayList untetheredSubscriptions = new ArrayList<>(); ReadablePosition[] subscriberPositions; + int lossReportTermId; + int lossReportTermOffset; + int lossReportLength; LossReport lossReport; LossReport.ReportEntry reportEntry; volatile Integer responseSessionId = null; @@ -150,11 +153,11 @@ enum State private final long smTimeoutNs; private final long maxReceiverWindowLength; - private volatile long beginLossChange; - private volatile long endLossChange; - private int lossTermId; - private int lossTermOffset; - private int lossLength; + volatile long beginLossChange; + volatile long endLossChange; + int lossTermId; + int lossTermOffset; + int lossLength; private long lastLossChangeNumber; private volatile long timeOfLastStateChangeNs; @@ -411,19 +414,15 @@ public void onGapDetected(final int termId, final int termOffset, final int leng END_LOSS_CHANGE_VH.setRelease(this, changeNumber); - if (null != reportEntry) + final int lossReportEndOffset; + if (termId != lossReportTermId || + termOffset >= (lossReportEndOffset = (lossReportTermOffset + lossReportLength))) { - reportEntry.recordObservation(length, epochClock.time()); + reportLoss(termId, termOffset, length, length); } - else if (null != lossReport) + else if (termOffset + length > lossReportEndOffset) { - reportEntry = lossReport.createEntry( - length, epochClock.time(), sessionId, streamId, channel(), sourceIdentity); - - if (null == reportEntry) - { - lossReport = null; - } + reportLoss(termId, termOffset, length, length - (lossReportEndOffset - termOffset)); } } @@ -953,6 +952,28 @@ void stopStatusMessagesIfNotActive() } } + private void reportLoss(final int termId, final int termOffset, final int length, final int bytesLost) + { + if (null != reportEntry) + { + reportEntry.recordObservation(bytesLost, epochClock.time()); + } + else if (null != lossReport) + { + reportEntry = lossReport.createEntry( + bytesLost, epochClock.time(), sessionId, streamId, channel(), sourceIdentity); + + if (null == reportEntry) + { + lossReport = null; + } + } + + lossReportTermId = termId; + lossReportTermOffset = termOffset; + lossReportLength = length; + } + private void state(final State state) { this.state = state; diff --git a/aeron-driver/src/test/c/aeron_publication_image_test.cpp b/aeron-driver/src/test/c/aeron_publication_image_test.cpp index 49f646695f..89c2e50082 100644 --- a/aeron-driver/src/test/c/aeron_publication_image_test.cpp +++ b/aeron-driver/src/test/c/aeron_publication_image_test.cpp @@ -36,8 +36,7 @@ static bool always_measure_rtt(void *state, int64_t now_ns) return true; } -class PublicationImageTest : public ReceiverTestBase -{ +class PublicationImageTest : public ReceiverTestBase { }; TEST_F(PublicationImageTest, shouldAddAndRemoveDestination) @@ -55,7 +54,13 @@ TEST_F(PublicationImageTest, shouldAddAndRemoveDestination) aeron_receive_destination_t *dest_1; ASSERT_LE(0, aeron_receive_destination_create( - &dest_1, channel_1, channel_1, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_1, + channel_1, + channel_1, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest_1)); aeron_publication_image_t *image = createImage(endpoint, dest_1, stream_id, session_id); @@ -64,7 +69,13 @@ TEST_F(PublicationImageTest, shouldAddAndRemoveDestination) aeron_receive_destination_t *dest_2; ASSERT_LE(0, aeron_receive_destination_create( - &dest_2, channel_2, channel_2, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_2, + channel_2, + channel_2, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(2, aeron_receive_channel_endpoint_add_destination(endpoint, dest_2)); ASSERT_EQ(2, aeron_publication_image_add_destination(image, dest_2)); @@ -88,7 +99,7 @@ TEST_F(PublicationImageTest, shouldAddAndRemoveDestination) ASSERT_EQ(1u, endpoint->destinations.length); ASSERT_EQ(0, aeron_publication_image_remove_destination(image, channel_not_added)); ASSERT_EQ(1u, image->connections.length); - ASSERT_EQ((aeron_receive_destination_t *)nullptr, destination); + ASSERT_EQ((aeron_receive_destination_t *) nullptr, destination); aeron_udp_channel_t *remove_channel_2 = createChannel(uri_2, &m_channels_for_tear_down); @@ -124,13 +135,25 @@ TEST_F(PublicationImageTest, shouldSendControlMessagesToAllDestinations) aeron_udp_channel_parse(strlen(uri_2), uri_2, &m_resolver, &channel_2, false); ASSERT_LE(0, aeron_receive_destination_create( - &dest_1, channel_1, channel_1, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_1, + channel_1, + channel_1, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest_1)); aeron_publication_image_t *image = createImage(endpoint, dest_1, stream_id, session_id); ASSERT_LE(0, aeron_receive_destination_create( - &dest_2, channel_2, channel_2, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_2, + channel_2, + channel_2, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(2, aeron_receive_channel_endpoint_add_destination(endpoint, dest_2)); ASSERT_EQ(2, aeron_publication_image_add_destination(image, dest_2)); @@ -200,13 +223,25 @@ TEST_F(PublicationImageTest, shouldHandleEosAcrossDestinations) aeron_udp_channel_parse(strlen(uri_2), uri_2, &m_resolver, &channel_2, false); ASSERT_LE(0, aeron_receive_destination_create( - &dest_1, channel_1, channel_1, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_1, + channel_1, + channel_1, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest_1)); aeron_publication_image_t *image = createImage(endpoint, dest_1, stream_id, session_id); ASSERT_LE(0, aeron_receive_destination_create( - &dest_2, channel_2, channel_2, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_2, + channel_2, + channel_2, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(2, aeron_receive_channel_endpoint_add_destination(endpoint, dest_2)); ASSERT_EQ(2, aeron_publication_image_add_destination(image, dest_2)); @@ -262,13 +297,25 @@ TEST_F(PublicationImageTest, shouldNotSendControlMessagesToAllDestinationThatHav aeron_clock_update_cached_nano_time(m_context->receiver_cached_clock, t0_ns); ASSERT_LE(0, aeron_receive_destination_create( - &dest_1, channel_1, channel_1, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_1, + channel_1, + channel_1, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest_1)); aeron_publication_image_t *image = createImage(endpoint, dest_1, stream_id, session_id); ASSERT_LE(0, aeron_receive_destination_create( - &dest_2, channel_2, channel_2, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_2, + channel_2, + channel_2, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(2, aeron_receive_channel_endpoint_add_destination(endpoint, dest_2)); ASSERT_EQ(2, aeron_publication_image_add_destination(image, dest_2)); @@ -282,7 +329,7 @@ TEST_F(PublicationImageTest, shouldNotSendControlMessagesToAllDestinationThatHav message->stream_id = stream_id; message->session_id = session_id; - message->frame_header.frame_length = (int32_t)message_length; + message->frame_header.frame_length = (int32_t) message_length; message->term_id = 0; message->term_offset = 0; @@ -291,7 +338,7 @@ TEST_F(PublicationImageTest, shouldNotSendControlMessagesToAllDestinationThatHav aeron_clock_update_cached_nano_time(m_context->receiver_cached_clock, t1_ns); - auto next_offset = (int32_t)message_length; + auto next_offset = (int32_t) message_length; message->term_offset = next_offset; aeron_publication_image_insert_packet(image, dest_2, 0, next_offset, data, message_length, &addr); @@ -333,13 +380,25 @@ TEST_F(PublicationImageTest, shouldTrackActiveTransportAccountBasedOnFrames) aeron_clock_update_cached_nano_time(m_context->receiver_cached_clock, t0_ns); ASSERT_LE(0, aeron_receive_destination_create( - &dest_1, channel_1, channel_1, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_1, + channel_1, + channel_1, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest_1)); aeron_publication_image_t *image = createImage(endpoint, dest_1, stream_id, session_id); ASSERT_LE(0, aeron_receive_destination_create( - &dest_2, channel_2, channel_2, m_context, &m_counters_manager, registration_id, endpoint->channel_status.counter_id)); + &dest_2, + channel_2, + channel_2, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); ASSERT_EQ(2, aeron_receive_channel_endpoint_add_destination(endpoint, dest_2)); ASSERT_EQ(2, aeron_publication_image_add_destination(image, dest_2)); @@ -438,7 +497,7 @@ TEST_F(PublicationImageTest, shouldTrackUnderRunningTransportsWithLastSmAndRecei message->stream_id = stream_id; message->session_id = session_id; - message->frame_header.frame_length = (int32_t)message_length; + message->frame_header.frame_length = (int32_t) message_length; message->term_id = 0; message->term_offset = 0; @@ -481,7 +540,7 @@ TEST_F(PublicationImageTest, shouldReturnStorageSpaceErrorIfNotEnoughStorageSpac endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest)); - m_context->usable_fs_space_func = [](const char* path) -> uint64_t + m_context->usable_fs_space_func = [](const char *path) -> uint64_t { return 42; }; @@ -518,7 +577,7 @@ TEST_F(PublicationImageTest, shouldLogWarningIfStorageSpaceIsLow) endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest)); - m_context->usable_fs_space_func = [](const char* path) -> uint64_t + m_context->usable_fs_space_func = [](const char *path) -> uint64_t { return 123456789; }; @@ -538,3 +597,286 @@ TEST_F(PublicationImageTest, shouldLogWarningIfStorageSpaceIsLow) .append(m_context->aeron_dir); EXPECT_NE(std::string::npos, error_text.find(expected_warning)); } + +TEST_F(PublicationImageTest, shouldReportUniqueLoss) +{ + const char *uri = "aeron:udp?endpoint=localhost:9090"; + aeron_receive_channel_endpoint_t *endpoint = createEndpoint(uri); + int32_t stream_id = 777; + int32_t session_id = 42; + int64_t registration_id = 0; + + aeron_udp_channel_t *channel; + aeron_receive_destination_t *dest; + + ASSERT_EQ(0, aeron_udp_channel_parse(strlen(uri), uri, &m_resolver, &channel, false)); + + ASSERT_LE(0, aeron_receive_destination_create( + &dest, + channel, + channel, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); + + aeron_publication_image_t *image = createImage(endpoint, dest, stream_id, session_id); + ASSERT_NE(nullptr, image) << aeron_errmsg(); + + const int32_t term_id = 111; + const int32_t offset = 128; + const size_t length = 192; + + // initial loss report + aeron_publication_image_on_gap_detected(image, term_id, offset, length); + EXPECT_EQ(1, image->begin_loss_change); + EXPECT_EQ(term_id, image->loss_term_id); + EXPECT_EQ(offset, image->loss_term_offset); + EXPECT_EQ(length, image->loss_length); + EXPECT_EQ(1, image->end_loss_change); + EXPECT_EQ(1, aeron_loss_reporter_read( + m_loss_reporter_buffer.data(), + m_loss_reporter_buffer.size(), + []( + void *clientd, + int64_t observation_count, + int64_t total_bytes_lost, + int64_t first_observation_timestamp, + int64_t last_observation_timestamp, + int32_t session_id, + int32_t stream_id, + const char *channel, + int32_t channel_length, + const char *source, + int32_t source_length) + { + EXPECT_EQ(1, observation_count); + EXPECT_EQ(192, total_bytes_lost); + EXPECT_EQ(first_observation_timestamp, last_observation_timestamp); + EXPECT_EQ(42, session_id); + EXPECT_EQ(777, stream_id); + }, + nullptr)); + + // same loss => no reporting + aeron_publication_image_on_gap_detected(image, term_id, offset, length); + EXPECT_EQ(2, image->begin_loss_change); + EXPECT_EQ(term_id, image->loss_term_id); + EXPECT_EQ(offset, image->loss_term_offset); + EXPECT_EQ(length, image->loss_length); + EXPECT_EQ(2, image->end_loss_change); + EXPECT_EQ(1, aeron_loss_reporter_read( + m_loss_reporter_buffer.data(), + m_loss_reporter_buffer.size(), + []( + void *clientd, + int64_t observation_count, + int64_t total_bytes_lost, + int64_t first_observation_timestamp, + int64_t last_observation_timestamp, + int32_t session_id, + int32_t stream_id, + const char *channel, + int32_t channel_length, + const char *source, + int32_t source_length) + { + EXPECT_EQ(1, observation_count); + EXPECT_EQ(192, total_bytes_lost); + EXPECT_EQ(first_observation_timestamp, last_observation_timestamp); + EXPECT_EQ(42, session_id); + EXPECT_EQ(777, stream_id); + }, + nullptr)); + + // less loss => no reporting + aeron_publication_image_on_gap_detected(image, term_id, offset, 32); + EXPECT_EQ(3, image->begin_loss_change); + EXPECT_EQ(term_id, image->loss_term_id); + EXPECT_EQ(offset, image->loss_term_offset); + EXPECT_EQ(32, image->loss_length); + EXPECT_EQ(3, image->end_loss_change); + EXPECT_EQ(1, aeron_loss_reporter_read( + m_loss_reporter_buffer.data(), + m_loss_reporter_buffer.size(), + []( + void *clientd, + int64_t observation_count, + int64_t total_bytes_lost, + int64_t first_observation_timestamp, + int64_t last_observation_timestamp, + int32_t session_id, + int32_t stream_id, + const char *channel, + int32_t channel_length, + const char *source, + int32_t source_length) + { + EXPECT_EQ(1, observation_count); + EXPECT_EQ(192, total_bytes_lost); + EXPECT_EQ(first_observation_timestamp, last_observation_timestamp); + EXPECT_EQ(42, session_id); + EXPECT_EQ(777, stream_id); + }, + nullptr)); + + // larger loss => report + aeron_publication_image_on_gap_detected(image, term_id, offset, 1500); + EXPECT_EQ(4, image->begin_loss_change); + EXPECT_EQ(term_id, image->loss_term_id); + EXPECT_EQ(offset, image->loss_term_offset); + EXPECT_EQ(1500, image->loss_length); + EXPECT_EQ(4, image->end_loss_change); + EXPECT_EQ(1, aeron_loss_reporter_read( + m_loss_reporter_buffer.data(), + m_loss_reporter_buffer.size(), + []( + void *clientd, + int64_t observation_count, + int64_t total_bytes_lost, + int64_t first_observation_timestamp, + int64_t last_observation_timestamp, + int32_t session_id, + int32_t stream_id, + const char *channel, + int32_t channel_length, + const char *source, + int32_t source_length) + { + EXPECT_EQ(2, observation_count); + EXPECT_EQ(1500, total_bytes_lost); + EXPECT_LE(first_observation_timestamp, last_observation_timestamp); + EXPECT_EQ(42, session_id); + EXPECT_EQ(777, stream_id); + }, + nullptr)); + + // overlapping loss => report + aeron_publication_image_on_gap_detected(image, term_id, offset + 996, 700); + EXPECT_EQ(5, image->begin_loss_change); + EXPECT_EQ(term_id, image->loss_term_id); + EXPECT_EQ(offset + 996, image->loss_term_offset); + EXPECT_EQ(700, image->loss_length); + EXPECT_EQ(5, image->end_loss_change); + EXPECT_EQ(1, aeron_loss_reporter_read( + m_loss_reporter_buffer.data(), + m_loss_reporter_buffer.size(), + []( + void *clientd, + int64_t observation_count, + int64_t total_bytes_lost, + int64_t first_observation_timestamp, + int64_t last_observation_timestamp, + int32_t session_id, + int32_t stream_id, + const char *channel, + int32_t channel_length, + const char *source, + int32_t source_length) + { + EXPECT_EQ(3, observation_count); + EXPECT_EQ(1696, total_bytes_lost); + EXPECT_LE(first_observation_timestamp, last_observation_timestamp); + EXPECT_EQ(42, session_id); + EXPECT_EQ(777, stream_id); + }, + nullptr)); + + // non-overlapping loss => report + aeron_publication_image_on_gap_detected(image, term_id, offset + 4096, 128); + EXPECT_EQ(6, image->begin_loss_change); + EXPECT_EQ(term_id, image->loss_term_id); + EXPECT_EQ(offset + 4096, image->loss_term_offset); + EXPECT_EQ(128, image->loss_length); + EXPECT_EQ(6, image->end_loss_change); + EXPECT_EQ(1, aeron_loss_reporter_read( + m_loss_reporter_buffer.data(), + m_loss_reporter_buffer.size(), + []( + void *clientd, + int64_t observation_count, + int64_t total_bytes_lost, + int64_t first_observation_timestamp, + int64_t last_observation_timestamp, + int32_t session_id, + int32_t stream_id, + const char *channel, + int32_t channel_length, + const char *source, + int32_t source_length) + { + EXPECT_EQ(4, observation_count); + EXPECT_EQ(1824, total_bytes_lost); + EXPECT_LE(first_observation_timestamp, last_observation_timestamp); + EXPECT_EQ(42, session_id); + EXPECT_EQ(777, stream_id); + }, + nullptr)); + + // loss in another term => report + aeron_publication_image_on_gap_detected(image, term_id + 3, 0, 400); + EXPECT_EQ(7, image->begin_loss_change); + EXPECT_EQ(term_id + 3, image->loss_term_id); + EXPECT_EQ(0, image->loss_term_offset); + EXPECT_EQ(400, image->loss_length); + EXPECT_EQ(7, image->end_loss_change); + EXPECT_EQ(1, aeron_loss_reporter_read( + m_loss_reporter_buffer.data(), + m_loss_reporter_buffer.size(), + []( + void *clientd, + int64_t observation_count, + int64_t total_bytes_lost, + int64_t first_observation_timestamp, + int64_t last_observation_timestamp, + int32_t session_id, + int32_t stream_id, + const char *channel, + int32_t channel_length, + const char *source, + int32_t source_length) + { + EXPECT_EQ(5, observation_count); + EXPECT_EQ(2224, total_bytes_lost); + EXPECT_LE(first_observation_timestamp, last_observation_timestamp); + EXPECT_EQ(42, session_id); + EXPECT_EQ(777, stream_id); + }, + nullptr)); + + // same loss => no report + aeron_publication_image_on_gap_detected(image, term_id + 3, 0, 400); + EXPECT_EQ(8, image->begin_loss_change); + EXPECT_EQ(term_id + 3, image->loss_term_id); + EXPECT_EQ(0, image->loss_term_offset); + EXPECT_EQ(400, image->loss_length); + EXPECT_EQ(8, image->end_loss_change); + EXPECT_EQ(1, aeron_loss_reporter_read( + m_loss_reporter_buffer.data(), + m_loss_reporter_buffer.size(), + []( + void *clientd, + int64_t observation_count, + int64_t total_bytes_lost, + int64_t first_observation_timestamp, + int64_t last_observation_timestamp, + int32_t session_id, + int32_t stream_id, + const char *channel, + int32_t channel_length, + const char *source, + int32_t source_length) + { + EXPECT_EQ(5, observation_count); + EXPECT_EQ(2224, total_bytes_lost); + EXPECT_LE(first_observation_timestamp, last_observation_timestamp); + EXPECT_EQ(42, session_id); + EXPECT_EQ(777, stream_id); + }, + nullptr)); + + aeron_publication_image_remove_destination(image, channel); + endpoint->transport_bindings->poller_remove_func(&m_receiver.poller, &dest->transport); + endpoint->transport_bindings->close_func(&dest->transport); + aeron_receive_destination_delete(dest, &m_counters_manager); +} diff --git a/aeron-driver/src/test/c/aeron_receiver_test.h b/aeron-driver/src/test/c/aeron_receiver_test.h index 707b38405a..55fc1a10a4 100644 --- a/aeron-driver/src/test/c/aeron_receiver_test.h +++ b/aeron-driver/src/test/c/aeron_receiver_test.h @@ -100,6 +100,8 @@ class ReceiverTestBase : public testing::Test &m_error_log, m_error_log_buffer.data(), m_error_log_buffer.size(), aeron_epoch_clock); aeron_driver_receiver_init(&m_receiver, m_context, &m_system_counters, &m_error_log); + aeron_loss_reporter_init(&m_loss_reporter, m_loss_reporter_buffer.data(), m_loss_reporter_buffer.size()); + m_receiver_proxy.receiver = &m_receiver; m_context->receiver_proxy = &m_receiver_proxy; m_context->error_log = &m_error_log; @@ -226,10 +228,29 @@ class ReceiverTestBase : public testing::Test conductor.context = m_context; if (aeron_publication_image_create( - &image, endpoint, destination, &conductor, correlation_id, session_id, stream_id, 0, 0, 0, - &hwm_position, &pos_position, congestion_control_strategy, - &channel->remote_control, &channel->local_data, - TERM_BUFFER_SIZE, MTU, UINT8_C(0), nullptr, true, true, false, &m_system_counters) < 0) + &image, + endpoint, + destination, + &conductor, + correlation_id, + session_id, + stream_id, + 0, + 0, + 0, + &hwm_position, + &pos_position, + congestion_control_strategy, + &channel->remote_control, + &channel->local_data, + TERM_BUFFER_SIZE, + MTU, + UINT8_C(0), + &m_loss_reporter, + true, + true, + false, + &m_system_counters) < 0) { congestion_control_strategy->fini(congestion_control_strategy); return nullptr; @@ -281,6 +302,8 @@ class ReceiverTestBase : public testing::Test int64_t m_conductor_fail_counter = 0; aeron_driver_receiver_t m_receiver = {}; aeron_distinct_error_log_t m_error_log = {}; + aeron_loss_reporter_t m_loss_reporter = {}; + AERON_DECL_ALIGNED(buffer_t m_loss_reporter_buffer, 16) = {}; AERON_DECL_ALIGNED(buffer_t m_error_log_buffer, 16) = {}; AERON_DECL_ALIGNED(buffer_t m_counter_value_buffer, 16) = {}; AERON_DECL_ALIGNED(buffer_4x_t m_counter_meta_buffer, 16) = {}; diff --git a/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java index c2cf747b54..b225532ae0 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/PublicationImageTest.java @@ -19,6 +19,7 @@ import io.aeron.driver.buffer.RawLog; import io.aeron.driver.media.ReceiveChannelEndpoint; import io.aeron.driver.media.UdpChannel; +import io.aeron.driver.reports.LossReport; import io.aeron.driver.status.ReceiverHwm; import io.aeron.driver.status.ReceiverPos; import io.aeron.driver.status.SystemCounterDescriptor; @@ -35,6 +36,7 @@ import org.agrona.concurrent.status.Position; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.InOrder; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -46,6 +48,10 @@ import static io.aeron.logbuffer.LogBufferDescriptor.*; import static io.aeron.protocol.DataHeaderFlyweight.*; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -77,6 +83,7 @@ class PublicationImageTest new UnsafeBuffer(ByteBuffer.allocateDirect(64 * 1024)), StandardCharsets.US_ASCII); private final DataHeaderFlyweight headerFlyweight = new DataHeaderFlyweight(); + private final LossReport lossReport = mock(LossReport.class); private Position hwmPosition; private Position rcvPosition; private PublicationImage image; @@ -94,13 +101,15 @@ void before() .untetheredWindowLimitTimeoutNs(TimeUnit.SECONDS.toNanos(1)) .untetheredRestingTimeoutNs(TimeUnit.SECONDS.toNanos(1)) .statusMessageTimeoutNs(TimeUnit.MILLISECONDS.toNanos(150)) - .systemCounters(new SystemCounters(countersManager)); + .systemCounters(new SystemCounters(countersManager)) + .lossReport(lossReport); final String channel = "aeron:udp?endpoint=localhost:5555"; final ChannelUri channelUri = ChannelUri.parse(channel); final UdpChannel udpChannel = mock(UdpChannel.class); when(udpChannel.channelUri()).thenReturn(channelUri); when(receiveChannelEndpoint.subscriptionUdpChannel()).thenReturn(udpChannel); + when(receiveChannelEndpoint.originalUriString()).thenReturn(channel); final SubscriptionLink subscriptionLink1 = mock(SubscriptionLink.class); when(subscriptionLink1.isReliable()).thenReturn(true); @@ -221,6 +230,100 @@ void shouldAdvanceHighWaterMarkPositionOnHeartbeat() } } + @Test + void shouldOnlyRecordUniqueLoss() + { + final LossReport.ReportEntry reportEntry = mock(LossReport.ReportEntry.class); + when(lossReport.createEntry(anyLong(), anyLong(), anyInt(), anyInt(), anyString(), anyString())) + .thenReturn(reportEntry); + final InOrder inOrder = inOrder(lossReport, reportEntry); + + final int termId = 0; + final int offset = 0; + final int length = 1024; + + // first activation => must be recorded + epochClock.update(100); + image.onGapDetected(termId, offset, length); + assertEquals(1L, image.beginLossChange); + assertEquals(termId, image.lossTermId); + assertEquals(offset, image.lossTermOffset); + assertEquals(length, image.lossLength); + assertEquals(1L, image.endLossChange); + + // same loss => no reporting + epochClock.update(200); + image.onGapDetected(termId, offset, length); + assertEquals(2L, image.beginLossChange); + assertEquals(termId, image.lossTermId); + assertEquals(offset, image.lossTermOffset); + assertEquals(length, image.lossLength); + assertEquals(2L, image.endLossChange); + + // smaller loss => no reporting + epochClock.update(300); + image.onGapDetected(termId, offset, 32); + assertEquals(3L, image.beginLossChange); + assertEquals(termId, image.lossTermId); + assertEquals(offset, image.lossTermOffset); + assertEquals(32, image.lossLength); + assertEquals(3L, image.endLossChange); + + // loss length increased => record + epochClock.update(400); + image.onGapDetected(termId, offset, length + 128); + assertEquals(4L, image.beginLossChange); + assertEquals(termId, image.lossTermId); + assertEquals(offset, image.lossTermOffset); + assertEquals(length + 128, image.lossLength); + assertEquals(4L, image.endLossChange); + + // overlapping loss => record + epochClock.update(500); + image.onGapDetected(termId, offset + 512, 800); + assertEquals(5L, image.beginLossChange); + assertEquals(termId, image.lossTermId); + assertEquals(offset + 512, image.lossTermOffset); + assertEquals(800, image.lossLength); + assertEquals(5L, image.endLossChange); + + // non-overlapping loss => record + epochClock.update(600); + image.onGapDetected(termId, offset + 512 + 800, 32); + assertEquals(6L, image.beginLossChange); + assertEquals(termId, image.lossTermId); + assertEquals(offset + 512 + 800, image.lossTermOffset); + assertEquals(32, image.lossLength); + assertEquals(6L, image.endLossChange); + + // non-overlapping loss => record + epochClock.update(700); + image.onGapDetected(termId, offset + 4096, 2048); + assertEquals(7L, image.beginLossChange); + assertEquals(termId, image.lossTermId); + assertEquals(offset + 4096, image.lossTermOffset); + assertEquals(2048, image.lossLength); + assertEquals(7L, image.endLossChange); + + // loss in different term => record + epochClock.update(800); + image.onGapDetected(termId + 11, 0, 256); + assertEquals(8L, image.beginLossChange); + assertEquals(termId + 11, image.lossTermId); + assertEquals(0, image.lossTermOffset); + assertEquals(256, image.lossLength); + assertEquals(8L, image.endLossChange); + + inOrder.verify(lossReport).createEntry( + length, 100, SESSION_ID, STREAM_ID, receiveChannelEndpoint.originalUriString(), SOURCE_IDENTITY); + inOrder.verify(reportEntry).recordObservation(128, 400); + inOrder.verify(reportEntry).recordObservation(160, 500); + inOrder.verify(reportEntry).recordObservation(32, 600); + inOrder.verify(reportEntry).recordObservation(2048, 700); + inOrder.verify(reportEntry).recordObservation(256, 800); + inOrder.verifyNoMoreInteractions(); + } + private int writeFrame( final int offset, final int termOffset,