Skip to content

Commit

Permalink
Speed up test initialization. (#14784)
Browse files Browse the repository at this point in the history
  • Loading branch information
bziobrowski authored Jan 11, 2025
1 parent 7c5c5d3 commit 6eddacf
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
}
}
} finally {
zkClient.close();
ZkStarter.closeAsync(zkClient);
}
setTimezone(instanceConfig);
initForwardIndexConfig(instanceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.spi.utils.NetUtils;
Expand Down Expand Up @@ -179,10 +181,9 @@ public void run() {
// Wait until the ZK server is started
for (int retry = 0; retry < DEFAULT_ZK_CLIENT_RETRIES; retry++) {
try {
Thread.sleep(1000L);
ZkClient client = new ZkClient("localhost:" + port, 1000 * (DEFAULT_ZK_CLIENT_RETRIES - retry));
client.waitUntilConnected(DEFAULT_ZK_CLIENT_RETRIES - retry, TimeUnit.SECONDS);
client.close();
closeAsync(client);
break;
} catch (Exception e) {
if (retry < DEFAULT_ZK_CLIENT_RETRIES - 1) {
Expand All @@ -191,6 +192,7 @@ public void run() {
LOGGER.warn("Failed to connect to zk server.", e);
throw e;
}
Thread.sleep(50L);
}
}
return new ZookeeperInstance(zookeeperServerMain, dataDirPath, port);
Expand All @@ -200,6 +202,17 @@ public void run() {
}
}

public static void closeAsync(ZkClient client) {
if (client != null) {
ZK_DISCONNECTOR.submit(() -> {
client.close();
});
}
}

private static final ExecutorService ZK_DISCONNECTOR =
Executors.newFixedThreadPool(1, new NamedThreadFactory("zk-disconnector"));

/**
* Stops a local Zk instance, deleting its data directory
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class ControllerConf extends PinotConfiguration {
public static final String HELIX_CLUSTER_NAME = "controller.helix.cluster.name";
public static final String CLUSTER_TENANT_ISOLATION_ENABLE = "cluster.tenant.isolation.enable";
public static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console";
public static final String CONSOLE_SWAGGER_ENABLE = "controller.swagger.enable";
public static final String CONSOLE_SWAGGER_USE_HTTPS = "controller.swagger.use.https";
public static final String CONTROLLER_MODE = "controller.mode";
public static final String LEAD_CONTROLLER_RESOURCE_REBALANCE_STRATEGY = "controller.resource.rebalance.strategy";
Expand Down Expand Up @@ -1128,4 +1129,13 @@ private String getSupportedProtocol(String property) {
public boolean isEnforcePoolBasedAssignmentEnabled() {
return getProperty(ENFORCE_POOL_BASED_ASSIGNMENT_KEY, DEFAULT_ENFORCE_POOL_BASED_ASSIGNMENT);
}

public void setEnableSwagger(boolean value) {
setProperty(ControllerConf.CONSOLE_SWAGGER_ENABLE, value);
}

public boolean isEnableSwagger() {
String enableSwagger = getProperty(ControllerConf.CONSOLE_SWAGGER_ENABLE);
return enableSwagger == null || Boolean.parseBoolean(enableSwagger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class ControllerAdminApiApplication extends ResourceConfig {

private final String _controllerResourcePackages;
private final boolean _useHttps;
private final boolean _enableSwagger;
private HttpServer _httpServer;

public ControllerAdminApiApplication(ControllerConf conf) {
Expand All @@ -60,6 +61,7 @@ public ControllerAdminApiApplication(ControllerConf conf) {
// TODO See ControllerResponseFilter
// register(new LoggingFeature());
_useHttps = Boolean.parseBoolean(conf.getProperty(ControllerConf.CONSOLE_SWAGGER_USE_HTTPS));
_enableSwagger = conf.isEnableSwagger();
if (conf.getProperty(CommonConstants.Controller.CONTROLLER_SERVICE_AUTO_DISCOVERY, false)) {
register(ServiceAutoDiscoveryFeature.class);
}
Expand All @@ -86,8 +88,10 @@ public void start(List<ListenerConfig> listenerConfigs) {
throw new RuntimeException("Failed to start http server", e);
}
ClassLoader classLoader = ControllerAdminApiApplication.class.getClassLoader();
PinotReflectionUtils.runWithLock(() ->
SwaggerSetupUtils.setupSwagger("Controller", _controllerResourcePackages, _useHttps, "/", _httpServer));
if (_enableSwagger) {
PinotReflectionUtils.runWithLock(() ->
SwaggerSetupUtils.setupSwagger("Controller", _controllerResourcePackages, _useHttps, "/", _httpServer));
}

// This is ugly from typical patterns to setup static resources but all our APIs are
// at path "/". So, configuring static handler for path "/" does not work well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
Expand Down Expand Up @@ -127,9 +128,7 @@ public static void setupPinotCluster(String helixClusterName, String zkPath, boo
createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, configAccessor, enableBatchMessageMode,
controllerConf);
} finally {
if (zkClient != null) {
zkClient.close();
}
ZkStarter.closeAsync(zkClient);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixPropertyFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
Expand Down Expand Up @@ -78,6 +80,8 @@
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -181,13 +185,13 @@ public ControllerRequestClient getControllerRequestClient() {

public void startZk() {
if (_zookeeperInstance == null) {
_zookeeperInstance = ZkStarter.startLocalZkServer();
runWithHelixMock(() -> _zookeeperInstance = ZkStarter.startLocalZkServer());
}
}

public void startZk(int port) {
if (_zookeeperInstance == null) {
_zookeeperInstance = ZkStarter.startLocalZkServer(port);
runWithHelixMock(() -> _zookeeperInstance = ZkStarter.startLocalZkServer(port));
}
}

Expand Down Expand Up @@ -221,6 +225,7 @@ public Map<String, Object> getDefaultControllerConfiguration() {
properties.put(ControllerConf.LOCAL_TEMP_DIR, DEFAULT_LOCAL_TEMP_DIR);
// Enable groovy on the controller
properties.put(ControllerConf.DISABLE_GROOVY, false);
properties.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false);
properties.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
overrideControllerConf(properties);
return properties;
Expand All @@ -244,43 +249,52 @@ public void startController()
startController(getDefaultControllerConfiguration());
}

public void startControllerWithSwagger()
throws Exception {
Map<String, Object> config = getDefaultControllerConfiguration();
config.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, true);
startController(config);
}

public void startController(Map<String, Object> properties)
throws Exception {
assertNull(_controllerStarter, "Controller is already started");
assertTrue(_controllerPort > 0, "Controller port is not assigned");
_controllerStarter = createControllerStarter();
_controllerStarter.init(new PinotConfiguration(properties));
_controllerStarter.start();
_controllerConfig = _controllerStarter.getConfig();
_controllerBaseApiUrl = _controllerConfig.generateVipUrl();
_controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
_controllerDataDir = _controllerConfig.getDataDir();
_helixResourceManager = _controllerStarter.getHelixResourceManager();
_helixManager = _controllerStarter.getHelixControllerManager();
_helixDataAccessor = _helixManager.getHelixDataAccessor();
ConfigAccessor configAccessor = _helixManager.getConfigAccessor();
// HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode.
HelixConfigScope scope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
.build();
switch (_controllerStarter.getControllerMode()) {
case DUAL:
case PINOT_ONLY:
_helixAdmin = _helixResourceManager.getHelixAdmin();
_propertyStore = _helixResourceManager.getPropertyStore();
// TODO: Enable periodic rebalance per 10 seconds as a temporary work-around for the Helix issue:
// https://github.com/apache/helix/issues/331 and https://github.com/apache/helix/issues/2309.
// Remove this after Helix fixing the issue.
configAccessor.set(scope, ClusterConfig.ClusterConfigProperty.REBALANCE_TIMER_PERIOD.name(), "10000");
break;
case HELIX_ONLY:
_helixAdmin = _helixManager.getClusterManagmentTool();
_propertyStore = _helixManager.getHelixPropertyStore();
break;
default:
break;
}
assertEquals(System.getProperty("user.timezone"), "UTC");
runWithHelixMock(() -> {
assertNull(_controllerStarter, "Controller is already started");
assertTrue(_controllerPort > 0, "Controller port is not assigned");
_controllerStarter = createControllerStarter();
_controllerStarter.init(new PinotConfiguration(properties));
_controllerStarter.start();
_controllerConfig = _controllerStarter.getConfig();
_controllerBaseApiUrl = _controllerConfig.generateVipUrl();
_controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
_controllerDataDir = _controllerConfig.getDataDir();
_helixResourceManager = _controllerStarter.getHelixResourceManager();
_helixManager = _controllerStarter.getHelixControllerManager();
_helixDataAccessor = _helixManager.getHelixDataAccessor();
ConfigAccessor configAccessor = _helixManager.getConfigAccessor();
// HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode.
HelixConfigScope scope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
.build();
switch (_controllerStarter.getControllerMode()) {
case DUAL:
case PINOT_ONLY:
_helixAdmin = _helixResourceManager.getHelixAdmin();
_propertyStore = _helixResourceManager.getPropertyStore();
// TODO: Enable periodic rebalance per 10 seconds as a temporary work-around for the Helix issue:
// https://github.com/apache/helix/issues/331 and https://github.com/apache/helix/issues/2309.
// Remove this after Helix fixing the issue.
configAccessor.set(scope, ClusterConfig.ClusterConfigProperty.REBALANCE_TIMER_PERIOD.name(), "10000");
break;
case HELIX_ONLY:
_helixAdmin = _helixManager.getClusterManagmentTool();
_propertyStore = _helixManager.getHelixPropertyStore();
break;
default:
break;
}
assertEquals(System.getProperty("user.timezone"), "UTC");
});
}

public void stopController() {
Expand Down Expand Up @@ -1085,4 +1099,29 @@ public void cleanup() {
}
}
}

@FunctionalInterface
public interface ExceptionalRunnable {
void run()
throws Exception;
}

protected void runWithHelixMock(ExceptionalRunnable r) {
try (MockedStatic<HelixPropertyFactory> mock = Mockito.mockStatic(HelixPropertyFactory.class)) {

// mock helix method to disable slow, but useless, getCloudConfig() call
Mockito.when(HelixPropertyFactory.getCloudConfig(Mockito.anyString(), Mockito.anyString()))
.then((i) -> new CloudConfig());

mock.when(HelixPropertyFactory::getInstance).thenCallRealMethod();

r.run();
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,26 @@ protected Map<String, String> getStreamConfigMap() {
*/
protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName())
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
return new TableConfigBuilder(TableType.REALTIME)
.setTableName(getTableName())
.setTimeColumnName(getTimeColumnName())
.setSortedColumn(getSortedColumn())
.setInvertedIndexColumns(getInvertedIndexColumns())
.setNoDictionaryColumns(getNoDictionaryColumns())
.setRangeIndexColumns(getRangeIndexColumns())
.setBloomFilterColumns(getBloomFilterColumns())
.setFieldConfigList(getFieldConfigs())
.setNumReplicas(getNumReplicas())
.setSegmentVersion(getSegmentVersion())
.setLoadMode(getLoadMode())
.setTaskConfig(getTaskConfig())
.setBrokerTenant(getBrokerTenant())
.setServerTenant(getServerTenant())
.setIngestionConfig(getIngestionConfig())
.setQueryConfig(getQueryConfig())
.setStreamConfigs(getStreamConfigs())
.setNullHandlingEnabled(getNullHandlingEnabled())
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ protected void startBroker()

protected void startBrokers(int numBrokers)
throws Exception {
for (int i = 0; i < numBrokers; i++) {
BaseBrokerStarter brokerStarter = startOneBroker(i);
_brokerStarters.add(brokerStarter);
}
assertEquals(System.getProperty("user.timezone"), "UTC");
runWithHelixMock(() -> {
for (int i = 0; i < numBrokers; i++) {
BaseBrokerStarter brokerStarter = startOneBroker(i);
_brokerStarters.add(brokerStarter);
}
assertEquals(System.getProperty("user.timezone"), "UTC");
});
}

protected BaseBrokerStarter startOneBroker(int brokerId)
Expand Down Expand Up @@ -257,11 +259,13 @@ protected void startServer()

protected void startServers(int numServers)
throws Exception {
FileUtils.deleteQuietly(new File(TEMP_SERVER_DIR));
for (int i = 0; i < numServers; i++) {
_serverStarters.add(startOneServer(i));
}
assertEquals(System.getProperty("user.timezone"), "UTC");
runWithHelixMock(() -> {
FileUtils.deleteQuietly(new File(TEMP_SERVER_DIR));
for (int i = 0; i < numServers; i++) {
_serverStarters.add(startOneServer(i));
}
assertEquals(System.getProperty("user.timezone"), "UTC");
});
}

protected BaseServerStarter startOneServer(int serverId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void setUp()
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
// Start an empty Pinot cluster
startZk();
startController();
startControllerWithSwagger();
startBroker();
startServer();
startMinion();
Expand Down

0 comments on commit 6eddacf

Please sign in to comment.