Skip to content

Commit

Permalink
Merge pull request #17 from trocco-io/feature/complement_columns
Browse files Browse the repository at this point in the history
Enable the creation of a schema taken from the table metadata and column_options
  • Loading branch information
d-hrs authored Dec 5, 2023
2 parents 40b2500 + 71fbb0f commit 3cd4062
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 3 deletions.
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ Athena input plugin for Embulk loads records from Athena(AWS).
* **access_key**: AWS access key (string, required)
* **secret_key**: AWS secret key (string, required)
* **query**: SQL to run (string, required)
* **columns**: columns (string, required)
* **columns**: columns. If these values are empty, they are taken from the table metadata and column_options. (array, optional)
* **column_options**: advanced: key-value pairs where key is a column name and value is options for the column, enabled if columns are empty. (array, optional)
- **value_type**: embulk get values from database as this value_type. Typically, the value_type determines `getXXX` method of `java.sql.PreparedStatement`.
- **type**: Column values are converted to this embulk type. Available values options are: `boolean`, `long`, `double`, `string`, `json`, `timestamp`).
* **options**: extra JDBC properties (string, default: {})
* **null_to_zero**: if true, convert long, double and boolean value from null to zero (boolean, default: false)

Expand All @@ -43,6 +46,21 @@ in:
null_to_zero: true
```
```yaml
in:
type: athena
database: log_test
athena_url: "jdbc:awsathena://athena.ap-northeast-1.amazonaws.com:443"
s3_staging_dir: "s3://aws-athena-query-results-11111111111-ap-northeast-1/"
access_key: ""
secret_key: ""
query: |
select uid, created_at from log_test.sample
column_options:
created_at: { type: string }
null_to_zero: true
```
## Build
```bash
Expand Down
96 changes: 94 additions & 2 deletions src/main/java/org/embulk/input/athena/AthenaInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TimeZone;
import java.time.ZoneId;

import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.input.athena.getter.AthenaColumnGetterFactory;
import org.embulk.input.jdbc.getter.ColumnGetterFactory;
import org.embulk.input.jdbc.JdbcColumn;
import org.embulk.input.jdbc.JdbcColumnOption;
import org.embulk.input.jdbc.JdbcInputConnection;
import org.embulk.input.jdbc.JdbcSchema;
import org.embulk.input.jdbc.ToStringMap;
import org.embulk.spi.BufferAllocator;
import org.embulk.spi.Column;
Expand All @@ -40,14 +52,16 @@
import org.embulk.util.config.ConfigMapperFactory;
import org.embulk.util.config.Task;
import org.embulk.util.config.TaskMapper;
import org.embulk.util.config.modules.ZoneIdModule;
import org.embulk.util.config.units.SchemaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AthenaInputPlugin implements InputPlugin
{
protected final Logger logger = LoggerFactory.getLogger(getClass());
private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build();
private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().addModule(ZoneIdModule.withLegacyNames()).build();
protected static final ConfigMapper CONFIG_MAPPER = CONFIG_MAPPER_FACTORY.createConfigMapper();

public interface PluginTask extends Task
{
Expand Down Expand Up @@ -81,6 +95,7 @@ public interface PluginTask extends Task

// if you get schema from config
@Config("columns")
@ConfigDefault("[]")
public SchemaConfig getColumns();

@Config("options")
Expand All @@ -91,6 +106,9 @@ public interface PluginTask extends Task
@ConfigDefault("false")
public boolean getNullToZero();

@Config("column_options")
@ConfigDefault("{}")
public Map<String, JdbcColumnOption> getColumnOptions();
}

@Override
Expand All @@ -99,12 +117,39 @@ public ConfigDiff transaction(ConfigSource config, InputPlugin.Control control)
final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper();
final PluginTask task = configMapper.map(config, PluginTask.class);

Schema schema = task.getColumns().toSchema();
Schema schema = getSchema(task);
int taskCount = 1; // number of run() method calls

return resume(task.toTaskSource(), schema, taskCount, control);
}

private void validateColumnOptions(PluginTask task) {
if(task.getColumnOptions() == null) {
return;
}

for (Map.Entry<String, JdbcColumnOption> entry : task.getColumnOptions().entrySet()) {
JdbcColumnOption columnOption = entry.getValue();
if(columnOption.getTimeZone().isPresent()) {
throw new ConfigException("timezone option is not supported");
}
}
}

private Schema getSchema(PluginTask task) {
SchemaConfig columns = task.getColumns();
if (columns != null && columns.getColumnCount() > 0) {
return columns.toSchema();
}

validateColumnOptions(task);
try {
return getSchemaOfQuery(task);
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

@Override
public ConfigDiff resume(TaskSource taskSource, Schema schema, int taskCount, InputPlugin.Control control)
{
Expand Down Expand Up @@ -280,11 +325,58 @@ protected Connection getAthenaConnection(PluginTask task) throws ClassNotFoundEx

return DriverManager.getConnection(task.getAthenaUrl(), properties);
}

private ColumnGetterFactory newColumnGetterFactory(PageBuilder pageBuilder, ZoneId dateTimeZone)
{
return new AthenaColumnGetterFactory(pageBuilder, dateTimeZone);
}

//
// copy from embulk-input-jdbc
//

private Schema getSchemaOfQuery(PluginTask task) throws SQLException, ClassNotFoundException {
JdbcInputConnection con = new JdbcInputConnection(getAthenaConnection(task), null);
JdbcSchema querySchema = con.getSchemaOfQuery(task.getQuery());
ColumnGetterFactory factory = newColumnGetterFactory(null, TimeZone.getTimeZone("Z").toZoneId());
final ArrayList<Column> columns = new ArrayList<>();
for (int i = 0; i < querySchema.getCount(); i++) {
JdbcColumn column = querySchema.getColumn(i);
JdbcColumnOption columnOption = columnOptionOf(task.getColumnOptions(), new HashMap<>(), column, factory.getJdbcType(column.getSqlType()));
columns.add(new Column(i,
column.getName(),
factory.newColumnGetter(con, null, column, columnOption).getToType()));
}
return new Schema(Collections.unmodifiableList(columns));
}

private static JdbcColumnOption columnOptionOf(Map<String, JdbcColumnOption> columnOptions, Map<String, JdbcColumnOption> defaultColumnOptions, JdbcColumn targetColumn, String targetColumnSQLType)
{
JdbcColumnOption columnOption = columnOptions.get(targetColumn.getName());
if (columnOption == null) {
String foundName = null;
for (Map.Entry<String, JdbcColumnOption> entry : columnOptions.entrySet()) {
if (entry.getKey().equalsIgnoreCase(targetColumn.getName())) {
if (columnOption != null) {
throw new ConfigException(String.format("Cannot specify column '%s' because both '%s' and '%s' exist in column_options.",
targetColumn.getName(), foundName, entry.getKey()));
}
foundName = entry.getKey();
columnOption = entry.getValue();
}
}
}

if (columnOption != null) {
return columnOption;
}
final JdbcColumnOption defaultColumnOption = defaultColumnOptions.get(targetColumnSQLType);
if (defaultColumnOption != null) {
return defaultColumnOption;
}
return CONFIG_MAPPER.map(CONFIG_MAPPER_FACTORY.newConfigSource(), JdbcColumnOption.class);
}

protected void loadDriver(String className, Optional<String> driverPath)
{
if (driverPath.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.embulk.input.athena.getter;

import org.embulk.input.jdbc.JdbcColumn;
import org.embulk.input.jdbc.getter.ColumnGetterFactory;
import org.embulk.spi.PageBuilder;

import java.time.ZoneId;

import static java.util.Locale.ENGLISH;

public class AthenaColumnGetterFactory extends ColumnGetterFactory {
public AthenaColumnGetterFactory(PageBuilder to, ZoneId defaultTimeZone) {
super(to, defaultTimeZone);
}

@Override
protected String sqlTypeToValueType(JdbcColumn column, int sqlType) {
try {
return super.sqlTypeToValueType(column, sqlType);
} catch (UnsupportedOperationException e) {
throw new UnsupportedOperationException(
String.format(ENGLISH,
"Unsupported type %s (sqlType=%d) of '%s' column. Please add '%s: {value_type: string}' to 'column_options: {...}' option to convert the values to strings.",
column.getTypeName(), column.getSqlType(), column.getName(), column.getName()));
}
}
}

0 comments on commit 3cd4062

Please sign in to comment.