-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.py
72 lines (60 loc) · 2.5 KB
/
handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import json,os
import requests
from flask import Flask, request, jsonify
from datetime import datetime
import logging,json
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s')
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
# Receive the JSON payload
payload = request.json
try:
current_time = datetime.utcnow().isoformat() + "Z"
# Perform field mapping for CloudEvent.io compliance
cloud_event = {
"specversion": "1.0",
"type": "my.very.own.cloudevent."f"{payload['action']}.v0",
"datacontenttype": "application/json",
"source": "/kn-py-cloudevent-transformation-function",
"id": payload["command_id"],
"time": current_time,
"data": {
"action": payload["action"],
"example": {
"attendee": {
"feelings": payload["example"]["attendee"]["feelings"]
}
},
"actuator": {
"event": {
"organization": payload["actuator"]["event"]["organization"]
}
},
"next": {
"up": payload["next"]["up"]
},
"command_id": payload["command_id"]
}
}
# Convert the CloudEvent payload to JSON with ordered keys
json_output = json.dumps(cloud_event, indent=2, sort_keys=False)
# Send the CloudEvent payload to the target Sink/Broker
target = os.getenv("K_SINK")
if not target:
return jsonify({"error": "K_SINK environment variable not set"}), 500
headers = {"Content-Type": "application/cloudevents+json"}
print(cloud_event) # Print to stdout
response = requests.post(target, json=cloud_event, headers=headers)
if response.status_code >= 200 and response.status_code <= 299:
return jsonify({"message": "CloudEvent payload sent to SinkBinding"}), 200
else:
return jsonify({"error": "Failed to send CloudEvent payload to SinkBinding"}), 500
except KeyError as e:
error_message = f"Missing required field: {str(e)}"
return jsonify({"error": error_message}), 400
except Exception as e:
error_message = f"An error occurred: {str(e)}"
return jsonify({"error": error_message}), 500
if __name__ == '__main__':
app.run()