forked from Drakkar-Software/OctoBot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoctobot_channel_consumer.py
117 lines (100 loc) · 5.87 KB
/
octobot_channel_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# This file is part of OctoBot (https://github.com/Drakkar-Software/OctoBot)
# Copyright (c) 2023 Drakkar-Software, All rights reserved.
#
# OctoBot is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# OctoBot is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import async_channel.channels as channel_instances
import async_channel.util as channel_creator
import octobot_commons.enums as enums
import octobot_commons.logging as logging
import octobot_evaluators.octobot_channel_consumer as evaluator_channel_consumer
import octobot_services.octobot_channel_consumer as service_channel_consumer
import octobot_trading.api as trading_api
import octobot_trading.octobot_channel_consumer as trading_channel_consumer
import octobot.channels as octobot_channel
import octobot.logger as logger
class OctoBotChannelGlobalConsumer:
def __init__(self, octobot):
self.octobot = octobot
self.logger = logging.get_logger(self.__class__.__name__)
# the list of octobot channel consumers
self.octobot_channel_consumers = []
# the OctoBot Channel instance
self.octobot_channel = None
async def initialize(self):
# Creates OctoBot Channel
self.octobot_channel: octobot_channel.OctoBotChannel = await channel_creator.create_channel_instance(
octobot_channel.OctoBotChannel, channel_instances.set_chan_at_id,
is_synchronized=True, bot_id=self.octobot.bot_id)
# Initialize global consumer
self.octobot_channel_consumers.append(
await self.octobot_channel.new_consumer(self.octobot_channel_callback, bot_id=self.octobot.bot_id))
# Initialize trading consumer
self.octobot_channel_consumers.append(
await self.octobot_channel.new_consumer(
trading_channel_consumer.octobot_channel_callback,
bot_id=self.octobot.bot_id,
action=[action.value for action in trading_channel_consumer.OctoBotChannelTradingActions]
))
# Initialize evaluator consumer
self.octobot_channel_consumers.append(
await self.octobot_channel.new_consumer(
evaluator_channel_consumer.octobot_channel_callback,
bot_id=self.octobot.bot_id,
action=[action.value for action in evaluator_channel_consumer.OctoBotChannelEvaluatorActions]
))
# Initialize service consumer
self.octobot_channel_consumers.append(
await self.octobot_channel.new_consumer(
service_channel_consumer.octobot_channel_callback,
bot_id=self.octobot.bot_id,
action=[action.value for action in service_channel_consumer.OctoBotChannelServiceActions]
))
async def octobot_channel_callback(self, bot_id, subject, action, data) -> None:
"""
OctoBot channel consumer callback
:param bot_id: the callback bot id
:param subject: the callback subject
:param action: the callback action
:param data: the callback data
"""
if subject == enums.OctoBotChannelSubjects.NOTIFICATION.value:
if action == trading_channel_consumer.OctoBotChannelTradingActions.EXCHANGE.value:
if trading_channel_consumer.OctoBotChannelTradingDataKeys.EXCHANGE_ID.value in data:
exchange_id = data[trading_channel_consumer.OctoBotChannelTradingDataKeys.EXCHANGE_ID.value]
self.octobot.exchange_producer.register_created_exchange_id(exchange_id)
await logger.init_exchange_chan_logger(exchange_id)
exchange_configuration = trading_api.get_exchange_configuration_from_exchange_id(exchange_id)
await self.octobot.evaluator_producer.create_evaluators(exchange_configuration)
# If an exchange is created before interface producer is done, it will be registered via
# self.octobot.interface_producer directly on creation
await self.octobot.interface_producer.register_exchange(exchange_id)
elif action == evaluator_channel_consumer.OctoBotChannelEvaluatorActions.EVALUATOR.value:
if not self.octobot.service_feed_producer.started:
# Start service feeds now that evaluators registered their feed requirements
await self.octobot.service_feed_producer.start_feeds()
elif action == service_channel_consumer.OctoBotChannelServiceActions.INTERFACE.value:
await self.octobot.interface_producer.register_interface(
data[service_channel_consumer.OctoBotChannelServiceDataKeys.INSTANCE.value])
elif action == service_channel_consumer.OctoBotChannelServiceActions.NOTIFICATION.value:
await self.octobot.interface_producer.register_notifier(
data[service_channel_consumer.OctoBotChannelServiceDataKeys.INSTANCE.value])
elif action == service_channel_consumer.OctoBotChannelServiceActions.SERVICE_FEED.value:
await self.octobot.service_feed_producer.register_service_feed(
data[service_channel_consumer.OctoBotChannelServiceDataKeys.INSTANCE.value])
async def stop(self) -> None:
"""
Remove all OctoBot Channel consumers
"""
for consumer in self.octobot_channel_consumers:
await self.octobot_channel.remove_consumer(consumer)