-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathline_notify.py
200 lines (183 loc) · 8.53 KB
/
line_notify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import logging
import urllib
import requests
import utilities as utils
config = utils.read_config()
webhook_url = config['webhook_url']
line_notify_id = config['line_notify_id']
line_notify_secret = config['line_notify_secret']
def send_message(message, token):
"""Send message to LINE Notify.
:param str message: Message to send.
:param str token: LINE Notify token.
"""
headers = {"Authorization": "Bearer " + token}
data = {'message': '\n' + message}
requests.post("https://notify-api.line.me/api/notify",
headers=headers, data=data, timeout=5)
def send_image_message(message, image_path, token):
"""Send media message to LINE Notify.
:param str message: Message to send.
:param str image_path: Path to media.
:param str token: LINE Notify token.
"""
headers = {"Authorization": "Bearer " + token}
data = {'message': message}
with open(image_path, 'rb') as f:
image = f.read()
files = {'imageFile': image}
requests.post("https://notify-api.line.me/api/notify",
headers=headers, data=data, files=files, timeout=5)
def create_auth_link(user_id):
"""Create LINE Notify auth link for user to connect.
:param str user_id: The line user_id of the user.
:return str: The auth link.
"""
data = {
'response_type': 'code',
'client_id': line_notify_id,
'redirect_uri': webhook_url + '/notify',
'scope': 'notify',
'state': user_id,
'response_mode': 'form_post'
}
query_str = urllib.parse.urlencode(data)
return f'https://notify-bot.line.me/oauth/authorize?{query_str}'
def get_notify_token_by_auth_code(auth_code):
"""Get LINE Notify token by auth code.
:param str auth_code: The auth code.
:return str: Line notify token of the user.
"""
url = 'https://notify-bot.line.me/oauth/token'
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
data = {
'grant_type': 'authorization_code',
'code': auth_code,
'redirect_uri': webhook_url + '/notify',
'client_id': line_notify_id,
'client_secret': line_notify_secret
}
response = requests.post(url, data=data, headers=headers)
notify_token = response.json()['access_token']
return notify_token
def send_notify(txn, txn_type, line_notify_tokens):
message = ''
if txn_type == 'normal':
message = f"New Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Gas Price: {txn['gas_price']} Gwei\n" \
f"Value: {txn['eth_value']} ETH\n" \
f"Action: {txn['action']}\n" \
f"------------------------------------\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"{txn['txn_url']}"
elif txn_type == 'internal':
message = f"New Internal Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Value: {txn['eth_value']} ETH\n" \
f"------------------------------------\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"{txn['txn_url']}"
elif txn_type == 'erc20':
message = f"New ERC20 Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Gas Price: {txn['gas_price']} Gwei\n" \
f"ETH Spend: {txn['spend_value']}\n" \
f"Token Value: {txn['value']} {txn['token_symbol']}\n" \
f"------------------------------------\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"Token Balance: {txn['token_balance']['balance']} {txn['token_symbol']}\n" \
f"{txn['txn_url']}"
elif txn_type == 'erc721':
message = f"New NFT Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Gas Price: {txn['gas_price']} Gwei\n" \
f"ETH Spend: {txn['spend_value']}\n" \
f"------------------------------------\n" \
f"NFT: {txn['token_name']} #{txn['token_id']}\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"{txn['txn_url']}"
elif txn_type == 'internal_721':
message = f"New NFT SOLD Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Sell price: {txn['receive_value']}\n" \
f"Sold NFT: {txn['token_name']} #{txn['token_id']}\n" \
f"------------------------------------\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"{txn['txn_url']}"
elif txn_type == 'normal_internal':
message = f"New Swap Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Gas Price: {txn['gas_price']} Gwei\n" \
f"ETH Spend: {txn['eth_value']}\n" \
f"ETH Receive: {txn['receive_value']}\n" \
f"------------------------------------\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"{txn['txn_url']}"
elif txn_type == 'normal_internal_20':
message = f"New Swap Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Gas Price: {txn['gas_price']} Gwei\n" \
f"Token Spend: {txn['value']} {txn['token_symbol']}\n" \
f"ETH Receive: {txn['receive_value']}\n" \
f"------------------------------------\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"Token Balance: {txn['token_balance']['balance']} {txn['token_symbol']}\n" \
f"{txn['txn_url']}"
elif txn_type == 'erc20_721':
message = f"New NFT Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Gas Price: {txn['gas_price']} Gwei\n" \
f"Token Value: {txn['value']} {txn['token_symbol']}\n" \
f"NFT: {txn['token_name']} #{txn['token_id']}\n" \
f"------------------------------------\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"Token Balance: {txn['token_balance']['balance']} {txn['token_symbol']}\n" \
f"{txn['txn_url']}"
elif txn_type == 'normal_20_721':
message = f"New NFT Transaction Found!\n" \
f"------------------------------------\n" \
f"From: {txn['from']}\n" \
f"To: {txn['to']}\n" \
f"Time: {txn['time']}\n" \
f"Gas Price: {txn['gas_price']} Gwei\n" \
f"ETH Spend: {txn['spend_value']}\n" \
f"Action: {txn['action']}\n" \
f"------------------------------------\n" \
f"Token Value: {txn['erc20_value']} {txn['token_symbol']}\n" \
f"NFT: {txn['token_name']} #{txn['token_id']}\n" \
f"------------------------------------\n" \
f"Current Balance: {txn['wallet_balance']} ETH\n" \
f"Token Balance: {txn['token_balance']['balance']} {txn['token_symbol']}\n" \
f"{txn['txn_url']}"
for token in line_notify_tokens:
if '721' in txn_type:
send_image_message(message, txn['nft_image_path'], token)
else:
send_message(message, token)