-
Notifications
You must be signed in to change notification settings - Fork 7
How to Integrate OpenIG and Message Brokers
Open Identity Platform implemented message broker integration in OpenIG starting with 5.0.12 version. Apache Kafka and IBM MQ are supported.
The following article shows an example how to send and receive messages from and to message brokers using OpenIG
You can use the following project as a base: https://github.com/maximthomas/openig-mb-example
The following setup allows to receive messages via HTTP protocol and send them to Apache Kafka topic.
Create Apache Kafka topic:
kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092
Add Apache Kafka consumer handler to the heap in OpenIG configuration file:
config.json
{
"heap": [
...
{
"name": "kafka-producer",
"type": "MQ_Kafka",
"config": {
"bootstrap.servers": "kafka:9092",
"topic.produce": "incoming-messages"
}
},
...
]
}
Some important MQ_Kafka handler settings:
Setting | Name |
---|---|
boostrap.server |
Comma-separated list of host and port pairs that are the addresses of the Kafka brokers |
topic.produce |
To which topic OpenIG should send messages |
topic.consume |
From which topic should OpenIG consume messages |
uri |
OpenIG route endpoint |
method |
HTTP method, which OpenIG uses to send requests to the HTTP endpoint |
Add OpenIG route to routes
folder to process HTTP requests:
10-http2kafka.json
{
"name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}",
"condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}",
"monitor": true,
"timer": true,
"handler": {
"type": "Chain",
"config": {
"filters": [],
"handler": {
"type": "DispatchHandler",
"config": {
"bindings": [
{
"handler": "kafka-consumer"
}
]
}
}
}
}
}
Send HTTP request to OpenIG and then check received messages in topic1
topic:
$ curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' 'http://localhost:8080/http2kafka'
* Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /http2kafka HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 16
>
* upload completely sent off: 16 out of 16 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Server: Apache-Coyote/1.1
< Content-Length: 0
< Date: Wed, 13 Apr 2022 12:34:03 GMT
<
* Connection #0 to host localhost left intact
$ kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server localhost:9092
{"data": "test"}
In the following configuration OpenIG will receive messages from Apache Kafka topic2
topic and send them to a HTTP endpoint
Create new Apache Kafka topic:
kafka-topics.sh --create --topic topic2 --bootstrap-server localhost:9092
Add Apache Kafka consumer handler to the heap in the OpenIG configuration file
config.json
{
"heap": [
...
{
"name": "kafka-consumer",
"type": "MQ_Kafka",
"config": {
"bootstrap.servers": "kafka:9092",
"topic.consume": "topic2",
"method": "POST"
}
},
...
]
}
Add OpenIG route to routes
folder to process Apache Kafka messages:
10-kafka2http.json
{
"name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
"condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
"monitor": true,
"timer": true,
"handler": {
"type": "Chain",
"config": {
"filters": [],
"handler": {
"type": "DispatchHandler",
"config": {
"bindings": [{
"handler": "ClientHandler",
"capture": "all",
"baseURI": "${system['endpoint.api']}"
}]
}
}
}
}
}
Send test data to the Apache Kafka topic:
$ kafka-console-producer.sh --topic topic2 --bootstrap-server localhost:9092
>{"data": "test"}
There is a new record in the sample service log:
2022-04-21 07:26:14.645 DEBUG 1 --- [nio-8080-exec-6] o.s.w.f.CommonsRequestLoggingFilter : After request [POST /kafka2http, headers=[kafka-offset:"29", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]
If there are no message brokers in the infrastructure, but there is a need to receive and redirect Kafka messages, OpenIG offers embedded Apache Kafka.
To setup embedded kafka, add EmbeddedKafka
to OpenIG config.json file.
config.json
{
"heap": [
...
{
"name": "EmbeddedKafka",
"type": "EmbeddedKafka",
"config": {
"zookeper.port": "${system['zookeper.port']}",
"security.inter.broker.protocol": "${empty system['keystore.location'] ?'PLAINTEXT':'SSL'}",
"listeners": "${system['kafka.bootstrap']}",
"advertised.listeners": "${system['kafka.bootstrap']}",
"ssl.endpoint.identification.algorithm": "",
"ssl.enabled.protocols":"TLSv1.2",
"ssl.keystore.location":"${system['keystore.location']}",
"ssl.keystore.password":"${empty system['keystore.password']?'changeit':system['keystore.password']}",
"ssl.key.password":"${empty system['key.password']?'changeit':system['key.password']}",
"ssl.truststore.location":"${system['truststore.location']}",
"ssl.truststore.password":"${empty system['truststore.password']?'changeit':system['truststore.password']}"
},
...
]
}
Some significant EmbeddedKafka settings:
Setting | Name |
---|---|
zookeper.port |
Zookeeper port for Embedded Apache Kafka. If not set Kafka won't start |
listeners |
Port and hosts which Kafka binds to for listening |
advertised.listeners |
Port and hosts which Kafka clients listening |
Add kafka listener to OpenIG heap and create a route that listens Kafka message and redirects it to HTTP endpoint (you can also redirect the message to another message broker).
config.json
{
"heap": [
...
{
"name": "kafka-consumer",
"type": "MQ_Kafka",
"config": {
"bootstrap.servers": "openig:9092",
"topic.consume": "topic1",
"method": "POST",
"uri": "/kafka2http"
}
...
]
}
10-kafka2http.json
{
"name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
"condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}",
"monitor": true,
"timer": true,
"handler": {
"type": "Chain",
"config": {
"filters": [],
"handler": {
"type": "DispatchHandler",
"config": {
"bindings": [{
"handler": "ClientHandler",
"capture": "all",
"baseURI": "${system['endpoint.api']}"
}]
}
}
}
}
}
Start OpenIG. Now you can create a topic in embedded OpenIG Kafka and send messages to the topic.
$ kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092
>{"data": "test"}
There is a new record in the sample service log:
2022-04-21 07:26:14.645 DEBUG 1 --- [nio-8080-exec-6] o.s.w.f.CommonsRequestLoggingFilter : After request [POST /kafka2http, headers=[kafka-offset:"29", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]
The following setup allows to receive messages via HTTP protocol and send them to IBM MQ topic.
Add IBM MQ consumer handler to the heap in the OpenIG configuration file:
config.json
{
"heap": [
...
{
"name": "mq-producer",
"type": "MQ_IBM",
"config": {
"XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)",
"XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN",
"XMSC_WMQ_QUEUE_MANAGER":"QM1",
"XMSC_USERID":"app",
"XMSC_PASSWORD":"passw0rd",
"topic.produce": "DEV.QUEUE.1"
}
},
...
]
}
Some important MQ_IBM handler settings:
Setting | Name |
---|---|
XMSC_WMQ_CONNECTION_NAME_LIST |
Comma-separated list of host and port that are the addresses of the IBM MQ brokers |
XMSC_WMQ_CHANNEL |
IBM MQ channel name, used for connection |
XMSC_USERID |
IBM MQ user id |
XMSC_PASSWORD |
IBM MQ user password |
topic.produce |
To which topic OpenIG should send messages |
topic.consume |
From which topic should OpenIG consume messages |
uri |
OpenIG route endpoint |
method |
HTTP method, which OpenIG uses to send requests to the HTTP endpoint |
Add OpenIG route to routes
folder to process HTTP requests:
10-http2mq.json
{
"name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}",
"condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}",
"monitor": true,
"timer": true,
"handler": {
"type": "Chain",
"config": {
"filters": [],
"handler": {
"type": "DispatchHandler",
"config": {
"bindings": [
{
"handler": "mq-producer"
}
]
}
}
}
}
}
Send HTTP request to OpenIG and then check received messages in DEV.QUEUE.1
topic
$ curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' 'http://localhost:8080/http2mq'
* Trying 127.0.0.1:8080...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> PUT /http2mq HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.68.0
> Accept: */*
> Content-Type: application/json
> Content-Length: 16
>
* upload completely sent off: 16 out of 16 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 202 Accepted
< Server: Apache-Coyote/1.1
< Content-Length: 0
< Date: Wed, 13 Apr 2022 12:34:03 GMT
<
* Connection #0 to host localhost left intact
Open IBM MQ web console https://localhost:9443/ibmmq/console/, in the DEV.QUEUE.1
you should see new message in the console:
Add IBM MQ comsumer handler to the heap in OpenIG configuration file
config.json
{
"heap": [
...
{
"name": "mq-consumer",
"type": "MQ_IBM",
"config": {
"XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)",
"XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN",
"XMSC_WMQ_QUEUE_MANAGER":"QM1",
"XMSC_USERID":"app",
"XMSC_PASSWORD":"passw0rd",
"topic.consume": "DEV.QUEUE.2",
"uri": "/mq2http",
"method": "POST"
}
}
...
]
}
Add OpenIG route to routes
folder to process IBM MQ messages:
10-mq2http.json
{
"name": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}",
"condition": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}",
"monitor": true,
"timer": true,
"handler": {
"type": "Chain",
"config": {
"filters": [],
"handler": {
"type": "DispatchHandler",
"config": {
"bindings": [
{
"handler": "ClientHandler",
"capture": "all",
"baseURI": "${system['endpoint.api']}"
}
]
}
}
}
}
}
Log in to the IBM MQ console, and create a message for DEV.QUEUE.2 topic
In the sample-service log you should see the following record:
2022-04-21 08:32:35.007 DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter : After request [POST /mq2http, headers=[jms_ibm_character_set:"UTF-8", jms_ibm_encoding:"273", jms_ibm_format:"MQSTR", jms_ibm_msgtype:"8", jms_ibm_putappltype:"6", jms_ibm_putdate:"20220421", jms_ibm_puttime:"08323434", jmsxappid:"com.ibm.mq.webconsole", jmsxdeliverycount:"1", jmsxuserid:"unknown", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]
If you have any additional questions, feel free to ask us