diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 7f5e24e7d51a2..5fffe83c6a788 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -43,9 +43,9 @@ use serde_derive::Deserialize; use url::Url; use with_options::WithOptions; -use self::prometheus::base_file_metrics_writer::BaseFileWriterWithMetricsBuilder; -use self::prometheus::partition_metrics_writer::FanoutPartitionedWriterWithMetricsBuilder; -use self::prometheus::position_delete_metrics_writer::PositionDeleteWriterWithMetricsBuilder; +use self::prometheus::monitor_base_file_writer::BaseFileWriterWithMetricsBuilder; +use self::prometheus::monitor_partition_writer::FanoutPartitionedWriterWithMetricsBuilder; +use self::prometheus::monitor_position_delete_writer::PositionDeleteWriterWithMetricsBuilder; use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; @@ -449,12 +449,12 @@ impl IcebergWriter { .iceberg_position_delete_cache_num .clone(), ); - let euality_delete_builder = + let equality_delete_builder = builder_helper.equality_delete_writer_builder(unique_column_ids.clone(), 0)?; let delta_builder = EqualityDeltaWriterBuilder::new( data_file_builder, position_delete_builder, - euality_delete_builder, + equality_delete_builder, unique_column_ids, ); let partition_delta_builder = FanoutPartitionedWriterWithMetricsBuilder::new( diff --git a/src/connector/src/sink/iceberg/prometheus/mod.rs b/src/connector/src/sink/iceberg/prometheus/mod.rs index 298c549e5b62f..d91cb12acaf53 100644 --- a/src/connector/src/sink/iceberg/prometheus/mod.rs +++ b/src/connector/src/sink/iceberg/prometheus/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod base_file_metrics_writer; -pub mod partition_metrics_writer; -pub mod position_delete_metrics_writer; -pub mod write_metrics_writer; +pub mod monitor_base_file_writer; +pub mod monitor_partition_writer; +pub mod monitor_position_delete_writer; +pub mod monitor_write_writer; diff --git a/src/connector/src/sink/iceberg/prometheus/base_file_metrics_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitor_base_file_writer.rs similarity index 100% rename from src/connector/src/sink/iceberg/prometheus/base_file_metrics_writer.rs rename to src/connector/src/sink/iceberg/prometheus/monitor_base_file_writer.rs diff --git a/src/connector/src/sink/iceberg/prometheus/partition_metrics_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitor_partition_writer.rs similarity index 100% rename from src/connector/src/sink/iceberg/prometheus/partition_metrics_writer.rs rename to src/connector/src/sink/iceberg/prometheus/monitor_partition_writer.rs diff --git a/src/connector/src/sink/iceberg/prometheus/position_delete_metrics_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitor_position_delete_writer.rs similarity index 100% rename from src/connector/src/sink/iceberg/prometheus/position_delete_metrics_writer.rs rename to src/connector/src/sink/iceberg/prometheus/monitor_position_delete_writer.rs diff --git a/src/connector/src/sink/iceberg/prometheus/write_metrics_writer.rs b/src/connector/src/sink/iceberg/prometheus/monitor_write_writer.rs similarity index 97% rename from src/connector/src/sink/iceberg/prometheus/write_metrics_writer.rs rename to src/connector/src/sink/iceberg/prometheus/monitor_write_writer.rs index e38e7d12507b8..6610bb785dd6f 100644 --- a/src/connector/src/sink/iceberg/prometheus/write_metrics_writer.rs +++ b/src/connector/src/sink/iceberg/prometheus/monitor_write_writer.rs @@ -67,7 +67,7 @@ impl IcebergWriter for WriteMetricsWriter { async fn write(&mut self, record: RecordBatch) -> Result<()> { self.write_qps.inc(); - let _ = self.write_latency.start_timer(); + let _timer = self.write_latency.start_timer(); self.appender.write(record).await }