aboutsummaryrefslogtreecommitdiff
path: root/ass3/q3/decoder_lzss.py
blob: 19669ebe235e9de7df247fa42e26cfd5a98bed3a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import sys


# A class that represents a stream of binary from a message
# This class adds the ability to extract various types of values relevant to decoding a message
# When a portion of data is read, the stream is progressed forward the correct amount.
# This allows for easy data reading and cleaner parsing/decompressing code
# Inspired by scanf from C.
class Stream:
    def __init__(self, string):
        self.string = string
        self.cursor = 0

    # Read one Elias omega integer from the stream
    def read_elias(self):
        offset = self.cursor
        if self.string[0 + offset] == "1":
            self.cursor += 1
            return 1
        start, end = 1, 2
        while True:  # Keep reading bits until a full Elias omega integer has been read
            binary = self.string[start + offset:end + 1 + offset]
            if binary[0] == "0":
                chunk = int(f"1{binary[1:]}", 2) + 1
                start = end + 1
                end += chunk
            else:
                self.cursor += end + 1
                return int(binary, 2)

    # Read a given amount of raw binary from the stream
    def read_binary(self, length):
        blob = self.string[self.cursor:self.cursor + length]
        self.cursor += length
        return blob

    # Read one (x bit) integer from the stream
    def read_int(self, bits):
        return int(self.read_binary(bits), 2)

    # Read one character from the stream. (Defaults to 7 bits, i.e. a sub 128 ASCII character)
    def read_char(self, bits=7):
        return chr(self.read_int(bits))

    # Read a single bit
    def read_bit(self):
        return self.read_binary(1)

    # Read one Huffman coded character, given a decode table (i.e. a dict which maps binary -> char)
    def read_huff_char(self, decode_table):
        window_size = 1
        window = self.string[self.cursor:self.cursor + window_size]
        while window not in decode_table:  # Scan for a Huffman coded char of a growing size
            if window_size >= len(self.string):
                raise Exception("Went for too long looking for a Huffman char")
            window_size += 1
            window = self.string[self.cursor:self.cursor + window_size]
        self.cursor += window_size
        return decode_table[window]

    # Read one LZSS compressed tuple from the stream, given a decode table
    def read_lzss_tuple(self, decode_table):
        format_type = self.read_bit()
        if format_type == "0":
            return 0, self.read_elias(), self.read_elias()  # Type 0 tuple
        else:
            return 1, self.read_huff_char(decode_table)  # Type 1 tuple


# Parse a binary header and return the resulting Huffman code as a mapping from codeword to character
def parse_header(stream):
    huff = {}
    alphabet_size = stream.read_elias()
    for i in range(alphabet_size):
        char = stream.read_char()
        length = stream.read_elias()
        codeword = stream.read_binary(length)
        huff[codeword] = char
    return huff


# Turn the body of a message into a list of LZSS tuples
def parse_body_tuples(stream, decode_table):
    num_tuples = stream.read_elias()
    return [stream.read_lzss_tuple(decode_table) for _ in range(num_tuples)]


# Decode a list of LZSS tuples, return the resulting string
def decode_lzss(tuples):
    result = ""
    for tup in tuples:
        if tup[0] == 0:
            _, offset, length = tup
            cursor = len(result) - offset
            for i in map(lambda x: x % offset, range(length)):
                result += result[cursor + i]
        else:
            result += tup[1]
    return result


# Given an entire binary message, return the decoded, decompressed data
def decode_message(raw_data):
    stream = Stream(raw_data)
    decode_table = parse_header(stream)
    tuples = parse_body_tuples(stream, decode_table)
    return decode_lzss(tuples)


# Read a file and return the contents
def read_file(filename):
    with open(filename, "r") as file:
        contents = file.read()
    return contents


# Write a given string to file
def write_file(filename, contents):
    with open(filename, "w") as file:
        file.write(contents)


def main():
    assert len(sys.argv) >= 2
    raw = read_file(sys.argv[1])
    result = decode_message(raw)
    write_file("output_decoder_lzss.txt", result)


if __name__ == "__main__":
    main()