Skip to content

Commit

Permalink
fix(service_integration): type clickhouse_kafka crashes (#2052)
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov authored Feb 17, 2025
1 parent 109a2f4 commit b331713
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ nav_order: 1

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Fix `aiven_service_integration` type `clickhouse_kafka` crashes when a table is removed

## [4.34.2] - 2025-02-13

- Fix `ip_filter`, suppress plan for the new `ip_filter` default value (`::/0`) when a new service is created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func TestAccAivenServiceIntegration_datadog_with_user_config_creates(t *testing.
})
}

func testAccServiceIntegrationClickhouseKafkaUserConfig(prefix, project string) string {
func testAccServiceIntegrationClickhouseKafkaUserConfig(prefix, project, tables string) string {
return fmt.Sprintf(`
data "aiven_project" "project" {
project = %[2]q
Expand All @@ -588,6 +588,11 @@ resource "aiven_kafka" "kafka" {
service_name = "%[1]s-kafka"
maintenance_window_dow = "monday"
maintenance_window_time = "10:00:00"
kafka_user_config {
// Enables Kafka Schemas
schema_registry = true
}
}
resource "aiven_clickhouse" "clickhouse" {
Expand All @@ -604,8 +609,27 @@ resource "aiven_service_integration" "clickhouse_kafka_source" {
integration_type = "clickhouse_kafka"
source_service_name = aiven_kafka.kafka.service_name
destination_service_name = aiven_clickhouse.clickhouse.service_name
clickhouse_kafka_user_config {
dynamic "tables" {
for_each = %[3]s
content {
data_format = "AvroConfluent"
name = tables.key
group_name = tables.key
num_consumers = 1
topics {
name = tables.key
}
columns {
name = tables.key
type = "LowCardinality(String)"
}
}
}
}
}
`, prefix, project)
`, prefix, project, tables)
}

func TestAccAivenServiceIntegration_clickhouse_kafka_user_config_creates(t *testing.T) {
Expand All @@ -618,12 +642,27 @@ func TestAccAivenServiceIntegration_clickhouse_kafka_user_config_creates(t *test
CheckDestroy: testAccCheckAivenServiceIntegrationResourceDestroy,
Steps: []resource.TestStep{
{
Config: testAccServiceIntegrationClickhouseKafkaUserConfig(prefix, project),
Config: testAccServiceIntegrationClickhouseKafkaUserConfig(prefix, project, "[]"),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "integration_type", "clickhouse_kafka"),
resource.TestCheckResourceAttr(resourceName, "project", project),
resource.TestCheckResourceAttr(resourceName, "source_service_name", prefix+"-kafka"),
resource.TestCheckResourceAttr(resourceName, "destination_service_name", prefix+"-clickhouse"),
resource.TestCheckResourceAttr(resourceName, "clickhouse_kafka_user_config.0.tables.#", "0"),
),
},
{
// Tests table removal: adds three
Config: testAccServiceIntegrationClickhouseKafkaUserConfig(prefix, project, `["foo", "bar", "baz"]`),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "clickhouse_kafka_user_config.0.tables.#", "3"),
),
},
{
// Tests table removal: leaves one
Config: testAccServiceIntegrationClickhouseKafkaUserConfig(prefix, project, `["bar"]`),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "clickhouse_kafka_user_config.0.tables.#", "1"),
),
},
},
Expand Down
49 changes: 29 additions & 20 deletions internal/sdkprovider/userconfig/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package diff

import (
"regexp"
"strconv"
"strings"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand All @@ -27,11 +28,24 @@ func SuppressUnchanged(k, oldValue, newValue string, d *schema.ResourceData) boo
return oldValue == newValue
}

// Lists, sets and objects (object is networks with one item).
// SuppressUnchanged is applied to each nested field.
// Ip filter items handled with a special suppressor.
if reIsIPFilterStrings.MatchString(k) {
return suppressIPFilterSet(k, oldValue, newValue, d)
}

// Doesn't suppress "set" items.
if reIsSetElement.MatchString(k) {
// A hash mostly calculated for the whole object.
return false
}

// Lists, sets and objects (object is list with one item).
// But never a set of nested objects: it is impossible to find an object in a set by its hash.
if strings.HasSuffix(k, ".#") {
if d.HasChange(k) {
// By some reason terraform might mark objects as "changed".
// In that case, terraform returns a networks with a nil value.
// In that case, terraform returns a list with a nil value.
// "nil" means that the object has no value.
key := strings.TrimSuffix(k, ".#")
v, ok := d.Get(key).([]any)
Expand All @@ -40,24 +54,13 @@ func SuppressUnchanged(k, oldValue, newValue string, d *schema.ResourceData) boo

// Suppresses object diff, because it might have been received as default value from the API.
// So the diff happens when the object's field is changed.
// Object is a networks, and both set and networks end with "#".
// So set == networks == object (by type).
// Object is a list, and both set and list end with "#".
// So set == list == object (by type).
// A set of objects is different.
// Because hash is calculated for the whole object, not per field.
return !isObjectSet(k, d)
}

// SuppressUnchanged is applied to each nested field.
// Ip filter items handled with a special suppressor.
if reIsIPFilterStrings.MatchString(k) {
return suppressIPFilterSet(k, oldValue, newValue, d)
}

// Doesn't suppress "set" items.
if reIsSetElement.MatchString(k) {
return false
}

// Object properties.
// "oldValue" — is something read from API
// "newValue" — is what is read from tf file
Expand All @@ -75,7 +78,8 @@ func SuppressUnchanged(k, oldValue, newValue string, d *schema.ResourceData) boo
return false
}

// suppressIPFilterSet ip_filter networks has specific logic
// suppressIPFilterSet ip_filter list has specific logic
// Doesn't support ip_filter_object.
// By default services have a non-empty ip_filter networks: either ["0.0.0.0/0"] or ["0.0.0.0/0", "::/0"]
// Suppress the diff when user didn't define the ip_filter networks in the config but got it from the API:
// - ip_filter = [0.0.0.0/0, ::/0] -- suppress
Expand Down Expand Up @@ -105,20 +109,25 @@ func suppressIPFilterSet(k, _, _ string, d *schema.ResourceData) bool {
return IsDefaultIPFilterList(set.List())
}

// isObjectSet returns true if given k is for collection of objects
// isObjectSet returns true if the given k is a set, and its elements are objects
// Doesn't support sets with nested objects!
// Terraform fails to calculate hash for nested objects, and can simply not detect changes.
func isObjectSet(k string, d *schema.ResourceData) bool {
path := strings.Split(strings.TrimSuffix(k, ".#"), ".")
value := d.GetRawState().AsValueMap()[path[0]]

// user_config field
path = path[1:]

// Drills down the field
// Drills down the field: foo_user_config.0.ip_filter_object
for _, v := range path {
if v == "0" {
value = value.AsValueSlice()[0]
// When mets ".0." it is an index of list/set.
index, err := strconv.Atoi(v)
if err == nil {
value = value.AsValueSlice()[index]
continue
}

value = value.GetAttr(v)
}

Expand Down

0 comments on commit b331713

Please sign in to comment.