-
Notifications
You must be signed in to change notification settings - Fork 0
/
trigger_cdf_pipeline.py
73 lines (54 loc) · 2.65 KB
/
trigger_cdf_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from google.cloud import pubsub_v1
import subprocess
import requests
import json
import os
import time
def get_access_token():
scopes='https://www.googleapis.com/auth/cloud-platform'
headers={'Metadata-Flavor': 'Google'}
api="http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes=" + scopes
print(api)
r = requests.get(api,headers=headers).json()
return r['access_token']
def run_job(data, context):
## set vars
PROJECT_ID=os.environ.get('PROJECT_ID', 'Specified environment variable is not set.')
TOPIC_ID=os.environ.get('TOPIC_ID', 'Specified environment variable is not set.')
PIPELINE_NAME=os.environ.get('PIPELINE_NAME', 'Specified environment variable is not set.')
INSTANCE_ID=os.environ.get('INSTANCE_ID', 'Specified environment variable is not set.')
REGION=os.environ.get('REGION', 'Specified environment variable is not set.')
NAMESPACE_ID=os.environ.get('NAMESPACE_ID', 'Specified environment variable is not set.')
CDAP_ENDPOINT=os.environ.get('CDAP_ENDPOINT', 'Specified environment variable is not set.')
## get uploaded file name
default_file_name = data['name']
## gcs bucket were the file resides
bucket_name = data['bucket']
## setup pubsub client
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
## access token
auth_token=get_access_token()
## assemble the api call
post_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/start"
## any macros setup here
data = '{"my-file":' + default_file_name +'}'
## add bearer token
post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
## start the job
r1 = requests.post(post_endpoint,data=data,headers=post_headers)
time.sleep(10)
## get the job run_id
get_endpoint = CDAP_ENDPOINT + "/v3/namespaces/" + NAMESPACE_ID + "/apps/" + PIPELINE_NAME + "/workflows/DataPipelineWorkflow/runs"
get_headers = {"Authorization": "Bearer " + auth_token,"Content-Type": "application/json"}
## get the status
r2 = requests.get(get_endpoint,headers=get_headers)
## get the response dictionary
response_dict = r2.json()
## get json record for a job that is 'starting'
running_job = [x for x in response_dict if x['status'] == 'STARTING']
## extract the runid
job=running_job[0]
## publish job details to pubsub
pubsub_msg = json.dumps(job).encode('utf-8')
published_msg = publisher.publish(topic_path, data=pubsub_msg)