diff --git a/README.md b/README.md
index ccd1228..3be1419 100644
--- a/README.md
+++ b/README.md
@@ -56,7 +56,6 @@ Supports all destinations and all Kinesis Firehose Features.
   * [Version 3.1.0](#version-310)
 * [License](#license)
 
-
 ## Module versioning rule
 
 | Module version | AWS Provider version |
@@ -95,6 +94,7 @@ Supports all destinations and all Kinesis Firehose Features.
 - Original Data Backup in S3
 - Logging and Encryption
 - Application Role to Direct Put Sources
+- Turn on/off cloudwatch logs decompressing and data message extraction
 - Permissions
   - IAM Roles
   - Opensearch / Opensearch Serverless Service Role
@@ -981,6 +981,8 @@ No modules.
 | <a name="input_elasticsearch_index_rotation_period"></a> [elasticsearch\_index\_rotation\_period](#input\_elasticsearch\_index\_rotation\_period) | The Elasticsearch index rotation period. Index rotation appends a timestamp to the IndexName to facilitate expiration of old data | `string` | `"OneDay"` | no |
 | <a name="input_elasticsearch_retry_duration"></a> [elasticsearch\_retry\_duration](#input\_elasticsearch\_retry\_duration) | The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt | `string` | `300` | no |
 | <a name="input_elasticsearch_type_name"></a> [elasticsearch\_type\_name](#input\_elasticsearch\_type\_name) | The Elasticsearch type name with maximum length of 100 characters | `string` | `null` | no |
+| <a name="input_enable_cloudwatch_logs_data_message_extraction"></a> [enable\_cloudwatch\_logs\_data\_message\_extraction](#input\_enable\_cloudwatch\_logs\_data\_message\_extraction) | Cloudwatch Logs data message extraction | `bool` | `false` | no |
+| <a name="input_enable_cloudwatch_logs_decompression"></a> [enable\_cloudwatch\_logs\_decompression](#input\_enable\_cloudwatch\_logs\_decompression) | Enables or disables Cloudwatch Logs decompression | `bool` | `false` | no |
 | <a name="input_enable_data_format_conversion"></a> [enable\_data\_format\_conversion](#input\_enable\_data\_format\_conversion) | Set it to true if you want to disable format conversion. | `bool` | `false` | no |
 | <a name="input_enable_destination_log"></a> [enable\_destination\_log](#input\_enable\_destination\_log) | The CloudWatch Logging Options for the delivery stream | `bool` | `true` | no |
 | <a name="input_enable_dynamic_partitioning"></a> [enable\_dynamic\_partitioning](#input\_enable\_dynamic\_partitioning) | Enables or disables dynamic partitioning | `bool` | `false` | no |
diff --git a/locals.tf b/locals.tf
index 573722d..b051d2a 100644
--- a/locals.tf
+++ b/locals.tf
@@ -29,7 +29,7 @@ locals {
   is_search_destination = contains(["elasticsearch", "opensearch", "opensearchserverless"], local.destination) ? true : false
 
   # Data Transformation
-  enable_processing = var.enable_lambda_transform || var.enable_dynamic_partitioning
+  enable_processing = var.enable_lambda_transform || var.enable_dynamic_partitioning || var.enable_cloudwatch_logs_decompression
   lambda_processor = var.enable_lambda_transform ? {
     type = "Lambda"
     parameters = [
@@ -97,11 +97,31 @@ locals {
   record_deaggregation_processor = (var.enable_dynamic_partitioning && var.dynamic_partition_enable_record_deaggregation ?
     (var.dynamic_partition_record_deaggregation_type == "JSON" ? local.record_deaggregation_processor_json : local.record_deaggregation_processor_delimiter)
   : null)
+  cloudwatch_logs_decompression_processor = var.enable_cloudwatch_logs_decompression ? {
+    type = "Decompression"
+    parameters = [
+      {
+        name  = "CompressionFormat"
+        value = "GZIP"
+      }
+    ]
+  } : null
+  cloudwatch_logs_data_message_extraction_processor = var.enable_cloudwatch_logs_decompression && var.enable_cloudwatch_logs_data_message_extraction ? {
+    type = "CloudWatchLogProcessing"
+    parameters = [
+      {
+        name  = "DataMessageExtraction"
+        value = tostring(var.enable_cloudwatch_logs_data_message_extraction)
+      },
+    ]
+  } : null
   processors = [for each in [
     local.lambda_processor,
     local.metadata_extractor_processor,
     local.append_delimiter_processor,
-    local.record_deaggregation_processor
+    local.record_deaggregation_processor,
+    local.cloudwatch_logs_decompression_processor,
+    local.cloudwatch_logs_data_message_extraction_processor
   ] : each if local.enable_processing && each != null]
 
   # Data Format conversion
diff --git a/variables.tf b/variables.tf
index b30e403..36988f8 100644
--- a/variables.tf
+++ b/variables.tf
@@ -433,6 +433,18 @@ variable "msk_source_connectivity_type" {
 ######
 # S3 Destination Configurations
 ######
+variable "enable_cloudwatch_logs_decompression" {
+  description = "Enables or disables Cloudwatch Logs decompression"
+  type        = bool
+  default     = false
+}
+
+variable "enable_cloudwatch_logs_data_message_extraction" {
+  description = "Cloudwatch Logs data message extraction"
+  type        = bool
+  default     = false
+}
+
 variable "enable_dynamic_partitioning" {
   description = "Enables or disables dynamic partitioning"
   type        = bool