Skip to content

Commit e268de0

Browse files
committed
[Java] Extend tests to assert that empty source recording can also be replicated.
1 parent a7db3ac commit e268de0

File tree

1 file changed

+29
-23
lines changed

1 file changed

+29
-23
lines changed

aeron-samples/src/test/java/io/aeron/samples/archive/RecordingReplicatorTest.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@
3434
import org.agrona.concurrent.status.CountersReader;
3535
import org.junit.jupiter.api.AfterEach;
3636
import org.junit.jupiter.api.BeforeEach;
37-
import org.junit.jupiter.api.Test;
3837
import org.junit.jupiter.api.extension.ExtendWith;
3938
import org.junit.jupiter.api.io.TempDir;
39+
import org.junit.jupiter.params.ParameterizedTest;
40+
import org.junit.jupiter.params.provider.ValueSource;
4041

4142
import java.nio.file.Path;
4243
import java.util.concurrent.ThreadLocalRandom;
@@ -128,13 +129,14 @@ void tearDown()
128129
CloseHelper.closeAll(dstAeronArchive, srcAeronArchive, dstArchive, srcArchive, dstMediaDriver, srcMediaDriver);
129130
}
130131

131-
@Test
132+
@ParameterizedTest
133+
@ValueSource(ints = { 0, 9 })
132134
@InterruptAfter(10)
133-
void replicateAsNewRecording()
135+
void replicateAsNewRecording(final int srcMessageCount)
134136
{
135-
createRecording(srcAeronArchive, IPC_CHANNEL, 555);
136-
createRecording(srcAeronArchive, "aeron:udp?endpoint=localhost:8108", 666);
137-
final long srcRecordingId = createRecording(srcAeronArchive, IPC_CHANNEL + "?alias=third", 1010101010);
137+
createRecording(srcAeronArchive, IPC_CHANNEL, 555, 3);
138+
createRecording(srcAeronArchive, "aeron:udp?endpoint=localhost:8108", 666, 1);
139+
final long srcRecordingId = createRecording(srcAeronArchive, IPC_CHANNEL + "?alias=third", 1010101010, srcMessageCount);
138140

139141
final RecordingReplicator recordingReplicator = new RecordingReplicator(
140142
dstAeronArchive,
@@ -148,19 +150,20 @@ void replicateAsNewRecording()
148150
verifyRecordingReplicated(srcRecordingId, replicatedRecordingId);
149151
}
150152

151-
@Test
153+
@ParameterizedTest
154+
@ValueSource(ints = { 0, 7, 21 })
152155
@InterruptAfter(10)
153-
void replicateOverAnExistingRecording()
156+
void replicateOverAnExistingRecording(final int srcMessageCount)
154157
{
155-
createRecording(srcAeronArchive, IPC_CHANNEL, 555);
156-
createRecording(srcAeronArchive, "aeron:udp?endpoint=localhost:8108", 666);
157-
final long srcRecordingId = createRecording(srcAeronArchive, IPC_CHANNEL + "?alias=third", 1010101010);
158+
createRecording(srcAeronArchive, IPC_CHANNEL, 555, 1);
159+
createRecording(srcAeronArchive, "aeron:udp?endpoint=localhost:8108", 666, 1);
160+
final long srcRecordingId = createRecording(srcAeronArchive, IPC_CHANNEL + "?alias=third", 1010101010, 5);
158161

159-
createRecording(dstAeronArchive, IPC_CHANNEL + "?alias=one", 111);
162+
createRecording(dstAeronArchive, IPC_CHANNEL + "?alias=one", 111, 3);
160163
final long dstRecordingId = createRecording(
161164
dstAeronArchive,
162165
"aeron:udp?endpoint=localhost:8114|init-term-id=13|term-id=27|term-offset=1024|term-length=64K",
163-
444);
166+
444, 19);
164167

165168
try (AeronArchive aeronArchive = AeronArchive.connect(new AeronArchive.Context()
166169
.aeronDirectoryName(dstMediaDriver.aeronDirectoryName())
@@ -186,19 +189,18 @@ void replicateOverAnExistingRecording()
186189
private long createRecording(
187190
final AeronArchive aeronArchive,
188191
final String channel,
189-
final int streamId)
192+
final int streamId,
193+
final int numMessages)
190194
{
191195
try (ExclusivePublication publication = aeronArchive.addRecordedExclusivePublication(channel, streamId))
192196
{
193197
final CountersReader counters = aeronArchive.context().aeron().countersReader();
194198
final int counterId = Tests.awaitRecordingCounterId(counters, publication.sessionId());
195199
final long recordingId = RecordingPos.getRecordingId(counters, counterId);
196200

197-
final ThreadLocalRandom random = ThreadLocalRandom.current();
198-
final int numMessages = random.nextInt(1, 11);
199201
for (int i = 0; i < numMessages; i++)
200202
{
201-
final int messageSize = random.nextInt(8, 500);
203+
final int messageSize = ThreadLocalRandom.current().nextInt(8, 500);
202204
long result;
203205
while ((result = publication.tryClaim(messageSize, bufferClaim)) < 0)
204206
{
@@ -213,11 +215,12 @@ private long createRecording(
213215

214216
final MutableDirectBuffer buffer = bufferClaim.buffer();
215217
final int offset = bufferClaim.offset();
216-
buffer.putInt(offset, random.nextInt(), LITTLE_ENDIAN);
217-
buffer.putInt(offset + (messageSize - SIZE_OF_INT), random.nextInt(), LITTLE_ENDIAN);
218+
buffer.putInt(offset, ThreadLocalRandom.current().nextInt(), LITTLE_ENDIAN);
219+
buffer.putInt(
220+
offset + (messageSize - SIZE_OF_INT), ThreadLocalRandom.current().nextInt(), LITTLE_ENDIAN);
218221
bufferClaim.commit();
219222
}
220-
223+
221224
Tests.awaitPosition(counters, counterId, publication.position());
222225

223226
final RecordingSignalCapture signalCapture =
@@ -226,9 +229,12 @@ private long createRecording(
226229
aeronArchive.stopRecording(publication);
227230
signalCapture.awaitSignal(aeronArchive, recordingId, RecordingSignal.STOP);
228231

229-
final long startPosition = aeronArchive.getStartPosition(recordingId);
230-
final long stopPosition = aeronArchive.getStopPosition(recordingId);
231-
assertNotEquals(startPosition, stopPosition);
232+
if (numMessages > 0)
233+
{
234+
final long startPosition = aeronArchive.getStartPosition(recordingId);
235+
final long stopPosition = aeronArchive.getStopPosition(recordingId);
236+
assertNotEquals(startPosition, stopPosition);
237+
}
232238

233239
return recordingId;
234240
}

0 commit comments

Comments
 (0)