-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple_tracker.py
246 lines (222 loc) · 8.5 KB
/
simple_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# Import necessary libraries and classes
import argparse
import time
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
from amazon_config import NAME, CURRENCY, MAX_PRICE, MIN_PRICE
from amazon_config import (
get_web_driver_options,
get_chrome_web_driver,
set_ignore_certificate_error,
set_browser_as_incognito,
set_automation_as_head_less,
NAME,
CURRENCY,
FILTERS,
BASE_URL,
DIRECTORY
)
import json
from datetime import datetime
# Argument parser
parser = argparse.ArgumentParser(description='Amazon Price Tracker')
parser.add_argument('--name', type=str, help='Product name to track')
parser.add_argument('--currency', type=str, help='Currency symbol')
parser.add_argument('--min_price', type=str, help='Minimum price')
parser.add_argument('--max_price', type=str, help='Maximum price')
args = parser.parse_args()
NAME = args.name or NAME
CURRENCY = args.currency or CURRENCY
MIN_PRICE = args.min_price or MIN_PRICE
MAX_PRICE = args.max_price or MAX_PRICE
FILTERS = {
'min': MIN_PRICE,
'max': MAX_PRICE
}
# Define a class for generating a report
class GenerateReport:
def __init__(self, file_name, filters, base_link, currency, data):
self.file_name = file_name
self.filters = filters
self.base_link = base_link
self.currency = currency
self.data = data
report = {
'title': self.file_name,
'date': self.get_now(),
'best_item': self.get_best_item(),
'currency': self.currency,
'filters': self.filters,
'base_link': self.base_link,
'products': self.data
}
print("Creating report for item...")
# Write the report to a JSON file
with open(f'{DIRECTORY}/{file_name}.json', 'w') as f:
json.dump(report, f)
print("...Done")
@staticmethod
def get_now():
# Get the current date and time
now = datetime.now()
return now.strftime("%d/%m/%Y %H:%M:%S")
def get_best_item(self):
if not self.data:
print("No data to sort.")
return None
try:
return sorted(self.data, key=lambda k: k['price'])[0]
except Exception as e:
print(e)
print("A problem occurred while sorting the items")
return None
# Define a class for interacting with the Amazon website using Selenium
class AmazonAPI:
def __init__(self, search_term, filters, base_url, currency):
self.base_url = base_url
self.search_term = search_term
# Set up Chrome webdriver options
options = get_web_driver_options()
set_ignore_certificate_error(options)
set_browser_as_incognito(options)
# Set up Chrome webdriver with options
self.driver = get_chrome_web_driver(options)
self.currency = currency
# Set up price filter for Amazon search
self.price_filter = f"&rh=p_36%3A{filters['min']}00-{filters['max']}00"
def run(self):
print("Starting Script...")
print(f"Looking for {self.search_term} products...")
# Get product links for the specified search term
links = self.get_product_links()
print(links)
if not links:
print("Stop")
return
print(f"Got {len(links)} product links")
print("Getting info about them...")
# Get product data for each product link
products = self.get_products_info(links)
print(f"Got info about {len(products)} products...")
# Quit the Chrome webdriver instance
self.driver.quit()
return products
def get_product_links(self):
# Open the Amazon website
self.driver.get(self.base_url)
# Enter the search term in the search bar
element = self.driver.find_element(By.ID, "twotabsearchtextbox")
element.send_keys(self.search_term)
element.send_keys(Keys.ENTER)
time.sleep(2)
# Add in the price filter to the search
self.driver.get(f'{self.driver.current_url}{self.price_filter}')
print(f"The URL: {self.driver.current_url}")
time.sleep(2)
# Get the links for each product on the page and return them
result_list = self.driver.find_elements(By.CLASS_NAME, 's-result-list')
links = []
try:
results = result_list[0].find_elements(By.XPATH,
"//div/div/div/div/div/div[2]/div/div/div[1]/h2/a")
links = [link.get_attribute('href') for link in results]
return links
except Exception as e:
print("Didn't get any products...")
print(e)
return links
def get_products_info(self, links):
asins = self.get_asins(links)
products = []
# For each product link, get its product data and append it to the products list
for asin in asins:
product = self.get_single_product_info(asin)
if product:
products.append(product)
return products
def get_asins(self, links):
# Get the ASIN for each product link
return [self.get_asin(link) for link in links]
def get_single_product_info(self, asin):
print(f"Product ID: {asin} - getting data")
# Get the product data for a single product with the specified ASIN
product_short_url = self.shorten_url(asin)
self.driver.get(f'{product_short_url}?language=en_GB')
time.sleep(2)
# Get the product title, seller, and price
title = self.get_title()
seller = self.get_seller()
price = self.get_price()
# If all the required product data is available, create a product_info dictionary and return it
if title and seller and price:
return {
'asin': asin,
'url': product_short_url,
'title': title,
'seller': seller,
'price': price
}
return None
def get_title(self):
# Get the title of a product
try:
return self.driver.find_element(By.ID, 'productTitle').text
except Exception as e:
print(e)
print(r"Can't get title of a product - {self.driver.current_url}")
return None
def get_seller(self):
# Get the seller of a product
try:
return self.driver.find_element(By.ID, 'bylineInfo').text
except Exception as e:
print(e)
print(r"Can't get seller of a product - {self.driver.current_url}")
return None
def get_price(self):
price = None
try:
price = self.driver.find_element(By.ID, 'apex_offerDisplay_desktop').text
price = self.convert_price(price)
except NoSuchElementException:
try:
availability = self.driver(By.ID, 'availability').text
if 'Available' in availability:
price = self.driver.find_element(By.ID, 'olp-padding-right').text
price = price[price.find(self.currency):]
price = self.convert_price(price)
except Exception as e:
print(e)
print(f"Can't get price of a product - {self.driver.current_url}")
return None
except Exception as e:
print(e)
print(f"Can't get price of a product - {self.driver.current_url}")
return None
return price
def shorten_url(self, asin):
# Shorten the product URL using the ASIN
return self.base_url + 'dp/' + asin
@staticmethod
def get_asin(product_link):
# Get the ASIN for a product from its URL
return product_link[product_link.find('/dp/') + 4:product_link.find('/ref')]
def convert_price(self, price):
# Convert the product price to a float
price = price.split(self.currency)[1]
try:
price = price.split("\n")[0] + "." + price.split("\n")[1]
except:
Exception()
try:
price = price.split(",")[0] + price.split(",")[1]
except:
Exception()
return float(price)
if __name__ == '__main__':
# Create an instance of AmazonAPI and run it to get product data
am = AmazonAPI(NAME, FILTERS, BASE_URL, CURRENCY)
data = am.run()
# Use the product data to generate a report
GenerateReport(NAME, FILTERS, BASE_URL, CURRENCY, data)