Skip to content

Commit

Permalink
Merge branch 'release-1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
yang69 committed Nov 22, 2018
2 parents ef92f7f + d58611e commit b92747b
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 1 deletion.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Flink Connector Redis

forked from org.apache.bahir:flink-connector-redis_2.11:1.0

test on JDK8, flink 1.4.2

```
<dependency>
<groupId>com.github.yang69</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
```

# Change Log | 更改日志

1.0 add support for SETEX, as a result, you will need to implement getSecondsFromData(), which returns the expire seconds for the key.

1.0 增加了对 SETEX 的支持,需要实现 getSecondsFromData() 方法,该方法返回期望设置的过期时间(单位为秒)
30 changes: 29 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,37 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.sonatype.oss</groupId>
<artifactId>oss-parent</artifactId>
<version>7</version>
</parent>

<groupId>com.github.yang69</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>0.1-SNAPSHOT</version>
<version>1.0</version>

<licenses>
<license>
<name>Apache 2.0 License</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
<distribution>repo</distribution>
</license>
</licenses>

<scm>
<url>https://github.com/yang69/flink-connector-redis_2.11</url>
<connection>git@github.com:yang69/flink-connector-redis_2.11.git</connection>
<developerConnection>https://github.com/yang69</developerConnection>
</scm>

<developers>
<developer>
<name>yang69</name>
<email>seuYangYang@gmail.com</email>
<url>https://github.com/yang69</url>
</developer>
</developers>


<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
* public String getKeyFromData(Tuple2<String, String> data) {
* return data.f0;
* }
* public int getSecondsFromData(Tuple2<String, String> data) {
* return 120;
* }
* public String getValueFromData(Tuple2<String, String> data) {
* return data.f1;
* }
Expand Down Expand Up @@ -127,6 +130,7 @@ public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redi
@Override
public void invoke(IN input) throws Exception {
String key = redisSinkMapper.getKeyFromData(input);
int seconds = redisSinkMapper.getSecondsFromData(input);
String value = redisSinkMapper.getValueFromData(input);

switch (redisCommand) {
Expand All @@ -142,6 +146,9 @@ public void invoke(IN input) throws Exception {
case SET:
this.redisCommandsContainer.set(key, value);
break;
case SETEX:
this.redisCommandsContainer.setex(key, seconds, value);
break;
case PFADD:
this.redisCommandsContainer.pfadd(key, value);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ public void set(final String key, final String value) {
}
}

@Override
public void setex(final String key, final int seconds, final String value) {
try {
jedisCluster.setex(key, seconds, value);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command SETEX to key {} with expire {} error message {}",
key, seconds, e.getMessage());
}
throw e;
}
}

@Override
public void pfadd(final String key, final String element) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ public interface RedisCommandsContainer extends Serializable {
*/
void set(String key, String value);

/**
* Set key to hold the string value. If key already holds a value, it is overwritten,
* regardless of its type. Any previous time to live associated with the key is
* discarded on successful SETEX operation.
*
* @param key the key name in which value to be set
* @param seconds the key expire time in seconds
* @param value the value
*/
void setex(String key, int seconds, String value);

/**
* Adds all the element arguments to the HyperLogLog data structure
* stored at the variable name specified as first argument.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ public void set(final String key, final String value) {
}
}

@Override
public void setex(final String key, final int seconds, final String value) {
Jedis jedis = null;
try {
jedis = getInstance();
jedis.setex(key, seconds, value);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Cannot send Redis message with command SETEX to key {} with expire {} error message {}",
key, seconds, e.getMessage());
}
throw e;
} finally {
releaseInstance(jedis);
}
}

@Override
public void pfadd(final String key, final String element) {
Jedis jedis = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public enum RedisCommand {
*/
SET(RedisDataType.STRING),

/**
* Set key with expire time to hold the string value. If key already holds a value,
* it is overwritten, regardless of its type.
* 将value设置到key中,同时设置key的过期时间。如果key已存在,则会被覆盖。
*/
SETEX(RedisDataType.STRING),

/**
* Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
* public String getKeyFromData(Tuple2<String, String> data) {
* return data.f0;
* }
* public int getSecondsFromData(Tuple2<String, String> data) {
* return 120;
* }
* public String getValueFromData(Tuple2<String, String> data) {
* return data.f1;
* }
Expand All @@ -56,6 +59,14 @@ public interface RedisMapper<T> extends Function, Serializable {
*/
String getKeyFromData(T data);

/**
* Extracts expire seconds from data.
*
* @param data source data
* @return seconds
*/
int getSecondsFromData(T data);

/**
* Extracts value from data.
*
Expand Down

0 comments on commit b92747b

Please sign in to comment.