import sys # A class that represents a stream of binary from a message # This class adds the ability to extract various types of values relevant to decoding a message # When a portion of data is read, the stream is progressed forward the correct amount. # This allows for easy data reading and cleaner parsing/decompressing code # Inspired by scanf from C. class Stream: def __init__(self, string): self.string = string self.cursor = 0 # Read one Elias omega integer from the stream def read_elias(self): offset = self.cursor if self.string[0 + offset] == "1": self.cursor += 1 return 1 start, end = 1, 2 while True: # Keep reading bits until a full Elias omega integer has been read binary = self.string[start + offset:end + 1 + offset] if binary[0] == "0": chunk = int(f"1{binary[1:]}", 2) + 1 start = end + 1 end += chunk else: self.cursor += end + 1 return int(binary, 2) # Read a given amount of raw binary from the stream def read_binary(self, length): blob = self.string[self.cursor:self.cursor + length] self.cursor += length return blob # Read one (x bit) integer from the stream def read_int(self, bits): return int(self.read_binary(bits), 2) # Read one character from the stream. (Defaults to 7 bits, i.e. a sub 128 ASCII character) def read_char(self, bits=7): return chr(self.read_int(bits)) # Read a single bit def read_bit(self): return self.read_binary(1) # Read one Huffman coded character, given a decode table (i.e. a dict which maps binary -> char) def read_huff_char(self, decode_table): window_size = 1 window = self.string[self.cursor:self.cursor + window_size] while window not in decode_table: # Scan for a Huffman coded char of a growing size if window_size >= len(self.string): raise Exception("Went for too long looking for a Huffman char") window_size += 1 window = self.string[self.cursor:self.cursor + window_size] self.cursor += window_size return decode_table[window] # Read one LZSS compressed tuple from the stream, given a decode table def read_lzss_tuple(self, decode_table): format_type = self.read_bit() if format_type == "0": return 0, self.read_elias(), self.read_elias() # Type 0 tuple else: return 1, self.read_huff_char(decode_table) # Type 1 tuple # Parse a binary header and return the resulting Huffman code as a mapping from codeword to character def parse_header(stream): huff = {} alphabet_size = stream.read_elias() for i in range(alphabet_size): char = stream.read_char() length = stream.read_elias() codeword = stream.read_binary(length) huff[codeword] = char return huff # Turn the body of a message into a list of LZSS tuples def parse_body_tuples(stream, decode_table): num_tuples = stream.read_elias() return [stream.read_lzss_tuple(decode_table) for _ in range(num_tuples)] # Decode a list of LZSS tuples, return the resulting string def decode_lzss(tuples): result = "" for tup in tuples: if tup[0] == 0: _, offset, length = tup cursor = len(result) - offset for i in map(lambda x: x % offset, range(length)): result += result[cursor + i] else: result += tup[1] return result # Given an entire binary message, return the decoded, decompressed data def decode_message(raw_data): stream = Stream(raw_data) decode_table = parse_header(stream) tuples = parse_body_tuples(stream, decode_table) return decode_lzss(tuples) # Read a file and return the contents def read_file(filename): with open(filename, "r") as file: contents = file.read() return contents # Write a given string to file def write_file(filename, contents): with open(filename, "w") as file: file.write(contents) def main(): assert len(sys.argv) >= 2 raw = read_file(sys.argv[1]) result = decode_message(raw) write_file("output_decoder_lzss.txt", result) if __name__ == "__main__": main()