message filter for Apache Pulsar, both support server-side and client-side.
pulsar-msg-filter-plugin
is a server-side message filtering plugin for Apache Pulsar based on PIP 105: Support pluggable entry filter in Dispatcher
.
pulsar-msg-filter-interceptor
is a client-side message filtering interceptor implemented based on Pulsar ConsumerInterceptor
.
- High performance and small size
- Supports common conditional expressions, almost meeting various business filtering scenarios
[server-side] pulsar-msg-filter-plugin usage instructions
-
Download the pulsar-msg-filter-plugin-VERSION.nar plugin and save it to the specified directory, such as /app/conf/plugin.
-
Modify the pulsar broker.conf configuration (version >= 2.10), with the plugin name set to
pulsar-msg-filter
:# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered # You can use this class to decide which entries can be sent to consumers. # Multiple classes need to be separated by commas. entryFilterNames=pulsar-msg-filter # The directory for all the entry filter implementations entryFiltersDirectory=/app/plugin # Location of unpacked NAR file narExtractionDirectory=/app/nar
-
Restart broker and check logs. If you see log messages like
Successfully loaded entry filter for name
`pulsar-msg-filter`, then it means that configuration was successful. -
Verification (optional)
-
When building Producer instance, disable batch operations using .enableBatching(false).
Producer<String> producer = client.newProducer(Schema.STRING) .topic("test-topic-1") .enableBatching(false) .create(); producer.newMessage() .property("k1","7") .property("k2", "vvvv") .property("k3", "true") .value("hi, this msg from `pulsar-msg-filter-plugin`") .send();
-
When consuming messages, configure subscription group filtering expression using
admin configuration
. The key is fixed as pulsar-msg-filter-expression.pulsar-admin topics update-subscription-properties --property --property pulsar-msg-filter-expression="double(k1)<6 || (k2=='vvvv' && k3=='true')" --subscription SUBSCRIPTIONNAME TOPIC pulsar-admin topics get-subscription-properties --subscription SUBSCRIPTIONNAME TOPIC
After modifying the above configuration, it takes effect immediately without needing to set subscriptionProperties when creating Consumer.
Consumer consumer = client.newConsumer() .subscriptionName("SUBSCRIPTIONNAME") .topic("TOPIC") .subscribe();
Explanation: Since the pulsar-msg-filter-plugin plugin (server-side) depends on
MessageMetadata
of messages, batch operations on the sending end need to be disabled (.enableBatching(false)
), otherwise they will not work. If unable to disable batching, use it together with pulsar-msg-filter-interceptor.
-
[client-side] pulsar-msg-filter-interceptor usage instructions
-
Add dependency for pulsar-msg-filter-interceptor
<dependency> <groupId>io.github.yangl</groupId> <artifactId>pulsar-msg-filter-interceptor</artifactId> <version>VERSION</version> </dependency>
-
When creating Consumer instance, configure MsgFilterConsumerInterceptor filter:
Consumer<String> consumer = client.newConsumer(Schema.STRING) .subscriptionName("SUBSCRIPTIONNAME") .topic("TOPIC") .intercept(MsgFilterConsumerInterceptor.<String>builder().build()) .subscribe();
Note: If you are using an address starting with "pulsar://", you need to additionally set the
.webServiceUrl(YOUR_HTTP_SERVICE_URL)
parameter as follows:.intercept(MsgFilterConsumerInterceptor.<String>builder().webServiceUrl(YOUR_HTTP_SERVICE_URL).build())
Precautions
- Since all keys and values in Pulsar message headers are of type
String
, pay attention to converting their types into target types when using expressions. - For
false
judgments in AviatorScript, it is recommended to directly use string comparison with==
true/false
. In AviatorScript, onlynil
andfalse
are considered false, while all others are considered true. - The filtering engine uses AviatorScript (thanks to Xiaodan), and its built-in functions can be found in the function library list.
pulsar-msg-filter
is licensed under the AGPLv3 License.