Skip to content

Commit

Permalink
add serial version UID for sink/source function
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-zou committed Jun 19, 2024
1 parent 7a55e99 commit 682dc0d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 29 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ under the License.

<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.4.2</version>
<version>1.4.3</version>
<packaging>jar</packaging>

<distributionManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
*/
public class RedisSinkFunction<IN> extends RichSinkFunction<IN> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class);

protected Integer ttl;
Expand All @@ -70,13 +72,10 @@ public class RedisSinkFunction<IN> extends RichSinkFunction<IN> {
private RedisCommand redisCommand;

private FlinkConfigBase flinkConfigBase;
private RedisCommandsContainer redisCommandsContainer;
private transient RedisCommandsContainer redisCommandsContainer;

private final int maxRetryTimes;
private List<DataType> columnDataTypes;

private ReadableConfig readableConfig;

private RedisValueDataStructure redisValueDataStructure;

private String zremrangeby;
Expand Down Expand Up @@ -222,25 +221,32 @@ private RedisFuture sink(String[] params) {
this.redisCommandsContainer.zadd(
params[0], Double.parseDouble(params[1]), params[2]);
if (zremrangeby != null) {
redisFuture.whenComplete((ignore, throwable) -> {
try {
if (zremrangeby.equalsIgnoreCase(ZremType.SCORE.name())) {
Range<Double> range =
Range.create(Double.parseDouble(params[3]), Double.parseDouble(params[4]));
this.redisCommandsContainer.zremRangeByScore(params[0], range);
} else if (zremrangeby.equalsIgnoreCase(ZremType.LEX.name())) {
Range<String> range = Range.create(params[3], params[4]);
this.redisCommandsContainer.zremRangeByLex(params[0], range);
} else if (zremrangeby.equalsIgnoreCase(ZremType.RANK.name())) {
this.redisCommandsContainer.zremRangeByRank(params[0], Long.parseLong(params[3]),
Long.parseLong(params[4]));
} else {
LOG.warn("Unrecognized zrem type:{}", zremrangeby);
}
} catch (Exception e) {
LOG.error("{} zremRangeBy failed.", params[0], e);
}
});
redisFuture.whenComplete(
(ignore, throwable) -> {
try {
if (zremrangeby.equalsIgnoreCase(ZremType.SCORE.name())) {
Range<Double> range =
Range.create(
Double.parseDouble(params[3]),
Double.parseDouble(params[4]));
this.redisCommandsContainer.zremRangeByScore(
params[0], range);
} else if (zremrangeby.equalsIgnoreCase(ZremType.LEX.name())) {
Range<String> range = Range.create(params[3], params[4]);
this.redisCommandsContainer.zremRangeByLex(
params[0], range);
} else if (zremrangeby.equalsIgnoreCase(ZremType.RANK.name())) {
this.redisCommandsContainer.zremRangeByRank(
params[0],
Long.parseLong(params[3]),
Long.parseLong(params[4]));
} else {
LOG.warn("Unrecognized zrem type:{}", zremrangeby);
}
} catch (Exception e) {
LOG.error("{} zremRangeBy failed.", params[0], e);
}
});
}
break;
case ZINCRBY:
Expand Down Expand Up @@ -283,15 +289,13 @@ private RedisFuture sink(String[] params) {
hashField.put(params[i], params[++i]);
}
if (!this.setIfAbsent) {
redisFuture =
this.redisCommandsContainer.hmset(params[0], hashField);
redisFuture = this.redisCommandsContainer.hmset(params[0], hashField);
} else {
redisFuture = this.redisCommandsContainer.exists(params[0]);
redisFuture.whenComplete(
(exist, throwable) -> {
if (!(Boolean) exist) {
this.redisCommandsContainer.hmset(
params[0], hashField);
this.redisCommandsContainer.hmset(params[0], hashField);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@

public class RedisSourceFunction<T> extends RichSourceFunction<T> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class);

ReadableConfig readableConfig;

private FlinkConfigBase flinkConfigBase;
private RedisCommandsContainer redisCommandsContainer;
private transient RedisCommandsContainer redisCommandsContainer;

private final int maxRetryTimes;

Expand Down

0 comments on commit 682dc0d

Please sign in to comment.