Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 109 additions & 117 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,25 +237,22 @@ private static void CloseLink(RequestResponseAmqpLink link)
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>List of messages received. Returns an empty list if no message is found.</returns>
public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsync(
public override Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsync(
int maxMessages,
TimeSpan? maxWaitTime,
CancellationToken cancellationToken)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
await _retryPolicy.RunOperation(async (timeout) =>
{
messages = await ReceiveMessagesAsyncInternal(
maxMessages,
maxWaitTime,
timeout,
cancellationToken).ConfigureAwait(false);
},
_connectionScope,
cancellationToken).ConfigureAwait(false);

return messages;
}
CancellationToken cancellationToken) =>
_retryPolicy.RunOperation(static (value, timeout, token) =>
{
var (receiver, maxmsgs, maxwait) = value;
return receiver.ReceiveMessagesAsyncInternal(
maxmsgs,
maxwait,
timeout,
token);
},
(this, maxMessages, maxWaitTime),
_connectionScope,
cancellationToken);

/// <summary>
/// Receives a list of <see cref="ServiceBusMessage" /> from the Service Bus entity.
Expand Down Expand Up @@ -346,16 +343,18 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
/// </remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task CompleteAsync(
public override Task CompleteAsync(
string lockToken,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) =>
await CompleteInternalAsync(
lockToken,
timeout).ConfigureAwait(false),
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken) = value;
return receiver.CompleteInternalAsync(lckToken, timeout);
},
(this, lockToken),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);

/// <summary>
/// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the message from the service.
Expand Down Expand Up @@ -486,17 +485,19 @@ private void ThrowLockLostException()
/// </remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task DeferAsync(
public override Task DeferAsync(
string lockToken,
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await DeferInternalAsync(
lockToken,
timeout,
propertiesToModify).ConfigureAwait(false),
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken, properties) = value;
return receiver.DeferInternalAsync(lckToken, timeout, properties);
},
(this, lockToken, propertiesToModify),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);

/// <summary>Indicates that the receiver wants to defer the processing for the message.</summary>
///
Expand Down Expand Up @@ -538,17 +539,19 @@ private Task DeferInternalAsync(
/// </remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task AbandonAsync(
public override Task AbandonAsync(
string lockToken,
IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await AbandonInternalAsync(
lockToken,
timeout,
propertiesToModify).ConfigureAwait(false),
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken, properties) = value;
return receiver.AbandonInternalAsync(lckToken, timeout, properties);
},
(this, lockToken, propertiesToModify),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);

/// <summary>
/// Abandons a <see cref="ServiceBusMessage"/> using a lock token. This will make the message available again for processing.
Expand Down Expand Up @@ -595,21 +598,21 @@ private Task AbandonInternalAsync(
/// </remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task DeadLetterAsync(
public override Task DeadLetterAsync(
string lockToken,
string deadLetterReason,
string deadLetterErrorDescription = default,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default) =>
await _retryPolicy.RunOperation(
async (timeout) => await DeadLetterInternalAsync(
lockToken,
timeout,
propertiesToModify,
deadLetterReason,
deadLetterErrorDescription).ConfigureAwait(false),
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, lckToken, properties, reason, description) = value;
return receiver.DeadLetterInternalAsync(lckToken, timeout, properties, reason, description);
},
(this, lockToken, propertiesToModify, deadLetterReason, deadLetterErrorDescription),
_connectionScope,
cancellationToken).ConfigureAwait(false);
cancellationToken);

/// <summary>
/// Moves a message to the dead-letter subqueue.
Expand Down Expand Up @@ -822,26 +825,25 @@ private static Outcome GetModifiedOutcome(
/// Also, unlike <see cref="ReceiveMessagesAsync(int, TimeSpan?, CancellationToken)"/>, this method will fetch even Deferred messages (but not Deadlettered message)
/// </remarks>
/// <returns></returns>
public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesAsync(
public override Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesAsync(
long? sequenceNumber,
int messageCount = 1,
CancellationToken cancellationToken = default)
{
long seqNumber = sequenceNumber ?? LastPeekedSequenceNumber + 1;
IReadOnlyList<ServiceBusReceivedMessage> messages = null;

await _retryPolicy.RunOperation(
async (timeout) =>
messages = await PeekMessagesInternalAsync(
seqNumber,
messageCount,
timeout,
cancellationToken)
.ConfigureAwait(false),
return _retryPolicy.RunOperation(
static (value, timeout, token) =>
{
var (receiver, number, count) = value;
return receiver.PeekMessagesInternalAsync(
number,
count,
timeout,
token);
},
(this, seqNumber, messageCount),
_connectionScope,
cancellationToken).ConfigureAwait(false);

return messages;
cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -929,22 +931,20 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInterna
///
/// <param name="lockToken">Lock token associated with the message.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
public override async Task<DateTimeOffset> RenewMessageLockAsync(
public override Task<DateTimeOffset> RenewMessageLockAsync(
string lockToken,
CancellationToken cancellationToken)
{
DateTimeOffset lockedUntil = DateTimeOffset.MinValue;
await _retryPolicy.RunOperation(
async (timeout) =>
CancellationToken cancellationToken) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
lockedUntil = await RenewMessageLockInternalAsync(
lockToken,
timeout).ConfigureAwait(false);
var (receiver, lckToken) = value;
return receiver.RenewMessageLockInternalAsync(
lckToken,
timeout);
},
(this, lockToken),
_connectionScope,
cancellationToken).ConfigureAwait(false);
return lockedUntil;
}
cancellationToken);

/// <summary>
/// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue.
Expand Down Expand Up @@ -1005,13 +1005,9 @@ private async Task<AmqpResponseMessage> ExecuteRequest(TimeSpan timeout, AmqpReq
/// </summary>
public override async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
{
DateTimeOffset lockedUntil;
await _retryPolicy.RunOperation(
async (timeout) =>
{
lockedUntil = await RenewSessionLockInternal(
timeout).ConfigureAwait(false);
},
var lockedUntil = await _retryPolicy.RunOperation(
static (receiver, timeout, _) => receiver.RenewSessionLockInternal(timeout),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
SessionLockedUntil = lockedUntil;
Expand Down Expand Up @@ -1053,17 +1049,13 @@ internal async Task<DateTimeOffset> RenewSessionLockInternal(TimeSpan timeout)
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public override async Task<BinaryData> GetStateAsync(CancellationToken cancellationToken = default)
public override Task<BinaryData> GetStateAsync(CancellationToken cancellationToken = default)
{
BinaryData sessionState = default;
await _retryPolicy.RunOperation(
async (timeout) =>
{
sessionState = await GetStateInternal(timeout).ConfigureAwait(false);
},
return _retryPolicy.RunOperation(
static (receiver, timeout, _) => receiver.GetStateInternal(timeout),
this,
_connectionScope,
cancellationToken).ConfigureAwait(false);
return sessionState;
cancellationToken);
}

internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
Expand Down Expand Up @@ -1105,20 +1097,20 @@ internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task SetStateAsync(
public override Task SetStateAsync(
BinaryData sessionState,
CancellationToken cancellationToken)
{
await _retryPolicy.RunOperation(
async (timeout) =>
CancellationToken cancellationToken) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
await SetStateInternal(
sessionState,
timeout).ConfigureAwait(false);
var (receiver, state) = value;
return receiver.SetStateInternal(
state,
timeout);
},
(this, sessionState),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
cancellationToken);

internal async Task SetStateInternal(
BinaryData sessionState,
Expand Down Expand Up @@ -1158,19 +1150,20 @@ internal async Task SetStateInternal(
/// <returns>Messages identified by sequence number are returned. Returns null if no messages are found.
/// Throws if the messages have not been deferred.</returns>
/// <seealso cref="DeferAsync"/>
public override async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync(
public override Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync(
long[] sequenceNumbers,
CancellationToken cancellationToken = default)
{
IReadOnlyList<ServiceBusReceivedMessage> messages = null;
await _retryPolicy.RunOperation(
async (timeout) => messages = await ReceiveDeferredMessagesAsyncInternal(
sequenceNumbers,
timeout).ConfigureAwait(false),
CancellationToken cancellationToken = default) =>
_retryPolicy.RunOperation(
static (value, timeout, _) =>
{
var (receiver, sqn) = value;
return receiver.ReceiveDeferredMessagesAsyncInternal(
sqn,
timeout);
},
(this, sequenceNumbers),
_connectionScope,
cancellationToken).ConfigureAwait(false);
return messages;
}
cancellationToken);

internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsyncInternal(
long[] sequenceNumbers,
Expand Down Expand Up @@ -1306,14 +1299,13 @@ private static TimeSpan UseMinimum(
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task OpenLinkAsync(CancellationToken cancellationToken)
public override Task OpenLinkAsync(CancellationToken cancellationToken)
{
ReceivingAmqpLink link = null;
await _retryPolicy.RunOperation(
async (timeout) =>
link = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
_connectionScope,
cancellationToken).ConfigureAwait(false);
return _retryPolicy.RunOperation(
static (link, timeout, _) => link.GetOrCreateAsync(timeout),
_receiveLink,
_connectionScope,
cancellationToken);
}

private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>
Expand Down
Loading