Skip to content

Commit

Permalink
gofmt + dep
Browse files Browse the repository at this point in the history
  • Loading branch information
yurykozyrev committed Aug 3, 2018
1 parent 8b0b350 commit 209323b
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ _testmain.go
*.exe
*.test
*.prof

vendor/
45 changes: 45 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Gopkg.toml example
#
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
#
# [prune]
# non-go = false
# go-tests = true
# unused-packages = true


[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.15.4"

[prune]
go-tests = true
unused-packages = true
26 changes: 12 additions & 14 deletions stream/stream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

// Yury Kozyrev (urakozz)
// MIT License
package stream
Expand All @@ -23,9 +22,9 @@ type StreamSubscriber struct {
}

func NewStreamSubscriber(
dynamoSvc *dynamodb.DynamoDB,
streamSvc *dynamodbstreams.DynamoDBStreams,
table string) *StreamSubscriber {
dynamoSvc *dynamodb.DynamoDB,
streamSvc *dynamodbstreams.DynamoDBStreams,
table string) *StreamSubscriber {
s := &StreamSubscriber{dynamoSvc: dynamoSvc, streamSvc: streamSvc, table: &table}
s.applyDefaults()
return s
Expand All @@ -50,7 +49,7 @@ func (r *StreamSubscriber) GetStreamData() (<-chan *dynamodbstreams.Record, <-ch
ch := make(chan *dynamodbstreams.Record, 1)
errCh := make(chan error, 1)

go func(ch chan <- *dynamodbstreams.Record, errCh chan <- error) {
go func(ch chan<- *dynamodbstreams.Record, errCh chan<- error) {
var shardId *string
var prevShardId *string
var streamArn *string
Expand Down Expand Up @@ -89,7 +88,7 @@ func (r *StreamSubscriber) GetStreamDataAsync() (<-chan *dynamodbstreams.Record,
needUpdateChannel <- struct{}{}

allShards := make(map[string]struct{})
shardProcessingLimit := 5;
shardProcessingLimit := 5
shardsCh := make(chan *dynamodbstreams.GetShardIteratorInput, shardProcessingLimit)
lock := sync.Mutex{}

Expand All @@ -112,7 +111,7 @@ func (r *StreamSubscriber) GetStreamDataAsync() (<-chan *dynamodbstreams.Record,
errCh <- err
return
}
ids, err := r.getShardIds(streamArn);
ids, err := r.getShardIds(streamArn)
if err != nil {
errCh <- err
return
Expand All @@ -137,17 +136,17 @@ func (r *StreamSubscriber) GetStreamDataAsync() (<-chan *dynamodbstreams.Record,

limit := make(chan struct{}, shardProcessingLimit)

go func(){
go func() {
time.Sleep(time.Second * 10)
for shardInput := range shardsCh {
limit <- struct{}{}
go func(sInput *dynamodbstreams.GetShardIteratorInput){
go func(sInput *dynamodbstreams.GetShardIteratorInput) {
err := r.processShard(sInput, ch)
if err != nil {
errCh <- err
}
// TODO: think about cleaning list of shards: delete(allShards, *sInput.ShardId)
<- limit
<-limit
}(shardInput)
}
}()
Expand Down Expand Up @@ -211,15 +210,15 @@ func (r *StreamSubscriber) getLatestStreamArn() (*string, error) {
return tableInfo.Table.LatestStreamArn, nil
}

func (r *StreamSubscriber) processShardBackport(shardId, lastStreamArn *string, ch chan <- *dynamodbstreams.Record) error {
func (r *StreamSubscriber) processShardBackport(shardId, lastStreamArn *string, ch chan<- *dynamodbstreams.Record) error {
return r.processShard(&dynamodbstreams.GetShardIteratorInput{
StreamArn: lastStreamArn,
ShardId: shardId,
ShardIteratorType: r.ShardIteratorType,
}, ch);
}, ch)
}

func (r *StreamSubscriber) processShard(input *dynamodbstreams.GetShardIteratorInput, ch chan <- *dynamodbstreams.Record) error {
func (r *StreamSubscriber) processShard(input *dynamodbstreams.GetShardIteratorInput, ch chan<- *dynamodbstreams.Record) error {
iter, err := r.streamSvc.GetShardIterator(input)
if err != nil {
return err
Expand Down Expand Up @@ -264,4 +263,3 @@ func (r *StreamSubscriber) processShard(input *dynamodbstreams.GetShardIteratorI
}
return nil
}

0 comments on commit 209323b

Please sign in to comment.