Skip to content

Commit

Permalink
Merge pull request #1029 from oracle/teventq-kafka-cleanup
Browse files Browse the repository at this point in the history
Cleanup Kafka chapter
  • Loading branch information
anders-swanson authored Jan 10, 2025
2 parents ff0b480 + f26bb2a commit 521a68a
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSeria
Producer<String, String> okafkaProducer = new KafkaProducer<>(props);
```

The following Java class produces a stream of messages to a topic, using the [Kafka Java Client for Oracle Database Transactional Event Queues](https://github.com/oracle/okafka). Note that the implementation does not use any Oracle-specific classes, only Kafka interfaces. This allows developers to drop in a org.oracle.okafka.clients.producer.KafkaProducer instance without code changes.
The following Java class produces a stream of messages to a topic, using the [Kafka Java Client for Oracle Database Transactional Event Queues](https://github.com/oracle/okafka). Note that the implementation does not use any Oracle-specific classes, only Kafka interfaces. This allows developers to drop in an org.oracle.okafka.clients.producer.KafkaProducer instance without code changes.

```java
import java.util.stream.Stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ When using Oracle Database Transactional Event Queues with Kafka APIs, the datab

Using your database as a message broker allows you to avoid separate, costly servers dedicated to event streaming. These servers typically require domain-specific knowledge to operate, maintain, and upgrade in production deployments.

With a database message broker, your messaging data is co-located with your other data and remains queryable with SQL. This reduces network traffic and data duplication across multiple servers (and their associated costs), while benefiting applications that need access to both event streaming data and its related data.
With a database message broker, your messaging data is co-located with your other data and remains queryable with SQL. This reduces network traffic and data duplication across multiple servers (and their associated costs), while benefiting applications that need access to both event streaming data and its table-based data.

## Topics, Producers, and Consumers

A topic is a logical channel for message streams, capable of high-throughput messaging. _Producers_ write data to topics, producing messages. _Consumers_ subscribe to topics and poll message data. Each consumer is part of a consumer group, which is a logical grouping of consumers, their subscriptions, and assignments.

With Oracle Database Transactional Event Queues, each topic is backed by a queue table, allowing [transactional messaging](./transactional-messaging.md) and query capabilities. For example, you can query the first five messages from a topic named `my_topic` directly with SQL:
With Oracle Database Transactional Event Queues, each topic is backed by a queue table, allowing [transactional messaging](./transactional-messaging.md) and query capabilities. For example, you can query five messages from a topic named `my_topic` directly with SQL:

```sql
select * from my_topic
Expand All @@ -37,7 +37,7 @@ When using Kafka APIs for Transactional Event Queues, you may also run database

## Partitions and Ordering

Topics are divided into one or more _partitions_, where each partition is backed by a Transactional Event Queue shard. A partition represents an ordered event stream within the topic.
Topics are divided into one or more _partitions_, where each partition is backed by a Transactional Event Queue event stream. A partition represents an ordered event stream within the topic.

Partitions enable parallel message consumption, as multiple consumers in the same consumer group can concurrently poll from the topic. Consumers are assigned one or more partitions depending on the size of the consumer group. Each partition, however, may be assigned to at most one consumer per group. For example, a topic with three partitions can have at most three active consumers per consumer group.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ The configured `Properties` objects are passed to Kafka Java Client for Oracle D

#### Configuring Plaintext Authentication

`PLAINTEXT` authentication uses a `ojdbc.properties` file to supply the database username and password to the Kafka Java client. Create a file named `ojdbc.properties` on your system, and populate it with your database username and password:
`PLAINTEXT` authentication uses an `ojdbc.properties` file to supply the database username and password to the Kafka Java client. Create a file named `ojdbc.properties` on your system, and populate it with your database username and password:

```
user = <database username>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ This section introduces Kafka connectors to connect Oracle Database Transactiona

The [Kafka connectors for Oracle Database Transactional Event Queues](https://github.com/oracle/okafka/tree/master/connectors) provide the capability to sync message data to/from Kafka topics.

The Sink Connector reads from Kafka and publishes messages to Oracle Database Transactional Event Queues. The Source Connector that reads from an Oracle Database Transactional Event Queues topic and publishes messages to a Kafka topic.
The Sink Connector reads from Kafka and publishes messages to Oracle Database Transactional Event Queues. The Source Connector reads from an Oracle Database Transactional Event Queues topic and publishes messages to a Kafka topic.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ producer.initTransactions();

##### Transactional Produce Example

The following Java method takes in input record and processes it using a transactional producer. On error, the transaction is aborted and neither the DML nor topic produce are committed to the database. Assume the `processRecord` method does some DML operation with the record, like inserting or updating a table.
The following Java method takes in an input record and processes it using a transactional producer. On error, the transaction is aborted and neither the DML nor topic produce are committed to the database. Assume the `processRecord` method does some DML operation with the record, like inserting or updating a table.

```java
public void produce(String record) {
Expand All @@ -66,7 +66,7 @@ public void produce(String record) {
);
producer.send(pr);

// 3. Use the record in a database query
// 3. Use the record in database DML
processRecord(record, conn);
} catch (Exception e) {
// 4. On error, abort the transaction
Expand All @@ -82,7 +82,7 @@ public void produce(String record) {

#### Transactional Consume

To configure a transactional consumer, configure the `org.oracle.okafka.clients.consumer.KafkaConsumer` class with `auto.commit=false`. Disabling auto-commit will allow great control of database transactions through the `commitSync()` and `commitAsync()` methods.
To configure a transactional consumer, configure the `org.oracle.okafka.clients.consumer.KafkaConsumer` class with `auto.commit=false`. Disabling auto-commit allows control of database transactions through the `commitSync()` and `commitAsync()` methods.

```java
Properties props = new Properties();
Expand Down Expand Up @@ -138,6 +138,8 @@ public void run() {
// 5. Since auto-commit is disabled, transactions are not
// committed when commitSync() is not called.
System.out.println("Unexpected error processing records. Aborting transaction!");
// Rollback DML from (3)
consumer.getDBConnection().rollback();
}
}
}
Expand Down

0 comments on commit 521a68a

Please sign in to comment.