From 942ab185e0500c211af1a0afad9f82bfdca18ed4 Mon Sep 17 00:00:00 2001 From: jeff-zou Date: Tue, 5 Nov 2024 10:11:30 +0800 Subject: [PATCH] close connection with join --- README-en.md | 6 +++--- README.md | 6 +++--- .../container/RedisClusterContainer.java | 15 ++++---------- .../redis/container/RedisContainer.java | 20 ++++++------------- 4 files changed, 16 insertions(+), 31 deletions(-) diff --git a/README-en.md b/README-en.md index 5a4723a..257ff41 100644 --- a/README-en.md +++ b/README-en.md @@ -45,8 +45,8 @@ After executing mvn package on the command line, import the generated package fl
- The project depends on Lettuce(6.2.1) and netty-transport-native-epoll(4.1.82.Final),flink-connection-redis-1.4.2.jar if these packages are available. -Otherwise, use flink-connector-redis-1.4.2-jar-with-dependencies.jar. + The project depends on Lettuce(6.2.1) and netty-transport-native-epoll(4.1.82.Final),flink-connection-redis-1.4.3.jar if these packages are available. +Otherwise, use flink-connector-redis-1.4.3-jar-with-dependencies.jar.
Development environment engineering direct reference: @@ -55,7 +55,7 @@ Development environment engineering direct reference: io.github.jeff-zou flink-connector-redis - 1.4.2 + 1.4.3 diff --git a/README.md b/README.md index a908d3c..ed30a9e 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,8 @@ # 2 使用方法: ## 2.1 工程直接引用 -项目依赖Lettuce(6.2.1)及netty-transport-native-epoll(4.1.82.Final),如flink环境有这两个包,则使用flink-connector-redis-1.4.2.jar, -否则使用flink-connector-redis-1.4.2-jar-with-dependencies.jar。 +项目依赖Lettuce(6.2.1)及netty-transport-native-epoll(4.1.82.Final),如flink环境有这两个包,则使用flink-connector-redis-1.4.3.jar, +否则使用flink-connector-redis-1.4.3-jar-with-dependencies.jar。
``` @@ -31,7 +31,7 @@ flink-connector-redis - 1.4.2 + 1.4.3 ``` ## 2.2 自行打包 diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/container/RedisClusterContainer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/container/RedisClusterContainer.java index b52d92e..d9810a2 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/container/RedisClusterContainer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/container/RedisClusterContainer.java @@ -18,9 +18,6 @@ package org.apache.flink.streaming.connectors.redis.container; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.lettuce.core.Range; import io.lettuce.core.RedisFuture; import io.lettuce.core.cluster.RedisClusterClient; @@ -28,12 +25,14 @@ import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; /** Redis command container if we want to connect to a Redis cluster. */ public class RedisClusterContainer implements RedisCommandsContainer, Closeable { @@ -67,13 +66,7 @@ public void open() { /** Closes the {@link RedisClusterClient}. */ @Override public void close() { - try { - CompletableFuture completableFuture = this.connection.closeAsync(); - completableFuture.get(); - LOG.info("close async connection success!"); - } catch (Exception e) { - LOG.error("close async connection error!", e); - } + this.connection.close(); this.redisClusterClient.shutdown(); } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/container/RedisContainer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/container/RedisContainer.java index 39ed697..2ea07a9 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/container/RedisContainer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/container/RedisContainer.java @@ -18,9 +18,6 @@ package org.apache.flink.streaming.connectors.redis.container; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.lettuce.core.Range; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; @@ -28,10 +25,12 @@ import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; /** * Redis command container if we want to connect to a single Redis server or to Redis sentinels If @@ -43,8 +42,7 @@ public class RedisContainer implements RedisCommandsContainer, Closeable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); - - private transient RedisClient redisClient; + private final transient RedisClient redisClient; protected transient StatefulRedisConnection connection; protected transient RedisAsyncCommands asyncCommands; @@ -60,14 +58,8 @@ public RedisContainer(RedisClient redisClient) { /** Closes the redisClient instances. */ @Override public void close() { - try { - CompletableFuture completableFuture = connection.closeAsync(); - completableFuture.get(); - LOG.info("close async connection success!"); - } catch (Exception e) { - LOG.info("close async connection error!", e); - } - redisClient.shutdown(); + this.connection.close(); + this.redisClient.shutdown(); } @Override