From 682dc0d0def357ad8962a42a0610bb781c6b53f1 Mon Sep 17 00:00:00 2001 From: jeff-zou Date: Wed, 19 Jun 2024 16:43:07 +0800 Subject: [PATCH] add serial version UID for sink/source function --- pom.xml | 2 +- .../redis/table/RedisSinkFunction.java | 58 ++++++++++--------- .../redis/table/RedisSourceFunction.java | 4 +- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/pom.xml b/pom.xml index b48f99c..2612934 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ under the License. io.github.jeff-zou flink-connector-redis - 1.4.2 + 1.4.3 jar diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java index 0893429..34274bc 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSinkFunction.java @@ -56,6 +56,8 @@ */ public class RedisSinkFunction extends RichSinkFunction { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(RedisSinkFunction.class); protected Integer ttl; @@ -70,13 +72,10 @@ public class RedisSinkFunction extends RichSinkFunction { private RedisCommand redisCommand; private FlinkConfigBase flinkConfigBase; - private RedisCommandsContainer redisCommandsContainer; + private transient RedisCommandsContainer redisCommandsContainer; private final int maxRetryTimes; private List columnDataTypes; - - private ReadableConfig readableConfig; - private RedisValueDataStructure redisValueDataStructure; private String zremrangeby; @@ -222,25 +221,32 @@ private RedisFuture sink(String[] params) { this.redisCommandsContainer.zadd( params[0], Double.parseDouble(params[1]), params[2]); if (zremrangeby != null) { - redisFuture.whenComplete((ignore, throwable) -> { - try { - if (zremrangeby.equalsIgnoreCase(ZremType.SCORE.name())) { - Range range = - Range.create(Double.parseDouble(params[3]), Double.parseDouble(params[4])); - this.redisCommandsContainer.zremRangeByScore(params[0], range); - } else if (zremrangeby.equalsIgnoreCase(ZremType.LEX.name())) { - Range range = Range.create(params[3], params[4]); - this.redisCommandsContainer.zremRangeByLex(params[0], range); - } else if (zremrangeby.equalsIgnoreCase(ZremType.RANK.name())) { - this.redisCommandsContainer.zremRangeByRank(params[0], Long.parseLong(params[3]), - Long.parseLong(params[4])); - } else { - LOG.warn("Unrecognized zrem type:{}", zremrangeby); - } - } catch (Exception e) { - LOG.error("{} zremRangeBy failed.", params[0], e); - } - }); + redisFuture.whenComplete( + (ignore, throwable) -> { + try { + if (zremrangeby.equalsIgnoreCase(ZremType.SCORE.name())) { + Range range = + Range.create( + Double.parseDouble(params[3]), + Double.parseDouble(params[4])); + this.redisCommandsContainer.zremRangeByScore( + params[0], range); + } else if (zremrangeby.equalsIgnoreCase(ZremType.LEX.name())) { + Range range = Range.create(params[3], params[4]); + this.redisCommandsContainer.zremRangeByLex( + params[0], range); + } else if (zremrangeby.equalsIgnoreCase(ZremType.RANK.name())) { + this.redisCommandsContainer.zremRangeByRank( + params[0], + Long.parseLong(params[3]), + Long.parseLong(params[4])); + } else { + LOG.warn("Unrecognized zrem type:{}", zremrangeby); + } + } catch (Exception e) { + LOG.error("{} zremRangeBy failed.", params[0], e); + } + }); } break; case ZINCRBY: @@ -283,15 +289,13 @@ private RedisFuture sink(String[] params) { hashField.put(params[i], params[++i]); } if (!this.setIfAbsent) { - redisFuture = - this.redisCommandsContainer.hmset(params[0], hashField); + redisFuture = this.redisCommandsContainer.hmset(params[0], hashField); } else { redisFuture = this.redisCommandsContainer.exists(params[0]); redisFuture.whenComplete( (exist, throwable) -> { if (!(Boolean) exist) { - this.redisCommandsContainer.hmset( - params[0], hashField); + this.redisCommandsContainer.hmset(params[0], hashField); } }); } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSourceFunction.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSourceFunction.java index c1e1ec0..8c5f7bc 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSourceFunction.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisSourceFunction.java @@ -42,12 +42,14 @@ public class RedisSourceFunction extends RichSourceFunction { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(RedisSourceFunction.class); ReadableConfig readableConfig; private FlinkConfigBase flinkConfigBase; - private RedisCommandsContainer redisCommandsContainer; + private transient RedisCommandsContainer redisCommandsContainer; private final int maxRetryTimes;