Skip to content

Commit

Permalink
Fix catalog.include option
Browse files Browse the repository at this point in the history
  • Loading branch information
grzegorz8 committed Jan 29, 2024
1 parent 957ca04 commit 19c93c9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [Unreleased]

- Fix `catalog.include` option and add more logging.

## [0.1.1] - 2024-01-26

- Add `catalog.include` and `catalog.exclude` options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class ElasticJdbcCatalogFactoryOptions {
.withDescription("Comma-separated list of index/datastream exclude patterns.");

public static final ConfigOption<String> INCLUDE =
ConfigOptions.key("catalog.exclude")
ConfigOptions.key("catalog.include")
.stringType()
.noDefaultValue()
.withDescription("Comma-separated list of index/datastream include patterns.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.getindata.flink.connector.jdbc.catalog;

import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
Expand All @@ -11,6 +13,8 @@

class IndexFilterResolver {

private static final Logger LOG = LoggerFactory.getLogger(IndexFilterResolver.class);

static IndexFilterResolver acceptAll() {
return new IndexFilterResolver(emptyList(), emptyList());
}
Expand Down Expand Up @@ -40,12 +44,17 @@ private static List<Pattern> parseRaw(String commaSeparatedList) {
boolean isAccepted(String objectName) {
if (!includePatterns.isEmpty()) {
if (includePatterns.stream().noneMatch(pattern -> pattern.matcher(objectName).matches())) {
LOG.info("'{}' does not match any include pattern. Include patterns='{}'.", objectName, includePatterns);
return false;
}
}
if (!excludePatterns.isEmpty()) {
return excludePatterns.stream().noneMatch(pattern -> pattern.matcher(objectName).matches());
if (excludePatterns.stream().anyMatch(pattern -> pattern.matcher(objectName).matches())) {
LOG.info("'{}' matches exclude pattern; exclude patterns='{}'.", objectName, excludePatterns);
return false;
}
}
LOG.info("'{}' matches include pattern and does not match any exclude pattern.", objectName);
return true;
}

Expand Down

0 comments on commit 19c93c9

Please sign in to comment.