-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqt_nats_wiretap.py
65 lines (44 loc) · 1.59 KB
/
qt_nats_wiretap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import sys
import asyncio
from PyQt5.QtWidgets import (QWidget, QGridLayout, QApplication, QPlainTextEdit)
from PyQt5.QtCore import QDateTime
from quamash import QEventLoop
from nats.aio.client import Client as NATS
class QtWiretap(QWidget):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
grid = QGridLayout()
self.setLayout(grid)
self.txt = QPlainTextEdit()
self.txt.setReadOnly(True)
self.txt.setStyleSheet("background-color: #003366; color: #CCFFCC")
# self.txt.setStyleSheet("background-color: #BE2625; color: #CCFFCC")
grid.addWidget(self.txt)
self.move(300, 150)
self.setWindowTitle(f"Qt Nats Wiretap - {sys.argv[3]}")
self.show()
def insertText(self, new_text):
self.txt.insertPlainText(f'{QDateTime.currentDateTime().toString()} {new_text} \n')
self.txt.moveCursor(1)
def wire_tap(msg):
ex.insertText(msg.data.decode())
def main(loop, subject):
nc = NATS()
yield from nc.connect(f"{sys.argv[1]}:{sys.argv[2]}", loop=loop)
async def mh_s1(msg):
await wire_tap(msg)
yield from nc.subscribe(subject, cb=wire_tap)
if __name__ == '__main__':
if len(sys.argv) != 4:
print("Usage: python nats_scrolling_plaintext_edit.py [host] [port] [queue]")
else:
app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
loop.run_until_complete(main(loop, subject=sys.argv[3]))
ex = QtWiretap()
loop.run_forever()