From 6ae113d64f2c0da39911dbe41f9e374f5c3e4e2a Mon Sep 17 00:00:00 2001 From: "Prof. Dr. Andreas Noack" Date: Mon, 22 Aug 2022 15:36:20 +0200 Subject: [PATCH] Flipper raw (#1000) * This branch allows to import .sub files from Flipper Zero (#989) * Add rudimentary export feature for .sub-files! Note that all parameters (Frequency, Preset, ...) in the .sub-file are constants and export will only work for .sub-files * Apply changes from revision. --- src/urh/controller/widgets/SignalFrame.py | 2 +- src/urh/signalprocessing/IQArray.py | 42 +++++++++++++++++++++++ src/urh/signalprocessing/Signal.py | 26 ++++++++++++++ src/urh/util/FileOperator.py | 6 ++-- 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/src/urh/controller/widgets/SignalFrame.py b/src/urh/controller/widgets/SignalFrame.py index cec6b97ce4..90217e9964 100644 --- a/src/urh/controller/widgets/SignalFrame.py +++ b/src/urh/controller/widgets/SignalFrame.py @@ -466,7 +466,7 @@ def export_demodulated(self): try: self.setCursor(Qt.WaitCursor) data = self.signal.qad - if filename.endswith(".wav"): + if filename.endswith(".wav") or filename.endswith(".sub"): data = self.signal.qad.astype(np.float32) data /= np.max(np.abs(data)) FileOperator.save_data(IQArray(data, skip_conversion=True), filename, self.signal.sample_rate, diff --git a/src/urh/signalprocessing/IQArray.py b/src/urh/signalprocessing/IQArray.py index f8cfe47832..f7a1efee6f 100644 --- a/src/urh/signalprocessing/IQArray.py +++ b/src/urh/signalprocessing/IQArray.py @@ -241,3 +241,45 @@ def export_to_wav(self, filename, num_channels, sample_rate): f.setframerate(sample_rate) f.writeframes(self.convert_to(np.int16)) f.close() + + def export_to_sub(self, filename, frequency=433920000, preset="FuriHalSubGhzPresetOok650Async"): + arr = [] + counter = 0 + + for value in self.convert_to(np.uint8): + # if origin was a sub-file value is uint8, else value is [uint8, uint8] + # TODO: export only works with sub files. Idea: Export from bit stream? + value = np.linalg.norm(value) + + # set lastvalue to value for first run + try: + lastvalue + except: + lastvalue = value + + # increase counter while values do not change + if value == lastvalue: + counter += 1 + else: + # add number of same value as positive int when value > 127 else as negative int + if counter > 1: + arr.append(counter if lastvalue > 127 else -counter) + counter = 1 + lastvalue = value + # save last value + arr.append(counter if lastvalue > 127 else -counter) + + with open(filename, 'w') as subfile: + subfile.write("Filetype: Flipper SubGhz RAW File\n") + subfile.write("Version: 1\n") + subfile.write("Frequency: {}\n".format(frequency)) + subfile.write("Preset: {}\n".format(preset)) + subfile.write("Protocol: RAW") # Skip last \n + # Write data + for idx in range(len(arr)): + if idx % 512 == 0: + subfile.write("\n") + subfile.write("RAW_Data: {}".format(arr[idx])) + else: + subfile.write(" {}".format(arr[idx])) + subfile.write("\n") \ No newline at end of file diff --git a/src/urh/signalprocessing/Signal.py b/src/urh/signalprocessing/Signal.py index b007784813..22e55bb988 100644 --- a/src/urh/signalprocessing/Signal.py +++ b/src/urh/signalprocessing/Signal.py @@ -1,5 +1,6 @@ import math import os +import re import tarfile import wave @@ -57,6 +58,7 @@ def __init__(self, filename: str, name="Signal", modulation: str = None, sample_ self.iq_array = IQArray(None, np.int8, 1) self.wav_mode = filename.endswith(".wav") + self.flipper_raw_mode = filename.endswith(".sub") self.__changed = False if modulation is None: modulation = "FSK" @@ -71,6 +73,8 @@ def __init__(self, filename: str, name="Signal", modulation: str = None, sample_ if len(filename) > 0: if self.wav_mode: self.__load_wav_file(filename) + elif self.flipper_raw_mode: + self.__load_sub_file(filename) elif filename.endswith(".coco"): self.__load_compressed_complex(filename) else: @@ -131,6 +135,27 @@ def __load_wav_file(self, filename: str): self.sample_rate = sample_rate + def __load_sub_file(self, filename: str): + # Flipper RAW file format (OOK): space separated values, number of samples above (positive value -> 1) + # or below (negative value -> 0) center + params = {"min": 0, "max": 255, "fmt": np.uint8} + params["center"] = (params["min"] + params["max"]) / 2 + arr = [] + with open(filename, 'r') as subfile: + for line in subfile: + dataline = re.match(r'RAW_Data:\s*([-0-9 ]+)\s*$', line) + if dataline: + values = dataline[1].split(r' ') + for value in values: + intval = int(value) + if intval > 0: + arr.extend(np.full(intval, params["max"], dtype=params["fmt"])) + else: + arr.extend(np.zeros(-intval, dtype=params["fmt"])) + self.iq_array = IQArray(None, np.float32, n=len(arr)) + self.iq_array.real = np.multiply(1 / params["max"], np.subtract(arr, params["center"])) + self.__already_demodulated = True + def __load_compressed_complex(self, filename: str): obj = tarfile.open(filename, "r") members = obj.getmembers() @@ -418,6 +443,7 @@ def create_new(self, start=0, end=0, new_data=None): new_signal.__bits_per_symbol = self.bits_per_symbol new_signal.__center = self.center new_signal.wav_mode = self.wav_mode + new_signal.flipper_raw_mode = self.flipper_raw_mode new_signal.__already_demodulated = self.__already_demodulated new_signal.changed = True new_signal.sample_rate = self.sample_rate diff --git a/src/urh/util/FileOperator.py b/src/urh/util/FileOperator.py index e91b0b6f6b..e89be4dddb 100644 --- a/src/urh/util/FileOperator.py +++ b/src/urh/util/FileOperator.py @@ -47,7 +47,7 @@ SIMULATOR_FILE_FILTER = "Simulator Profile (*.sim.xml *.sim)" TAR_FILE_FILTER = "Tar Archive (*.tar *.tar.gz *.tar.bz2)" ZIP_FILE_FILTER = "Zip Archive (*.zip)" - +SUB_FILE_FILTER = "Flipper SubGHz RAW (*.sub)" def __get__name_filter_for_signals() -> str: return ";;".join([EVERYTHING_FILE_FILTER] + SIGNAL_NAME_FILTERS + [COMPRESSED_COMPLEX_FILE_FILTER, WAV_FILE_FILTER]) @@ -95,7 +95,7 @@ def ask_save_file_name(initial_name: str, caption="Save signal", selected_name_f elif caption == "Save protocol": name_filter = ";;".join([PROTOCOL_FILE_FILTER, BINARY_PROTOCOL_FILE_FILTER]) elif caption == "Export demodulated": - name_filter = WAV_FILE_FILTER + name_filter = ";;".join([WAV_FILE_FILTER, SUB_FILE_FILTER]) else: name_filter = EVERYTHING_FILE_FILTER @@ -157,6 +157,8 @@ def save_data(data, filename: str, sample_rate=1e6, num_channels=2): data.export_to_wav(filename, num_channels, sample_rate) elif filename.endswith(".coco"): data.save_compressed(filename) + elif filename.endswith(".sub"): + data.export_to_sub(filename, 433920000, "FuriHalSubGhzPresetOok650Async") else: data.tofile(filename)