From 3501b45c59c14efd2d953d864a7afc8bb5cf4d14 Mon Sep 17 00:00:00 2001 From: akiyamn Date: Sun, 16 May 2021 16:41:41 +1000 Subject: Ass 3: Q3 it actually works --- ass3/q2/header.py | 3 +- ass3/q3/decoder_lzss.py | 123 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 ass3/q3/decoder_lzss.py (limited to 'ass3') diff --git a/ass3/q2/header.py b/ass3/q2/header.py index 0db542f..a10b9a9 100644 --- a/ass3/q2/header.py +++ b/ass3/q2/header.py @@ -59,7 +59,6 @@ def huffman(string): new_node = first + second heapq.heappush(nodes, new_node) root = nodes[0] - root.dfs_apply(print) huffman_calculate(root) result = {} for leaf in leaves: @@ -110,7 +109,7 @@ def write_file(filename, contents): def main(): assert len(sys.argv) >= 2 string = read_file(sys.argv[1]) - write_file("output_header.txt", header(string)[0]) + write_file("output_header.txt", header(string)) if __name__ == "__main__": main() diff --git a/ass3/q3/decoder_lzss.py b/ass3/q3/decoder_lzss.py new file mode 100644 index 0000000..d7dcf0c --- /dev/null +++ b/ass3/q3/decoder_lzss.py @@ -0,0 +1,123 @@ +import sys + + +class Stream: + def __init__(self, string): + self.string = string + self.cursor = 0 + + def rewind(self): + self.cursor = 0 + + def end_of_stream(self): + return self.cursor >= len(self.string) + + def read_elias(self): + offset = self.cursor + if self.string[0 + offset] == "1": + self.cursor += 1 + return 1 + start, end = 1, 2 + while True: + binary = self.string[start + offset:end + 1 + offset] + if binary[0] == "0": + chunk = int(f"1{binary[1:]}", 2) + 1 + start = end + 1 + end += chunk + else: + self.cursor += end + 1 + return int(binary, 2) + + def read_binary(self, length): + blob = self.string[self.cursor:self.cursor + length] + self.cursor += length + return blob + + def read_int(self, bits): + return int(self.read_binary(bits), 2) + + def read_char(self, bits=7): + return chr(self.read_int(bits)) + + def read_bit(self): + return self.read_binary(1) + + def read_huff_char(self, decode_table): + window_size = 1 + window = self.string[self.cursor:self.cursor + window_size] + while window not in decode_table: + if window_size >= len(self.string): + raise Exception("Went for too long looking for a Huffman char") + window_size += 1 + window = self.string[self.cursor:self.cursor + window_size] + self.cursor += window_size + return decode_table[window] + + def read_lzss_tuple(self, decode_table): + format_type = self.read_bit() + if format_type == "0": + return 0, self.read_elias(), self.read_elias() + else: + return 1, self.read_huff_char(decode_table) + + +def elias(n): + assert n > 0 + binary = bin(n)[2:] + result = binary + while len(binary) != 1: + binary = "0" + bin(len(binary) - 1)[3:] + result = binary + result + return result + + +def decode_elias(encoded, offset=0): + if encoded[0 + offset] == "1": + return 1 + start, end = 1, 2 + while True: + binary = encoded[start + offset:end + 1 + offset] + if binary[0] == "0": + chunk = int(f"1{binary[1:]}", 2) + 1 + start = end + 1 + end += chunk + else: + return int(binary, 2) + + +def parse_header(stream): + huff = {} + alphabet_size = stream.read_elias() + for i in range(alphabet_size): + char = stream.read_char() + length = stream.read_elias() + codeword = stream.read_binary(length) + huff[codeword] = char + return huff + + +def parse_body_tuples(stream, decode_table): + num_tuples = stream.read_elias() + return [stream.read_lzss_tuple(decode_table) for _ in range(num_tuples)] + + +def decode_lzss(tuples): + result = "" + for tup in tuples: + if tup[0] == 0: + _, offset, length = tup + cursor = len(result) - offset + for i in map(lambda x: x % offset, range(length)): + result += result[cursor + i] + else: + result += tup[1] + return result + + +def decode_message(raw_data): + stream = Stream(raw_data) + decode_table = parse_header(stream) + tuples = parse_body_tuples(stream, decode_table) + return decode_lzss(tuples) + +print(decode_message("01111000011111000100100011000110100100011111111010011000100100001101111")) -- cgit v1.2.3