From 729705d8d6b839d1e835cc1d60f7a15e7052fac1 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Mon, 7 Nov 2016 12:00:22 -0800 Subject: [PATCH 1/2] HDFS-11114. Support for running async disk checks in DataNode. Change-Id: Ib21cd21fe9b67ca35b38f8462c138e90b55f33df --- .../server/datanode/checker/AsyncChecker.java | 62 ++++ .../server/datanode/checker/Checkable.java | 49 ++++ .../checker/ThrottledAsyncChecker.java | 224 ++++++++++++++ .../server/datanode/checker/package-info.java | 26 ++ .../checker/TestThrottledAsyncChecker.java | 276 ++++++++++++++++++ 5 files changed, 637 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java new file mode 100644 index 0000000000000..c367d6be2dc48 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A class that can be used to schedule an asynchronous check on a given + * {@link Checkable}. If the check is successfully scheduled then a + * {@link ListenableFuture} is returned. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface AsyncChecker { + + /** + * Schedule an asynchronous check for the given object. + * + * @param target object to be checked. + * + * @param context the interpretation of the context depends on the + * target. + * + * @return returns a {@link ListenableFuture} that can be used to + * retrieve the result of the asynchronous check. + */ + ListenableFuture schedule(Checkable target, K context); + + /** + * Cancel all executing checks and wait for them to complete. + * First attempts a graceful cancellation, then cancels forcefully. + * Waits for the supplied timeout after both attempts. + * + * See {@link ExecutorService#awaitTermination} for a description of + * the parameters. + * + * @throws InterruptedException + */ + void join(long timeout, TimeUnit timeUnit) throws InterruptedException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java new file mode 100644 index 0000000000000..833ebda15f96a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/Checkable.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + + +/** + * A Checkable is an object whose health can be probed by invoking its + * {@link #check} method. + * + * e.g. a {@link Checkable} instance may represent a single hardware + * resource. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface Checkable { + + /** + * Query the health of this object. This method may hang + * indefinitely depending on the status of the target resource. + * + * @param context for the probe operation. May be null depending + * on the implementation. + * + * @return result of the check operation. + * + * @throws Exception encountered during the check operation. An + * exception indicates that the check failed. + */ + V check(K context) throws Exception; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java new file mode 100644 index 0000000000000..0be14570a9364 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.Timer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * An implementation of {@link AsyncChecker} that skips checking recently + * checked objects. It will enforce at least {@link minMsBetweenChecks} + * milliseconds between two successive checks of any one object. + * + * It is assumed that the total number of Checkable objects in the system + * is small, (not more than a few dozen) since the checker uses O(Checkables) + * storage and also potentially O(Checkables) threads. + * + * {@link minMsBetweenChecks} should be configured reasonably + * by the caller to avoid spinning up too many threads frequently. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ThrottledAsyncChecker implements AsyncChecker { + public static final Logger LOG = + LoggerFactory.getLogger(ThrottledAsyncChecker.class); + + private final Timer timer; + + /** + * The ExecutorService used to schedule asynchronous checks. + */ + private final ListeningExecutorService executorService; + + /** + * The minimum gap in milliseconds between two successive checks + * of the same object. This is the throttle. + */ + private final long minMsBetweenChecks; + + /** + * Map of checks that are currently in progress. Protected by the object + * lock. + */ + private final Map> checksInProgress; + + /** + * Maps Checkable objects to a future that can be used to retrieve + * the results of the operation. + * Protected by the object lock. + */ + private final Map> completedChecks; + + ThrottledAsyncChecker(final Timer timer, + final long minMsBetweenChecks, + final ExecutorService executorService) { + this.timer = timer; + this.minMsBetweenChecks = minMsBetweenChecks; + this.executorService = MoreExecutors.listeningDecorator(executorService); + this.checksInProgress = new HashMap<>(); + this.completedChecks = new WeakHashMap<>(); + } + + /** + * See {@link AsyncChecker#schedule} + * + * If the object has been checked recently then the check will + * be skipped. Multiple concurrent checks for the same object + * will receive the same Future. + */ + @Override + public synchronized ListenableFuture schedule( + final Checkable target, + final K context) { + LOG.debug("Scheduling a check of {}", target); + + if (checksInProgress.containsKey(target)) { + return checksInProgress.get(target); + } + + if (completedChecks.containsKey(target)) { + final LastCheckResult result = completedChecks.get(target); + final long msSinceLastCheck = timer.monotonicNow() - result.completedAt; + if (msSinceLastCheck < minMsBetweenChecks) { + LOG.debug("Skipped checking {}. Time since last check {}ms " + + "is less than the min gap {}ms.", + target, msSinceLastCheck, minMsBetweenChecks); + return result.result != null ? + Futures.immediateFuture(result.result) : + Futures.immediateFailedFuture(result.exception); + } + } + + final ListenableFuture lf = executorService.submit( + new Callable() { + @Override + public V call() throws Exception { + return target.check(context); + } + }); + checksInProgress.put(target, lf); + addResultCachingCallback(target, lf); + return lf; + } + + /** + * Register a callback to cache the result of a check. + * @param target + * @param lf + */ + private void addResultCachingCallback( + Checkable target, ListenableFuture lf) { + Futures.addCallback(lf, new FutureCallback() { + @Override + public void onSuccess(@Nullable V result) { + synchronized (ThrottledAsyncChecker.this) { + checksInProgress.remove(target); + completedChecks.put(target, new LastCheckResult<>( + result, timer.monotonicNow())); + } + } + + @Override + public void onFailure(@Nonnull Throwable t) { + synchronized (ThrottledAsyncChecker.this) { + checksInProgress.remove(target); + completedChecks.put(target, new LastCheckResult<>( + t, timer.monotonicNow())); + } + } + }); + } + + /** + * See {@link AsyncChecker#join}. + */ + @Override + public void join(long timeout, TimeUnit timeUnit) + throws InterruptedException { + // Try orderly shutdown. + executorService.shutdown(); + + if (!executorService.awaitTermination(timeout, timeUnit)) { + // Interrupt executing tasks and wait again. + executorService.shutdownNow(); + executorService.awaitTermination(timeout, timeUnit); + } + } + + /** + * Status of running a check. It can either be a result or an + * exception, depending on whether the check completed or threw. + */ + private static final class LastCheckResult { + /** + * Timestamp at which the check completed. + */ + private final long completedAt; + + /** + * Result of running the check if it completed. null if it threw. + */ + @Nullable + private final V result; + + /** + * Exception thrown by the check. null if it returned a result. + */ + private final Throwable exception; // null on success. + + /** + * Initialize with a result. + * @param result + */ + private LastCheckResult(V result, long completedAt) { + this.result = result; + this.exception = null; + this.completedAt = completedAt; + } + + /** + * Initialize with an exception. + * @param completedAt + * @param t + */ + private LastCheckResult(Throwable t, long completedAt) { + this.result = null; + this.exception = t; + this.completedAt = completedAt; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java new file mode 100644 index 0000000000000..52822e9a8f325 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Datanode support for running disk checks. + */ +@InterfaceAudience.LimitedPrivate({"HDFS"}) +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.datanode.checker; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java new file mode 100644 index 0000000000000..81a34df138da4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.checker; + +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.Is.isA; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Verify functionality of {@link ThrottledAsyncChecker}. + */ +public class TestThrottledAsyncChecker { + public static final Logger LOG = + LoggerFactory.getLogger(TestThrottledAsyncChecker.class); + private static final long MIN_ERROR_CHECK_GAP = 1000; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Test various scheduling combinations to ensure scheduling and + * throttling behave as expected. + */ + @Test(timeout=60000) + public void testScheduler() throws Exception { + final NoOpCheckable target1 = new NoOpCheckable(); + final NoOpCheckable target2 = new NoOpCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + + // check target1 and ensure we get back the expected result. + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(1L)); + + // Check target1 again without advancing the timer. target1 should not + // be checked again and the cached result should be returned. + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(1L)); + + // Schedule target2 scheduled without advancing the timer. + // target2 should be checked as it has never been checked before. + assertTrue(checker.schedule(target2, true).get()); + assertThat(target2.numChecks.get(), is(1L)); + + // Advance the timer but just short of the min gap. + // Neither target1 nor target2 should be checked again. + timer.advance(MIN_ERROR_CHECK_GAP - 1); + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target2, true).get()); + assertThat(target2.numChecks.get(), is(1L)); + + // Advance the timer again. + // Both targets should be checked now. + timer.advance(MIN_ERROR_CHECK_GAP); + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(2L)); + assertTrue(checker.schedule(target2, true).get()); + assertThat(target1.numChecks.get(), is(2L)); + } + + @Test (timeout=60000) + public void testCancellation() throws Exception { + LatchedCheckable target = new LatchedCheckable(); + final FakeTimer timer = new FakeTimer(); + final LatchedCallback callback = new LatchedCallback(target); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + + ListenableFuture lf = checker.schedule(target, true); + Futures.addCallback(lf, callback); + + // Request immediate cancellation. + checker.join(0, TimeUnit.MILLISECONDS); + try { + assertFalse(lf.get()); + fail("Failed to get expected InterruptedException"); + } catch (ExecutionException ee) { + assertTrue(ee.getCause() instanceof InterruptedException); + } + callback.failureLatch.await(); + } + + @Test (timeout=60000) + public void testConcurrentChecks() throws Exception { + LatchedCheckable target = new LatchedCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + final ListenableFuture lf1 = checker.schedule(target, true); + final ListenableFuture lf2 = checker.schedule(target, true); + + // Ensure that concurrent requests return the same future object. + assertTrue(lf1 == lf2); + + // Unblock the latch and wait for it to finish execution. + target.latch.countDown(); + lf1.get(); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + // We should not get back the same future as before. + // This can take a short while until the internal callback in + // ThrottledAsyncChecker is scheduled for execution. + // Also this should not trigger a new check operation as the timer + // was not advanced. If it does trigger a new check then the test + // will fail with a timeout. + final ListenableFuture lf3 = checker.schedule(target, true); + return lf3 != lf2; + } + }, 100, 10000); + } + + /** + * Ensure that the context is passed through to the Checkable#check + * method. + * @throws Exception + */ + @Test(timeout=60000) + public void testContextIsPassed() throws Exception { + final NoOpCheckable target1 = new NoOpCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + + assertTrue(checker.schedule(target1, true).get()); + assertThat(target1.numChecks.get(), is(1L)); + timer.advance(MIN_ERROR_CHECK_GAP + 1); + assertFalse(checker.schedule(target1, false).get()); + assertThat(target1.numChecks.get(), is(2L)); + } + + /** + * Ensure that the exeption from a failed check is cached + * and returned without re-running the check when the minimum + * gap has not elapsed. + * + * @throws Exception + */ + @Test(timeout=60000) + public void testExceptionCaching() throws Exception { + final ThrowingCheckable target1 = new ThrowingCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, + getExecutorService()); + + thrown.expectCause(isA(DummyException.class)); + checker.schedule(target1, true).get(); + assertThat(target1.numChecks.get(), is(1L)); + + thrown.expectCause(isA(DummyException.class)); + checker.schedule(target1, true).get(); + assertThat(target1.numChecks.get(), is(2L)); + } + + /** + * A simple ExecutorService for testing. + */ + private ExecutorService getExecutorService() { + return new ScheduledThreadPoolExecutor(1); + } + + /** + * A Checkable that just returns its input. + */ + private static class NoOpCheckable + implements Checkable { + private final AtomicLong numChecks = new AtomicLong(0); + @Override + public Boolean check(Boolean context) { + numChecks.incrementAndGet(); + return context; + } + } + + private static class ThrowingCheckable + implements Checkable { + private final AtomicLong numChecks = new AtomicLong(0); + @Override + public Boolean check(Boolean context) throws DummyException { + numChecks.incrementAndGet(); + throw new DummyException(); + } + + } + + private static class DummyException extends Exception { + } + + /** + * A checkable that hangs until signaled. + */ + private static class LatchedCheckable + implements Checkable { + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public Boolean check(Boolean ignored) throws InterruptedException { + LOG.info("LatchedCheckable {} waiting.", this); + latch.await(); + return true; // Unreachable. + } + } + + /** + * A {@link FutureCallback} that counts its invocations. + */ + private static final class LatchedCallback + implements FutureCallback { + private final CountDownLatch successLatch = new CountDownLatch(1); + private final CountDownLatch failureLatch = new CountDownLatch(1); + private final Checkable target; + + private LatchedCallback(Checkable target) { + this.target = target; + } + + @Override + public void onSuccess(@Nonnull Boolean result) { + LOG.info("onSuccess callback invoked for {}", target); + successLatch.countDown(); + } + + @Override + public void onFailure(@Nonnull Throwable t) { + LOG.info("onFailure callback invoked for {} with exception", target, t); + failureLatch.countDown(); + } + } +} From 1472cf6a247bddc65077c8269334824d006ab1bc Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Mon, 7 Nov 2016 12:34:05 -0800 Subject: [PATCH 2/2] Rename AsyncChecker#join method to shutdownAndWait. Change-Id: I3b6dd71f2421b6703fc09b0f3491bd3e18bbf898 --- .../hadoop/hdfs/server/datanode/checker/AsyncChecker.java | 3 ++- .../hdfs/server/datanode/checker/ThrottledAsyncChecker.java | 4 ++-- .../server/datanode/checker/TestThrottledAsyncChecker.java | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java index c367d6be2dc48..1d534a369d45f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java @@ -58,5 +58,6 @@ public interface AsyncChecker { * * @throws InterruptedException */ - void join(long timeout, TimeUnit timeUnit) throws InterruptedException; + void shutdownAndWait(long timeout, TimeUnit timeUnit) + throws InterruptedException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java index 0be14570a9364..d0ee3d2c0b0d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java @@ -164,10 +164,10 @@ public void onFailure(@Nonnull Throwable t) { } /** - * See {@link AsyncChecker#join}. + * {@inheritDoc}. */ @Override - public void join(long timeout, TimeUnit timeUnit) + public void shutdownAndWait(long timeout, TimeUnit timeUnit) throws InterruptedException { // Try orderly shutdown. executorService.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java index 81a34df138da4..70795caf65284 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java @@ -113,7 +113,7 @@ public void testCancellation() throws Exception { Futures.addCallback(lf, callback); // Request immediate cancellation. - checker.join(0, TimeUnit.MILLISECONDS); + checker.shutdownAndWait(0, TimeUnit.MILLISECONDS); try { assertFalse(lf.get()); fail("Failed to get expected InterruptedException");