-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
356 lines (308 loc) · 12.9 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
from typing import List, Union
from fastapi import Body, Depends, FastAPI, File, Form, HTTPException, Query,Path, Request, UploadFile
from fastapi.staticfiles import StaticFiles
import mysql.connector
from mysql.connector import Error
from fastapi.responses import HTMLResponse
from config import connection_config
from pydantic import BaseModel
app = FastAPI()
app.mount("/assets", StaticFiles(directory="assets"), name="assets")
def save_uploaded_file(file_name: str, file_contents: bytes):
with open(f"assets/{file_name}", "wb") as f:
f.write(file_contents)
# Establish MySQL connection
connection = mysql.connector.connect(**connection_config)
if connection.is_connected():
print("Connected to MySQL database")
else:
print("Failed to connect to MySQL database")
current_username = "name"
def username_is_admin(username):
try:
cursor = connection.cursor()
cursor.execute(
"SELECT isAdmin FROM customers WHERE name = %s", (username,)
)
result = cursor.fetchone()
if result and result[0]:
return True
else:
raise HTTPException(
status_code=403, detail="User is not an admin"
)
except Error as e:
print("Error checking admin status:", e)
raise HTTPException(
status_code=500, detail="Internal Server Error"
)
finally:
if "cursor" in locals():
cursor.close()
class Customer(BaseModel):
id: int
name: str
age: int
occupation_name: str
isSaint: bool
password: str
isAdmin: bool
image_path: str
class LoginDetails(BaseModel):
username: str
password: str
@app.post("/login")
async def login(login_details: LoginDetails = Body(...)):
global current_username
try:
cursor = connection.cursor()
cursor.execute("SELECT * FROM customers WHERE name = %s AND password = %s", (login_details.username, login_details.password))
customer = cursor.fetchone()
current_username = login_details.username
if customer:
# If the customer exists in the database and the password matches
return {"message": "Login successful"}
else:
# If the customer does not exist in the database or the password does not match
raise HTTPException(status_code=401, detail="Unauthorized")
except Error as e:
print("Error during login:", e)
raise HTTPException(status_code=500, detail="Internal Server Error")
finally:
if 'cursor' in locals():
cursor.close()
@app.get("/")
async def index():
return "Ahalan! You can fetch some json by navigating to '/json'"
@app.get("/upload", response_class=HTMLResponse)
async def upload_form(request: Request):
form_html = """
<html>
<head>
<title>File Upload Form</title>
</head>
<body>
<h2>Upload File</h2>
<form action="/upload" method="post" enctype="multipart/form-data">
<label for="file_name">Saints Name:</label><br>
<input type="text" id="file_name" name="file_name"><br>
<label for="image">Select a image:</label><br>
<input type="file" id="image" name="image"><br><br>
<input type="submit" value="Submit">
</form>
</body>
</html>
"""
return HTMLResponse(content=form_html, status_code=200)
@app.post("/upload")
async def upload_saint_with_image(file_name: str = Form(...), image: UploadFile = File(...)):
try:
cursor = connection.cursor()
# Check if the provided name exists in the database
cursor.execute("SELECT name FROM customers WHERE name = %s", (file_name,))
customer = cursor.fetchone()
if customer:
# Save the uploaded image to the /assets directory
save_uploaded_file(image.filename, await image.read())
# Update the record with the image_path
cursor.execute("UPDATE customers SET image_path = %s WHERE name = %s", (image.filename, file_name))
connection.commit()
return {"message": "Image uploaded successfully and linked to the customer."}
else:
return {"message": f"Customer with name '{file_name}' does not exist."}
except Error as e:
print("Error uploading image:", e)
raise HTTPException(status_code=500, detail="Failed to upload image")
finally:
if cursor:
cursor.close()
@app.get("/data", response_model=List[Customer])
async def get_data():
try:
cursor = connection.cursor()
cursor.execute("SELECT * FROM customers")
customers_data = cursor.fetchall()
customers_list = []
for customer in customers_data:
customer_dict = {
"id": customer[0],
"name": customer[1],
"age": customer[2],
"occupation_name": customer[3],
"isSaint": bool(customer[4]),
"password": customer[5],
"isAdmin": bool(customer[6]),
"image_path": customer[7]
}
customers_list.append(Customer(**customer_dict))
return customers_list
except Error as e:
print("Error retrieving customer data from MySQL database:", e)
raise HTTPException(status_code=500, detail="Failed to retrieve customer data from database")
finally:
if 'cursor' in locals():
cursor.close()
@app.get("/saints", response_model=List[Customer])
async def get_saints():
try:
cursor = connection.cursor()
cursor.execute("SELECT * FROM customers WHERE isSaint = 1")
saints = cursor.fetchall()
saint_list = []
for saint in saints:
saint_dict = {
"id": saint[0],
"name": saint[1],
"age": saint[2],
"occupation_name": saint[3],
"isSaint": bool(saint[4]),
"password": saint[5],
"isAdmin": bool(saint[6]),
"image_path": saint[7]
}
saint_list.append(Customer(**saint_dict))
return saint_list
except Error as e:
print("Error retrieving saints from MySQL database:", e)
raise HTTPException(status_code=500, detail="Failed to retrieve saints from database")
finally:
if 'cursor' in locals():
cursor.close()
@app.get("/who", response_model=Union[Customer, str])
async def get_customer(name: str = Query(None, min_length=2, max_length=11)):
try:
cursor = connection.cursor()
cursor.execute("SELECT * FROM customers WHERE name = %s", (name,))
customer = cursor.fetchone()
if customer:
customer_dict = {
"id": customer[0],
"name": customer[1],
"age": customer[2],
"occupation_name": customer[3],
"isSaint": bool(customer[4])
}
return customer_dict
else:
return "No such customer"
except Error as e:
print("Error retrieving customer from MySQL database:", e)
raise HTTPException(status_code=500, detail="Failed to retrieve customer from database")
finally:
if 'cursor' in locals():
cursor.close()
@app.post("/saints")
async def add_saint(saint: Customer):
try:
cursor = connection.cursor()
cursor.execute("""
INSERT INTO customers (id, name, age, occupation_name, isSaint, password, isAdmin)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (saint.id, saint.name, saint.age, saint.occupation_name, saint.isSaint, saint.password, saint.isAdmin))
connection.commit()
print("Saint added successfully to MySQL database")
return {"message": "Saint added successfully"}
except Error as e:
print("Error inserting data into MySQL table:", e)
raise HTTPException(status_code=500, detail="Failed to add Saint to database")
finally:
if 'cursor' in locals():
cursor.close()
@app.get("/short-desc", response_class=HTMLResponse)
async def get_short_desc():
try:
cursor = connection.cursor()
cursor.execute("SELECT name, occupation_name, id, image_path FROM customers")
customers_data = cursor.fetchall()
html_content = "<html><head><title>Customer Short Description</title></head><body><table border='1'><tr><th>Name</th><th>Occupation</th><th>ID</th><th>Image</th></tr>"
for customer in customers_data:
html_content += f"<tr><td>{customer[0]}</td><td>{customer[1]}</td><td>{customer[2]}</td><td><img src='/assets/{customer[3]}' width='100' height='100'></td></tr>"
html_content += "</table></body></html>"
return html_content
except Error as e:
print("Error retrieving customer data from MySQL database:", e)
raise HTTPException(status_code=500, detail="Failed to retrieve customer data from database")
finally:
if 'cursor' in locals():
cursor.close()
def get_cursor():
return connection.cursor()
@app.get("/admin/saint/age/{min_age}/{max_age}", response_model=List[Customer])
async def get_saints_in_age_range(min_age: int = Path(..., title="Minimum Age", ge=0), max_age: int = Path(..., title="Maximum Age", ge=0), cursor = Depends(get_cursor)):
try:
is_admin = username_is_admin(current_username)
if is_admin:
cursor.execute("SELECT * FROM customers WHERE isSaint = 1 AND age BETWEEN %s AND %s", (min_age, max_age))
saints = cursor.fetchall()
saint_list = []
for saint in saints:
saint_dict = {
"id": saint[0],
"name": saint[1],
"age": saint[2],
"occupation_name": saint[3],
"isSaint": bool(saint[4]),
"password": saint[5],
"isAdmin": bool(saint[6])
}
saint_list.append(saint_dict)
return saint_list
except Error as e:
raise HTTPException(status_code=500, detail="Failed to retrieve saints from database")
@app.get("/admin/notsaint/age/{min_age}/{max_age}", response_model=List[Customer])
async def get_notsaints_in_age_range(min_age: int = Path(..., title="Minimum Age", ge=0), max_age: int = Path(..., title="Maximum Age", ge=0), cursor = Depends(get_cursor)):
try:
is_admin = username_is_admin(current_username)
if is_admin:
cursor.execute("SELECT * FROM customers WHERE isSaint = 0 AND age BETWEEN %s AND %s", (min_age, max_age))
notsaints = cursor.fetchall()
notsaint_list = []
for notsaint in notsaints:
notsaint_dict = {
"id": notsaint[0],
"name": notsaint[1],
"age": notsaint[2],
"occupation_name": notsaint[3],
"isSaint": bool(notsaint[4]),
"password": notsaint[5],
"isAdmin": bool(notsaint[6])
}
notsaint_list.append(notsaint_dict)
return notsaint_list
except Error as e:
raise HTTPException(status_code=500, detail="Failed to retrieve not saints from database")
@app.get("/admin/name/{search_name}", response_model=List[Customer])
async def get_saints_by_name(search_name: str = Path(..., title="Search Name"), cursor = Depends(get_cursor)):
try:
is_admin = username_is_admin(current_username)
if is_admin:
cursor.execute("SELECT * FROM customers WHERE isSaint = 1 AND name LIKE %s", (f"%{search_name}%",))
saints = cursor.fetchall()
saint_list = []
for saint in saints:
saint_dict = {
"id": saint[0],
"name": saint[1],
"age": saint[2],
"occupation_name": saint[3],
"isSaint": bool(saint[4])
}
saint_list.append(saint_dict)
return saint_list
except Error as e:
raise HTTPException(status_code=500, detail="Failed to retrieve saints from database")
@app.get("/admin/average", response_model=Customer)
async def get_average_ages(cursor = Depends(get_cursor)):
try:
is_admin = username_is_admin(current_username)
if is_admin:
cursor.execute("SELECT AVG(age) FROM customers WHERE isSaint = 1")
saint_avg_age = cursor.fetchone()[0]
cursor.execute("SELECT AVG(age) FROM customers WHERE isSaint = 0")
notsaint_avg_age = cursor.fetchone()[0]
return {"saint_avg_age": saint_avg_age, "notsaint_avg_age": notsaint_avg_age}
except Error as e:
raise HTTPException(status_code=500, detail="Failed to retrieve average ages from database")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="localhost", port=8000)