Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use future to put-all! events in out-sink #30

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/robertluo/waterfall/core.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
(ns ^:no-doc robertluo.waterfall.core
"Core data structure"
(:require [manifold.deferred :as d]
[manifold.stream :as ms]
[manifold.stream :as ms]
[robertluo.waterfall.util :as util]
[clojure.core.match :refer [match]])
(:import (java.time Duration)
(java.util Map)
(org.apache.kafka.clients.consumer Consumer ConsumerRecord KafkaConsumer)
(org.apache.kafka.clients.producer
KafkaProducer
Producer
Producer
ProducerRecord
RecordMetadata)
(org.apache.kafka.common.serialization ByteArrayDeserializer ByteArraySerializer)))
@@ -31,7 +31,7 @@
sending (fn [x]
(let [{:keys [key value topic partition timestamp]} x]
(rmd->map @(.send prod (ProducerRecord. topic partition timestamp key value)))))]
(ms/on-closed strm #(.close prod))
(ms/on-closed strm #(.close prod))
(ms/splice strm (ms/map sending strm))))

;--------------------
@@ -100,7 +100,7 @@
(cmd-self [::poll duration])) ; resume and poll
[::poll duration]
(let [putting-all (fn [events] ; function to handle events and resume
(d/chain (ms/put-all! out-sink events)
(d/chain (d/future (ms/put-all! out-sink events))
#(when % (cmd-self [::resume duration]))))]
(ensure-sink
(if-let [events (->> (.poll consumer duration) (.iterator) (iterator-seq) (map cr->map) seq)]
@@ -113,20 +113,20 @@


(defn consumer
[nodes group-id topics
[nodes group-id topics
{:keys [poll-duration position]
:as conf
:as conf
:or {poll-duration (Duration/ofSeconds 10)}}]
(let [conr (-> (dissoc conf :poll-duration :position) ;avoid kafka complaints
(merge {:bootstrap-servers nodes
:group-id group-id
:enable-auto-commit false})
util/->config-map
(KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.)))
(KafkaConsumer. (ByteArrayDeserializer.) (ByteArrayDeserializer.)))
out-sink (ms/stream)
actor (consumer-actor conr out-sink)]
(actor [::subscribe topics])
(when position (actor [::seek position]))
(ms/on-closed out-sink (fn [] @(actor [::close])))
(ms/on-closed out-sink (fn [] @(actor [::close])))
(actor [::poll poll-duration])
(ms/source-only out-sink)))