Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,22 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;

/**
* Test implementation of RSProcedureDispatcher that throws desired errors for testing purpose.
Expand All @@ -42,7 +47,8 @@ public class RSProcDispatcher extends RSProcedureDispatcher {

private static final Logger LOG = LoggerFactory.getLogger(RSProcDispatcher.class);

private static final AtomicInteger I = new AtomicInteger();
/** Config key for the fail-fast retry limit, shared with the test so the two cannot drift. */
static final String FAIL_FAST_LIMIT_KEY = "hbase.master.rs.remote.proc.fail.fast.limit";

private static final List<IOException> ERRORS =
Arrays.asList(new ConnectionClosedException("test connection closed error..."),
Expand All @@ -51,8 +57,38 @@ public class RSProcDispatcher extends RSProcedureDispatcher {

private static final AtomicInteger ERROR_IDX = new AtomicInteger();

// Injection is driven by the test and bound to a target table, not a global call count:
// remoteDispatch() fires for every remote procedure in the cluster (startup, table creation,
// chores, background assignments), so counting calls drifts and misses the operations under test.
private static final AtomicBoolean INJECT = new AtomicBoolean(false);
private static final AtomicInteger VICTIMS_REMAINING = new AtomicInteger(0);
private static volatile TableName targetTable;

// Fail-fast retry limit after which the master schedules an SCP; read from conf to match test.
private final int failFastLimit;

/**
* Fails the next {@code n} open/close-region requests for {@code table} with connection errors
* until the fail-fast retry limit is exhausted, so the master schedules an SCP. Call right before
* the operations under test.
*/
static void injectErrorsForNextRequests(TableName table, int n) {
ERROR_IDX.set(0);
targetTable = table;
VICTIMS_REMAINING.set(n);
INJECT.set(true);
}

/** Stops error injection. Safe to call unconditionally, e.g. from test teardown. */
static void stopInjecting() {
INJECT.set(false);
VICTIMS_REMAINING.set(0);
targetTable = null;
}

public RSProcDispatcher(MasterServices master) {
super(master);
this.failFastLimit = master.getConfiguration().getInt(FAIL_FAST_LIMIT_KEY, 10);
}

@Override
Expand All @@ -66,8 +102,42 @@ protected void remoteDispatch(final ServerName serverName,
}
}

/**
* True if the request opens or closes a region of the injection target table. Open requests carry
* a full RegionInfo; close requests carry a REGION_NAME specifier the table is parsed from.
*/
private static boolean targetsInjectionTable(AdminProtos.ExecuteProceduresRequest request) {
TableName table = targetTable;
if (table == null) {
return false;
}
for (AdminProtos.OpenRegionRequest open : request.getOpenRegionList()) {
for (AdminProtos.OpenRegionRequest.RegionOpenInfo info : open.getOpenInfoList()) {
if (table.equals(ProtobufUtil.toTableName(info.getRegion().getTableName()))) {
return true;
}
}
}
for (AdminProtos.CloseRegionRequest close : request.getCloseRegionList()) {
HBaseProtos.RegionSpecifier region = close.getRegion();
if (
region.getType() == HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME
&& table.equals(RegionInfo.getTable(region.getValue().toByteArray()))
) {
return true;
}
}
return false;
}

class TestExecuteProceduresRemoteCall extends ExecuteProceduresRemoteCall {

// attempts: retries of this single request instance (mirrors the dispatcher's
// numberOfAttemptsSoFar). injectErrors: whether this instance is failed with injected errors,
// decided once on the first call and kept across its retries.
private int attempts = 0;
private Boolean injectErrors = null;

public TestExecuteProceduresRemoteCall(ServerName serverName,
Set<RemoteProcedure> remoteProcedures) {
super(serverName, remoteProcedures);
Expand All @@ -76,23 +146,22 @@ public TestExecuteProceduresRemoteCall(ServerName serverName,
@Override
public AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
final AdminProtos.ExecuteProceduresRequest request) throws IOException {
int j = I.addAndGet(1);
LOG.info("sendRequest() req: {} , j: {}", request, j);
if (j == 12 || j == 22) {
// Execute the remote close and open region requests in the last (5th) retry before
// throwing ConnectionClosedException. This is to ensure even if the region open/close
// is successfully completed by regionserver, master still schedules SCP because
// sendRequest() throws error which has retry-limit exhausted.
FutureUtils.get(getRsAdmin().executeProcedures(request));
if (injectErrors == null) {
// Claim a slot only for a target-table open/close request, once per instance.
injectErrors =
INJECT.get() && targetsInjectionTable(request) && VICTIMS_REMAINING.getAndDecrement() > 0;
}
LOG.info("sendRequest() req: {}, attempts: {}, injectErrors: {}", request, attempts,
injectErrors);
if (!injectErrors) {
return FutureUtils.get(getRsAdmin().executeProcedures(request));
}
// For one of the close region requests and one of the open region requests,
// throw ConnectionClosedException until retry limit is exhausted and master
// schedules recoveries for the server.
// We will have ABNORMALLY_CLOSED regions, and they are expected to recover on their own.
if (j >= 8 && j <= 13 || j >= 18 && j <= 23) {
throw ERRORS.get(ERROR_IDX.getAndIncrement() % ERRORS.size());
// Throw a connection error each attempt until the retry limit is exhausted (-> SCP). On the
// last attempt run the real open/close first so the region still recovers.
if (attempts++ >= failFastLimit - 1) {
FutureUtils.get(getRsAdmin().executeProcedures(request));
}
return FutureUtils.get(getRsAdmin().executeProcedures(request));
Comment on lines -92 to -95

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The judgment here is actually not stable enough

throw ERRORS.get(ERROR_IDX.getAndIncrement() % ERRORS.size());
}

private AsyncRegionServerAdmin getRsAdmin() {
Expand All @@ -112,5 +181,4 @@ public void run() {
new RegionServerStoppedException("Server " + getServerName() + " is not online"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
Expand All @@ -68,7 +69,7 @@ public class TestProcDispatcher {
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().set(HBASE_MASTER_RSPROC_DISPATCHER_CLASS,
RSProcDispatcher.class.getName());
TEST_UTIL.getConfiguration().setInt("hbase.master.rs.remote.proc.fail.fast.limit", 5);
TEST_UTIL.getConfiguration().setInt(RSProcDispatcher.FAIL_FAST_LIMIT_KEY, 5);
TEST_UTIL.startMiniCluster(3);
SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
rs0 = cluster.getRegionServer(0).getServerName();
Expand All @@ -90,17 +91,23 @@ public void setUp(TestInfo testInfo) throws Exception {
TEST_UTIL.getAdmin().createTable(tableDesc, Bytes.toBytes(startKey), Bytes.toBytes(endKey), 9);
}

@AfterEach
public void tearDown() {
RSProcDispatcher.stopInjecting();
}

@Test
public void testRetryLimitOnConnClosedErrors(TestInfo testInfo) throws Exception {
HbckChore hbckChore = new HbckChore(TEST_UTIL.getHBaseCluster().getMaster());
final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
Admin admin = TEST_UTIL.getAdmin();
Table table = TEST_UTIL.getConnection().getTable(tableName);
List<Put> puts = IntStream.range(10, 50000).mapToObj(i -> new Put(Bytes.toBytes(i))
.addColumn(Bytes.toBytes("fam1"), Bytes.toBytes("q1"), Bytes.toBytes("val_" + i)))
.collect(Collectors.toList());
table.put(puts);
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
table.put(puts);
}
admin.flush(tableName);
admin.compact(tableName);
Thread.sleep(3000);
Expand All @@ -118,18 +125,20 @@ public void testRetryLimitOnConnClosedErrors(TestInfo testInfo) throws Exception
assertEquals(0, hbckReport.getOrphanRegionsOnRS().size());

HRegion region0 = hRegionServer0.getRegions().get(0);
// Fail the next two open/close-region requests for this table so the moves trigger SCP(s).
RSProcDispatcher.injectErrorsForNextRequests(tableName, 2);
// move all regions from server1 to server0
for (HRegion region : hRegionServer1.getRegions()) {
TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(), rs0);
}
TEST_UTIL.getAdmin().move(region0.getRegionInfo().getEncodedNameAsBytes());
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();

// Ensure:
// 1. num of regions before and after scheduling SCP remain same
// 2. all procedures including SCPs are successfully completed
// 3. two servers have SCPs scheduled
TEST_UTIL.waitFor(5000, 1000, () -> {
// Ensure, after the injected connection errors:
// 1. the total number of regions is unchanged before and after the SCP(s)
// 2. all procedures (including the SCP(s)) complete successfully
// 3. at least one ServerCrashProcedure was scheduled
TEST_UTIL.waitFor(60000, 1000, () -> {
LOG.info("numRegions0: {} , numRegions1: {} , numRegions2: {}", numRegions0, numRegions1,
numRegions2);
LOG.info("Online regions - server0 : {} , server1: {} , server2: {}",
Expand All @@ -142,7 +151,7 @@ public void testRetryLimitOnConnClosedErrors(TestInfo testInfo) throws Exception
== ProcedureProtos.ProcedureState.SUCCESS)
.count(),
master.getMasterProcedureExecutor().getProcedures().size());
LOG.info("Num of SCPs: " + master.getMasterProcedureExecutor().getProcedures().stream()
LOG.info("Num of SCPs: {}", master.getMasterProcedureExecutor().getProcedures().stream()
.filter(proc -> proc instanceof ServerCrashProcedure).count());
return (numRegions0 + numRegions1 + numRegions2)
== (cluster.getRegionServer(0).getNumberOfOnlineRegions()
Expand All @@ -157,13 +166,11 @@ public void testRetryLimitOnConnClosedErrors(TestInfo testInfo) throws Exception
});

// Ensure we have no inconsistent regions
TEST_UTIL.waitFor(5000, 1000, () -> {
TEST_UTIL.waitFor(60000, 1000, () -> {
hbckChore.choreForTesting();
HbckReport report = hbckChore.getLastReport();
return report.getInconsistentRegions().isEmpty() && report.getOrphanRegionsOnFS().isEmpty()
&& report.getOrphanRegionsOnRS().isEmpty();
});

}

}