Skip to content

Commit

Permalink
Merge pull request #12 from Ja7ad/feat/reservoir_sampling
Browse files Browse the repository at this point in the history
Feat: Reservoir Sampling
  • Loading branch information
Ja7ad authored Jan 21, 2025
2 parents ff204d8 + d61752e commit ff672f5
Show file tree
Hide file tree
Showing 17 changed files with 791 additions and 10 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
[![codecov](https://codecov.io/gh/Ja7ad/algo/graph/badge.svg?token=9fLKrkUviU)](https://codecov.io/gh/Ja7ad/algo)
[![Go Report Card](https://goreportcard.com/badge/github.com/Ja7ad/algo)](https://goreportcard.com/report/github.com/Ja7ad/algo)

**`algo`** is a Golang library featuring a variety of **efficient** and **well-optimized** algorithms designed for diverse **problem-solving needs**.
**`algo`** is a Golang library featuring a variety of **efficient** and **well-optimized** algorithms
designed for diverse **problem-solving needs**.

## 📌 Features

**Optimized Performance** – Algorithms are designed with efficiency in mind.
**Modular Structure** – Each algorithm is in its own package for easy use.
**Well-Documented** – Clear documentation and examples for every algorithm.
Expand All @@ -15,10 +17,13 @@

## 📚 Available Algorithms

| Algorithm | Description |
|-----------|-------------|
| [Random Weighted Selection](./rws/README.md) | Selects items randomly based on assigned weights. Useful in load balancing, gaming, and AI. |

| Algorithm | Description |
|--------------------------------------------------|-------------|
| [Random Weighted Selection](./rws/README.md) | Selects items randomly based on assigned weights. Useful in load balancing, gaming, and AI. |
| [Reservoir Sampling Algorithm R](./rs/README.md) | Basic reservoir sampling, replaces elements with probability `k/i`. Efficient for uniform random sampling. |
| [Reservoir Sampling Algorithm L](./rs/README.md) | Optimized reservoir sampling for large `N`, reduces unnecessary replacements using skipping. |
| [Weighted Reservoir Sampling](./rs/README.md) | Selects items with probability proportional to their weights using a heap-based approach. Used in recommendation systems and A/B testing. |
| [Random Sort Reservoir Sampling](./rs/README.md) | Uses a min-heap and random priorities to maintain the top `k` elements in a streaming dataset. |

## 🚀 Installation >= go 1.19

Expand Down
147 changes: 147 additions & 0 deletions rs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Reservoir sampling

Reservoir sampling is a family of randomized algorithms for choosing a simple random sample, without replacement,
of k items from a population of unknown size n in a single pass over the items. The size of the population n is not
known to the algorithm and is typically too large for all n items to fit into main memory. The population is revealed
to the algorithm over time, and the algorithm cannot look back at previous items. At any point, the current state of
the algorithm must permit extraction of a simple random sample without replacement of size k over the part of
the population seen so far.

## **🔹 Variants of Reservoir Sampling**
While **Algorithm R** is the simplest and most commonly used, there are **other variants** that improve performance in specific cases:

| **Algorithm** | **Description** | **Complexity** |
|--------------------------------|----------------|---------------|
| **Algorithm R** | Basic reservoir sampling, replaces elements with probability `k/i` | **O(N)** |
| **Algorithm L** | Optimized for large `N`, reduces replacements via skipping | **O(N), fewer iterations** |
| **Weighted Reservoir Sampling** | Assigns elements weights, prioritizing selection based on weight | **O(N log k)** (heap-based) |
| **Random Sort Reservoir Sampling** | Uses a min-heap priority queue, selecting `k` elements with highest random priority scores | **O(N log k)** |

## Algorithm Weighted R – Weighted Reservoir Sampling
**Weighted Reservoir Sampling** is an **efficient algorithm** for selecting `k` elements **proportionally to their weights** from a stream of unknown length `N`, using only `O(k)` memory.

This repository implements **Weighted Algorithm R**, an extension of **Jeffrey Vitter's Algorithm R**, which allows weighted sampling using a **heap-based approach**.

> This algorithm uses a **min-heap-based priority selection**, ensuring **O(N log k)** time complexity, making it efficient for large streaming datasets.
## 📊 **Mathematical Formula for Weighted Algorithm R**

### **Problem Definition**
We need to select **`k` elements** from a data stream **of unknown length `N`**, ensuring **each element is selected with a probability proportional to its weight `w_i`**.

### **Algorithm Steps**
1. **Initialize a Min-Heap of Size `k`**
- Store the first `k` elements **with their priority scores**:
\[
$p_i = \frac{w_i}{U_i}$
\]
where \( $U_i$ \) is a uniform random number from **(0,1]**.

2. **Process Remaining Elements (`i > k`)**
- For each new element `s_i`:
- Compute **priority score**:
\[
$p_i = \frac{w_i}{U_i}$
\]
- If `p_i` is greater than the **smallest priority in the heap**, replace the smallest element.

3. **After processing `N` elements**, the reservoir will contain `k` elements **selected proportionally to their weights**.

---

## 🔬 **Probability Proof**
For any element \( $s_i$ \) with weight \( $w_i$ \):
1. The **priority score** is:
\[
$p_i = \frac{w_i}{U_i}$
\]
where \( $U_i \sim U(0,1]$ \).

2. The **probability that `s_i` is among the top `k` elements**:
\[
$P(s_i \text{ is selected}) \propto w_i$
\]
meaning elements with **higher weights** are **more likely to be selected**.

**Conclusion:** Weighted Algorithm R correctly samples elements **proportionally to their weights**, unlike uniform Algorithm R.

---

## 🧪 **Test Case Formula for Weighted Algorithm R**

### **Test Case Design**
To validate Weighted Algorithm R, we must check if:
- **Higher-weight elements are chosen more frequently**.
- **Selection follows the weight distribution over multiple runs**.

### **Mathematical Test**
For `T` independent runs:
- Let `count(s_i)` be the number of times `s_i` appears in the reservoir.
- Expected probability:
\[
$P(s_i) = \frac{w_i}{\sum w_j}$
\]
- Expected occurrence over `T` runs:
\[
$\text{Expected count}(s_i) = T \times \frac{w_i}{\sum w_j}$
\]
- We verify that `count(s_i)` is **statistically close** to this value.

# 🎯 Algorithm L

**Reservoir Sampling** is a technique for randomly selecting `k` elements from a stream of unknown length `N`.
**Algorithm L**, introduced by **Jeffrey Vitter (1985)**, improves upon traditional methods by using an **optimized skipping approach**, significantly reducing the number of random number calls.

### **Problem Definition**
We need to select **`k` elements** from a data stream **of unknown length `N`**, ensuring **each element has an equal probability `k/N`** of being chosen.

### **Algorithm Steps**
1. **Fill the reservoir** with the **first `k` elements**.
2. **Initialize weight factor `W`** using:

$W = \exp\left(\frac{\log(\text{random}())}{k}\right)$

3. **Skip elements efficiently** using the geometric formula:

$\text{skip} = \lfloor \frac{\log(\text{random}())}{\log(1 - W)} \rfloor$

4. **If still in bounds**, **randomly replace** an element in the reservoir.
5. **Update `W`** for the next iteration using:

$W = W \times \exp\left(\frac{\log(\text{random}())}{k}\right)$

6. **Repeat until the end of the stream**.

### **Probability Proof**
For each element \( $s_i$ \), we show that it has an equal probability of being selected:

1. The probability that \( $s_i$ \) **reaches the selection process**:

$P(s_i \text{ is considered}) = \frac{k}{i}$

2. The probability that \( $s_i$ \) **remains in the reservoir** is:

$P(s_i \text{ in final reservoir}) = \frac{k}{N}, \quad \forall i \in \{1, ..., N\}$

This confirms that **Algorithm L ensures uniform selection**.


## 🧪 **Test Case Formula for Algorithm L**

### **Test Case Design**
To validate Algorithm L, we must check if:
- **Each element is chosen with probability `k/N`**.
- **Selection is uniform over multiple runs**.

### **Mathematical Test**
For `T` independent runs:
- Let `count(s_i)` be the number of times `s_i` appears in the reservoir.
- Expected probability:

$P(s_i) = \frac{k}{N}$

- Expected occurrence over `T` runs:

$\text{Expected count}(s_i) = T \times \frac{k}{N}$

- We verify that `count(s_i)` is **statistically close** to this value.
27 changes: 27 additions & 0 deletions rs/pq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package rs

// Item represents an element with a priority
type Item[T any] struct {
Value T
Priority float64
}

// PriorityQueue implements a min-heap for Items
type PriorityQueue[T any] []*Item[T]

func (pq PriorityQueue[T]) Len() int { return len(pq) }
func (pq PriorityQueue[T]) Less(i, j int) bool { return pq[i].Priority < pq[j].Priority }
func (pq PriorityQueue[T]) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }

func (pq *PriorityQueue[T]) Push(x any) {
item := x.(*Item[T])
*pq = append(*pq, item)
}

func (pq *PriorityQueue[T]) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[:n-1]
return item
}
87 changes: 87 additions & 0 deletions rs/pq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package rs

import (
"container/heap"
"testing"
)

func TestPriorityQueue_PushPop(t *testing.T) {
pq := &PriorityQueue[int]{}
heap.Init(pq)

items := []struct {
value int
priority float64
}{
{1, 0.8},
{2, 0.5},
{3, 0.2},
{4, 0.9},
{5, 0.1},
}

for _, item := range items {
heap.Push(pq, &Item[int]{Value: item.value, Priority: item.priority})
}

expectedOrder := []int{5, 3, 2, 1, 4}
for i, expected := range expectedOrder {
popped := heap.Pop(pq).(*Item[int])
if popped.Value != expected {
t.Errorf("Expected %d, but got %d at index %d", expected, popped.Value, i)
}
}
}

func TestPriorityQueue_GenericTypes(t *testing.T) {
pq := &PriorityQueue[string]{}
heap.Init(pq)

heap.Push(pq, &Item[string]{Value: "apple", Priority: 0.9})
heap.Push(pq, &Item[string]{Value: "banana", Priority: 0.5})
heap.Push(pq, &Item[string]{Value: "cherry", Priority: 0.1})

expectedOrder := []string{"cherry", "banana", "apple"}
for i, expected := range expectedOrder {
popped := heap.Pop(pq).(*Item[string])
if popped.Value != expected {
t.Errorf("Expected %s, but got %s at index %d", expected, popped.Value, i)
}
}
}

func TestPriorityQueue_Length(t *testing.T) {
pq := &PriorityQueue[float64]{}
heap.Init(pq)

heap.Push(pq, &Item[float64]{Value: 1.5, Priority: 0.3})
heap.Push(pq, &Item[float64]{Value: 2.5, Priority: 0.7})

if pq.Len() != 2 {
t.Errorf("Expected length 2, got %d", pq.Len())
}

heap.Pop(pq)
if pq.Len() != 1 {
t.Errorf("Expected length 1 after pop, got %d", pq.Len())
}
}

func TestPriorityQueue_Empty(t *testing.T) {
pq := &PriorityQueue[int]{}
heap.Init(pq)

if pq.Len() != 0 {
t.Errorf("Expected empty queue, got length %d", pq.Len())
}

heap.Push(pq, &Item[int]{Value: 10, Priority: 0.5})
if pq.Len() != 1 {
t.Errorf("Expected length 1 after push, got %d", pq.Len())
}

heap.Pop(pq)
if pq.Len() != 0 {
t.Errorf("Expected empty queue after pop, got length %d", pq.Len())
}
}
45 changes: 45 additions & 0 deletions rs/rs_L.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package rs

import (
"math"
"math/rand"
"time"
)

func init() {
rand.New(rand.NewSource(time.Now().UnixNano()))
}

// ReservoirSampleL selects k elements from a stream using Algorithm L
func ReservoirSampleL[T any](stream []T, k int) []T {
if len(stream) < k {
return nil // Not enough elements
}

// Step 1: Fill the initial reservoir
reservoir := make([]T, k)
copy(reservoir, stream[:k])

// Step 2: Initialize weight factor W
W := math.Exp(math.Log(rand.Float64()) / float64(k))

i := k // Current position in the stream

// Step 3: Process remaining elements with skipping
for i < len(stream) {
// Calculate number of elements to skip
skip := int(math.Floor(math.Log(rand.Float64()) / math.Log(1-W)))
i += skip + 1 // Move forward in the stream

// If within bounds, replace a random item in the reservoir
if i < len(stream) {
j := rand.Intn(k) // Random index in the reservoir
reservoir[j] = stream[i]

// Update weight factor W
W *= math.Exp(math.Log(rand.Float64()) / float64(k))
}
}

return reservoir
}
14 changes: 14 additions & 0 deletions rs/rs_L_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package rs

import "fmt"

func ExampleReservoirSampleL() {
// Define a sample stream of integers
stream := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}

// Select 5 random elements using Algorithm L
reservoir := ReservoirSampleL(stream, 5)

// Print the selected reservoir sample
fmt.Println("Selected Reservoir Sample:", reservoir)
}
Loading

0 comments on commit ff672f5

Please sign in to comment.