diff --git a/src/wkok/openai_clojure/sse.clj b/src/wkok/openai_clojure/sse.clj index 440400c..b3a443d 100644 --- a/src/wkok/openai_clojure/sse.clj +++ b/src/wkok/openai_clojure/sse.clj @@ -5,11 +5,8 @@ [hato.middleware :as hm] [clojure.core.async :as a] [clojure.string :as string] - [cheshire.core :as json] - [clojure.core.async.impl.protocols :as impl]) - (:import (java.io InputStream) - (clojure.lang Counted) - (java.util LinkedList))) + [cheshire.core :as json]) + (:import (java.io InputStream))) (def event-mask (re-pattern (str "(?s).+?\n\n"))) @@ -31,39 +28,17 @@ (-> (subs raw-event data-idx) (json/parse-string true))))) -(deftype InfiniteBuffer [^LinkedList buf] - impl/UnblockingBuffer - impl/Buffer - (full? [_this] - false) - (remove! [_this] - (.removeLast buf)) - (add!* [this itm] - (.addFirst buf itm) - this) - (close-buf! [_this]) - Counted - (count [_this] - (.size buf))) - -(defn infinite-buffer [] - (InfiniteBuffer. (LinkedList.))) +; Per this discussion: https://community.openai.com/t/clarification-for-max-tokens/19576 +; if the max_tokens is not provided, the response will try to use all the available +; tokens to generate response, hence DEFAULT_BUFFER_SIZE should be large enough +(def ^:private DEFAULT_BUFFER_SIZE 100000) (defn calc-buffer-size - "- Use stream_buffer_len if provided. - - Otherwise, buffer size should be at least equal to max_tokens - plus the [DONE] terminator if it is provided. - - Else fallbacks on ##Inf and use an infinite-buffer instead" - [{:keys [stream_buffer_len max_tokens]}] - (or stream_buffer_len - (when max_tokens (inc max_tokens)) - ##Inf)) - -(defn make-buffer [params] - (let [size (calc-buffer-size params)] - (if (= size ##Inf) - (infinite-buffer) - (a/sliding-buffer size)))) + "Buffer size should be at least equal to max_tokens + plus the [DONE] terminator" + [{:keys [max_tokens] + :or {max_tokens DEFAULT_BUFFER_SIZE}}] + (inc max_tokens)) (defn sse-events "Returns a core.async channel with events as clojure data structures. @@ -72,7 +47,8 @@ (let [event-stream ^InputStream (:body (http/request (merge request params {:as :stream}))) - events (a/chan (make-buffer params) (map parse-event))] + buffer-size (calc-buffer-size params) + events (a/chan (a/buffer buffer-size) (map parse-event))] (a/thread (loop [byte-coll []] (let [byte-arr (byte-array (max 1 (.available event-stream))) @@ -181,3 +157,4 @@ (assoc ctx :response (if (:stream params) (sse-request ctx') (http/request request'))))))}) +