Skip to content

Commit

Permalink
Improve mod load performance when there are a lot of connection resou…
Browse files Browse the repository at this point in the history
…rces. Added in memory cache functionality (moved from Flowpipe). (#610)
  • Loading branch information
vhadianto authored Dec 20, 2024
1 parent 7b978d7 commit 731b5c0
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 11 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

Shared Pipes Component

## v1.7.0 [tbd]

_What's new_

* Improved performance of mod loading when there's a lot of `connections`.
* Added in memory cache functionality.

## v1.6.6 [2024-11-21]

_Bug fixes_
Expand Down
1 change: 1 addition & 0 deletions app_specific/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func SetAppSpecificEnvVarKeys(envAppPrefix string) {
EnvLogLevel = buildEnv("LOG_LEVEL")
EnvGitToken = buildEnv("GIT_TOKEN")
EnvPipesToken = buildEnv("PIPES_TOKEN")
EnvProfile = buildEnv("PROFILE")
}

// buildEnv is a function to construct an application specific env var key
Expand Down
121 changes: 121 additions & 0 deletions cache/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package cache

import (
"os"
"sync"
"time"

"github.com/dgraph-io/ristretto"
"github.com/sagikazarmark/slog-shim"
)

// simple cache implemented using ristretto cache library
type InMemoryCache struct {
cache *ristretto.Cache
}

var (
inMemoryCache *InMemoryCache
inMemoryCacheInitOnce sync.Once
credentialCache *InMemoryCache
credentialCacheInitOnce sync.Once
connectionCache *InMemoryCache
connectionCacheInitOnce sync.Once
)

func inMemoryInitialize(config *ristretto.Config) *InMemoryCache {
if config == nil {
config = &ristretto.Config{
NumCounters: 100000, // number of keys to track frequency
MaxCost: 67108864, // maximum cost of cache (64mb).
BufferItems: 64, // number of keys per Get buffer.
}
}
cache, err := ristretto.NewCache(config)
if err != nil {
slog.Error("error initializing in-memory cache", "error", err)
os.Exit(1)
}

inMemoryCache = &InMemoryCache{cache}
return inMemoryCache
}

func GetCache() *InMemoryCache {
inMemoryCacheInitOnce.Do(func() {
inMemoryInitialize(nil)
})
return inMemoryCache
}

func initializeCredentialCache() {
credCacheConfig := &ristretto.Config{
NumCounters: 100000, // number of keys to track frequency
MaxCost: 67108864, // maximum cost of cache (64mb).
BufferItems: 64, // number of keys per Get buffer.
}

credCache, err := ristretto.NewCache(credCacheConfig)
if err != nil {
slog.Error("error initializing in-memory cache for credentials", "error", err)
os.Exit(1)
}

credentialCache = &InMemoryCache{credCache}
}

func initializeConnectionCache() {
connCacheConfig := &ristretto.Config{
NumCounters: 100000, // number of keys to track frequency
MaxCost: 67108864, // maximum cost of cache (64mb).
BufferItems: 64, // number of keys per Get buffer.
}

connCache, err := ristretto.NewCache(connCacheConfig)
if err != nil {
slog.Error("error initializing in-memory cache for connections", "error", err)
os.Exit(1)
}

connectionCache = &InMemoryCache{connCache}
}

func GetCredentialCache() *InMemoryCache {
credentialCacheInitOnce.Do(func() {
initializeCredentialCache()
})
return credentialCache
}

func GetConnectionCache() *InMemoryCache {
connectionCacheInitOnce.Do(func() {
initializeConnectionCache()
})
return connectionCache
}

func ResetCredentialCache() {
credentialCache = nil
initializeCredentialCache()
}

func ResetConnectionCache() {
connectionCache = nil
initializeConnectionCache()
}

func (cache *InMemoryCache) SetWithTTL(key string, value interface{}, ttl time.Duration) bool {
res := cache.cache.SetWithTTL(key, value, 1, ttl)

// wait for value to pass through buffers
time.Sleep(10 * time.Millisecond)
return res
}

func (cache *InMemoryCache) Get(key string) (interface{}, bool) {
return cache.cache.Get(key)
}

func (cache *InMemoryCache) Delete(key string) {
cache.cache.Del(key)
}
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/opencontainers/image-spec v1.1.0
github.com/otiai10/copy v1.14.0
github.com/robfig/cron/v3 v3.0.1
github.com/rs/xid v1.5.0
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02
github.com/sirupsen/logrus v1.9.3
Expand All @@ -44,11 +43,10 @@ require (
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2/config v1.27.11
github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964
github.com/dgraph-io/ristretto v0.2.0
github.com/goccy/go-yaml v1.11.2
github.com/google/go-cmp v0.6.0
github.com/hashicorp/go-cleanhttp v0.5.2
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f
github.com/iancoleman/strcase v0.3.0
github.com/jackc/pgx/v5 v5.6.0
github.com/jedib0t/go-pretty/v6 v6.5.9
github.com/sagikazarmark/slog-shim v0.1.0
Expand Down Expand Up @@ -89,11 +87,13 @@ require (
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect
github.com/bmatcuk/doublestar v1.3.4 // indirect
github.com/btubbs/datetime v0.1.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
github.com/containerd/errdefs v0.1.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
Expand All @@ -106,6 +106,7 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
Expand Down Expand Up @@ -142,6 +143,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7N
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheggaaa/pb v1.0.27/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand Down Expand Up @@ -293,6 +295,12 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.2.0 h1:XAfl+7cmoUDWW/2Lx8TGZQjjxIQ2Ley9DSf52dru4WE=
github.com/dgraph-io/ristretto v0.2.0/go.mod h1:8uBHCU/PBV4Ag0CJrP47b9Ofby5dqWNh4FicAdoqFNU=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
Expand Down Expand Up @@ -483,8 +491,6 @@ github.com/hashicorp/terraform-svchost v0.1.1 h1:EZZimZ1GxdqFRinZ1tpJwVxxt49xc/S
github.com/hashicorp/terraform-svchost v0.1.1/go.mod h1:mNsjQfZyf/Jhz35v6/0LWcv26+X7JPS+buii2c9/ctc=
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfkj3CTRcShK0KOL/w6iTiKyqqBA9a41Wnggw8=
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI=
github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI=
github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down Expand Up @@ -588,8 +594,6 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
Expand Down
2 changes: 1 addition & 1 deletion logs/timing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"bufio"
"bytes"
"fmt"
"github.com/turbot/pipe-fittings/app_specific"
"io"
"log"
"os"
"strings"
"time"

"github.com/olekukonko/tablewriter"
"github.com/turbot/pipe-fittings/app_specific"
)

type timeLog struct {
Expand Down
5 changes: 4 additions & 1 deletion parse/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package parse

import (
"fmt"
"log/slog"

"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/hclsyntax"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/pipe-fittings/modconfig"
"github.com/turbot/pipe-fittings/schema"
"github.com/turbot/pipe-fittings/utils"
"log/slog"
)

type DecoderOption func(Decoder)
Expand Down Expand Up @@ -56,6 +57,7 @@ func (d *DecoderImpl) Decode(parseCtx *ModParseContext) hcl.Diagnostics {
parseCtx.ClearDependencies()

for _, block := range blocks {
utils.LogTime(fmt.Sprintf("decode block %s - %v start", block.Type, block.Labels))
switch block.Type {
case schema.BlockTypeLocals:
resources, res := d.decodeLocalsBlock(block, parseCtx)
Expand All @@ -77,6 +79,7 @@ func (d *DecoderImpl) Decode(parseCtx *ModParseContext) hcl.Diagnostics {
resourceDiags := AddResourceToMod(resource, block, parseCtx)
diags = append(diags, resourceDiags...)
}
utils.LogTime(fmt.Sprintf("decode block %s - %v end", block.Type, block.Labels))
}

return diags
Expand Down
5 changes: 5 additions & 0 deletions parse/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,10 @@ func ParseMod(_ context.Context, fileData map[string][]byte, parseCtx *ModParseC
// continue decoding as long as the number of unresolved blocks decreases
prevUnresolvedBlocks := 0
for attempts := 0; ; attempts++ {
slog.Debug("decode mod start", "decode passes", attempts+1)
diags = modDecoder.Decode(parseCtx)
if diags.HasErrors() {
slog.Error("decode mod failed", "decode passes", attempts+1, "diags", diags)
return nil, error_helpers.NewErrorsAndWarning(error_helpers.HclDiagsToError("Failed to decode mod", diags))
}
// now retrieve the warning strings
Expand All @@ -169,6 +171,9 @@ func ParseMod(_ context.Context, fileData map[string][]byte, parseCtx *ModParseC
slog.Debug("parse complete with no unresolved blocks", "decode passes", attempts+1)
break
}

slog.Debug("decode mod unresolved blocks", "decode passes", attempts+1, "unresolved blocks", unresolvedBlocks)

// if the number of unresolved blocks has NOT reduced, fail
if prevUnresolvedBlocks != 0 && unresolvedBlocks >= prevUnresolvedBlocks {
str := parseCtx.FormatDependencies()
Expand Down
14 changes: 12 additions & 2 deletions parse/mod_parse_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"maps"
"strings"
"sync"
"time"

"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/hclsyntax"
filehelpers "github.com/turbot/go-kit/files"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/pipe-fittings/app_specific"
"github.com/turbot/pipe-fittings/cache"
"github.com/turbot/pipe-fittings/connection"
"github.com/turbot/pipe-fittings/constants"
"github.com/turbot/pipe-fittings/hclhelpers"
Expand Down Expand Up @@ -485,8 +487,16 @@ func (m *ModParseContext) RebuildEvalContext() {
// should we include connections
if m.supportLateBinding && m.includeLateBindingResourcesInEvalContext {
if len(m.PipelingConnections) > 0 {
connMap := BuildTemporaryConnectionMapForEvalContext(m.PipelingConnections)
variables[schema.BlockTypeConnection] = cty.ObjectVal(connMap)
cacheKey := m.ParseContext.RootEvalPath + ":pipeling_connections"
connMapI, found := cache.GetCache().Get(cacheKey)
if !found {
connMap := BuildTemporaryConnectionMapForEvalContext(m.PipelingConnections)
variables[schema.BlockTypeConnection] = cty.ObjectVal(connMap)
cache.GetCache().SetWithTTL(cacheKey, connMap, 24 * time.Hour)
} else {
connMap := connMapI.(map[string]cty.Value)
variables[schema.BlockTypeConnection] = cty.ObjectVal(connMap)
}
}

if len(m.lateBindingVars) > 0 {
Expand Down
10 changes: 10 additions & 0 deletions parse/pipeling_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/turbot/pipe-fittings/connection"
"github.com/turbot/pipe-fittings/funcs"
"github.com/turbot/pipe-fittings/hclhelpers"
"github.com/turbot/pipe-fittings/utils"
"github.com/zclconf/go-cty/cty"
)

Expand All @@ -24,6 +25,8 @@ func DecodePipelingConnection(configPath string, block *hcl.Block) (connection.P
return nil, diags
}

utils.LogTime(fmt.Sprintf("decode connection %v start", block.Labels))

// create an empty connection struct of appropriate type
conn, err := app_specific_connection.NewPipelingConnection(block.Labels[0], block.Labels[1], block.DefRange)
if err != nil {
Expand Down Expand Up @@ -61,11 +64,16 @@ func DecodePipelingConnection(configPath string, block *hcl.Block) (connection.P
diags = append(diags, moreDiags...)
}

utils.LogTime(fmt.Sprintf("decode connection %v end", block.Labels))

return conn, diags
}

// decodeConnectionImpl decodes the given block into a connection.ConnectionImpl and returns the remaining body.
func decodeConnectionImpl(block *hcl.Block, evalCtx *hcl.EvalContext, connectionImpl *connection.ConnectionImpl) (hcl.Body, hcl.Diagnostics) {

utils.LogTime(fmt.Sprintf("decode connectionImpl %s start", connectionImpl.FullName))

schema, err := hclhelpers.HclSchemaForStruct(connectionImpl)
if err != nil {
return nil, hcl.Diagnostics{
Expand Down Expand Up @@ -107,6 +115,8 @@ func decodeConnectionImpl(block *hcl.Block, evalCtx *hcl.EvalContext, connection
}
}

utils.LogTime(fmt.Sprintf("decode connectionImpl %s end", connectionImpl.FullName))

// Return the decoded result, remaining body, and any diagnostics
return remain, diags
}
Loading

0 comments on commit 731b5c0

Please sign in to comment.