Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 49 additions & 28 deletions aeron-driver/src/main/c/aeron_publication_image.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,40 @@ static aeron_feedback_delay_generator_state_t *aeron_publication_image_acquire_d
return &image->feedback_delay_state;
}

static void aeron_publication_image_report_loss(
aeron_publication_image_t *image, int32_t term_id, int32_t term_offset, size_t length, size_t bytes_lost)
{
if (image->conductor_fields.loss_report_entry_offset >= 0)
{
aeron_loss_reporter_record_observation(
image->conductor_fields.loss_report, image->conductor_fields.loss_report_entry_offset, (int64_t)bytes_lost, image->epoch_clock());
}
else if (NULL != image->conductor_fields.loss_report)
{
if (NULL != image->conductor_fields.endpoint)
{
image->conductor_fields.loss_report_entry_offset = aeron_loss_reporter_create_entry(
image->conductor_fields.loss_report,
(int64_t)bytes_lost,
image->epoch_clock(),
image->session_id,
image->stream_id,
image->conductor_fields.endpoint->conductor_fields.udp_channel->original_uri,
image->conductor_fields.endpoint->conductor_fields.udp_channel->uri_length,
image->source_identity,
image->source_identity_length);
}

if (-1 == image->conductor_fields.loss_report_entry_offset)
{
image->conductor_fields.loss_report = NULL;
}
}
image->conductor_fields.loss_report_term_id = term_id;
image->conductor_fields.loss_report_term_offset = term_offset;
image->conductor_fields.loss_report_length = length;
}

int aeron_publication_image_create(
aeron_publication_image_t **image,
aeron_receive_channel_endpoint_t *endpoint,
Expand Down Expand Up @@ -264,8 +298,11 @@ int aeron_publication_image_create(
_image->conductor_fields.endpoint = endpoint;

_image->congestion_control = congestion_control;
_image->loss_reporter = loss_reporter;
_image->loss_reporter_offset = -1;
_image->conductor_fields.loss_report = loss_reporter;
_image->conductor_fields.loss_report_entry_offset = -1;
_image->conductor_fields.loss_report_term_id = 0;
_image->conductor_fields.loss_report_term_offset = 0;
_image->conductor_fields.loss_report_length = 0;
_image->conductor_fields.subscribable.correlation_id = correlation_id;
_image->conductor_fields.subscribable.array = NULL;
_image->conductor_fields.subscribable.length = 0;
Expand Down Expand Up @@ -342,14 +379,14 @@ int aeron_publication_image_create(
active_term_id, initial_term_offset, _image->position_bits_to_shift, initial_term_id);
const int64_t now_ns = aeron_clock_cached_nano_time(context->cached_clock);

_image->begin_loss_change = -1;
_image->end_loss_change = -1;
_image->begin_loss_change = 0;
_image->end_loss_change = 0;
_image->loss_term_id = active_term_id;
_image->loss_term_offset = initial_term_offset;
_image->loss_length = 0;

_image->begin_sm_change = -1;
_image->end_sm_change = -1;
_image->begin_sm_change = 0;
_image->end_sm_change = 0;
_image->next_sm_position = initial_position;
_image->next_sm_receiver_window_length = _image->congestion_control->initial_window_length(
_image->congestion_control->state);
Expand Down Expand Up @@ -461,31 +498,15 @@ void aeron_publication_image_on_gap_detected(void *clientd, int32_t term_id, int

AERON_SET_RELEASE(image->end_loss_change, change_number);

if (image->loss_reporter_offset >= 0)
size_t loss_report_end_offset;
if (term_id != image->conductor_fields.loss_report_term_id ||
(size_t)term_offset >= (loss_report_end_offset = (size_t)image->conductor_fields.loss_report_term_offset + image->conductor_fields.loss_report_length))
{
aeron_loss_reporter_record_observation(
image->loss_reporter, image->loss_reporter_offset, (int64_t)length, image->epoch_clock());
aeron_publication_image_report_loss(image, term_id, term_offset, length, length);
}
else if (NULL != image->loss_reporter)
else if ((size_t)term_offset + length > loss_report_end_offset)
{
if (NULL != image->conductor_fields.endpoint)
{
image->loss_reporter_offset = aeron_loss_reporter_create_entry(
image->loss_reporter,
(int64_t)length,
image->epoch_clock(),
image->session_id,
image->stream_id,
image->conductor_fields.endpoint->conductor_fields.udp_channel->original_uri,
image->conductor_fields.endpoint->conductor_fields.udp_channel->uri_length,
image->source_identity,
image->source_identity_length);
}

if (-1 == image->loss_reporter_offset)
{
image->loss_reporter = NULL;
}
aeron_publication_image_report_loss(image, term_id, term_offset, length, length - (loss_report_end_offset - (size_t)term_offset));
}
}

Expand Down
8 changes: 5 additions & 3 deletions aeron-driver/src/main/c/aeron_publication_image.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ typedef struct aeron_publication_image_stct
int64_t untethered_window_limit_timeout_ns;
int64_t untethered_resting_timeout_ns;
int64_t clean_position;
int32_t loss_report_term_id;
int32_t loss_report_term_offset;
size_t loss_report_length;
aeron_loss_reporter_t *loss_report;
aeron_loss_reporter_entry_offset_t loss_report_entry_offset;
aeron_receive_channel_endpoint_t *endpoint;
uint8_t flags;
}
Expand Down Expand Up @@ -95,9 +100,6 @@ typedef struct aeron_publication_image_stct
aeron_clock_func_t epoch_clock;
aeron_clock_cache_t *cached_clock;

aeron_loss_reporter_t *loss_reporter;
aeron_loss_reporter_entry_offset_t loss_reporter_offset;

char *log_file_name;
int32_t session_id;
int32_t stream_id;
Expand Down
51 changes: 36 additions & 15 deletions aeron-driver/src/main/java/io/aeron/driver/PublicationImage.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class PublicationImageConductorFields extends PublicationImagePadding1
long cleanPosition;
final ArrayList<UntetheredSubscription> untetheredSubscriptions = new ArrayList<>();
ReadablePosition[] subscriberPositions;
int lossReportTermId;
int lossReportTermOffset;
int lossReportLength;
LossReport lossReport;
LossReport.ReportEntry reportEntry;
volatile Integer responseSessionId = null;
Expand Down Expand Up @@ -150,11 +153,11 @@ enum State
private final long smTimeoutNs;
private final long maxReceiverWindowLength;

private volatile long beginLossChange;
private volatile long endLossChange;
private int lossTermId;
private int lossTermOffset;
private int lossLength;
volatile long beginLossChange;
volatile long endLossChange;
int lossTermId;
int lossTermOffset;
int lossLength;
private long lastLossChangeNumber;

private volatile long timeOfLastStateChangeNs;
Expand Down Expand Up @@ -411,19 +414,15 @@ public void onGapDetected(final int termId, final int termOffset, final int leng

END_LOSS_CHANGE_VH.setRelease(this, changeNumber);

if (null != reportEntry)
final int lossReportEndOffset;
if (termId != lossReportTermId ||
termOffset >= (lossReportEndOffset = (lossReportTermOffset + lossReportLength)))
{
reportEntry.recordObservation(length, epochClock.time());
reportLoss(termId, termOffset, length, length);
}
else if (null != lossReport)
else if (termOffset + length > lossReportEndOffset)
{
reportEntry = lossReport.createEntry(
length, epochClock.time(), sessionId, streamId, channel(), sourceIdentity);

if (null == reportEntry)
{
lossReport = null;
}
reportLoss(termId, termOffset, length, length - (lossReportEndOffset - termOffset));
}
}

Expand Down Expand Up @@ -953,6 +952,28 @@ void stopStatusMessagesIfNotActive()
}
}

private void reportLoss(final int termId, final int termOffset, final int length, final int bytesLost)
{
if (null != reportEntry)
{
reportEntry.recordObservation(bytesLost, epochClock.time());
}
else if (null != lossReport)
{
reportEntry = lossReport.createEntry(
bytesLost, epochClock.time(), sessionId, streamId, channel(), sourceIdentity);

if (null == reportEntry)
{
lossReport = null;
}
}

lossReportTermId = termId;
lossReportTermOffset = termOffset;
lossReportLength = length;
}

private void state(final State state)
{
this.state = state;
Expand Down
Loading
Loading