diff --git a/build.gradle b/build.gradle index 4e63ce5..8dff4f2 100644 --- a/build.gradle +++ b/build.gradle @@ -26,8 +26,8 @@ sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { - compile "org.embulk:embulk-core:0.8.39" - provided "org.embulk:embulk-core:0.8.39" + compile "org.embulk:embulk-core:0.9.23" + provided "org.embulk:embulk-core:0.9.23" compile "org.scala-lang:scala-library:2.12.+" compile "org.apache.orc:orc:1.5.4" @@ -37,7 +37,6 @@ dependencies { compile 'org.embulk.input.s3:embulk-util-aws-credentials:0.2.8' compile "com.amazonaws:aws-java-sdk-s3:1.10.33" compile "org.apache.hadoop:hadoop-aws:2.7.5" - compile 'com.google.guava:guava:24.1-jre' testCompile 'org.jmockit:jmockit:1.38' // testCompile "junit:junit:4.+" diff --git a/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java b/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java deleted file mode 100644 index 234587f..0000000 --- a/src/main/java/org/embulk/output/orc/OrcOutputPlugin.java +++ /dev/null @@ -1,230 +0,0 @@ -package org.embulk.output.orc; - -import com.amazonaws.auth.AWSCredentials; -import com.google.common.base.Throwables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.util.VersionInfo; -import org.apache.orc.CompressionKind; -import org.apache.orc.MemoryManager; -import org.apache.orc.OrcFile; -import org.apache.orc.TypeDescription; -import org.apache.orc.Writer; -import org.embulk.config.ConfigDiff; -import org.embulk.config.ConfigSource; -import org.embulk.config.TaskReport; -import org.embulk.config.TaskSource; -import org.embulk.spi.Column; -import org.embulk.spi.Exec; -import org.embulk.spi.OutputPlugin; -import org.embulk.spi.PageReader; -import org.embulk.spi.Schema; -import org.embulk.spi.TransactionalPageOutput; -import org.embulk.spi.time.TimestampFormatter; -import org.embulk.spi.type.Type; -import org.embulk.spi.util.Timestamps; -import org.embulk.util.aws.credentials.AwsCredentials; - -import java.io.IOException; -import java.util.List; - -public class OrcOutputPlugin - implements OutputPlugin -{ - @Override - public ConfigDiff transaction(ConfigSource config, - Schema schema, int taskCount, - OutputPlugin.Control control) - { - PluginTask task = config.loadConfig(PluginTask.class); - - // retryable (idempotent) output: - // return resume(task.dump(), schema, taskCount, control); - - // non-retryable (non-idempotent) output: - control.run(task.dump()); - return Exec.newConfigDiff(); - } - - @Override - public ConfigDiff resume(TaskSource taskSource, - Schema schema, int taskCount, - OutputPlugin.Control control) - { - throw new UnsupportedOperationException("orc output plugin does not support resuming"); - } - - @Override - public void cleanup(TaskSource taskSource, - Schema schema, int taskCount, - List successTaskReports) - { - } - - @Override - public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int taskIndex) - { - PluginTask task = taskSource.loadTask(PluginTask.class); - - if (task.getOverwrite()) { - AWSCredentials credentials = AwsCredentials.getAWSCredentialsProvider(task).getCredentials(); - OrcOutputPluginHelper.removeOldFile(buildPath(task, taskIndex), task); - } - - final PageReader reader = new PageReader(schema); - Writer writer = createWriter(task, schema, taskIndex); - - return new OrcTransactionalPageOutput(reader, writer, task); - } - - private String buildPath(PluginTask task, int processorIndex) - { - final String pathPrefix = task.getPathPrefix(); - final String pathSuffix = task.getFileNameExtension(); - final String sequenceFormat = task.getSequenceFormat(); - return pathPrefix + String.format(sequenceFormat, processorIndex) + pathSuffix; - } - - static TypeDescription getSchema(Schema schema) - { - TypeDescription oschema = TypeDescription.createStruct(); - for (int i = 0; i < schema.size(); i++) { - Column column = schema.getColumn(i); - Type type = column.getType(); - switch (type.getName()) { - case "long": - oschema.addField(column.getName(), TypeDescription.createLong()); - break; - case "double": - oschema.addField(column.getName(), TypeDescription.createDouble()); - break; - case "boolean": - oschema.addField(column.getName(), TypeDescription.createBoolean()); - break; - case "string": - oschema.addField(column.getName(), TypeDescription.createString()); - break; - case "timestamp": - oschema.addField(column.getName(), TypeDescription.createTimestamp()); - break; - default: - System.out.println("Unsupported type"); - break; - } - } - return oschema; - } - - private Configuration getHadoopConfiguration(PluginTask task) - { - Configuration conf = new Configuration(); - - // see: https://stackoverflow.com/questions/17265002/hadoop-no-filesystem-for-scheme-file - conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", LocalFileSystem.class.getName()); - // see: https://stackoverflow.com/questions/20833444/how-to-set-objects-in-hadoop-configuration - - AwsCredentials.getAWSCredentialsProvider(task); - if (task.getAccessKeyId().isPresent()) { - conf.set("fs.s3a.access.key", task.getAccessKeyId().get()); - conf.set("fs.s3n.awsAccessKeyId", task.getAccessKeyId().get()); - } - if (task.getSecretAccessKey().isPresent()) { - conf.set("fs.s3a.secret.key", task.getSecretAccessKey().get()); - conf.set("fs.s3n.awsSecretAccessKey", task.getSecretAccessKey().get()); - } - if (task.getEndpoint().isPresent()) { - conf.set("fs.s3a.endpoint", task.getEndpoint().get()); - conf.set("fs.s3n.endpoint", task.getEndpoint().get()); - } - return conf; - } - - private Writer createWriter(PluginTask task, Schema schema, int processorIndex) - { - final TimestampFormatter[] timestampFormatters = Timestamps - .newTimestampColumnFormatters(task, schema, task.getColumnOptions()); - - Configuration conf = getHadoopConfiguration(task); - TypeDescription oschema = getSchema(schema); - - // see: https://groups.google.com/forum/#!topic/vertx/lLb-slzpWVg - Thread.currentThread().setContextClassLoader(VersionInfo.class.getClassLoader()); - - Writer writer = null; - try { - // Make writerOptions - OrcFile.WriterOptions writerOptions = createWriterOptions(task, conf); - // see: https://stackoverflow.com/questions/9256733/how-to-connect-hive-in-ireport - // see: https://community.hortonworks.com/content/kbentry/73458/connecting-dbvisualizer-and-datagrip-to-hive-with.html - writer = OrcFile.createWriter( - new Path(buildPath(task, processorIndex)), - writerOptions.setSchema(oschema) - .memory(new WriterLocalMemoryManager()) - .version(OrcFile.Version.V_0_12) - ); - } - catch (IOException e) { - Throwables.throwIfUnchecked(e); - } - return writer; - } - - private OrcFile.WriterOptions createWriterOptions(PluginTask task, Configuration conf) - { - final Integer bufferSize = task.getBufferSize(); - final Integer stripSize = task.getStripSize(); - final Integer blockSize = task.getBlockSize(); - final String kindString = task.getCompressionKind(); - CompressionKind kind = CompressionKind.valueOf(kindString); - return OrcFile.writerOptions(conf) - .bufferSize(bufferSize) - .blockSize(blockSize) - .stripeSize(stripSize) - .compress(kind); - } - - // We avoid using orc.MemoryManagerImpl since it is not threadsafe, but embulk is multi-threaded. - // Embulk creates and uses multiple instances of TransactionalPageOutput in worker threads. - // As a workaround, WriterLocalMemoryManager is bound to a single orc.Writer instance, and - // notifies checkMemory() only to that instance. - private static class WriterLocalMemoryManager - implements MemoryManager - { - final long rowsBetweenChecks = 10000; - - private int rowsAddedSinceCheck = 0; - Callback boundCallback = null; - - @Override - public void addWriter(Path path, long requestedAllocation, Callback callback) - throws IOException - { - if (boundCallback != null) { - throw new IllegalStateException("WriterLocalMemoryManager should be bound to a single orc.Writer instance."); - } - - boundCallback = callback; - } - - @Override - public void removeWriter(Path path) - throws IOException - { - boundCallback = null; - } - - @Override - public void addedRow(int rows) - throws IOException - { - rowsAddedSinceCheck += rows; - if (rowsAddedSinceCheck > rowsBetweenChecks) { - boundCallback.checkMemory(1); - rowsAddedSinceCheck = 0; - } - } - } -} diff --git a/src/main/java/org/embulk/output/orc/PluginTask.java b/src/main/java/org/embulk/output/orc/PluginTask.java deleted file mode 100644 index cd258fb..0000000 --- a/src/main/java/org/embulk/output/orc/PluginTask.java +++ /dev/null @@ -1,60 +0,0 @@ -package org.embulk.output.orc; - -import com.google.common.base.Optional; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.config.Task; -import org.embulk.spi.time.TimestampFormatter; -import org.embulk.util.aws.credentials.AwsCredentialsTask; -import org.joda.time.DateTimeZone; - -import java.util.Map; - -public interface PluginTask - extends Task, TimestampFormatter.Task, AwsCredentialsTask -{ - @Config("path_prefix") - String getPathPrefix(); - - @Config("file_ext") - @ConfigDefault("\".orc\"") - String getFileNameExtension(); - - @Config("column_options") - @ConfigDefault("{}") - Map getColumnOptions(); - - @Config("sequence_format") - @ConfigDefault("\".%03d\"") - String getSequenceFormat(); - - // see: https://orc.apache.org/docs/hive-config.html - // ORC File options - @Config("strip_size") - @ConfigDefault("67108864") // 64MB - Integer getStripSize(); - - @Config("buffer_size") - @ConfigDefault("262144") // 256KB - Integer getBufferSize(); - - @Config("block_size") - @ConfigDefault("268435456") // 256MB - Integer getBlockSize(); - - @Config("compression_kind") - @ConfigDefault("ZLIB") - public String getCompressionKind(); - - @Config("overwrite") - @ConfigDefault("false") - boolean getOverwrite(); - - @Config("default_from_timezone") - @ConfigDefault("\"UTC\"") - DateTimeZone getDefaultFromTimeZone(); - - @Config("endpoint") - @ConfigDefault("null") - Optional getEndpoint(); -} diff --git a/src/main/java/org/embulk/output/orc/TimestampColumnOption.java b/src/main/java/org/embulk/output/orc/TimestampColumnOption.java deleted file mode 100644 index 8a42c06..0000000 --- a/src/main/java/org/embulk/output/orc/TimestampColumnOption.java +++ /dev/null @@ -1,22 +0,0 @@ -package org.embulk.output.orc; - -import com.google.common.base.Optional; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.config.Task; -import org.embulk.spi.time.TimestampFormatter; -import org.joda.time.DateTimeZone; - -import java.util.List; - -public interface TimestampColumnOption - extends Task, TimestampFormatter.TimestampColumnOption -{ - @Config("from_timezone") - @ConfigDefault("null") - Optional getFromTimeZone(); - - @Config("from_format") - @ConfigDefault("null") - Optional> getFromFormat(); -} diff --git a/src/main/scala/org/embulk/output/orc/OrcOutputPlugin.scala b/src/main/scala/org/embulk/output/orc/OrcOutputPlugin.scala new file mode 100644 index 0000000..961894c --- /dev/null +++ b/src/main/scala/org/embulk/output/orc/OrcOutputPlugin.scala @@ -0,0 +1,156 @@ +package org.embulk.output.orc + +import java.io.IOException +import java.util + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{LocalFileSystem, Path} +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.util.VersionInfo +import org.apache.orc.{CompressionKind, MemoryManager, OrcFile, TypeDescription, Writer} +import org.embulk.config.{ConfigSource, TaskReport, TaskSource} +import org.embulk.spi.util.Timestamps +import org.embulk.spi.{Exec, OutputPlugin, PageReader, Schema} +import org.embulk.util.aws.credentials.AwsCredentials + +object OrcOutputPlugin { + private[orc] def getSchema(schema: Schema) = { + val oschema = TypeDescription.createStruct + for (i <- 0 until schema.size) { + val column = schema.getColumn(i) + val `type` = column.getType + `type`.getName match { + case "long" => + oschema.addField(column.getName, TypeDescription.createLong) + case "double" => + oschema.addField(column.getName, TypeDescription.createDouble) + case "boolean" => + oschema.addField(column.getName, TypeDescription.createBoolean) + case "string" => + oschema.addField(column.getName, TypeDescription.createString) + case "timestamp" => + oschema.addField(column.getName, TypeDescription.createTimestamp) + case _ => + System.out.println("Unsupported type") + } + } + oschema + } + + // We avoid using orc.MemoryManagerImpl since it is not threadsafe, but embulk is multi-threaded. + // Embulk creates and uses multiple instances of TransactionalPageOutput in worker threads. + // As a workaround, WriterLocalMemoryManager is bound to a single orc.Writer instance, and + // notifies checkMemory() only to that instance. + private class WriterLocalMemoryManager extends MemoryManager { + final private[orc] val rowsBetweenChecks = 10000 + private var rowsAddedSinceCheck = 0 + private[orc] var boundCallback: MemoryManager.Callback = _ + + @throws[IOException] + override def addWriter(path: Path, requestedAllocation: Long, callback: MemoryManager.Callback): Unit = { + if (boundCallback != null) { + throw new IllegalStateException("WriterLocalMemoryManager should be bound to a single orc.Writer instance.") + } else { + boundCallback = callback + } + } + + @throws[IOException] + override def removeWriter(path: Path): Unit = boundCallback = null + + @throws[IOException] + override def addedRow(rows: Int): Unit = { + rowsAddedSinceCheck += rows + if (rowsAddedSinceCheck > rowsBetweenChecks) { + boundCallback.checkMemory(1) + rowsAddedSinceCheck = 0 + } + } + } + +} + +class OrcOutputPlugin extends OutputPlugin { + override def transaction(config: ConfigSource, schema: Schema, taskCount: Int, control: OutputPlugin.Control) = { + val task = config.loadConfig(classOf[PluginTask]) + // retryable (idempotent) output: + // return resume(task.dump(), schema, taskCount, control); + // non-retryable (non-idempotent) output: + control.run(task.dump) + Exec.newConfigDiff + } + + override def resume(taskSource: TaskSource, schema: Schema, taskCount: Int, control: OutputPlugin.Control) = throw new UnsupportedOperationException("orc output plugin does not support resuming") + + override def cleanup(taskSource: TaskSource, schema: Schema, taskCount: Int, successTaskReports: util.List[TaskReport]): Unit = { + } + + override def open(taskSource: TaskSource, schema: Schema, taskIndex: Int) = { + val task = taskSource.loadTask(classOf[PluginTask]) + if (task.getOverwrite) { + val credentials = AwsCredentials.getAWSCredentialsProvider(task).getCredentials + OrcOutputPluginHelper.removeOldFile(buildPath(task, taskIndex), task) + } + val reader = new PageReader(schema) + val writer = createWriter(task, schema, taskIndex) + new OrcTransactionalPageOutput(reader, writer, task) + } + + private def buildPath(task: PluginTask, processorIndex: Int): String = { + val pathPrefix = task.getPathPrefix + val pathSuffix = task.getFileNameExtension + val sequenceFormat = task.getSequenceFormat + val fmt = java.lang.String.format(sequenceFormat, processorIndex.asInstanceOf[AnyRef]) + pathPrefix + fmt + pathSuffix + } + + private def getHadoopConfiguration(task: PluginTask) = { + val conf = new Configuration + // see: https://stackoverflow.com/questions/17265002/hadoop-no-filesystem-for-scheme-file + conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName) + conf.set("fs.file.impl", classOf[LocalFileSystem].getName) + // see: https://stackoverflow.com/questions/20833444/how-to-set-objects-in-hadoop-configuration + AwsCredentials.getAWSCredentialsProvider(task) + if (task.getAccessKeyId.isPresent) { + conf.set("fs.s3a.access.key", task.getAccessKeyId.get) + conf.set("fs.s3n.awsAccessKeyId", task.getAccessKeyId.get) + } + if (task.getSecretAccessKey.isPresent) { + conf.set("fs.s3a.secret.key", task.getSecretAccessKey.get) + conf.set("fs.s3n.awsSecretAccessKey", task.getSecretAccessKey.get) + } + if (task.getEndpoint.isPresent) { + conf.set("fs.s3a.endpoint", task.getEndpoint.get) + conf.set("fs.s3n.endpoint", task.getEndpoint.get) + } + conf + } + + private def createWriter(task: PluginTask, schema: Schema, processorIndex: Int): Writer = { + val timestampFormatters = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions) + val conf = getHadoopConfiguration(task) + val oschema = OrcOutputPlugin.getSchema(schema) + // see: https://groups.google.com/forum/#!topic/vertx/lLb-slzpWVg + Thread.currentThread.setContextClassLoader(classOf[VersionInfo].getClassLoader) + + var writer: Writer = null + try { // Make writerOptions + val writerOptions = createWriterOptions(task, conf) + // see: https://stackoverflow.com/questions/9256733/how-to-connect-hive-in-ireport + // see: https://community.hortonworks.com/content/kbentry/73458/connecting-dbvisualizer-and-datagrip-to-hive-with.html + writer = OrcFile.createWriter(new Path(buildPath(task, processorIndex)), writerOptions.setSchema(oschema).memory(new OrcOutputPlugin.WriterLocalMemoryManager).version(OrcFile.Version.V_0_12)) + } catch { + case e: IOException => throw e + } + writer + } + + private def createWriterOptions(task: PluginTask, conf: Configuration) = { + val bufferSize = task.getBufferSize + val stripSize = task.getStripSize + val blockSize = task.getBlockSize + val kindString = task.getCompressionKind + val kind = CompressionKind.valueOf(kindString) + OrcFile.writerOptions(conf).bufferSize(bufferSize).blockSize(blockSize.toLong).stripeSize(stripSize.toLong).compress(kind) + } +} diff --git a/src/main/scala/org/embulk/output/orc/OrcOutputPluginHelper.scala b/src/main/scala/org/embulk/output/orc/OrcOutputPluginHelper.scala index cb70a59..222057b 100644 --- a/src/main/scala/org/embulk/output/orc/OrcOutputPluginHelper.scala +++ b/src/main/scala/org/embulk/output/orc/OrcOutputPluginHelper.scala @@ -6,19 +6,18 @@ import java.nio.file.{Files, Paths} import com.amazonaws.auth.profile.ProfileCredentialsProvider import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.DeleteObjectRequest -import com.google.common.base.Throwables import scala.beans.BeanProperty object OrcOutputPluginHelper { - def removeOldFile(fpath: String, task: PluginTask) = { + def removeOldFile(fpath: String, task: PluginTask): Unit = { // NOTE: Delete a file if local-filesystem, not HDFS or S3. val schema = getSchema(fpath) if (isDeleteTarget(schema)) schema match { case "file" => try Files.deleteIfExists(Paths.get(fpath)) catch { - case e: IOException => Throwables.throwIfUnchecked(e) + case e: IOException => throw e } case "s3" | "s3n" | "s3a" => val s3Url = parseS3Url(fpath) @@ -50,7 +49,7 @@ object OrcOutputPluginHelper { val parts = s3url.split("(://|/)").toList val bucket = parts.apply(1) val key = parts.slice(2, parts.size).mkString("/") - new OrcOutputPluginHelper.AmazonS3URILikeObject(bucket, key) + OrcOutputPluginHelper.AmazonS3URILikeObject(bucket, key) } case class AmazonS3URILikeObject(@BeanProperty bucket: String, @BeanProperty key: String) diff --git a/src/main/scala/org/embulk/output/orc/OrcTransactionalPageOutput.scala b/src/main/scala/org/embulk/output/orc/OrcTransactionalPageOutput.scala index 86e0557..177ac48 100644 --- a/src/main/scala/org/embulk/output/orc/OrcTransactionalPageOutput.scala +++ b/src/main/scala/org/embulk/output/orc/OrcTransactionalPageOutput.scala @@ -2,7 +2,6 @@ package org.embulk.output.orc import java.io.IOException -import com.google.common.base.Throwables import org.apache.orc.Writer import org.embulk.config.TaskReport import org.embulk.spi.{Exec, Page, PageReader, TransactionalPageOutput} @@ -41,7 +40,7 @@ class OrcTransactionalPageOutput(val reader: PageReader, val writer: Writer, val override def finish(): Unit = { try writer.close() catch { - case e: IOException => Throwables.throwIfUnchecked(e) + case e: IOException => throw e } } @@ -50,4 +49,4 @@ class OrcTransactionalPageOutput(val reader: PageReader, val writer: Writer, val override def abort(): Unit = {} override def commit: TaskReport = Exec.newTaskReport -} \ No newline at end of file +} diff --git a/src/main/scala/org/embulk/output/orc/PluginTask.scala b/src/main/scala/org/embulk/output/orc/PluginTask.scala new file mode 100644 index 0000000..c539e9d --- /dev/null +++ b/src/main/scala/org/embulk/output/orc/PluginTask.scala @@ -0,0 +1,56 @@ +package org.embulk.output.orc + +import java.util + +import com.google.common.base.Optional +import org.embulk.config.{Config, ConfigDefault, Task} +import org.embulk.spi.time.TimestampFormatter +import org.embulk.util.aws.credentials.AwsCredentialsTask +import org.joda.time.DateTimeZone + +trait PluginTask extends Task with TimestampFormatter.Task with AwsCredentialsTask { + @Config("path_prefix") + def getPathPrefix: String + + @Config("file_ext") + @ConfigDefault("\".orc\"") + def getFileNameExtension: String + + @Config("column_options") + @ConfigDefault("{}") + def getColumnOptions: util.Map[String, TimestampColumnOption] + + @Config("sequence_format") + @ConfigDefault("\".%03d\"") + def getSequenceFormat: String + + // see: https://orc.apache.org/docs/hive-config.html + // ORC File options + @Config("strip_size") + @ConfigDefault("67108864") // 64MB + def getStripSize: Integer + + @Config("buffer_size") + @ConfigDefault("262144") // 256KB + def getBufferSize: Integer + + @Config("block_size") + @ConfigDefault("268435456") // 256MB + def getBlockSize: Integer + + @Config("compression_kind") + @ConfigDefault("ZLIB") + def getCompressionKind: String + + @Config("overwrite") + @ConfigDefault("false") + def getOverwrite: Boolean + + @Config("default_from_timezone") + @ConfigDefault("\"UTC\"") + def getDefaultFromTimeZone: DateTimeZone + + @Config("endpoint") + @ConfigDefault("null") + def getEndpoint: Optional[String] +} diff --git a/src/main/scala/org/embulk/output/orc/TimestampColumnOption.scala b/src/main/scala/org/embulk/output/orc/TimestampColumnOption.scala new file mode 100644 index 0000000..d2f40c2 --- /dev/null +++ b/src/main/scala/org/embulk/output/orc/TimestampColumnOption.scala @@ -0,0 +1,32 @@ +package org.embulk.output.orc + +import java.util + +import com.google.common.base.Optional +import org.embulk.config.{Config, ConfigDefault, Task} +import org.embulk.spi.time.TimestampFormatter +import org.joda.time.DateTimeZone + +/* +public interface TimestampColumnOption + extends Task, TimestampFormatter.TimestampColumnOption +{ + @Config("from_timezone") + @ConfigDefault("null") + Optional getFromTimeZone(); + + @Config("from_format") + @ConfigDefault("null") + Optional> getFromFormat(); +} + */ + +trait TimestampColumnOption extends Task with TimestampFormatter.TimestampColumnOption { + @Config("from_timezone") + @ConfigDefault("null") + def getFromTimeZone: Optional[DateTimeZone] + + @Config("from_format") + @ConfigDefault("null") + def getFromFormat: Optional[util.List[String]] +}