aboutsummaryrefslogtreecommitdiff
path: root/ass3/q3/decoder_lzss.py
diff options
context:
space:
mode:
authorakiyamn2021-05-16 16:41:41 +1000
committerakiyamn2021-05-16 16:41:41 +1000
commit3501b45c59c14efd2d953d864a7afc8bb5cf4d14 (patch)
tree6dd87b62ed1eea5281fd287de6cf982c69229f39 /ass3/q3/decoder_lzss.py
parent7202c67e2d464dc31e5124082e06586462774144 (diff)
downloadfit3155-3501b45c59c14efd2d953d864a7afc8bb5cf4d14.tar.gz
fit3155-3501b45c59c14efd2d953d864a7afc8bb5cf4d14.zip
Ass 3: Q3 it actually works
Diffstat (limited to 'ass3/q3/decoder_lzss.py')
-rw-r--r--ass3/q3/decoder_lzss.py123
1 files changed, 123 insertions, 0 deletions
diff --git a/ass3/q3/decoder_lzss.py b/ass3/q3/decoder_lzss.py
new file mode 100644
index 0000000..d7dcf0c
--- /dev/null
+++ b/ass3/q3/decoder_lzss.py
@@ -0,0 +1,123 @@
+import sys
+
+
+class Stream:
+ def __init__(self, string):
+ self.string = string
+ self.cursor = 0
+
+ def rewind(self):
+ self.cursor = 0
+
+ def end_of_stream(self):
+ return self.cursor >= len(self.string)
+
+ def read_elias(self):
+ offset = self.cursor
+ if self.string[0 + offset] == "1":
+ self.cursor += 1
+ return 1
+ start, end = 1, 2
+ while True:
+ binary = self.string[start + offset:end + 1 + offset]
+ if binary[0] == "0":
+ chunk = int(f"1{binary[1:]}", 2) + 1
+ start = end + 1
+ end += chunk
+ else:
+ self.cursor += end + 1
+ return int(binary, 2)
+
+ def read_binary(self, length):
+ blob = self.string[self.cursor:self.cursor + length]
+ self.cursor += length
+ return blob
+
+ def read_int(self, bits):
+ return int(self.read_binary(bits), 2)
+
+ def read_char(self, bits=7):
+ return chr(self.read_int(bits))
+
+ def read_bit(self):
+ return self.read_binary(1)
+
+ def read_huff_char(self, decode_table):
+ window_size = 1
+ window = self.string[self.cursor:self.cursor + window_size]
+ while window not in decode_table:
+ if window_size >= len(self.string):
+ raise Exception("Went for too long looking for a Huffman char")
+ window_size += 1
+ window = self.string[self.cursor:self.cursor + window_size]
+ self.cursor += window_size
+ return decode_table[window]
+
+ def read_lzss_tuple(self, decode_table):
+ format_type = self.read_bit()
+ if format_type == "0":
+ return 0, self.read_elias(), self.read_elias()
+ else:
+ return 1, self.read_huff_char(decode_table)
+
+
+def elias(n):
+ assert n > 0
+ binary = bin(n)[2:]
+ result = binary
+ while len(binary) != 1:
+ binary = "0" + bin(len(binary) - 1)[3:]
+ result = binary + result
+ return result
+
+
+def decode_elias(encoded, offset=0):
+ if encoded[0 + offset] == "1":
+ return 1
+ start, end = 1, 2
+ while True:
+ binary = encoded[start + offset:end + 1 + offset]
+ if binary[0] == "0":
+ chunk = int(f"1{binary[1:]}", 2) + 1
+ start = end + 1
+ end += chunk
+ else:
+ return int(binary, 2)
+
+
+def parse_header(stream):
+ huff = {}
+ alphabet_size = stream.read_elias()
+ for i in range(alphabet_size):
+ char = stream.read_char()
+ length = stream.read_elias()
+ codeword = stream.read_binary(length)
+ huff[codeword] = char
+ return huff
+
+
+def parse_body_tuples(stream, decode_table):
+ num_tuples = stream.read_elias()
+ return [stream.read_lzss_tuple(decode_table) for _ in range(num_tuples)]
+
+
+def decode_lzss(tuples):
+ result = ""
+ for tup in tuples:
+ if tup[0] == 0:
+ _, offset, length = tup
+ cursor = len(result) - offset
+ for i in map(lambda x: x % offset, range(length)):
+ result += result[cursor + i]
+ else:
+ result += tup[1]
+ return result
+
+
+def decode_message(raw_data):
+ stream = Stream(raw_data)
+ decode_table = parse_header(stream)
+ tuples = parse_body_tuples(stream, decode_table)
+ return decode_lzss(tuples)
+
+print(decode_message("01111000011111000100100011000110100100011111111010011000100100001101111"))