Skip to content

Commit 76f2080

Browse files
committed
[Java] Check all contexts for reuse by multiple conclusion and improve on solution provided in PR #666, increase minor version as a result of change.
1 parent dde01f2 commit 76f2080

File tree

13 files changed

+121
-66
lines changed

13 files changed

+121
-66
lines changed

aeron-archive/src/main/java/io/aeron/archive/Archive.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.aeron.Image;
2121
import io.aeron.archive.client.AeronArchive;
2222
import io.aeron.archive.client.ArchiveException;
23+
import io.aeron.exceptions.ConfigurationException;
2324
import io.aeron.logbuffer.LogBufferDescriptor;
2425
import org.agrona.BitUtil;
2526
import org.agrona.CloseHelper;
@@ -35,12 +36,14 @@
3536
import java.util.Objects;
3637
import java.util.concurrent.ThreadFactory;
3738
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3840
import java.util.function.Supplier;
3941

4042
import static io.aeron.driver.status.SystemCounterDescriptor.SYSTEM_COUNTER_TYPE_ID;
4143
import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MAX_LENGTH;
4244
import static io.aeron.logbuffer.LogBufferDescriptor.TERM_MIN_LENGTH;
4345
import static java.lang.System.getProperty;
46+
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
4447
import static org.agrona.SystemUtil.getDurationInNanos;
4548
import static org.agrona.SystemUtil.getSizeAsInt;
4649
import static org.agrona.SystemUtil.loadPropertiesFiles;
@@ -351,6 +354,13 @@ public static boolean deleteArchiveOnStart()
351354
*/
352355
public static class Context implements Cloneable
353356
{
357+
/**
358+
* Using an integer because there is no support for boolean. 1 is concluded, 0 is not concluded.
359+
*/
360+
private static final AtomicIntegerFieldUpdater<Context> IS_CONCLUDED_UPDATER = newUpdater(
361+
Context.class, "isConcluded");
362+
private volatile int isConcluded;
363+
354364
private boolean deleteArchiveOnStart = Configuration.deleteArchiveOnStart();
355365
private boolean ownsAeronClient = false;
356366
private String aeronDirectoryName = CommonContext.getAeronDirectoryName();
@@ -414,6 +424,11 @@ public Context clone()
414424
*/
415425
public void conclude()
416426
{
427+
if (0 != IS_CONCLUDED_UPDATER.getAndSet(this, 1))
428+
{
429+
throw new ConfigurationException("Context already concluded");
430+
}
431+
417432
Objects.requireNonNull(errorHandler, "Error handler must be supplied");
418433

419434
if (null == epochClock)

aeron-archive/src/main/java/io/aeron/archive/client/AeronArchive.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.aeron.archive.codecs.ControlResponseCode;
2020
import io.aeron.archive.codecs.ControlResponseDecoder;
2121
import io.aeron.archive.codecs.SourceLocation;
22+
import io.aeron.exceptions.ConfigurationException;
2223
import io.aeron.exceptions.TimeoutException;
2324
import org.agrona.CloseHelper;
2425
import org.agrona.ErrorHandler;
@@ -27,11 +28,13 @@
2728
import org.agrona.concurrent.*;
2829

2930
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3032
import java.util.concurrent.locks.Lock;
3133
import java.util.concurrent.locks.ReentrantLock;
3234

3335
import static io.aeron.archive.client.ArchiveProxy.DEFAULT_RETRY_ATTEMPTS;
3436
import static io.aeron.driver.Configuration.*;
37+
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
3538
import static org.agrona.SystemUtil.getDurationInNanos;
3639
import static org.agrona.SystemUtil.getSizeAsInt;
3740

@@ -1532,6 +1535,13 @@ public static int recordingEventsStreamId()
15321535
*/
15331536
public static class Context implements Cloneable
15341537
{
1538+
/**
1539+
* Using an integer because there is no support for boolean. 1 is concluded, 0 is not concluded.
1540+
*/
1541+
private static final AtomicIntegerFieldUpdater<Context> IS_CONCLUDED_UPDATER = newUpdater(
1542+
Context.class, "isConcluded");
1543+
private volatile int isConcluded;
1544+
15351545
private long messageTimeoutNs = Configuration.messageTimeoutNs();
15361546
private String recordingEventsChannel = AeronArchive.Configuration.recordingEventsChannel();
15371547
private int recordingEventsStreamId = AeronArchive.Configuration.recordingEventsStreamId();
@@ -1571,6 +1581,11 @@ public Context clone()
15711581
*/
15721582
public void conclude()
15731583
{
1584+
if (0 != IS_CONCLUDED_UPDATER.getAndSet(this, 1))
1585+
{
1586+
throw new ConfigurationException("Context already concluded");
1587+
}
1588+
15741589
if (null == aeron)
15751590
{
15761591
aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(aeronDirectoryName));

aeron-client/src/main/java/io/aeron/Aeron.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,6 @@ public static Aeron connect()
123123
*/
124124
public static Aeron connect(final Context ctx)
125125
{
126-
ctx.verifyNotAlreadyInUse();
127-
128126
try
129127
{
130128
final Aeron aeron = new Aeron(ctx);

aeron-client/src/main/java/io/aeron/CommonContext.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.aeron;
1717

1818
import io.aeron.exceptions.AeronException;
19+
import io.aeron.exceptions.ConfigurationException;
1920
import io.aeron.exceptions.DriverTimeoutException;
2021
import org.agrona.DirectBuffer;
2122
import org.agrona.IoUtil;
@@ -33,13 +34,14 @@
3334
import java.text.SimpleDateFormat;
3435
import java.util.Date;
3536
import java.util.UUID;
36-
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3738
import java.util.function.Consumer;
3839

3940
import static io.aeron.Aeron.sleep;
4041
import static io.aeron.CncFileDescriptor.*;
4142
import static java.lang.Long.getLong;
4243
import static java.lang.System.getProperty;
44+
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
4345

4446
/**
4547
* This class provides the Media Driver and client with common configuration for the Aeron directory.
@@ -215,7 +217,12 @@ public class CommonContext implements Cloneable
215217
*/
216218
public static final String TETHER_PARAM_NAME = "tether";
217219

218-
private final AtomicBoolean inUse = new AtomicBoolean(false);
220+
/**
221+
* Using an integer because there is no support for boolean. 1 is concluded, 0 is not concluded.
222+
*/
223+
private static final AtomicIntegerFieldUpdater<CommonContext> IS_CONCLUDED_UPDATER = newUpdater(
224+
CommonContext.class, "isConcluded");
225+
private volatile int isConcluded;
219226

220227
private long driverTimeoutMs = DRIVER_TIMEOUT_MS;
221228
private String aeronDirectoryName = getAeronDirectoryName();
@@ -291,6 +298,11 @@ public static String generateRandomDirName()
291298
*/
292299
public CommonContext conclude()
293300
{
301+
if (0 != IS_CONCLUDED_UPDATER.getAndSet(this, 1))
302+
{
303+
throw new ConfigurationException("Context already concluded");
304+
}
305+
294306
concludeAeronDirectory();
295307

296308
cncFile = new File(aeronDirectory, CncFileDescriptor.CNC_FILE);
@@ -313,21 +325,6 @@ public CommonContext concludeAeronDirectory()
313325
return this;
314326
}
315327

316-
/**
317-
* Checks to see if another instance of the Aeron Client or Media Driver already is already using this
318-
* context.
319-
*
320-
* @throws IllegalStateException if the context is already in use.
321-
*/
322-
public void verifyNotAlreadyInUse()
323-
{
324-
if (!inUse.compareAndSet(false, true))
325-
{
326-
throw new IllegalStateException(
327-
"Context instances may not be reused, create a new one for each Aeron/MediaDriver instance");
328-
}
329-
}
330-
331328
/**
332329
* Get the top level Aeron directory used for communication between the client and Media Driver, and
333330
* the location of the data buffers.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2014-2019 Real Logic Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.aeron;
17+
18+
import io.aeron.exceptions.ConfigurationException;
19+
import org.junit.Test;
20+
21+
public class CommonContextTest
22+
{
23+
@Test(expected = ConfigurationException.class)
24+
public void shouldNotAllowConcludeMoreThanOnce()
25+
{
26+
final CommonContext ctx = new CommonContext();
27+
28+
ctx.conclude();
29+
ctx.conclude();
30+
}
31+
}

aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.aeron.cluster.client.ClusterException;
2424
import io.aeron.cluster.codecs.mark.ClusterComponentType;
2525
import io.aeron.cluster.service.*;
26+
import io.aeron.exceptions.ConfigurationException;
2627
import io.aeron.security.Authenticator;
2728
import io.aeron.security.AuthenticatorSupplier;
2829
import org.agrona.*;
@@ -35,12 +36,14 @@
3536
import java.util.Random;
3637
import java.util.concurrent.ThreadFactory;
3738
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3840
import java.util.function.Supplier;
3941

4042
import static io.aeron.cluster.ConsensusModule.Configuration.*;
4143
import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.SNAPSHOT_CHANNEL_PROP_NAME;
4244
import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.SNAPSHOT_STREAM_ID_PROP_NAME;
4345
import static io.aeron.driver.status.SystemCounterDescriptor.SYSTEM_COUNTER_TYPE_ID;
46+
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
4447
import static org.agrona.SystemUtil.*;
4548
import static org.agrona.concurrent.status.CountersReader.METADATA_LENGTH;
4649

@@ -885,6 +888,13 @@ public static int memberStatusStreamId()
885888
*/
886889
public static class Context implements Cloneable
887890
{
891+
/**
892+
* Using an integer because there is no support for boolean. 1 is concluded, 0 is not concluded.
893+
*/
894+
private static final AtomicIntegerFieldUpdater<Context> IS_CONCLUDED_UPDATER = newUpdater(
895+
Context.class, "isConcluded");
896+
private volatile int isConcluded;
897+
888898
private boolean ownsAeronClient = false;
889899
private String aeronDirectoryName = CommonContext.getAeronDirectoryName();
890900
private Aeron aeron;
@@ -975,6 +985,11 @@ public Context clone()
975985
@SuppressWarnings("MethodLength")
976986
public void conclude()
977987
{
988+
if (0 != IS_CONCLUDED_UPDATER.getAndSet(this, 1))
989+
{
990+
throw new ConfigurationException("Context already concluded");
991+
}
992+
978993
if (null == clusterDir)
979994
{
980995
clusterDir = new File(clusterDirectoryName);

aeron-cluster/src/main/java/io/aeron/cluster/client/AeronCluster.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import org.agrona.concurrent.*;
3131

3232
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3334

35+
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
3436
import static org.agrona.SystemUtil.getDurationInNanos;
3537

3638
/**
@@ -886,6 +888,13 @@ public static int egressStreamId()
886888
*/
887889
public static class Context implements Cloneable
888890
{
891+
/**
892+
* Using an integer because there is no support for boolean. 1 is concluded, 0 is not concluded.
893+
*/
894+
private static final AtomicIntegerFieldUpdater<Context> IS_CONCLUDED_UPDATER = newUpdater(
895+
Context.class, "isConcluded");
896+
private volatile int isConcluded;
897+
889898
private long messageTimeoutNs = Configuration.messageTimeoutNs();
890899
private String clusterMemberEndpoints = Configuration.clusterMemberEndpoints();
891900
private String ingressChannel = Configuration.ingressChannel();
@@ -922,6 +931,11 @@ public Context clone()
922931

923932
public void conclude()
924933
{
934+
if (0 != IS_CONCLUDED_UPDATER.getAndSet(this, 1))
935+
{
936+
throw new ConfigurationException("Context already concluded");
937+
}
938+
925939
if (null == aeron)
926940
{
927941
aeron = Aeron.connect(

aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -696,7 +696,7 @@ private AtomicCounter awaitHeartbeatCounter(final CountersReader counters)
696696

697697
private void loadSnapshot(final long recordingId)
698698
{
699-
try (AeronArchive archive = AeronArchive.connect(archiveCtx))
699+
try (AeronArchive archive = AeronArchive.connect(archiveCtx.clone()))
700700
{
701701
final String channel = ctx.replayChannel();
702702
final int streamId = ctx.replayStreamId();
@@ -741,7 +741,7 @@ private long onTakeSnapshot(final long logPosition, final long leadershipTermId)
741741
{
742742
final long recordingId;
743743

744-
try (AeronArchive archive = AeronArchive.connect(archiveCtx);
744+
try (AeronArchive archive = AeronArchive.connect(archiveCtx.clone());
745745
Publication publication = aeron.addExclusivePublication(ctx.snapshotChannel(), ctx.snapshotStreamId()))
746746
{
747747
final String channel = ChannelUri.addSessionId(ctx.snapshotChannel(), publication.sessionId());

aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceContainer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@
3434

3535
import java.io.File;
3636
import java.util.concurrent.ThreadFactory;
37+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3738
import java.util.function.Supplier;
3839

3940
import static io.aeron.driver.status.SystemCounterDescriptor.SYSTEM_COUNTER_TYPE_ID;
41+
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
4042
import static org.agrona.SystemUtil.getSizeAsInt;
4143
import static org.agrona.SystemUtil.loadPropertiesFiles;
4244

@@ -430,6 +432,13 @@ public static boolean isRespondingService()
430432
*/
431433
public static class Context implements Cloneable
432434
{
435+
/**
436+
* Using an integer because there is no support for boolean. 1 is concluded, 0 is not concluded.
437+
*/
438+
private static final AtomicIntegerFieldUpdater<Context> IS_CONCLUDED_UPDATER = newUpdater(
439+
Context.class, "isConcluded");
440+
private volatile int isConcluded;
441+
433442
private int serviceId = Configuration.serviceId();
434443
private String serviceName = Configuration.serviceName();
435444
private String replayChannel = Configuration.replayChannel();
@@ -481,6 +490,11 @@ public Context clone()
481490
@SuppressWarnings("MethodLength")
482491
public void conclude()
483492
{
493+
if (0 != IS_CONCLUDED_UPDATER.getAndSet(this, 1))
494+
{
495+
throw new ConfigurationException("Context already concluded");
496+
}
497+
484498
if (serviceId < 0)
485499
{
486500
throw new ConfigurationException("service id cannot be negative: " + serviceId);

aeron-driver/src/main/java/io/aeron/driver/MediaDriver.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ public static MediaDriver launch()
222222
*/
223223
public static MediaDriver launch(final Context ctx)
224224
{
225-
ctx.verifyNotAlreadyInUse();
226225
return new MediaDriver(ctx).start();
227226
}
228227

0 commit comments

Comments
 (0)