Skip to content

Commit

Permalink
[AMORO-3418] Optimize ams configuration to support parsing of time an…
Browse files Browse the repository at this point in the history
…d storage unit (#3423)

* [AMORO-3418] Optimize ams configuration to support parsing of time interval and storage related configuration items when both values and units are specified

* fixup! [AMORO-3418] Optimize ams configuration to support parsing of time interval and storage related configuration items when both values and units are specified

* fixup! [AMORO-3418] Optimize ams configuration to support parsing of time interval and storage related configuration items when both values and units are specified

---------

Co-authored-by: jzjsnow <snow.jiangzj@gmail.com>
  • Loading branch information
Jzjsnow and jzjsnow authored Feb 10, 2025
1 parent 65ab732 commit a922fe8
Show file tree
Hide file tree
Showing 15 changed files with 559 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.amoro.config.ConfigOption;
import org.apache.amoro.config.ConfigOptions;
import org.apache.amoro.utils.MemorySize;

import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -81,10 +82,10 @@ public class AmoroManagementConf {
"Sets the size of the worker pool. The worker pool limits the number of tasks concurrently processing "
+ "manifests in the base table implementation across all concurrent commit operations.");

public static final ConfigOption<Long> REFRESH_EXTERNAL_CATALOGS_INTERVAL =
public static final ConfigOption<Duration> REFRESH_EXTERNAL_CATALOGS_INTERVAL =
ConfigOptions.key("refresh-external-catalogs.interval")
.longType()
.defaultValue(3 * 60 * 1000L)
.durationType()
.defaultValue(Duration.ofMinutes(3))
.withDescription("Interval to refresh the external catalog.");

public static final ConfigOption<Integer> REFRESH_EXTERNAL_CATALOGS_THREAD_COUNT =
Expand Down Expand Up @@ -172,29 +173,29 @@ public class AmoroManagementConf {
.defaultValue(3)
.withDescription("The number of threads used for creating tags.");

public static final ConfigOption<Long> AUTO_CREATE_TAGS_INTERVAL =
public static final ConfigOption<Duration> AUTO_CREATE_TAGS_INTERVAL =
ConfigOptions.key("auto-create-tags.interval")
.longType()
.defaultValue(60000L)
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription("Interval for creating tags.");

public static final ConfigOption<Long> REFRESH_TABLES_INTERVAL =
public static final ConfigOption<Duration> REFRESH_TABLES_INTERVAL =
ConfigOptions.key("refresh-tables.interval")
.longType()
.defaultValue(60000L)
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription("Interval for refreshing table metadata.");

public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
ConfigOptions.key("refresh-tables.max-pending-partition-count")
.intType()
.defaultValue(100)
.withDescription("Filters will not be used beyond that number of partitions");
.withDescription("Filters will not be used beyond that number of partitions.");

public static final ConfigOption<Long> BLOCKER_TIMEOUT =
public static final ConfigOption<Duration> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
.longType()
.defaultValue(60000L)
.withDescription("session timeout in Milliseconds");
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription("Session timeout. Default unit is milliseconds if not specified.");

public static final ConfigOption<Boolean> HA_ENABLE =
ConfigOptions.key("ha.enabled")
Expand Down Expand Up @@ -226,11 +227,12 @@ public class AmoroManagementConf {
.defaultValue(1261)
.withDescription("Port that the optimizing service thrift server is bound to.");

public static final ConfigOption<Long> THRIFT_MAX_MESSAGE_SIZE =
public static final ConfigOption<MemorySize> THRIFT_MAX_MESSAGE_SIZE =
ConfigOptions.key("thrift-server.max-message-size")
.longType()
.defaultValue(100 * 1024 * 1024L)
.withDescription("Maximum message size that the Thrift server can accept.");
.memorySizeType()
.defaultValue(MemorySize.ofMebiBytes(100))
.withDescription(
"Maximum message size that the Thrift server can accept. Default unit is bytes if not specified.");

public static final ConfigOption<Integer> THRIFT_WORKER_THREADS =
ConfigOptions.key("thrift-server.table-service.worker-thread-count")
Expand Down Expand Up @@ -354,16 +356,16 @@ public class AmoroManagementConf {
.defaultValue(30000L)
.withDescription("Max wait time before getting a connection timeout.");

public static final ConfigOption<Long> OPTIMIZER_HB_TIMEOUT =
public static final ConfigOption<Duration> OPTIMIZER_HB_TIMEOUT =
ConfigOptions.key("optimizer.heart-beat-timeout")
.longType()
.defaultValue(60000L)
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription("Timeout duration for Optimizer heartbeat.");

public static final ConfigOption<Long> OPTIMIZER_TASK_ACK_TIMEOUT =
public static final ConfigOption<Duration> OPTIMIZER_TASK_ACK_TIMEOUT =
ConfigOptions.key("optimizer.task-ack-timeout")
.longType()
.defaultValue(30000L)
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription("Timeout duration for task acknowledgment.");

public static final ConfigOption<Integer> OPTIMIZER_MAX_PLANNING_PARALLELISM =
Expand All @@ -372,10 +374,10 @@ public class AmoroManagementConf {
.defaultValue(1)
.withDescription("Max planning parallelism in one optimizer group.");

public static final ConfigOption<Long> OPTIMIZER_POLLING_TIMEOUT =
public static final ConfigOption<Duration> OPTIMIZER_POLLING_TIMEOUT =
ConfigOptions.key("optimizer.polling-timeout")
.longType()
.defaultValue(3000L)
.durationType()
.defaultValue(Duration.ofSeconds(3))
.withDescription("Optimizer polling task timeout.");

/** config key prefix of terminal */
Expand Down Expand Up @@ -408,11 +410,12 @@ public class AmoroManagementConf {
.withDescription(
"When a statement fails to execute, stop execution or continue executing the remaining statements.");

public static final ConfigOption<Integer> TERMINAL_SESSION_TIMEOUT =
public static final ConfigOption<Duration> TERMINAL_SESSION_TIMEOUT =
ConfigOptions.key("terminal.session.timeout")
.intType()
.defaultValue(30)
.withDescription("Session timeout in minutes.");
.durationType()
.defaultValue(Duration.ofMinutes(30))
.withDescription(
"Session timeout. Default unit is milliseconds if not specified (** Note: default units are minutes when version < 0.8).");

public static final ConfigOption<String> TERMINAL_SENSITIVE_CONF_KEYS =
ConfigOptions.key("terminal.sensitive-conf-keys")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private void registerAmsServiceMetric() {

private void initThriftService() throws TTransportException {
LOG.info("Initializing thrift service...");
long maxMessageSize = serviceConfig.getLong(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE);
long maxMessageSize = serviceConfig.get(AmoroManagementConf.THRIFT_MAX_MESSAGE_SIZE).getBytes();
int selectorThreads = serviceConfig.getInteger(AmoroManagementConf.THRIFT_SELECTOR_THREADS);
int workerThreads = serviceConfig.getInteger(AmoroManagementConf.THRIFT_WORKER_THREADS);
int queueSizePerSelector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,14 @@ public DefaultOptimizingService(
CatalogManager catalogManager,
MaintainedTableManager tableManager,
TableService tableService) {
this.optimizerTouchTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
this.optimizerTouchTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
this.taskAckTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.pollingTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT).toMillis();
this.tableService = tableService;
this.catalogManager = catalogManager;
this.tableManager = tableManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class DefaultTableManager extends PersistentBase implements TableManager

public DefaultTableManager(Configurations configuration, CatalogManager catalogManager) {
this.catalogManager = catalogManager;
this.blockerTimeout = configuration.getLong(AmoroManagementConf.BLOCKER_TIMEOUT);
this.blockerTimeout = configuration.get(AmoroManagementConf.BLOCKER_TIMEOUT).toMillis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class DefaultTableService extends PersistentBase implements TableService
public DefaultTableService(Configurations configuration, CatalogManager catalogManager) {
this.catalogManager = catalogManager;
this.externalCatalogRefreshingInterval =
configuration.getLong(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL);
configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis();
this.serverConfiguration = configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public void setup(TableService tableService, Configurations conf) {
new TableRuntimeRefreshExecutor(
tableService,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
conf.get(AmoroManagementConf.REFRESH_TABLES_INTERVAL).toMillis(),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
tableService,
conf.getInteger(AmoroManagementConf.AUTO_CREATE_TAGS_THREAD_COUNT),
conf.getLong(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL));
conf.get(AmoroManagementConf.AUTO_CREATE_TAGS_INTERVAL).toMillis());
}
if (conf.getBoolean(AmoroManagementConf.DATA_EXPIRATION_ENABLED)) {
this.dataExpiringExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public TerminalManager(Configurations conf, CatalogManager catalogManager) {
this.catalogManager = catalogManager;
this.resultLimits = conf.getInteger(AmoroManagementConf.TERMINAL_RESULT_LIMIT);
this.stopOnError = conf.getBoolean(AmoroManagementConf.TERMINAL_STOP_ON_ERROR);
this.sessionTimeout = conf.getInteger(AmoroManagementConf.TERMINAL_SESSION_TIMEOUT);
this.sessionTimeout = (int) conf.get(AmoroManagementConf.TERMINAL_SESSION_TIMEOUT).toMinutes();
this.sessionFactory = loadTerminalSessionFactory(conf);
gcThread = new Thread(new SessionCleanTask());
gcThread.setName("terminal-session-gc");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.junit.Assert;
import org.junit.BeforeClass;

import java.time.Duration;

public abstract class AMSServiceTestBase extends AMSManagerTestBase {
private static DefaultTableService TABLE_SERVICE = null;
private static DefaultOptimizingService OPTIMIZING_SERVICE = null;
Expand All @@ -35,7 +37,7 @@ public abstract class AMSServiceTestBase extends AMSManagerTestBase {
public static void initTableService() {
try {
Configurations configurations = new Configurations();
configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, 800L);
configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L));
TABLE_SERVICE = new DefaultTableService(new Configurations(), CATALOG_MANAGER);
OPTIMIZING_SERVICE =
new DefaultOptimizingService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.net.BindException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -346,7 +347,8 @@ private void startAms() throws Exception {
AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
optimizingServiceBindPort);
serviceConfig.set(
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL, 1000L);
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
Duration.ofMillis(1000L));
serviceContainer.startService();
break;
} catch (TTransportException e) {
Expand Down
Loading

0 comments on commit a922fe8

Please sign in to comment.