From 065bd95cb5866f88b556003e0d3597ed7e67ff21 Mon Sep 17 00:00:00 2001 From: Sam Brockie Date: Tue, 14 Feb 2023 15:54:28 +0100 Subject: [PATCH] Add new CSE benchmark based on Kalman filter matrix expression --- benchmarks/cse.py | 76 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/benchmarks/cse.py b/benchmarks/cse.py index 751895a..069bf1d 100644 --- a/benchmarks/cse.py +++ b/benchmarks/cse.py @@ -237,3 +237,79 @@ def time_jacobian_cse(self): def time_combined_cse(self): """Time simultaneous CSE on the lighthouse example and its Jacobian.""" cse([self.y, self.G]) + + +class KalmanFilterMatrixEquationCSE: + """Kalman filter example from Matthew Rocklin's SciPy 2013 talk. + + Talk titled: "Matrix Expressions and BLAS/LAPACK; SciPy 2013 Presentation" + + https://pyvideo.org/scipy-2013/matrix-expressions-and-blaslapack-scipy-2013-pr + + Notes + ===== + + Equations are: + new_mu = mu + Sigma*H.T * (R + H*Sigma*H.T).I * (H*mu - data) + new_Sigma = Sigma - Sigma*H.T * (R + H*Sigma*H.T).I * H * Sigma + + """ + + def setup(self): + """Create the 2x2 matrix equations for mu and Sigma.""" + N = 2 + mu = ImmutableDenseMatrix(symbols(f'mu:{N}')) + Sigma = ImmutableDenseMatrix(symbols(f'Sigma:{N * N}')).reshape(N, N) + H = ImmutableDenseMatrix(symbols(f'H:{N * N}')).reshape(N, N) + R = ImmutableDenseMatrix(symbols(f'R:{N * N}')).reshape(N, N) + data = ImmutableDenseMatrix(symbols(f'data:{N}')) + self.new_mu = MatAdd( + mu, + MatMul( + Sigma, + Transpose(H), + Inverse(MatAdd(R, MatMul(H, Sigma, Transpose(H)))), + MatAdd(MatMul(H, mu), MatMul(S.NegativeOne, data)), + ) + ) + self.new_Sigma = MatAdd( + Sigma, + MatMul( + S.NegativeOne, + Sigma, + Transpose(H), + Inverse(MatAdd(R, MatMul(H, Sigma, Transpose(H)))), + H, + Sigma, + ) + ) + + x0 = MatrixSymbol('x0', N, N) + x1 = MatrixSymbol('x1', N, N) + replacements_expected = [ + (x0, Transpose(H)), + (x1, Inverse(MatAdd(R, MatMul(H, Sigma, x0)))), + ] + reduced_exprs_expected = [ + MatAdd( + mu, + MatMul( + Sigma, + x0, + x1, + MatAdd(MatMul(H, mu), MatMul(S.NegativeOne, data)), + ), + ), + MatAdd(Sigma, MatMul(S.NegativeOne, Sigma, x0, x1, H, Sigma)), + ] + self.cse_expr_expected = (replacements_expected, reduced_exprs_expected) + + def test_cse(self): + """Expected result from CSE on the Kalman filter example.""" + cse_expr = cse([self.new_mu, self.new_Sigma]) + assert cse_expr == self.cse_expr_expected + + def time_cse(self): + """Time CSE on the Kalman filter example.""" + cse([self.new_mu, self.new_Sigma]) +