forked from Bears-R-Us/arkouda
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExternalIntegration.chpl
549 lines (484 loc) · 23.5 KB
/
ExternalIntegration.chpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
module ExternalIntegration {
use Curl;
use URL;
use Reflection;
use FileIO;
use Logging;
use ServerConfig;
use ServerErrors;
private config const logLevel = ServerConfig.logLevel;
private config const logChannel = ServerConfig.logChannel;
const eiLogger = new Logger(logLevel, logChannel);
private config const systemType = SystemType.NONE;
/*
* libcurl C constants required to configure the Curl core
* of HttpChannel objects.
*/
extern const CURLOPT_VERBOSE:CURLoption;
extern const CURLOPT_USERNAME:CURLoption;
extern const CURLOPT_PASSWORD:CURLoption;
extern const CURLOPT_USE_SSL:CURLoption;
extern const CURLOPT_SSLCERT:CURLoption;
extern const CURLOPT_SSLKEY:CURLoption;
extern const CURLOPT_KEYPASSWD:CURLoption;
extern const CURLOPT_SSLCERTTYPE:CURLoption;
extern const CURLOPT_CAPATH:CURLoption;
extern const CURLOPT_CAINFO:CURLoption;
extern const CURLOPT_URL:CURLoption;
extern const CURLOPT_HTTPHEADER:CURLoption;
extern const CURLOPT_POSTFIELDS:CURLoption;
extern const CURLOPT_CUSTOMREQUEST:CURLoption;
extern const CURLOPT_FAILONERROR:CURLoption;
extern const CURLINFO_RESPONSE_CODE:CURLoption;
extern const CURLOPT_SSL_VERIFYPEER:CURLoption;
/*
* Enum specifies the type of external system Arkouda will integrate with.
*/
enum SystemType{KUBERNETES,REDIS,CONSUL,NONE};
/*
* Enum describing the type of channel used to write to an
* external system.
*/
enum ChannelType{STDOUT,FILE,HTTP};
/*
* Enum specifies if the service endpoint is the Arkouda client or metrics
* socket
*/
enum ServiceEndpoint{ARKOUDA_CLIENT,METRICS};
/*
* Enum specifies the request type used to write to an external system
* via HTTP.
*/
enum HttpRequestType{POST,PUT,PATCH,DELETE};
/*
* Enum specifies the request format used to write to an external system
* via HTTP.
*/
enum HttpRequestFormat{TEXT,JSON,MULTIPART};
/*
* Retrieves the host ip address of the locale 0 arkouda_server process, which is
* useful for registering Arkouda with cloud environments such as Kubernetes.
*/
proc getConnectHostIp() throws {
var hostip: string;
on Locales[0] {
var ipString = getLineFromFile('/etc/hosts',getConnectHostname());
try {
var splits = ipString.split();
hostip = splits[0]:string;
hostip.split();
} catch (e: Error) {
throw new IllegalArgumentError(
"invalid hostname -> ip address entry in /etc/hosts %?".doFormat(
e));
}
}
return hostip;
}
/*
* Base class defining the Arkouda Channel interface consisting of a
* write method that writes a payload to an external system.
*/
class Channel {
proc write(payload : string) throws {
throw new owned Error("All derived classes must implement write");
}
}
/*
* The FileChannel class writes a payload out to a file, either by appending
* or overwriting an existing file or creating and writing to a new file.
*/
class FileChannel : Channel {
var path: string;
var append: bool;
proc init(params: FileChannelParams) {
super.init();
this.path = params.path;
this.append = params.append;
}
override proc write(payload: string) throws {
if append {
appendFile(path, payload);
} else {
writeToFile(path, payload);
}
}
}
/*
* The HttpChannel class writes a payload out to an HTTP/S endpoint
* in a configurable format via a configurable request type.
*/
class HttpChannel : Channel {
var url: string;
var requestType: HttpRequestType;
var requestFormat: HttpRequestFormat;
var ssl: bool = false;
var sslKey: string;
var sslCert: string;
var sslCacert: string;
var sslCapath: string;
var sslKeyPasswd: string;
proc init(params: HttpChannelParams) {
super.init();
this.url = params.url;
this.requestType = params.requestType;
this.requestFormat = params.requestFormat;
this.ssl = params.ssl;
if this.ssl {
this.sslKey = params.sslKey;
this.sslCert = params.sslCert;
this.sslCacert = params.sslCacert;
this.sslCapath = params.sslCapath;
this.sslKeyPasswd = params.sslKeyPasswd;
}
}
proc configureSsl(channel) throws {
Curl.setopt(channel, CURLOPT_USE_SSL, this.ssl);
Curl.setopt(channel, CURLOPT_SSLCERT, this.sslCert);
Curl.setopt(channel, CURLOPT_SSLKEY, this.sslKey);
Curl.setopt(channel, CURLOPT_KEYPASSWD, this.sslKeyPasswd);
Curl.setopt(channel, CURLOPT_SSL_VERIFYPEER, 0);
if logLevel == LogLevel.DEBUG {
Curl.setopt(channel, CURLOPT_VERBOSE, true);
}
}
proc generateHeader(channel) throws {
var args = new Curl.slist();
var format = this.requestFormat;
select(format) {
when HttpRequestFormat.JSON {
args.append("Accept: application/json");
if this.requestType == HttpRequestType.PATCH {
args.append('Content-Type: application/json-patch+json');
} else {
args.append("Content-Type: application/json");
}
}
when HttpRequestFormat.TEXT {
args.append("Accept: text/plain");
args.append("Content-Type: text/plain; charset=UTF-8");
}
otherwise {
throw new Error("Unsupported HttpFormat");
}
}
Curl.curl_easy_setopt(channel, CURLOPT_HTTPHEADER, args);
return args;
}
/*
* Writes the payload out to an HTTP/S endpoint in a format specified
* by the requestFormat instance attribute via the request type
* specified in the requestType instance attribute.
*/
override proc write(payload: string) throws {
var curl = Curl.curl_easy_init();
Curl.curl_easy_setopt(curl, CURLOPT_URL, this.url);
this.configureSsl(curl);
Curl.curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1);
var args = generateHeader(curl);
Curl.curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload);
Curl.curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, this.requestType:string);
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Configured HttpChannel for type %s format %s".doFormat(
this.requestType, this.requestFormat));
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Executing Http request with payload %s".doFormat(payload));
var ret = Curl.curl_easy_perform(curl);
if ret == 0 {
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Successfully executed Http request with payload %s".doFormat(payload));
} else {
if ret == 22 {
throw getErrorWithContext(getLineNumber(),getRoutineName(),getModuleName(),
"invalid request to overwrite existing entry with payload %s. Delete the existing entry first".doFormat(payload),
"ExternalSystemError");
} else {
throw getErrorWithContext(getLineNumber(),getRoutineName(),getModuleName(),
"request with payload %s returned error code %i".doFormat(payload,ret),
"ExternalSystemError");
}
}
args.free();
Curl.curl_easy_cleanup(curl);
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Closed HttpChannel");
}
}
/*
* Encapsulates config parameters needed to open and write to
* a channel connected to an external system.
*/
class ChannelParams {
var channelType: ChannelType;
}
/*
* Encapsulates config parameters needed to open and write to
* a channel connected to a file.
*/
class FileChannelParams : ChannelParams {
var path: string;
var append: bool;
proc init(channelType: ChannelType, path: string, append: bool=false) {
super.init(channelType);
this.path = path;
this.append = append;
}
}
/*
* Encapsulates config parameters needed to open and write to
* a HTTP or HTTPS connection.
*/
class HttpChannelParams : ChannelParams {
var url: string;
var requestType: HttpRequestType;
var requestFormat: HttpRequestFormat;
var ssl: bool = false;
var sslKey: string;
var sslCert: string;
var sslCacert: string;
var sslCapath: string;
var sslKeyPasswd: string;
proc init(channelType: ChannelType, url: string, requestType: HttpRequestType,
requestFormat: HttpRequestFormat, ssl: bool=false,
sslKey: string, sslCert: string, sslCacert: string, sslCapath: string,
sslKeyPasswd: string) {
super.init(channelType);
this.url = url;
this.requestType = requestType;
this.requestFormat = requestFormat;
this.ssl = ssl;
if this.ssl {
this.sslKey = sslKey;
this.sslCert = sslCert;
this.sslCacert = sslCacert;
this.sslCapath = sslCapath;
this.sslKeyPasswd = sslKeyPasswd;
}
}
}
/*
* Factory function used to retrieve a Channel based upon ChannelParams.
*/
proc getExternalChannel(params: borrowed ChannelParams) : Channel throws {
const channelType = params.channelType;
select(channelType) {
when ChannelType.FILE {
return new FileChannel(params: FileChannelParams);
}
when ChannelType.HTTP {
return new HttpChannel(params: HttpChannelParams);
}
otherwise {
throw new owned Error("Invalid channelType");
}
}
}
/*
* Registers Arkouda with Kubernetes by creating a Kubernetes Service--and an Endpoints
* if Arkouda is deployed outside of Kubernetes--to enable service discovery of Arkouda
* from applications deployed within Kubernetes.
*/
proc registerWithKubernetes(appName: string, serviceName: string,
servicePort: int, targetServicePort: int) throws {
if deployment == Deployment.KUBERNETES {
registerAsInternalService(appName, serviceName, servicePort, targetServicePort);
} else {
registerAsExternalService(serviceName, servicePort, targetServicePort);
}
proc generateEndpointCreateUrl() : string throws {
var k8sHost = ServerConfig.getEnv('K8S_HOST');
var namespace = ServerConfig.getEnv('NAMESPACE');
return '%s/api/v1/namespaces/%s/endpoints'.doFormat(k8sHost,namespace);
}
proc generateEndpointUpdateUrl() : string throws {
var k8sHost = ServerConfig.getEnv('K8S_HOST');
var namespace = ServerConfig.getEnv('NAMESPACE');
var name = ServerConfig.getEnv('ENDPOINT_NAME');
return '%s/api/v1/namespaces/%s/endpoints/%s'.doFormat(k8sHost,namespace,name);
}
proc generateServiceCreateUrl() : string throws {
var k8sHost = ServerConfig.getEnv('K8S_HOST');
var namespace = ServerConfig.getEnv(name='NAMESPACE',default='default');
return '%s/api/v1/namespaces/%s/services'.doFormat(k8sHost,namespace);
}
proc registerAsInternalService(appName: string, serviceName: string, servicePort: int,
targetPort: int) throws {
var serviceUrl = generateServiceCreateUrl();
var servicePayload = "".join('{"apiVersion": "v1","kind": "Service","metadata": ',
'{"name": "%s"},"spec": {"ports": [{"port": %i,' ,
'"protocol": "TCP","targetPort": %i}],"selector":',
' {"app":"%s"}}}').doFormat(
serviceName,
servicePort,
targetPort,
appName);
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Registering internal service via payload %s and url %s".doFormat(
servicePayload,serviceUrl));
var channel = getExternalChannel(new HttpChannelParams(
channelType=ChannelType.HTTP,
url=serviceUrl,
requestType=HttpRequestType.POST,
requestFormat=HttpRequestFormat.JSON,
ssl=true,
sslKey=ServerConfig.getEnv('KEY_FILE'),
sslCert=ServerConfig.getEnv('CERT_FILE'),
sslCacert=ServerConfig.getEnv('CACERT_FILE'),
sslCapath='',
sslKeyPasswd=ServerConfig.getEnv('KEY_PASSWD')));
channel.write(servicePayload);
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Registered internal service via payload %s and url %s".doFormat(
servicePayload,serviceUrl));
}
/*
* Registers Arkouda with Kubernetes by creating a Kubernetes Service and
* Endpoints which together enable service discovery of an Arkouda instance
* deployed outside of Kubernetes from applications deployed within Kubernetes.
*/
proc registerAsExternalService(serviceName: string, servicePort: int,
serviceTargetPort: int) throws {
// Create Kubernetes Service
var serviceUrl = generateServiceCreateUrl();
var servicePayload = "".join('{"apiVersion": "v1","kind": "Service","metadata": ',
'{"name": "%s"},"spec": {"ports": [{"port": %i,',
'"protocol": "TCP","targetPort": %i}]}}').doFormat(
serviceName,
servicePort,
serviceTargetPort);
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Registering external service via payload %s and url %s".doFormat(
servicePayload,serviceUrl));
var channel = getExternalChannel(new HttpChannelParams(
channelType=ChannelType.HTTP,
url=serviceUrl,
requestType=HttpRequestType.POST,
requestFormat=HttpRequestFormat.JSON,
ssl=true,
sslKey=ServerConfig.getEnv('KEY_FILE'),
sslCert=ServerConfig.getEnv('CERT_FILE'),
sslCacert=ServerConfig.getEnv('CACERT_FILE'),
sslCapath='',
sslKeyPasswd=ServerConfig.getEnv('KEY_PASSWD')));
channel.write(servicePayload);
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Registered external service via payload %s and url %s".doFormat(
servicePayload,serviceUrl));
// Create Kubernetes Endpoints
var endpointUrl = generateEndpointCreateUrl();
var endpointPayload = "".join('{"kind": "Endpoints","apiVersion": "v1",',
' "metadata": {"name": "%s"}, "subsets": ',
'[{"addresses": [{"ip": "%s"}],"ports": ',
'[{"port": %i, "protocol": "TCP"}]}]}').doFormat(
serviceName,
getConnectHostIp(),
servicePort);
channel = getExternalChannel(new HttpChannelParams(
channelType=ChannelType.HTTP,
url=endpointUrl,
requestType=HttpRequestType.POST,
requestFormat=HttpRequestFormat.JSON,
ssl=true,
sslKey=ServerConfig.getEnv('KEY_FILE'),
sslCert=ServerConfig.getEnv('CERT_FILE'),
sslCacert=ServerConfig.getEnv('CACERT_FILE'),
sslCapath='',
sslKeyPasswd=ServerConfig.getEnv('KEY_PASSWD')));
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Registering endpoint via payload %s and url %s".doFormat(
endpointPayload,endpointUrl));
channel.write(endpointPayload);
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Registered endpoint via payload %s and endpointUrl %s".doFormat(
endpointPayload,endpointUrl));
}
}
/*
* Removes the Kubernetes Service and, if applicable, Endpoints that compose the
* service endpoint that enables access to Arkouda deployed within or outside of
* Kubernetes from applications deployed within Kubernetes
*/
proc deregisterFromKubernetes(serviceName: string) throws {
proc generateServiceDeleteUrl(serviceName: string) throws {
var k8sHost = ServerConfig.getEnv('K8S_HOST');
var namespace = ServerConfig.getEnv('NAMESPACE');
return '%s/api/v1/namespaces/%s/services/%s'.doFormat(k8sHost,namespace,serviceName);
}
var url = generateServiceDeleteUrl(serviceName);
var channel = getExternalChannel(new HttpChannelParams(
channelType=ChannelType.HTTP,
url=url,
requestType=HttpRequestType.DELETE,
requestFormat=HttpRequestFormat.JSON,
ssl=true,
sslKey=ServerConfig.getEnv('KEY_FILE'),
sslCert=ServerConfig.getEnv('CERT_FILE'),
sslCacert=ServerConfig.getEnv('CACERT_FILE'),
sslCapath='',
sslKeyPasswd=ServerConfig.getEnv('KEY_PASSWD')));
channel.write('{}');
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Deregistered service %s from Kubernetes via url %s".doFormat(serviceName,
url));
}
proc getKubernetesRegistrationParameters(serviceEndpoint: ServiceEndpoint) throws {
var serviceName: string;
var servicePort: int;
var targetServicePort: int;
if serviceEndpoint == ServiceEndpoint.METRICS {
serviceName = ServerConfig.getEnv('METRICS_SERVICE_NAME');
servicePort = ServerConfig.getEnv('METRICS_SERVICE_PORT',
default='5556'):int;
servicePort = ServerConfig.getEnv('METRICS_SERVICE_TARGET_PORT',
default='5556'):int;
} else {
serviceName = ServerConfig.getEnv('EXTERNAL_SERVICE_NAME');
servicePort = ServerConfig.getEnv('EXTERNAL_SERVICE_PORT',
default='5555'):int;
targetServicePort = ServerConfig.getEnv('EXTERNAL_SERVICE_TARGET_PORT',
default='5555'):int;
}
return (serviceName,servicePort,targetServicePort);
}
proc getKubernetesDeregisterParameters(serviceEndpoint: ServiceEndpoint) throws {
if serviceEndpoint == ServiceEndpoint.METRICS {
return ServerConfig.getEnv('METRICS_SERVICE_NAME');
} else {
return ServerConfig.getEnv('EXTERNAL_SERVICE_NAME');
}
}
/*
* Registers Arkouda with an external system on startup, defaulting to none.
*/
proc registerWithExternalSystem(appName: string, endpoint: ServiceEndpoint) throws {
select systemType {
when SystemType.KUBERNETES {
var params: (string,int,int) = getKubernetesRegistrationParameters(endpoint);
registerWithKubernetes(appName, params(0), params(1), params(2));
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Registered Arkouda with Kubernetes");
}
otherwise {
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Did not register Arkouda with any external systems");
}
}
}
/*
* Deregisters Arkouda from an external system upon receipt of shutdown command
*/
proc deregisterFromExternalSystem(endpoint: ServiceEndpoint) throws {
var serviceName = getKubernetesDeregisterParameters(endpoint);
select systemType {
when SystemType.KUBERNETES {
deregisterFromKubernetes(serviceName);
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Deregistered service %s from Kubernetes".doFormat(serviceName));
}
otherwise {
eiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),
"Did not deregister Arkouda from any external system");
}
}
}
}