Skip to content

Commit

Permalink
[Feature-#1842][ftp] Added compress and write mode when writing, fixe…
Browse files Browse the repository at this point in the history
…d the config of sftp is null

[hotfix-#1842][core] Fixed the config of sftp is null and support write mode

[Feature-#1842][ftp] Added compress and write mode when writing, fixed the config of sftp is null

[Feature-#1842][ftp] Added compress and write mode when writing, fixed the config of sftp is null
  • Loading branch information
libailin authored and zoudaokoulife committed Nov 24, 2023
1 parent f4402f0 commit 2d5f489
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dtstack.chunjun.connector.ftp.enums;

public enum CompressType {
GZIP,
BZIP2,
ZIP;
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public void loginFtpServer(FtpConfig ftpConfig) {
}
ftpClient.setControlEncoding(ftpConfig.getControlEncoding());
ftpClient.setListHiddenFiles(ftpConfig.isListHiddenFiles());
if (StringUtils.isNotEmpty(ftpConfig.getCompressType())) {
// 设置文件传输类型为二进制
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@
package com.dtstack.chunjun.connector.ftp.sink;

import com.dtstack.chunjun.connector.ftp.config.FtpConfig;
import com.dtstack.chunjun.connector.ftp.enums.CompressType;
import com.dtstack.chunjun.connector.ftp.handler.DTFtpHandler;
import com.dtstack.chunjun.connector.ftp.handler.FtpHandlerFactory;
import com.dtstack.chunjun.constants.ConstantValue;
import com.dtstack.chunjun.enums.SizeUnitType;
import com.dtstack.chunjun.sink.WriteMode;
import com.dtstack.chunjun.sink.format.BaseFileOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
import com.dtstack.chunjun.throwable.WriteRecordException;
import com.dtstack.chunjun.util.ExceptionUtil;

import org.apache.flink.table.data.RowData;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.lang3.StringUtils;

import java.io.BufferedWriter;
Expand Down Expand Up @@ -61,11 +65,16 @@ public class FtpOutputFormat extends BaseFileOutputFormat {

private transient OutputStream os;

private boolean isCompress = false;

@Override
protected void openInternal(int taskNumber, int numTasks) throws IOException {
super.openInternal(taskNumber, numTasks);
ftpHandler = FtpHandlerFactory.createFtpHandler(ftpConfig.getProtocol());
ftpHandler.loginFtpServer(ftpConfig);
if (StringUtils.isNotEmpty(ftpConfig.getCompressType())) {
isCompress = true;
}
}

@Override
Expand Down Expand Up @@ -99,15 +108,35 @@ protected void deleteTmpDataDir() {
@Override
protected void nextBlock() {
super.nextBlock();

if (writer != null) {
return;
if (isCompress) {
if (os != null) {
return;
}
} else {
if (writer != null) {
return;
}
}
String currentBlockTmpPath = tmpPath + SP + currentFileName;
try {
os = ftpHandler.getOutputStream(currentBlockTmpPath);
writer =
new BufferedWriter(new OutputStreamWriter(os, ftpConfig.getEncoding()), 131072);
if (isCompress) {
if (ftpConfig.getCompressType().equalsIgnoreCase(CompressType.GZIP.name())) {
os = new GzipCompressorOutputStream(os);
} else if (ftpConfig
.getCompressType()
.equalsIgnoreCase(CompressType.BZIP2.name())) {
os = new BZip2CompressorOutputStream(os);
} else {
throw new UnsupportedTypeException(
String.format(
"Unsupported compress type:[%s]", ftpConfig.getCompressType()));
}
} else {
writer =
new BufferedWriter(
new OutputStreamWriter(os, ftpConfig.getEncoding()), 131072);
}
log.info("subtask:[{}] create block file:{}", taskNumber, currentBlockTmpPath);
} catch (IOException e) {
throw new ChunJunRuntimeException(ExceptionUtil.getErrorMessage(e));
Expand All @@ -121,11 +150,13 @@ protected void checkCurrentFileSize() {
if (numWriteCounter.getLocalValue() < nextNumForCheckDataSize) {
return;
}
try {
// Does not manually flush cause a message error?
writer.flush();
} catch (IOException e) {
throw new ChunJunRuntimeException("flush failed when check fileSize");
if (!isCompress) {
try {
// Does not manually flush cause a message error?
writer.flush();
} catch (IOException e) {
throw new ChunJunRuntimeException("flush failed when check fileSize");
}
}
long currentFileSize = getCurrentFileSize();
if (currentFileSize > ftpConfig.getMaxFileSize()) {
Expand All @@ -142,13 +173,21 @@ protected void checkCurrentFileSize() {
@Override
public void writeSingleRecordToFile(RowData rowData) throws WriteRecordException {
try {
if (writer == null) {
nextBlock();
}

String line = (String) rowConverter.toExternal(rowData, "");
this.writer.write(line);
this.writer.write(NEWLINE);
if (isCompress) {
if (os == null) {
nextBlock();
}
byte[] bytes = line.getBytes(ftpConfig.getEncoding());
this.os.write(bytes);
this.os.write(NEWLINE);
} else {
if (writer == null) {
nextBlock();
}
this.writer.write(line);
this.writer.write(NEWLINE);
}
lastRow = rowData;
} catch (Exception ex) {
throw new WriteRecordException(ex.getMessage(), ex, 0, rowData);
Expand All @@ -159,12 +198,18 @@ public void writeSingleRecordToFile(RowData rowData) throws WriteRecordException
public void closeInternal() throws IOException {
super.closeInternal();
try {
if (writer != null) {
writer.flush();
writer.close();
writer = null;
os.close();
os = null;
if (isCompress) {
if (os != null) {
os.flush();
os.close();
os = null;
}
} else {
if (writer != null) {
writer.flush();
writer.close();
writer = null;
}
}
this.ftpHandler.logoutFtpServer();
} catch (Exception e) {
Expand All @@ -184,6 +229,7 @@ protected void closeSource() {
}

if (os != null) {
os.flush();
os.close();
os = null;
}
Expand All @@ -194,7 +240,23 @@ protected void closeSource() {

@Override
public void flushDataInternal() {
closeSource();
if (isCompress) {
log.info(
"Close current text stream, write data size:[{}]",
SizeUnitType.readableFileSize(bytesWriteCounter.getLocalValue()));
try {
if (os != null) {
os.flush();
os.close();
os = null;
}
} catch (IOException e) {
throw new ChunJunRuntimeException(
"error to flush stream." + ExceptionUtil.getErrorMessage(e), e);
}
} else {
closeSource();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.ftp.config.ConfigConstants;
import com.dtstack.chunjun.connector.ftp.config.FtpConfig;
import com.dtstack.chunjun.connector.ftp.enums.CompressType;
import com.dtstack.chunjun.connector.ftp.options.FtpOptions;
import com.dtstack.chunjun.connector.ftp.sink.FtpDynamicTableSink;
import com.dtstack.chunjun.connector.ftp.source.FtpDynamicTableSource;
import com.dtstack.chunjun.table.options.BaseFileOptions;
import com.dtstack.chunjun.throwable.UnsupportedTypeException;
import com.dtstack.chunjun.util.StringUtil;

import org.apache.flink.api.common.serialization.DeserializationSchema;
Expand All @@ -45,6 +47,8 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;

import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -69,6 +73,7 @@ private static FtpConfig getFtpConfByOptions(ReadableConfig config) {
ftpConfig.setCompressType(config.get(FtpOptions.COMPRESS_TYPE));
ftpConfig.setFileType(config.get(FtpOptions.FILE_TYPE));
ftpConfig.setNextCheckRows(config.get(BaseFileOptions.NEXT_CHECK_ROWS));
ftpConfig.setWriteMode(config.get(BaseFileOptions.WRITE_MODE));

if (!ConfigConstants.DEFAULT_FIELD_DELIMITER.equals(
config.get(FtpOptions.FIELD_DELIMITER))) {
Expand Down Expand Up @@ -146,6 +151,18 @@ public DynamicTableSink createDynamicTableSink(Context context) {
FtpOptions.FORMAT));

FtpConfig ftpConfig = getFtpConfByOptions(config);
String compressType = ftpConfig.getCompressType();
if (StringUtils.isNotEmpty(compressType)) {
// 在文件类型扩展名后面追加压缩类型扩展名
String fileType = ftpConfig.getFileType();
if (CompressType.GZIP.name().equalsIgnoreCase(compressType)) {
ftpConfig.setFileType(fileType + ".gz");
} else if (CompressType.BZIP2.name().equalsIgnoreCase(compressType)) {
ftpConfig.setFileType(fileType + ".bz2");
} else {
throw new UnsupportedTypeException("not support compress type: " + compressType);
}
}

return new FtpDynamicTableSink(resolvedSchema, ftpConfig, valueEncodingFormat);
}
Expand Down Expand Up @@ -181,6 +198,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(FtpOptions.FIELD_DELIMITER);
options.add(FtpOptions.COMPRESS_TYPE);
options.add(BaseFileOptions.NEXT_CHECK_ROWS);
options.add(BaseFileOptions.WRITE_MODE);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dtstack.chunjun.constants.ConstantValue;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.util.FileSystemUtil;
import com.dtstack.chunjun.util.GsonUtil;
import com.dtstack.chunjun.util.JsonUtil;
import com.dtstack.chunjun.util.Md5Util;

Expand Down Expand Up @@ -278,7 +279,10 @@ private static String loadFromSftp(
}
SftpHandler handler = null;
try {
handler = SftpHandler.getInstanceWithRetry(MapUtils.getMap(config, KEY_SFTP_CONF));
handler =
SftpHandler.getInstanceWithRetry(
GsonUtil.GSON.fromJson(
MapUtils.getString(config, KEY_SFTP_CONF), Map.class));
if (handler.isFileExist(filePathOnSftp)) {
handler.downloadFileWithRetry(filePathOnSftp, fileLocalPath);

Expand Down

0 comments on commit 2d5f489

Please sign in to comment.