Skip to content

Commit

Permalink
[Feature-#1918][s3] Add support for reading all types of documents su…
Browse files Browse the repository at this point in the history
…pported by Apache Tika, read excel format

[Feature-#1918][s3] Add support for reading all types of documents supported by Apache Tika, read excel format
  • Loading branch information
libailin authored and lihongwei committed Sep 20, 2024
1 parent 33c14a5 commit a45eabb
Show file tree
Hide file tree
Showing 42 changed files with 6,270 additions and 21 deletions.
12 changes: 12 additions & 0 deletions chunjun-connectors/chunjun-connector-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@
<version>1.11-8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-format-tika</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-format-excel</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package com.dtstack.chunjun.connector.s3.config;

import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.format.excel.config.ExcelFormatConfig;
import com.dtstack.chunjun.format.tika.config.TikaReadConfig;

import com.amazonaws.regions.Regions;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand Down Expand Up @@ -86,4 +88,23 @@ public class S3Config extends CommonConfig implements Serializable {

/** 生成的文件名后缀 */
private String suffix;

/** 对象匹配规则 */
private String objectsRegex;

/** 是否使用文本限定符 */
private boolean useTextQualifier = true;

/** 是否开启每条记录生成一个对应的文件 */
private boolean enableWriteSingleRecordAsFile = false;

/** 保留原始文件名 */
private boolean keepOriginalFilename = false;

/** 禁用 Bucket 名称注入到 endpoint 前缀 */
private boolean disableBucketNameInEndpoint = false;

private TikaReadConfig tikaReadConfig = new TikaReadConfig();

private ExcelFormatConfig excelFormatConfig = new ExcelFormatConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ public DynamicTableSink copy() {

@Override
public String asSummaryString() {
return "StreamDynamicTableSink";
return S3DynamicTableSink.class.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,28 @@
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.throwable.WriteRecordException;
import com.dtstack.chunjun.util.GsonUtil;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PartETag;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;

import static com.dtstack.chunjun.format.tika.config.TikaReadConfig.ORIGINAL_FILENAME;

/** The OutputFormat Implementation which write data to Amazon S3. */
@Slf4j
public class S3OutputFormat extends BaseRichOutputFormat {
Expand Down Expand Up @@ -137,7 +143,8 @@ private void checkOutputDir() {
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
s3Config.getFetchSize());
s3Config.getFetchSize(),
s3Config.getObjectsRegex());
} else {
subObjects =
S3Util.listObjectsByv1(
Expand Down Expand Up @@ -166,11 +173,17 @@ private void nextBlock() {
sw = new StringWriter();
}
this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter());
if (!s3Config.isUseTextQualifier()) {
writerUtil.setUseTextQualifier(false);
}
this.currentPartNumber = this.currentPartNumber + 1;
}

/** Create file multipart upload ID */
private void createActionFinishedTag() {
if (s3Config.isEnableWriteSingleRecordAsFile()) {
return;
}
if (!StringUtils.isNotBlank(currentUploadId)) {
this.currentUploadId =
S3Util.initiateMultipartUploadAndGetId(
Expand All @@ -193,26 +206,35 @@ private void beforeWriteRecords() {
}

protected void flushDataInternal() {
if (sw == null) {
return;
}
StringBuffer sb = sw.getBuffer();
if (sb.length() > MIN_SIZE || willClose) {
if (sb.length() > MIN_SIZE || willClose || s3Config.isEnableWriteSingleRecordAsFile()) {
byte[] byteArray;
try {
byteArray = sb.toString().getBytes(s3Config.getEncoding());
} catch (UnsupportedEncodingException e) {
throw new ChunJunRuntimeException(e);
}
log.info("Upload part size:" + byteArray.length);
PartETag partETag =
S3Util.uploadPart(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
this.currentUploadId,
this.currentPartNumber,
byteArray);

MyPartETag myPartETag = new MyPartETag(partETag);
myPartETags.add(myPartETag);

if (s3Config.isEnableWriteSingleRecordAsFile()) {
S3Util.putStringObject(
amazonS3, s3Config.getBucket(), s3Config.getObject(), sb.toString());
} else {
PartETag partETag =
S3Util.uploadPart(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
this.currentUploadId,
this.currentPartNumber,
byteArray);

MyPartETag myPartETag = new MyPartETag(partETag);
myPartETags.add(myPartETag);
}

log.debug(
"task-{} upload etag:[{}]",
Expand All @@ -225,6 +247,9 @@ protected void flushDataInternal() {
}

private void completeMultipartUploadFile() {
if (s3Config.isEnableWriteSingleRecordAsFile()) {
return;
}
if (this.currentPartNumber > 10000) {
throw new IllegalArgumentException("part can not bigger than 10000");
}
Expand Down Expand Up @@ -282,7 +307,11 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
// convert row to string
stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord);
try {
for (int i = 0; i < columnNameList.size(); ++i) {
int columnSize = columnNameList.size();
if (s3Config.isEnableWriteSingleRecordAsFile()) {
columnSize = 1;
}
for (int i = 0; i < columnSize; ++i) {

String column = stringRecord[i];

Expand All @@ -292,6 +321,25 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
writerUtil.write(column);
}
writerUtil.endRecord();

if (s3Config.isEnableWriteSingleRecordAsFile()) {
Map<String, String> metadataMap =
GsonUtil.GSON.fromJson(stringRecord[1], Map.class);
String key = FilenameUtils.getPath(s3Config.getObject());
// 是否保留原始文件名
if (s3Config.isKeepOriginalFilename()) {
key += metadataMap.get(ORIGINAL_FILENAME) + getExtension();
} else {
key +=
jobId
+ "_"
+ taskNumber
+ "_"
+ UUID.randomUUID().toString()
+ getExtension();
}
s3Config.setObject(key);
}
flushDataInternal();
} catch (Exception ex) {
String msg = "RowData2string error RowData(" + rowData + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
field.setName(column.getName());
field.setType(
TypeConfig.fromString(column.getDataType().getLogicalType().asSummaryString()));
field.setIndex(i);
int index =
s3Config.getExcelFormatConfig().getColumnIndex() != null
? s3Config.getExcelFormatConfig()
.getColumnIndex()
.get(columns.indexOf(column))
: columns.indexOf(column);
field.setIndex(index);
columnList.add(field);
}
s3Config.setColumn(columnList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

package com.dtstack.chunjun.connector.s3.source;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.RestoreConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.enums.CompressType;
import com.dtstack.chunjun.connector.s3.util.ReaderUtil;
import com.dtstack.chunjun.connector.s3.util.S3SimpleObject;
import com.dtstack.chunjun.connector.s3.util.S3Util;
import com.dtstack.chunjun.format.excel.common.ExcelData;
import com.dtstack.chunjun.format.excel.source.ExcelInputFormat;
import com.dtstack.chunjun.format.tika.common.TikaData;
import com.dtstack.chunjun.format.tika.source.TikaInputFormat;
import com.dtstack.chunjun.restore.FormatState;
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
Expand All @@ -38,6 +43,8 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
Expand Down Expand Up @@ -71,6 +78,12 @@ public class S3InputFormat extends BaseRichInputFormat {

private RestoreConfig restoreConf;

private transient TikaData tikaData;
private TikaInputFormat tikaInputFormat;

private transient ExcelData excelData;
private ExcelInputFormat excelInputFormat;

@Override
public void openInputFormat() throws IOException {
super.openInputFormat();
Expand Down Expand Up @@ -137,7 +150,31 @@ protected InputSplit[] createInputSplitsInternal(int minNumSplits) {
protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
String[] fields;
try {
fields = readerUtil.getValues();
if (s3Config.getTikaReadConfig().isUseExtract() && tikaData != null) {
fields = tikaData.getData();
} else if (s3Config.getExcelFormatConfig().isUseExcelFormat() && excelData != null) {
fields = excelData.getData();
} else {
fields = readerUtil.getValues();
}
// 处理字段配置了对应的列索引
if (s3Config.getExcelFormatConfig().getColumnIndex() != null) {
List<FieldConfig> columns = s3Config.getColumn();
String[] fieldsData = new String[columns.size()];
for (int i = 0; i < CollectionUtils.size(columns); i++) {
FieldConfig fieldConfig = columns.get(i);
if (fieldConfig.getIndex() >= fields.length) {
String errorMessage =
String.format(
"The column index is greater than the data size."
+ " The current column index is [%s], but the data size is [%s]. Data loss may occur.",
fieldConfig.getIndex(), fields.length);
throw new IllegalArgumentException(errorMessage);
}
fieldsData[i] = fields[fieldConfig.getIndex()];
}
fields = fieldsData;
}
rowData = rowConverter.toInternal(fields);
} catch (IOException e) {
throw new ChunJunRuntimeException(e);
Expand All @@ -164,9 +201,82 @@ protected void closeInternal() {

@Override
public boolean reachedEnd() throws IOException {
if (s3Config.getTikaReadConfig().isUseExtract()) {
tikaData = getTikaData();
return tikaData == null || tikaData.getData() == null;
} else if (s3Config.getExcelFormatConfig().isUseExcelFormat()) {
excelData = getExcelData();
return excelData == null || excelData.getData() == null;
}
return reachedEndWithoutCheckState();
}

public ExcelData getExcelData() {
if (excelInputFormat == null) {
nextExcelDataStream();
}
if (excelInputFormat != null) {
if (!excelInputFormat.hasNext()) {
excelInputFormat.close();
excelInputFormat = null;
return getExcelData();
}
String[] record = excelInputFormat.nextRecord();
return new ExcelData(record);
} else {
return null;
}
}

private void nextExcelDataStream() {
if (splits.hasNext()) {
currentObject = splits.next();
GetObjectRequest rangeObjectRequest =
new GetObjectRequest(s3Config.getBucket(), currentObject);
log.info("Current read file {}", currentObject);
S3Object o = amazonS3.getObject(rangeObjectRequest);
S3ObjectInputStream s3is = o.getObjectContent();
excelInputFormat = new ExcelInputFormat();
excelInputFormat.open(s3is, s3Config.getExcelFormatConfig());
} else {
excelInputFormat = null;
}
}

public TikaData getTikaData() {
if (tikaInputFormat == null) {
nextTikaDataStream();
}
if (tikaInputFormat != null) {
if (!tikaInputFormat.hasNext()) {
tikaInputFormat.close();
tikaInputFormat = null;
return getTikaData();
}
String[] record = tikaInputFormat.nextRecord();
return new TikaData(record);
} else {
return null;
}
}

private void nextTikaDataStream() {
if (splits.hasNext()) {
currentObject = splits.next();
GetObjectRequest rangeObjectRequest =
new GetObjectRequest(s3Config.getBucket(), currentObject);
log.info("Current read file {}", currentObject);
S3Object o = amazonS3.getObject(rangeObjectRequest);
S3ObjectInputStream s3is = o.getObjectContent();
tikaInputFormat =
new TikaInputFormat(
s3Config.getTikaReadConfig(), s3Config.getFieldNameList().size());
tikaInputFormat.open(s3is, FilenameUtils.getName(currentObject));
} else {
tikaInputFormat = null;
}
}

public boolean reachedEndWithoutCheckState() throws IOException {
// br is empty, indicating that a new file needs to be read
if (readerUtil == null) {
Expand Down Expand Up @@ -259,7 +369,11 @@ public List<S3SimpleObject> resolveObjects() {
if (s3Config.isUseV2()) {
subObjects =
S3Util.listObjectsKeyByPrefix(
amazonS3, bucket, prefix, s3Config.getFetchSize());
amazonS3,
bucket,
prefix,
s3Config.getFetchSize(),
s3Config.getObjectsRegex());
} else {
subObjects =
S3Util.listObjectsByv1(
Expand Down
Loading

0 comments on commit a45eabb

Please sign in to comment.