diff options
| author | akiyamn | 2021-05-23 15:50:50 +1000 | 
|---|---|---|
| committer | akiyamn | 2021-05-23 15:50:50 +1000 | 
| commit | 8e3ceacb95db423becbab6b19c49cf87ee19a7bc (patch) | |
| tree | 8ef6924802858aa7b06883961e97f2dddfb19cd2 | |
| parent | 4aba625715d39f9b4f593cbf5a72db368c809eb3 (diff) | |
| download | fit3155-master.tar.gz fit3155-master.zip | |
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | ass3/q2/header.py | 29 | ||||
| -rw-r--r-- | ass3/q3/decoder_lzss.py | 26 | 
3 files changed, 45 insertions, 11 deletions
| @@ -2,6 +2,7 @@  .idea  .vscode  venv +tests  # ---> Python  # Byte-compiled / optimized / DLL files diff --git a/ass3/q2/header.py b/ass3/q2/header.py index a10b9a9..24964af 100644 --- a/ass3/q2/header.py +++ b/ass3/q2/header.py @@ -2,6 +2,7 @@ import heapq  import sys +# Represents a node in the tree used to construct a Huffman code  class HuffNode:      def __init__(self, string, count):          self.string = string @@ -19,6 +20,9 @@ class HuffNode:          new_node.height += 1 + max(self.height, other.height)          return new_node +    # Used to determine a node's place in the heap +    # Sorts based on number of occurrences first, height second. +    # This allows for a shorter code word.      def __lt__(self, other):          if self.count == other.count:              return self.height < other.height @@ -30,6 +34,7 @@ class HuffNode:      def num_children(self):          return (self.left is not None) + (self.right is not None) +    # Apply a function to all child nodes via DFS search      def dfs_apply(self, func):          func(self)          if self.left is not None: @@ -38,6 +43,7 @@ class HuffNode:              self.right.dfs_apply(func) +# Counts the number of unique characters in a string and places them in a dictionary  def count_unique(string):      count = {}      for char in string: @@ -48,24 +54,28 @@ def count_unique(string):      return count +# Given a string, returns a dictionary mapping from a string to a binary code  def huffman(string):      count = count_unique(string)      alphabet = "".join(count.keys()) -    nodes = [HuffNode(letter, count[letter]) for letter in alphabet] -    leaves = nodes[:] +    nodes = [HuffNode(letter, count[letter]) for letter in alphabet] # Node for each letter +    leaves = nodes[:]  # Just the leaf nodes used later for extracting leaf Huffman codes      heapq.heapify(nodes) +    # Keep linking nodes together until there is only one left      while len(nodes) > 1: -        first, second = heapq.heappop(nodes), heapq.heappop(nodes) -        new_node = first + second -        heapq.heappush(nodes, new_node) +        first, second = heapq.heappop(nodes), heapq.heappop(nodes)  # Pop two off the heap +        new_node = first + second  # Combine the nodes into a new, combined node. (Non-destructive) +        heapq.heappush(nodes, new_node)  # Add this new node to the heap      root = nodes[0] -    huffman_calculate(root) -    result = {} +    huffman_calculate(root)  # Calculate Huffman codes for the entire tree +    result = {}  # Combine results into a dictionary      for leaf in leaves:          result[leaf.string] = leaf.binary      return result +# Calculates the Huffman codes of this node and all of its children recursively +# Non-leaf nodes are calculated as intermediate values. Only the leaves are considered important  def huffman_calculate(node: HuffNode, bin_prefix=""):      if node.num_children() == 0:          node.binary = bin_prefix @@ -74,6 +84,7 @@ def huffman_calculate(node: HuffNode, bin_prefix=""):      huffman_calculate(node.right, bin_prefix + "1") +# Calculate the Elias omega code for a given positive integer  def elias(n):      assert n > 0      binary = bin(n)[2:] @@ -84,6 +95,7 @@ def elias(n):      return result +# Generate the binary header for a given string  def header(string):      huff_code = huffman(string)      alphabet_size = len(huff_code.keys()) @@ -95,12 +107,14 @@ def header(string):      return output +# Read a file and return the contents  def read_file(filename):      with open(filename, "r") as file:          contents = file.read()      return contents +# Write a given string to file  def write_file(filename, contents):      with open(filename, "w") as file:          file.write(contents) @@ -111,5 +125,6 @@ def main():      string = read_file(sys.argv[1])      write_file("output_header.txt", header(string)) +  if __name__ == "__main__":      main() diff --git a/ass3/q3/decoder_lzss.py b/ass3/q3/decoder_lzss.py index 92bd011..19669eb 100644 --- a/ass3/q3/decoder_lzss.py +++ b/ass3/q3/decoder_lzss.py @@ -1,18 +1,24 @@  import sys +# A class that represents a stream of binary from a message +# This class adds the ability to extract various types of values relevant to decoding a message +# When a portion of data is read, the stream is progressed forward the correct amount. +# This allows for easy data reading and cleaner parsing/decompressing code +# Inspired by scanf from C.  class Stream:      def __init__(self, string):          self.string = string          self.cursor = 0 +    # Read one Elias omega integer from the stream      def read_elias(self):          offset = self.cursor          if self.string[0 + offset] == "1":              self.cursor += 1              return 1          start, end = 1, 2 -        while True: +        while True:  # Keep reading bits until a full Elias omega integer has been read              binary = self.string[start + offset:end + 1 + offset]              if binary[0] == "0":                  chunk = int(f"1{binary[1:]}", 2) + 1 @@ -22,24 +28,29 @@ class Stream:                  self.cursor += end + 1                  return int(binary, 2) +    # Read a given amount of raw binary from the stream      def read_binary(self, length):          blob = self.string[self.cursor:self.cursor + length]          self.cursor += length          return blob +    # Read one (x bit) integer from the stream      def read_int(self, bits):          return int(self.read_binary(bits), 2) +    # Read one character from the stream. (Defaults to 7 bits, i.e. a sub 128 ASCII character)      def read_char(self, bits=7):          return chr(self.read_int(bits)) +    # Read a single bit      def read_bit(self):          return self.read_binary(1) +    # Read one Huffman coded character, given a decode table (i.e. a dict which maps binary -> char)      def read_huff_char(self, decode_table):          window_size = 1          window = self.string[self.cursor:self.cursor + window_size] -        while window not in decode_table: +        while window not in decode_table:  # Scan for a Huffman coded char of a growing size              if window_size >= len(self.string):                  raise Exception("Went for too long looking for a Huffman char")              window_size += 1 @@ -47,14 +58,16 @@ class Stream:          self.cursor += window_size          return decode_table[window] +    # Read one LZSS compressed tuple from the stream, given a decode table      def read_lzss_tuple(self, decode_table):          format_type = self.read_bit()          if format_type == "0": -            return 0, self.read_elias(), self.read_elias() +            return 0, self.read_elias(), self.read_elias()  # Type 0 tuple          else: -            return 1, self.read_huff_char(decode_table) +            return 1, self.read_huff_char(decode_table)  # Type 1 tuple +# Parse a binary header and return the resulting Huffman code as a mapping from codeword to character  def parse_header(stream):      huff = {}      alphabet_size = stream.read_elias() @@ -66,11 +79,13 @@ def parse_header(stream):      return huff +# Turn the body of a message into a list of LZSS tuples  def parse_body_tuples(stream, decode_table):      num_tuples = stream.read_elias()      return [stream.read_lzss_tuple(decode_table) for _ in range(num_tuples)] +# Decode a list of LZSS tuples, return the resulting string  def decode_lzss(tuples):      result = ""      for tup in tuples: @@ -84,6 +99,7 @@ def decode_lzss(tuples):      return result +# Given an entire binary message, return the decoded, decompressed data  def decode_message(raw_data):      stream = Stream(raw_data)      decode_table = parse_header(stream) @@ -91,12 +107,14 @@ def decode_message(raw_data):      return decode_lzss(tuples) +# Read a file and return the contents  def read_file(filename):      with open(filename, "r") as file:          contents = file.read()      return contents +# Write a given string to file  def write_file(filename, contents):      with open(filename, "w") as file:          file.write(contents) | 
