-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test: unsubscribe #36
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||||||||||
package entities | ||||||||||||||
|
||||||||||||||
import ( | ||||||||||||||
"fmt" | ||||||||||||||
"sync" | ||||||||||||||
|
||||||||||||||
"github.com/matheusmosca/walrus/domain/vos" | ||||||||||||||
|
@@ -31,7 +32,6 @@ func NewTopic(topicName vos.TopicName) (Topic, error) { | |||||||||||||
func (t Topic) Activate() { | ||||||||||||||
go t.listenForSubscriptions() | ||||||||||||||
go t.listenForMessages() | ||||||||||||||
go t.listenForKills() | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (t Topic) Dispatch(message vos.Message) error { | ||||||||||||||
|
@@ -44,23 +44,37 @@ func (t Topic) Dispatch(message vos.Message) error { | |||||||||||||
return nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (t Topic) RemoveSubscriber(subscriberID vos.SubscriberID) { | ||||||||||||||
t.killSubCh <- subscriberID | ||||||||||||||
func (t *Topic) RemoveSubscriber(subscriberID vos.SubscriberID) error { | ||||||||||||||
if _, ok := t.subscribers.LoadAndDelete(subscriberID); !ok { | ||||||||||||||
return ErrSubscriberNotFound | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
return nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (t Topic) addSubscriber(sub Subscriber) { | ||||||||||||||
t.newSubCh <- sub | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (t *Topic) listenForSubscriptions() { | ||||||||||||||
for newSubCh := range t.newSubCh { | ||||||||||||||
t.subscribers.Store(newSubCh.GetID(), newSubCh) | ||||||||||||||
func (t Topic) GetSubscriber(subscriberID vos.SubscriberID) (*Subscriber, error) { | ||||||||||||||
if value, ok := t.subscribers.Load(subscriberID); ok { | ||||||||||||||
subsInterface := value.(Subscriber) | ||||||||||||||
|
||||||||||||||
return &subsInterface, nil | ||||||||||||||
Comment on lines
+61
to
+63
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
} | ||||||||||||||
|
||||||||||||||
return nil, ErrSubscriberNotFound | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (t Topic) UpdateTopic(topic Topic) (Topic, error) { | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you remove this method? |
||||||||||||||
fmt.Println("implement me") | ||||||||||||||
|
||||||||||||||
return Topic{}, nil | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
func (t *Topic) listenForKills() { | ||||||||||||||
for subscriberID := range t.killSubCh { | ||||||||||||||
t.subscribers.Delete(subscriberID) | ||||||||||||||
func (t *Topic) listenForSubscriptions() { | ||||||||||||||
for newSubCh := range t.newSubCh { | ||||||||||||||
t.subscribers.Store(newSubCh.GetID(), newSubCh) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,12 @@ func (u useCase) Unsubscribe(ctx context.Context, subscriberID vos.SubscriberID, | |
return err | ||
} | ||
|
||
topic.RemoveSubscriber(subscriberID) | ||
err = topic.RemoveSubscriber(subscriberID) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
//u.storage.UpdateTopic(ctx, topic) | ||
Comment on lines
+19
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you remove these lines? |
||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,117 @@ | ||||||||||||||||||
package usecases | ||||||||||||||||||
|
||||||||||||||||||
import ( | ||||||||||||||||||
"context" | ||||||||||||||||||
"testing" | ||||||||||||||||||
|
||||||||||||||||||
"github.com/stretchr/testify/assert" | ||||||||||||||||||
"github.com/stretchr/testify/require" | ||||||||||||||||||
|
||||||||||||||||||
"github.com/matheusmosca/walrus/domain/entities" | ||||||||||||||||||
"github.com/matheusmosca/walrus/domain/vos" | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
func TestUnsubscribe(t *testing.T) { | ||||||||||||||||||
t.Parallel() | ||||||||||||||||||
|
||||||||||||||||||
type args struct { | ||||||||||||||||||
ctx context.Context | ||||||||||||||||||
message vos.Message | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The unsubscribe use case does not have any relation with messages, could you remove this field? It makes more sense to add the topic name here (if you want to) |
||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
type fields struct { | ||||||||||||||||||
storage Repository | ||||||||||||||||||
topicName vos.TopicName | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
tests := []struct { | ||||||||||||||||||
name string | ||||||||||||||||||
args args | ||||||||||||||||||
fields fields | ||||||||||||||||||
beforeRun func(topic entities.Topic) (chan vos.Message, vos.SubscriberID) | ||||||||||||||||||
wantErr error | ||||||||||||||||||
wantSubscriber bool | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this field is not being used, could you remove it? |
||||||||||||||||||
}{ | ||||||||||||||||||
{ | ||||||||||||||||||
name: "unsubscribe should succeed", | ||||||||||||||||||
args: args{ | ||||||||||||||||||
ctx: context.Background(), | ||||||||||||||||||
message: vos.Message{ | ||||||||||||||||||
TopicName: "unsubscribing", | ||||||||||||||||||
}, | ||||||||||||||||||
}, | ||||||||||||||||||
fields: fields{ | ||||||||||||||||||
storage: &RepositoryMock{}, | ||||||||||||||||||
topicName: "unsubscribing", | ||||||||||||||||||
}, | ||||||||||||||||||
beforeRun: func(topic entities.Topic) (chan vos.Message, vos.SubscriberID) { | ||||||||||||||||||
subscriber := entities.NewSubscriber(topic) | ||||||||||||||||||
ch, ID := subscriber.Subscribe() | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you write
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
// Assert that the subscriber has been created successfully | ||||||||||||||||||
sub, err := topic.GetSubscriber(ID) | ||||||||||||||||||
require.NoError(t, err) | ||||||||||||||||||
assert.NotNil(t, sub) | ||||||||||||||||||
|
||||||||||||||||||
return ch, ID | ||||||||||||||||||
}, | ||||||||||||||||||
wantErr: nil, | ||||||||||||||||||
wantSubscriber: true, | ||||||||||||||||||
}, | ||||||||||||||||||
{ | ||||||||||||||||||
name: "try to unsubscribe a subscriber that doesnt exist should fail", | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
args: args{ | ||||||||||||||||||
ctx: context.Background(), | ||||||||||||||||||
message: vos.Message{ | ||||||||||||||||||
TopicName: "unsubscribing", | ||||||||||||||||||
}, | ||||||||||||||||||
}, | ||||||||||||||||||
fields: fields{ | ||||||||||||||||||
storage: &RepositoryMock{}, | ||||||||||||||||||
topicName: "unsubscribing", | ||||||||||||||||||
}, | ||||||||||||||||||
beforeRun: func(topic entities.Topic) (chan vos.Message, vos.SubscriberID) { | ||||||||||||||||||
msgChan := make(chan vos.Message) | ||||||||||||||||||
|
||||||||||||||||||
return msgChan, vos.SubscriberID("") | ||||||||||||||||||
}, | ||||||||||||||||||
wantErr: entities.ErrSubscriberNotFound, | ||||||||||||||||||
wantSubscriber: false, | ||||||||||||||||||
}, | ||||||||||||||||||
} | ||||||||||||||||||
for _, tt := range tests { | ||||||||||||||||||
tt := tt | ||||||||||||||||||
t.Run(tt.name, func(t *testing.T) { | ||||||||||||||||||
t.Parallel() | ||||||||||||||||||
|
||||||||||||||||||
topic, err := entities.NewTopic(tt.fields.topicName) | ||||||||||||||||||
require.NoError(t, err) | ||||||||||||||||||
|
||||||||||||||||||
topic.Activate() | ||||||||||||||||||
|
||||||||||||||||||
subsCh, subsID := tt.beforeRun(topic) | ||||||||||||||||||
defer close(subsCh) | ||||||||||||||||||
Comment on lines
+92
to
+93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are not using this channel on tests why is it being returned by
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
tt.fields.storage = &RepositoryMock{ | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know there are some tests doing this, but could you define the |
||||||||||||||||||
GetTopicFunc: func(ctx context.Context, topicName vos.TopicName) (entities.Topic, error) { | ||||||||||||||||||
if tt.fields.topicName != tt.args.message.TopicName { | ||||||||||||||||||
return entities.Topic{}, entities.ErrTopicNotFound | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
return topic, nil | ||||||||||||||||||
}, | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
useCase := New(tt.fields.storage) | ||||||||||||||||||
err = useCase.Unsubscribe(context.Background(), subsID, tt.fields.topicName) | ||||||||||||||||||
if tt.wantErr != nil { | ||||||||||||||||||
assert.ErrorIs(t, err, tt.wantErr) | ||||||||||||||||||
return | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+107
to
+110
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are not asserting this error on successful scenarios.
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
// Assert the unsubscription | ||||||||||||||||||
_, err = topic.GetSubscriber(subsID) | ||||||||||||||||||
assert.ErrorIs(t, entities.ErrSubscriberNotFound, err) | ||||||||||||||||||
}) | ||||||||||||||||||
} | ||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ type useCase struct { | |
type Repository interface { | ||
CreateTopic(ctx context.Context, name vos.TopicName, topic entities.Topic) error | ||
GetTopic(ctx context.Context, topicName vos.TopicName) (entities.Topic, error) | ||
UpdateTopic(ctx context.Context, topicName entities.Topic) (entities.Topic, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you remove this method since it is not implemented? |
||
ListTopics(ctx context.Context) ([]entities.Topic, error) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ type MemoryRepository interface { | |
GetTopic(ctx context.Context, topicName vos.TopicName) (entities.Topic, error) | ||
CreateTopic(ctx context.Context, name vos.TopicName, topic entities.Topic) error | ||
ListTopics(ctx context.Context) ([]entities.Topic, error) | ||
UpdateTopic(ctx context.Context, topicName entities.Topic) (entities.Topic, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one too |
||
} | ||
|
||
func NewMemoryRepository(storage map[vos.TopicName]entities.Topic) MemoryRepository { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package topics | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/matheusmosca/walrus/domain/entities" | ||
) | ||
|
||
func (r *repository) UpdateTopic(ctx context.Context, topicName entities.Topic) (entities.Topic, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you remove this file? |
||
return entities.Topic{}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change this method to not return a pointer?