-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
106 lines (79 loc) · 3.02 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from datetime import timedelta, datetime
from time import sleep
import jwt
import requests
from django.conf import settings
from main_app.models import Build
def generate_app_token():
payload = {
# issued at time
'iat': int(datetime.now().timestamp()),
# JWT expiration time (10 minute maximum)
'exp': int((datetime.now() + timedelta(minutes=10)).timestamp()),
# GitHub App's identifier
# 'iss': 73186
'iss': settings.GITHUB_APP_ID
}
# key = open('batchbuilder.private-key.pem').read()
key = open(settings.GITHUB_APP_PRIVATE_KEY_PATH).read()
token = jwt.encode(payload, key, 'RS256')
return token.decode()
def generate_installation_token(installation_id):
app_token = generate_app_token()
response = requests.post(
f'https://api.github.com/app/installations/{installation_id}/access_tokens',
headers={
'Accept': 'application/vnd.github.machine-man-preview+json',
'Authorization': f'Bearer {app_token}'
})
return response.json()['token']
def read_config_file(repo):
content = repo.get_contents('.batch.yml', 'refs/heads/master').decoded_content.decode()
size = 8
bisection = True
stop_at = 4
for line in content.split('\n'):
if 'size:' in line:
size = line.split(':')[1].strip()
if 'bisection:' in line:
bisection = line.split(':')[1].strip()
if 'stop_at:' in line:
stop_at = line.split(':')[1].strip()
return {
'size': size,
'bisection': bisection,
'stop_at': stop_at
}
def get_batch_status(repo, token):
url = f'https://api.github.com/repos/{repo.owner.login}/{repo.name}/commits/batch/check-runs'
response = requests.get(
url,
headers={
'Accept': 'application/vnd.github.antiope-preview+json',
'Authorization': f'Bearer {token}'
})
if response.json()['check_runs'] and response.json()['check_runs'][0]['conclusion']:
return response.json()['check_runs'][0]
def set_master_status(repo, token, check_run):
url = f'https://api.github.com/repos/{repo.owner.login}/{repo.name}/check-runs'
for build in Build.objects.filter(repo_id=repo.id):
response = requests.post(
url,
json={
'name': check_run['name'],
'head_sha': build.head_commit,
'details_url': check_run['details_url'],
'external_id': check_run['external_id'],
'status': check_run['status'],
'started_at': check_run['started_at'],
'conclusion': check_run['conclusion'],
'completed_at': check_run['completed_at'],
'output': check_run['output'],
},
headers={
'Accept': 'application/vnd.github.antiope-preview+json',
'Authorization': f'Bearer {token}'
})
if response.status_code == 201:
build.delete()
sleep(1)