-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpayShieldToSyslog.py
797 lines (716 loc) · 33.2 KB
/
payShieldToSyslog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
# The aim of payShield2Syslog project is to gather the Audit log via the host command Q2,
# interpreter the response of the appliance and eventually send it to a syslog facility.
#
# Copyright (C) 2023 Marco Simone Zuppone - msz@msz.eu
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# Please refer to the LICENSE file for more information about licensing
# and to README.md file for more information about the usage of it
import argparse
import binascii
import logging.handlers
import socket
import ssl
import string
from pathlib import Path
from struct import *
from sys import exit # It is needed by the executable version
from types import FunctionType
from typing import Tuple, Dict
VERSION = "0.4.2"
# Begin Class
class PayConnector:
"""It represents the connection with the payShield host port. It supports tcp,udp and tls.
Attributes
----------
ssl_sock : SSLSocket
The SSLSocket in case of tls connection.
connection : socket
The connection. It should not be accessed directly
host : str
The host ip or hostname.
port : int
The tcp/udp port to connect with.
protocol: str
The protol to use to connect to the host. Can be only tcp, tls or udp.
connected: bool
When is true the connection has been established already and there is no need to open a new one.
When is False the connection needs to be opened.
keyfile : str
In case of tls protocol this is the full path of the client key file.
crtfile : str
In case of tls protocol this is the full path of the client certificate file.
context : ssl.SSLContext
The SSLContext object
"""
def __init__(self, host: str, port: int, protocol: str, keyfile: str = None, crtfile: str = None):
"""Constructor for the PayConnector class. It sets all the initial parameters.
Parameters
----------
host : str
The host ip or hostname.
port : int
The tcp/udp port to connect with.
protocol : str
The protol to use to connect to the host. Can be only tcp, tls or udp.
keyfile : str
In case of tls protocol this is the full path of the client key file.
crtfile : str
In case of tls protocol this is the full path of the client certificate file.
"""
self.keyfile = keyfile
self.crtfile = crtfile
self.ssl_sock = None
self.connection = None
self.context = None
# self.socket = None
self.host = host
self.port = port
self.protocol = protocol
self.connected = False
if protocol not in ['udp', 'tcp', 'tls']:
raise ValueError("protocol must me udp, tcp or ssl")
if protocol == 'tls':
if (keyfile is None) or (crtfile is None):
raise ValueError("keyfile and crtfile parameters are both required")
def sendCommand(self, host_command: str) -> bytes:
"""
sends the command specified in the parameter to the payShield and return the response.
If establishes the connection if it's not established yet, otherwise reuses the open connection
Parameters
----------
host_command : str
The command to send to the payshield host port.
Returns
-------
bytes
The response from the host.
"""
size = pack('>h', len(host_command))
# join everything together in python3
message = size + host_command.encode()
# Connect to the host and gather the reply in TCP or UDP
buffer_size = 4096
try:
if self.protocol == 'tcp':
if not self.connected:
self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connection.connect((self.host, self.port))
# send message
self.connection.send(message)
# receive data
data: bytes = self.connection.recv(buffer_size)
self.connected = True
return data
elif self.protocol == "tls":
# creates the TCP TLS socket
if not self.connected:
# Let's srt uo the context
self.context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.context.load_cert_chain(certfile=self.crtfile, keyfile=self.keyfile)
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_NONE
self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ssl_sock = self.context.wrap_socket(self.connection, server_side=False)
self.ssl_sock.connect((self.host, self.port))
# send message
self.ssl_sock.send(message)
# receive data
data: bytes = self.ssl_sock.recv(buffer_size)
self.connected = True
return data
elif self.protocol == 'udp':
if not self.connected:
# create the UDP socket
self.connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.connected = True
# send data
self.connection.sendto(message, (self.host, self.port))
# receive data
self.connection.settimeout(5)
data_tuple = self.connection.recvfrom(buffer_size)
data: bytes = data_tuple[0]
return data
except (ConnectionError, TimeoutError) as e:
print("Connection issue: ", e)
self.connected = False
except FileNotFoundError as e:
print("The client certificate file or the client key file cannot be found or accessed.\n" +
"Check value passed to the parameters --keyfile and --crtfile", e)
except Exception as e:
print("Unexpected issue: ", e)
self.connected = False
def close(self):
"""It invokes the close method of the connection
"""
if self.connected:
self.connection.close()
def __del__(self):
"""
Destructor for the PayConnector class.
It invokes the close method of the connection
"""
self.close()
# End Class
def decode_q2(response_to_decode: bytes, head_len: int, logger_instance=None):
"""
It decodes the result of the command Q2 and prints the meaning of the returned output
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
syslog_entry: the string to eventually send to syslog
"""
syslog_entry = ''
SPECIFIC_ERROR: Dict[str, str] = {'35': 'No Audit Records found',
'36': 'All Audit Records have been retrieved'}
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00': # No errors
str_pointer = str_pointer + 2
log_entry = response_to_decode[str_pointer:str_pointer + 80]
print("Log Entry in Hex: ", log_entry)
bin_entry = binascii.unhexlify(log_entry)
audit_counter = int(binascii.hexlify(bin_entry[0:4]).decode(), base=16)
print("Audit Counter: ", audit_counter)
syslog_entry = str(audit_counter)
data_value = binascii.hexlify(bin_entry[4:10]).decode()
date_readable = data_value[:2] + ':' + data_value[2:4] + ':' + data_value[4:6] + \
' ' + data_value[6:8] + '/' + data_value[8:10] + '/20' + \
data_value[10:12]
print("Date: ", date_readable)
syslog_entry = syslog_entry + " " + date_readable
command_action_code = bin_entry[10:12]
print("Action Code / Command Code", command_action_code.decode())
syslog_entry = syslog_entry + " " + command_action_code.decode()
bit_mask_str = str(bin(int(binascii.hexlify(bin_entry[12:14]).decode(), base=16))[2:])
print("Bit Mask", bit_mask_str)
command_code_type = bit_mask_str[0:2]
response_error_code = bin_entry[14:16].decode()
if command_code_type != '10': # It is not a fraud event
command_action_message = get_action_command_message(command_action_code.decode(), command_code_type)
else:
# In case of fraud event the command that caused the event is in the 'command action field' and the reaction
# to decode is contained in the response error code field
command_action_message = command_action_code.decode() + ' caused ' + \
get_action_command_message(response_error_code, command_code_type)
syslog_entry = syslog_entry + ' ' + command_action_message
if command_code_type == '00':
print("\tCommand code type: Host Command")
syslog_entry = syslog_entry + " " + "HOST"
elif command_code_type == '01':
print("\tCommand code type: Console Command")
syslog_entry = syslog_entry + " " + "CONS"
elif command_code_type == '10':
print("\tCommand code type: Fraud Event")
syslog_entry = syslog_entry + " " + "FRD"
elif command_code_type == '11':
print("\tCommand code type: User Action")
syslog_entry = syslog_entry + " " + "USER"
print("\tCommand/Action description:", command_action_message)
if bit_mask_str[2:3] == '0':
print("\tNot Archived")
syslog_entry = syslog_entry + " " + "NOTA"
else:
print("\tArchived")
syslog_entry = syslog_entry + " " + "ARCH"
if bit_mask_str[3:4] == '0':
print("\tNot Retrieved")
syslog_entry = syslog_entry + " " + "NOTR"
else:
print("\tRetrieved")
syslog_entry = syslog_entry + " " + "RETR"
print("\tUnused:", bit_mask_str[4:])
print("Response Error Code:", response_error_code)
audit_MAC = binascii.hexlify(bin_entry[16:16 + 8]).decode().upper()
print("Audit Record MAC:", audit_MAC)
syslog_entry = syslog_entry + " " + audit_MAC
random_key = binascii.hexlify(bin_entry[24:]).decode().upper()
print("Random MAC Key:", random_key)
syslog_entry = syslog_entry + " " + random_key
else:
if SPECIFIC_ERROR.get(response_to_decode[str_pointer:str_pointer + 2]) is not None:
print("Command specific error: ", SPECIFIC_ERROR.get(response_to_decode[str_pointer:str_pointer + 2]))
if logger_instance is not None:
logger_instance.info(syslog_entry)
return syslog_entry
def get_payshield_error_message(error_code: str) -> str:
"""This function maps the result code with the error message.
I derived the list of errors and messages from the following manual:
payShield 10K Core Host Commands v1
Revision: A
Date: 04 August 2020
Doc.Number: PUGD0537 - 004
Parameters
----------
error_code: str
The status code returned from the payShield 10k
Returns
----------
a string containing the message of the error code
"""
PAYSHIELD_ERROR_CODE = {
'00': 'No error',
'01': 'Verification failure or warning of imported key parity error',
'02': 'Key inappropriate length for algorithm',
'04': 'Invalid key type code',
'05': 'Invalid key length flag',
'10': 'Source key parity error',
'11': 'Destination key parity error or key all zeros',
'12': 'Contents of user storage not available. Reset, power-down or overwrite',
'13': 'Invalid LMK Identifier',
'14': 'PIN encrypted under LMK pair 02-03 is invalid',
'15': 'Invalid input data (invalid format, invalid characters, or not enough data provided)',
'16': 'Console or printer not ready or not connected',
'17': 'HSM not authorized, or operation prohibited by security settings',
'18': 'Document format definition not loaded',
'19': 'Specified Diebold Table is invalid',
'20': 'PIN block does not contain valid values',
'21': 'Invalid index value, or index/block count would cause an overflow condition',
'22': 'Invalid account number',
'23': 'Invalid PIN block format code. (Use includes where the security setting to implement PCI HSM '
'limitations on PIN Block format usage is applied, and a Host command attempts to convert a PIN Block '
'to a disallowed format.)',
'24': 'PIN is fewer than 4 or more than 12 digits in length',
'25': 'Decimalization Table error',
'26': 'Invalid key scheme',
'27': 'Incompatible key length',
'28': 'Invalid key type',
'29': 'Key function not permitted',
'30': 'Invalid reference number',
'31': 'Insufficient solicitation entries for batch',
'32': 'AES not licensed',
'33': 'LMK key change storage is corrupted',
'39': 'Fraud detection',
'40': 'Invalid checksum',
'41': 'Internal hardware/software error: bad RAM, invalid error codes, etc.',
'42': 'DES failure',
'43': 'RSA Key Generation Failure',
'46': 'Invalid tag for encrypted PIN',
'47': 'Algorithm not licensed',
'49': 'Private key error, report to supervisor',
'51': 'Invalid message header',
'65': 'Transaction Key Scheme set to None',
'67': 'Command not licensed',
'68': 'Command has been disabled',
'69': 'PIN block format has been disabled',
'74': 'Invalid digest info syntax (no hash mode only)',
'75': 'Single length key masquerading as double or triple length key',
'76': 'RSA public key length error or RSA encrypted data length error',
'77': 'Clear data block error',
'78': 'Private key length error',
'79': 'Hash algorithm object identifier error',
'80': 'Data length error. The amount of MAC data (or other data) is greater than or less than the expected '
'amount.',
'81': 'Invalid certificate header',
'82': 'Invalid check value length',
'83': 'Key block format error',
'84': 'Key block check value error',
'85': 'Invalid OAEP Mask Generation Function',
'86': 'Invalid OAEP MGF Hash Function',
'87': 'OAEP Parameter Error',
'90': 'Data parity error in the request message received by the HSM',
'A1': 'Incompatible LMK schemes',
'A2': 'Incompatible LMK identifiers',
'A3': 'Incompatible key block LMK identifiers',
'A4': 'Key block authentication failure',
'A5': 'Incompatible key length',
'A6': 'Invalid key usage',
'A7': 'Invalid algorithm',
'A8': 'Invalid mode of use',
'A9': 'Invalid key version number',
'AA': 'Invalid export field',
'AB': 'Invalid number of optional blocks',
'AC': 'Optional header block error',
'AD': 'Key status optional block error',
'AE': 'Invalid start date/time',
'AF': 'Invalid end date/time',
'B0': 'Invalid encryption mode',
'B1': 'Invalid authentication mode',
'B2': 'Miscellaneous key block error',
'B3': 'Invalid number of optional blocks',
'B4': 'Optional block data error',
'B5': 'Incompatible components',
'B6': 'Incompatible key status optional blocks',
'B7': 'Invalid change field',
'B8': 'Invalid old value',
'B9': 'Invalid new value',
'BA': 'No key status block in the key block',
'BB': 'Invalid wrapping key',
'BC': 'Repeated optional block',
'BD': 'Incompatible key types',
'BE': 'Invalid key block header ID',
'D2': 'Invalid curve reference',
'D3': 'Invalid Key Encoding',
'E0': 'Invalid command version number'
}
return PAYSHIELD_ERROR_CODE.get(error_code, "Unknown error")
def get_action_command_message(code: str, code_type: str) -> str:
"""This function maps the action/command code with its description.
I derived the list of actions/commands messages from the following manual:
payShield 10K Installation and User Guide 1.7a
Date: November 2022
Doc. Number: 007-001512-007
Parameters
----------
code: str
The action/command code returned from the payShield 10k
code_type: str
The type of code: action type or command type
Returns
----------
a string containing a descriptive message of the action/command code
"""
CONSOLE_COMMAND_ACTIONS = {
'00': 'User actions performed using payShield Manager',
'01': 'AUDITLOG',
'02': 'AUDITOPTIONS',
'03': 'CLEARAUDIT',
'04': 'CLEARERR',
'05': 'EJECT',
'06': 'ERRLOG',
'07': 'GETCMDS',
'08': 'GETTIME',
'09': 'SETTIME',
'0A': 'A',
'0B': 'B',
'0C': 'C',
'0D': 'D',
'0E': 'F',
'0F': 'K',
'10': 'N',
'11': 'R',
'12': 'T',
'13': 'V',
'14': 'Z',
'15': '$',
'16': 'CONFIGCMDS',
'17': 'CONFIGPB',
'18': 'PING',
'19': 'TRACERT',
'1A': 'NETSTAT',
'1B': 'AUDITPRINT',
'1C': 'SYSLOG',
'1D': 'UTILCFG',
'1E': 'UTILENABLE',
'1F': 'UTISTATS',
'20': 'HEALTHENABLE',
'21': 'HEALTHSTATS',
'22': 'SNMP',
'23': 'SNMPADD',
'24': 'SNMPDEL',
'25': 'RESET',
'26': 'ROUTE',
'27': 'TRAP',
'28': 'TRAPADD',
'29': 'TRAPDEL',
'2A': 'CONFIGACL'
}
FRAUD_EVENT = {
'01': 'Limit for number of PIN verifications per minute exceeded',
'02': 'Limit for number of PIN verifications per hour exceeded',
'03': 'Limit for total number of failed PIN verifications exceeded'
}
AUDITED_USER_ACTIONS = {
'A0': 'Authorization Cancelled',
'A1': 'Authorization ON',
'AA': 'Authorization Activity ON',
'AC': 'Authorization Activity Cancelled',
'AT': 'Authorization Timeout',
'CL': 'Audit log cleared',
'DE': 'Diagnostic Event(Selftest)',
'KE': 'User authentication',
'LE': 'LMK erased',
'LF': 'License file load failure',
'LL': 'LMK loaded',
'LS': 'License file successfully loaded',
'OE': 'Old LMK erased',
'OF': 'Change to Offline',
'OL': 'Old LMK loaded',
'ON': 'Change to Online',
'PW': 'Cycle power supply',
'SE': 'Change to Secure',
'UT': 'Utilization Reset'
}
message = ''
if code_type == '11':
message = AUDITED_USER_ACTIONS.get(code, "Unknown user action " + code)
elif code_type == '10':
message = FRAUD_EVENT.get(code, "Unknown fraud action " + code)
elif code_type == '01':
message = CONSOLE_COMMAND_ACTIONS.get(code, code)
elif code_type == '00':
message = code
return message
def check_returned_command_verb(result_returned: bytes, head_len: int, command_sent: str) -> Tuple[int, str, str]:
"""
Checks if the command returned by the payShield is congruent to the command sent
Parameters
----------
result_returned: bytes
The output returned from the payShield
head_len: int
The length of the header
command_sent: str
The command sent to the payShield
Returns
----------
a Tuple[int, str, str]
a Tuple[int, str, str] where the first value is 0 if the command is congruent or -1 if it is not
the second value is the command sent
the third value is the command returned by the payShield
"""
verb_returned = result_returned[2 + head_len:][:2]
verb_sent = command_sent[head_len:][:2]
verb_expected = verb_sent[0:1] + chr(ord(verb_sent[1:2]) + 1)
if verb_returned != verb_expected.encode():
return -1, verb_sent, verb_returned.decode()
else:
return 0, verb_sent, verb_returned.decode()
def check_return_message(result_returned: bytes, head_len: int) -> Tuple[str, str]:
if len(result_returned) < 2 + head_len + 2: # 2 bytes for len + 2 header len + 2 for command
return "ZZ", "Incomplete message"
# decode the first two bytes returned and transform them in integer
try:
expected_msg_len = int.from_bytes(result_returned[:2], byteorder='big', signed=False)
except ValueError:
return "ZZ", "Malformed message"
except Exception:
return "ZZ", "Unknown message length parsing error"
# compares the effective message length with then one stated in the first two bytes of the message
if len(result_returned) - 2 != expected_msg_len:
return "ZZ", "Length mismatch"
ret_code_position = 2 + head_len + 2
# better be safe than sorry
try:
# ret_code = int(result_returned[ret_code_position:ret_code_position + 2])
ret_code = result_returned[ret_code_position:ret_code_position + 2].decode()
except (ValueError, UnicodeDecodeError):
return "ZZ", "message result code parsing error"
except Exception:
return "ZZ", "Unknown message result code parsing error"
# try to describe the error
return ret_code, get_payshield_error_message(ret_code)
def test_printable(input_str):
return all(c in string.printable for c in input_str)
def hex2ip(hex_ip):
addr_long = int(hex_ip, 16)
hex_ip = socket.inet_ntoa(pack(">L", addr_long))
return hex_ip
def run_test(payConnectorInstance: PayConnector, host_command: str,
header_len: int = 4, decoder_funct: FunctionType = None, logger_instance=None) -> str:
"""
It connects to the specified host and port, using the specified protocol (tcp, udp or tls) and sends the command.
Parameters
___________
payConnectorInstance: PayConnector
The instance of the PayConnector class
host_command: str
The command to send to the payShield complete of the header part
header_len: int
The length of the header. If not specified the value is 4 because it is the default factory value
in payShield 10k
decoder_funct: FunctionType
If provided needs to be a reference to a function that is able to parse the command and print the meaning of it
If not provided the default is None
Returns
___________
The return code from the command
"""
try:
return_code_tuple = (None, None)
message_size = pack('>h', len(host_command))
message = message_size + host_command.encode()
data = payConnectorInstance.sendCommand(host_command)
# If no data is returned
if data is None:
return 'Error'
# try to decode the result code contained in the reply of the payShield
check_result_tuple = (-1, "", "")
return_code_tuple = check_return_message(data, header_len)
if return_code_tuple[0] != "ZZ":
print()
check_result_tuple = check_returned_command_verb(data, header_len, host_command)
print("Return code: " + str(return_code_tuple[0]) + " " + return_code_tuple[1])
if check_result_tuple[0] != 0:
print("NOTE: The response received from the HSM seems unrelated to the request!")
print("Command sent/received: " + check_result_tuple[1] + " ==> " + check_result_tuple[2])
# don't print ascii if msg or resp contains non printable chars
if test_printable(message[2:].decode("ascii", "ignore")):
print("sent data (ASCII) :", message[2:].decode("ascii", "ignore"))
print("sent data (HEX) :", bytes.hex(message))
if test_printable((data[2:]).decode("ascii", "ignore")):
print("received data (ASCII):", data[2:].decode("ascii", "ignore"))
print("received data (HEX) :", bytes.hex(data))
if (decoder_funct is not None) and callable(decoder_funct):
print("")
print("-----DECODING RESPONSE-----")
decoder_funct(data, header_len, logger_instance)
except ConnectionError as e:
print("Connection issue: ", e)
except FileNotFoundError as e:
print("The client certificate file or the client key file cannot be found or accessed.\n" +
"Check value passed to the parameters --keyfile and --crtfile", e)
except Exception as e:
print("Unexpected issue:", e)
finally:
return return_code_tuple[0]
def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, int]:
"""
This function is a helper used by the decode_XX functions.
It converts the response_to_decode in ascii, calculates and prints the message size and
prints the header, the command returned and the error code.
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
returns a tuple:
message_to_decode: str
The message_to_decode converted in ascii
msg_len: int
The length of the message
str_pointer: int
the pointer (position) of the last interpreted/parsed character of the message_to_decode
"""
msg_len = int.from_bytes(response_to_decode[:2], byteorder='big', signed=False)
print("Message length: ", msg_len)
response_to_decode = response_to_decode.decode('ascii', 'replace')
str_pointer: int = 2
print("Header: ", response_to_decode[str_pointer:str_pointer + head_len])
str_pointer = str_pointer + head_len
print("Command returned: ", response_to_decode[str_pointer:str_pointer + 2])
str_pointer = str_pointer + 2
print("Error returned: ", response_to_decode[str_pointer:str_pointer + 2])
return response_to_decode, msg_len, str_pointer
# End
if __name__ == "__main__":
print("PayShield Audit Log utility, version " + VERSION + ", by Marco S. Zuppone - msz@msz.eu - https://msz.eu")
print("To get more info about the usage invoke it with the -h option")
print("This software is open source and it is under the Affero AGPL 3.0 license")
print("")
# List of decoder functions used to interpreter the result.
# The reference to the function is used as parameter in the run_test function.
# If the parameter is not passed because a decoder for that command it is not defined the default value of the
# parameter assumes the value of None
DECODERS = {
'Q2': decode_q2
}
parser = argparse.ArgumentParser(
description="Dumps the Audit Log and eventually sends the entries to a syslog facility for the sake of "
"testing and demonstration.",
epilog="For any questions, feedback, suggestions, donations (yes...I'm a dreamer, I know) you can contact the "
"author at msz@msz.eu")
group = parser.add_mutually_exclusive_group()
parser.add_argument("host", help="payShield IP address or hostname.")
parser.add_argument("--port", "-p", help="The host port", default=1500, type=int)
parser.add_argument("--header",
help="the header string to prepend to the host command. If not specified the default is HEAD.",
default="HEAD", type=str)
group.add_argument("--allentries", help="when specified all log entries are retrieved or until an error is "
"returned.",
action="store_true")
parser.add_argument("--decode", help="if specified the reply of the payShield is interpreted "
"if a decoder function for that command has been implemented.",
action="store_true")
group.add_argument("--times", help="how many time to repeat the operation.", type=int, default=1)
parser.add_argument("--proto", help="accepted value are tcp or udp, the default is tcp.", default="tcp",
choices=["tcp", "udp", "tls"], type=str.lower)
parser.add_argument("--keyfile", help="client key file, used if the protocol is TLS.", type=Path,
default="client.key")
parser.add_argument("--crtfile", help="client certificate file, used if the protocol is TLS.", type=Path,
default="client.crt")
parser.add_argument("--syslog", help="syslog facility ip address.", type=str)
parser.add_argument("--syslogport", help="syslog port.", type=int, default=514)
parser.add_argument("--syslogproto", help="protocol to use for syslog. Can be udp or tcp. If this parameter is not "
"specified the default is tcp.", choices=["tcp", "udp"], default="udp",
type=str.lower)
args = parser.parse_args()
command = args.header + 'Q2'
# IMPORTANT: At this point the 'command' needs to contain something.
# If you want to add to the tool command link arguments about commands do it before this comment block
# Now we verify if the command variable is empty. In this case we throw an error.
if len(command) == 0:
print("You forgot to specify the action you want to to perform on the payShield")
exit()
if args.proto == 'tls':
# check that the cert and key files are accessible
if not (args.keyfile.exists() and args.crtfile.exists()):
print("The client certificate file or the client key file cannot be found or accessed.\n" +
"Check value passed to the parameters --keyfile and --crtfile")
print("You passed these values:")
print("Certificate file:", args.crtfile)
print("Key file:", args.keyfile)
exit()
if args.port < 2500:
print("WARNING: generally the TLS base port is 2500. You are instead using the port ",
args.port, " please check that you passed the right value to the "
"--port parameter")
# Let's instance the connection
if args.proto == 'tls':
payConnInst = PayConnector(args.host, args.port, args.proto, args.keyfile, args.crtfile)
else:
payConnInst = PayConnector(args.host, args.port, args.proto)
logger = None
if args.syslog is not None:
proto_socket_type = socket.SOCK_DGRAM
if args.syslogproto == "tcp":
proto_socket_type = socket.SOCK_STREAM
logger = logging.getLogger('mylogger')
syslog = logging.handlers.SysLogHandler(address=(args.syslog, args.syslogport), socktype=proto_socket_type)
logger.setLevel(logging.DEBUG)
syslog.setLevel(logging.INFO)
logger.addHandler(syslog)
if args.allentries:
i = 1
while True:
print("Iteration: ", i)
return_code = ''
if args.decode:
return_code = run_test(payConnInst, command, len(args.header),
DECODERS.get(command[len(args.header):len(args.header) + 2], None), logger)
else:
return_code = run_test(payConnInst, command, len(args.header), None)
i = i + 1
if return_code != '00':
if return_code is None:
print("Connection error with the host has occurred")
else:
print("Return code: ", get_payshield_error_message(return_code))
exit()
print("")
else:
for i in range(0, args.times):
print("Iteration: ", i + 1, " of ", args.times)
return_code = ''
if args.decode:
return_code = run_test(payConnInst, command, len(args.header),
DECODERS.get(command[len(args.header):len(args.header) + 2], None), logger)
else:
return_code = run_test(payConnInst, command, len(args.header), None)
i = i + 1
if return_code != '00':
if return_code is None:
print("Connection error with the host has occurred")
else:
print("Return code: ", get_payshield_error_message(return_code))
exit()
print("")
print("DONE")