This repository is currently being migrated. It's locked while the migration is in progress.
forked from beingmeta/framerd-modules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpump.scm
87 lines (67 loc) · 2.45 KB
/
pump.scm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
;;; -*- Mode: Scheme; Character-encoding: utf-8; -*-
;;; Copyright (C) 2005-2018 beingmeta, inc. All rights reserved.
(in-module 'pump)
(use-module '{ezrecords logger varconfig fifo})
(module-export! '{pump/make
pump/push! pump/cancel!
pump/pending pump/shutdown
pump-name pump-input})
(define-init %loglevel %notice%)
(define default-pump-queue 64)
(define named-pumps 64)
(defrecord (pump OPAQUE)
name dofn donefn options
(input (fifo/make default-pump-queue))
(output (fifo/make default-pump-queue))
(live? #t) (threads (make-hashtable)) (items (make-hashset))
(running (make-hashtable)))
(define default-opts #[])
;;; The default pump function
(define (pumpfn pump threadid)
(and (pump-live? pump)
(let ((item (fifo/pop (pump-input pump)))
(running (pump-running pump))
(liveitems (pump-items pump))
(dofn (pump-dofn pump))
(donefn (pump-donefn pump)))
(while (and (exists? item) (pump-live? pump))
(unwind-protect
(begin (store! running item (cons threadid (gmtimestamp)))
(store! running threadid item)
(dofn item))
(hashset-drop! liveitems item)
(when donefn (donefn item pump threadid))
(drop! running item (cons threadid (gmtimestamp)))
(drop! running threadid))
(set! item (fifo/pop (pump-input pump))))
#f)))
;;; Making new pumps
(define (pump/make dofn (donefn #f) (name #f) (opts #f))
(if (not opts)
(set! opts default-opts)
(if (not (pair? opts))
(set! opts (cons opts default-opts))))
(if (and name (test named-pumps name))
(error DUPLICATENAME pump/make "A pump named " name " already exists")
(freshpump dofn donefn name opts)))
(defslambda (freshpump dofn donefn name opts)
(let* ((input (fifo/make default-pump-queue))
(pump (cons-pump (or name (getuuid)) dofn donefn input))
(minthreads (getopt opts 'minthreads 4)))
(dotimes (i minthreads)
(let* ((threadid (threadcall pumpfn pump))
(thread (threadcall pumpfn pump threadid)))
(store! (pump-threads pump) threadid thread)
(set+! threads thread)))
(when name (store! named-pumps name pump))
pump))
(define (pump/push! pump item)
(unless (get (pump-items pump) item)
(fifo/push! (pump-input pump) item)))
(define (pump/cancel! pump item)
(when (get (pump-items pump) item)
(fifo/remove! (pump-input pump) item)))
(define (pump/pending pump)
(fifo/queued (pump-input pump)))
(define (pump/shutdown pump)
(fifo/close! (pump-input pump)))