-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathlogstash_filter_run.py
86 lines (80 loc) · 2.7 KB
/
logstash_filter_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import json
import os
from genericpath import exists
from os.path import join
from shutil import rmtree
from subprocess import Popen, PIPE
from tempfile import mkdtemp
LOGSTASH_BIN_ALTERNATIVES = [
'/opt/logstash/bin/logstash',
'/usr/share/logstash/bin/logstash',
]
PIPELINES_YML = """\
- pipeline.id: my-pipeline
path.config: "{}"
pipeline.workers: 1
"""
INPUT_OUTPUT_CONF = """\
input {
stdin {
codec => "json_lines"
}
}
output {
file {
path => "%s"
}
}
"""
def logstash_filter_run(inputs, filter_def, logstash_bin=None, remove_tempdir=True):
"""
Run a bunch of json through logstash given the filter definition
:param inputs: a list of dicts
:param filter_def: logstash filter definition as a string
:param logstash_bin: logstash executable path. By default will try
LOGSTASH_BIN_ALTERNATIVES
:param remove_tempdir: remove temporary working directory after done
:return: a list of dicts, the results
"""
input_jsons = [json.dumps(d) for d in inputs]
assert all(s[0] == '{' for s in input_jsons), "inputs must be a list of dicts"
if logstash_bin is None:
for fn in LOGSTASH_BIN_ALTERNATIVES:
if exists(fn):
logstash_bin = fn
break
else:
raise RuntimeError("Couldn't find logstash executable")
workdir = mkdtemp(prefix='logstash-test-')
data_dir = join(workdir, 'data')
config_dir = join(workdir, 'config')
pipeline_dir = join(workdir, 'pipeline.d')
os.mkdir(data_dir)
os.mkdir(config_dir)
os.mkdir(pipeline_dir)
open(join(config_dir, 'logstash.yml'), 'w').close()
with open(join(config_dir, 'pipelines.yml'), 'w') as f:
f.write(PIPELINES_YML.format(pipeline_dir))
output_fn = join(workdir, 'output')
with open(join(pipeline_dir, 'io.conf'), 'w') as f:
f.write(INPUT_OUTPUT_CONF % output_fn)
with open(join(pipeline_dir, 'filter.conf'), 'w') as f:
f.write(filter_def)
inputs_s = ''.join(s+'\n' for s in input_jsons)
args = [logstash_bin, '--log.level=warn',
'--path.settings', config_dir, '--path.data', data_dir]
print(' '.join(args))
popen = Popen(args, stdin=PIPE)
popen.communicate(inputs_s.encode('utf8'))
rc = popen.wait()
if rc != 0:
raise RuntimeError("logstash returned non-zero return code {}"
.format(rc))
output_lines = list(open(output_fn))
if len(output_lines) != len(inputs):
raise RuntimeError("Received {} outputs, expecting {}"
.format(len(output_lines), len(inputs)))
outputs = [json.loads(line) for line in output_lines]
if remove_tempdir:
rmtree(workdir)
return outputs