Skip to content

Commit

Permalink
Merge pull request #56 from 7hong/main
Browse files Browse the repository at this point in the history
support zremrangeby
  • Loading branch information
jeff-zou authored May 14, 2024
2 parents 22b7246 + 0c53bab commit 7a55e99
Show file tree
Hide file tree
Showing 10 changed files with 331 additions and 2 deletions.
1 change: 1 addition & 0 deletions README-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ key: name, field:subject, value: name\01subject\01score.
| scan.range.start | (none) | Integer | lrange start |
| scan.range.stop | (none) | Integer | lrange start |
| scan.count | (none) | Integer | srandmember count |
| zset.zremrangeby | (none) | String | After executing zadd, whether to execute zremrangeby,Valid values are:SCORE、LEX、RANK |


##### sink with ttl parameters
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ on j.name = 'test'
| scan.range.start | (none) | Integer | 查询list结构时指定lrange start |
| scan.range.stop | (none) | Integer | 查询list结构时指定lrange start |
| scan.count | (none) | Integer | 查询set结构时指定srandmember count |
| zset.zremrangeby | (none) | String | 执行zadd之后,是否执行zremrangeby,取值:SCORE、LEX、RANK |

### 3.1.1 command值与redis命令对应关系:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,10 @@ private RedisOptions() {
.intType()
.defaultValue(null)
.withDescription("Optional set count for srandmember query");

public static final ConfigOption<String> ZREM_RANGEBY =
ConfigOptions.key("zset.zremrangeby")
.stringType()
.defaultValue(null)
.withDescription("Remove related elements,Valid values: LEX,RANK,SCORE");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.redis.config;

/** rem type for sorted set**/
public enum ZremType {

SCORE,
RANK,
LEX,
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.lettuce.core.Range;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
Expand Down Expand Up @@ -276,6 +277,51 @@ public RedisFuture<Long> zrem(final String key, final String element) {
}
}

@Override
public RedisFuture<Long> zremRangeByScore(String key, Range<Double> range) {
try {
return clusterAsyncCommands.zremrangebyscore(key, range);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(
"Cannot send Redis message with command zremrangebyscore to set {} error message {}",
key,
e.getMessage());
}
throw e;
}
}

@Override
public RedisFuture<Long> zremRangeByLex(String key, Range<String> range) {
try {
return clusterAsyncCommands.zremrangebylex(key, range);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(
"Cannot send Redis message with command zremrangebylex to set {} error message {}",
key,
e.getMessage());
}
throw e;
}
}

@Override
public RedisFuture<Long> zremRangeByRank(String key, long start, long stop) {
try {
return clusterAsyncCommands.zremrangebyrank(key, start, stop);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(
"Cannot send Redis message with command zremrangebyrank to set {} error message {}",
key,
e.getMessage());
}
throw e;
}
}

@Override
public RedisFuture<Long> incrBy(String key, long value) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.connectors.redis.container;

import io.lettuce.core.Range;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;

Expand Down Expand Up @@ -154,6 +155,31 @@ public interface RedisCommandsContainer extends Serializable {
*/
RedisFuture<Long> zrem(String key, String element);

/**
* Remove members from a specified score range
* @param key
* @param range
* @return
*/
RedisFuture<Long> zremRangeByScore(String key, Range<Double> range);

/**
* Remove members from a specified lex range
* @param key
* @param range
* @return
*/
RedisFuture<Long> zremRangeByLex(String key, Range<String> range);

/**
* Remove members from a specified rank
* @param key
* @param start
* @param stop
* @return
*/
RedisFuture<Long> zremRangeByRank(String key, long start, long stop);

/**
* increase value to specified key.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.lettuce.core.Range;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
Expand Down Expand Up @@ -278,6 +279,51 @@ public RedisFuture<Long> zrem(final String key, final String element) {
}
}

@Override
public RedisFuture<Long> zremRangeByScore(String key, Range<Double> range) {
try {
return asyncCommands.zremrangebyscore(key, range);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(
"Cannot send Redis message with command zremrangebyscore to set {} error message {}",
key,
e.getMessage());
}
throw e;
}
}

@Override
public RedisFuture<Long> zremRangeByLex(String key, Range<String> range) {
try {
return asyncCommands.zremrangebylex(key, range);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(
"Cannot send Redis message with command zremrangebylex to set {} error message {}",
key,
e.getMessage());
}
throw e;
}
}

@Override
public RedisFuture<Long> zremRangeByRank(String key, long start, long stop) {
try {
return asyncCommands.zremrangebyrank(key, start, stop);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error(
"Cannot send Redis message with command zremrangebyrank to set {} error message {}",
key,
e.getMessage());
}
throw e;
}
}

@Override
public RedisFuture<Long> incrBy(String key, long value) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(RedisOptions.SCAN_RANGE_STOP);
options.add(RedisOptions.SCAN_RANGE_START);
options.add(RedisOptions.SCAN_COUNT);
options.add(RedisOptions.ZREM_RANGEBY);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.connectors.redis.config.FlinkConfigBase;
import org.apache.flink.streaming.connectors.redis.config.RedisOptions;
import org.apache.flink.streaming.connectors.redis.config.RedisValueDataStructure;
import org.apache.flink.streaming.connectors.redis.config.ZremType;
import org.apache.flink.streaming.connectors.redis.container.RedisCommandsContainer;
import org.apache.flink.streaming.connectors.redis.container.RedisCommandsContainerBuilder;
import org.apache.flink.streaming.connectors.redis.converter.RedisRowConverter;
Expand All @@ -40,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.lettuce.core.Range;
import io.lettuce.core.RedisFuture;

import java.io.IOException;
Expand Down Expand Up @@ -77,6 +79,8 @@ public class RedisSinkFunction<IN> extends RichSinkFunction<IN> {

private RedisValueDataStructure redisValueDataStructure;

private String zremrangeby;

/**
* Creates a new {@link RedisSinkFunction} that connects to the Redis server.
*
Expand Down Expand Up @@ -110,6 +114,7 @@ public RedisSinkFunction(

this.columnDataTypes = resolvedSchema.getColumnDataTypes();
this.redisValueDataStructure = readableConfig.get(RedisOptions.VALUE_DATA_STRUCTURE);
this.zremrangeby = readableConfig.get(RedisOptions.ZREM_RANGEBY);
}

/**
Expand Down Expand Up @@ -215,7 +220,28 @@ private RedisFuture sink(String[] params) {
case ZADD:
redisFuture =
this.redisCommandsContainer.zadd(
params[0], Double.valueOf(params[1]), params[2]);
params[0], Double.parseDouble(params[1]), params[2]);
if (zremrangeby != null) {
redisFuture.whenComplete((ignore, throwable) -> {
try {
if (zremrangeby.equalsIgnoreCase(ZremType.SCORE.name())) {
Range<Double> range =
Range.create(Double.parseDouble(params[3]), Double.parseDouble(params[4]));
this.redisCommandsContainer.zremRangeByScore(params[0], range);
} else if (zremrangeby.equalsIgnoreCase(ZremType.LEX.name())) {
Range<String> range = Range.create(params[3], params[4]);
this.redisCommandsContainer.zremRangeByLex(params[0], range);
} else if (zremrangeby.equalsIgnoreCase(ZremType.RANK.name())) {
this.redisCommandsContainer.zremRangeByRank(params[0], Long.parseLong(params[3]),
Long.parseLong(params[4]));
} else {
LOG.warn("Unrecognized zrem type:{}", zremrangeby);
}
} catch (Exception e) {
LOG.error("{} zremRangeBy failed.", params[0], e);
}
});
}
break;
case ZINCRBY:
redisFuture =
Expand Down Expand Up @@ -428,7 +454,9 @@ private int calcParamNumByCommand(int rowDataNum) {
return 1;
}

if (redisCommand.getInsertCommand() == RedisInsertCommand.HSET
if (redisCommand.getInsertCommand() == RedisInsertCommand.ZADD && zremrangeby != null) {
return 5;
} else if (redisCommand.getInsertCommand() == RedisInsertCommand.HSET
|| redisCommand.getInsertCommand() == RedisInsertCommand.ZADD
|| redisCommand.getInsertCommand() == RedisInsertCommand.HINCRBY
|| redisCommand.getInsertCommand() == RedisInsertCommand.HINCRBYFLOAT
Expand Down
Loading

0 comments on commit 7a55e99

Please sign in to comment.