-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdrive_service.py
113 lines (101 loc) · 3.71 KB
/
drive_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import io
from google.auth.transport.requests import Request as GoogleRequest
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
SCOPES = [
"https://www.googleapis.com/auth/drive.metadata.readonly",
"https://www.googleapis.com/auth/drive.readonly",
]
class DriveService:
def __init__(self, token_file):
self.token_file = token_file
self.creds = Credentials.from_authorized_user_file(token_file, SCOPES)
if self.creds.expired and self.creds.refresh_token:
self.creds.refresh(GoogleRequest())
with open(token_file, "w") as token:
token.write(self.creds.to_json())
self.service = build("drive", "v3", credentials=self.creds)
def list_folders(self):
results = (
self.service.files()
.list(
q="mimeType='application/vnd.google-apps.folder'",
fields="nextPageToken, files(id, name)",
)
.execute()
)
return results.get("files", [])
def list_files(self, folder_id=None):
items = []
nextPageToken = None
fields = "nextPageToken, files(id, name)"
query = f"'{folder_id}' in parents" if folder_id else None
while True:
if query:
results = (
self.service.files()
.list(
pageToken=nextPageToken,
q=query,
fields=fields,
orderBy="createdTime desc",
)
.execute()
)
else:
results = (
self.service.files()
.list(
pageToken=nextPageToken,
fields=fields,
orderBy="createdTime desc",
)
.execute()
)
items += results.get("files", [])
nextPageToken = results.get("nextPageToken")
if not nextPageToken:
break
return items
def search(self, query):
items = []
nextPageToken = None
fields = "nextPageToken, files(id, name)"
query = f"fullText contains '{query}'"
while True:
results = (
self.service.files()
.list(
pageToken=nextPageToken,
q=query,
fields=fields,
)
.execute()
)
items.extend(results.get("files", []))
nextPageToken = results.get("nextPageToken")
if not nextPageToken:
break
return list(map(lambda x: x["name"], items))
def get_file(self, file_id):
return self.service.files().get_media(fileId=file_id).execute()
def download_media(self, file_id, buffer):
file_req = self.service.files().get_media(fileId=file_id)
downloader = MediaIoBaseDownload(buffer, file_req)
done = False
while not done:
_, done = downloader.next_chunk()
async def media_stream(self, file_id: str):
file_req = self.service.files().get_media(fileId=file_id)
with io.BytesIO() as buffer:
downloader = MediaIoBaseDownload(buffer, file_req)
done = False
read_position = 0
while not done:
buffer.seek(0, io.SEEK_END)
_, done = downloader.next_chunk()
buffer.seek(read_position)
chunk = buffer.read()
read_position = buffer.tell()
yield chunk