|
| 1 | +""" |
| 2 | + One of the several implementations of Lempel–Ziv–Welch compression algorithm |
| 3 | + https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch |
| 4 | +""" |
| 5 | + |
| 6 | +import math |
| 7 | +import os |
| 8 | +import sys |
| 9 | + |
| 10 | + |
| 11 | +def read_file_binary(file_path: str) -> str: |
| 12 | + """ |
| 13 | + Reads given file as bytes and returns them as a long string |
| 14 | + """ |
| 15 | + result = "" |
| 16 | + try: |
| 17 | + with open(file_path, "rb") as binary_file: |
| 18 | + data = binary_file.read() |
| 19 | + for dat in data: |
| 20 | + curr_byte = "{0:08b}".format(dat) |
| 21 | + result += curr_byte |
| 22 | + return result |
| 23 | + except IOError: |
| 24 | + print("File not accessible") |
| 25 | + sys.exit() |
| 26 | + |
| 27 | + |
| 28 | +def add_key_to_lexicon( |
| 29 | + lexicon: dict, curr_string: str, index: int, last_match_id: int |
| 30 | +) -> None: |
| 31 | + """ |
| 32 | + Adds new strings (curr_string + "0", curr_string + "1") to the lexicon |
| 33 | + """ |
| 34 | + lexicon.pop(curr_string) |
| 35 | + lexicon[curr_string + "0"] = last_match_id |
| 36 | + |
| 37 | + if math.log2(index).is_integer(): |
| 38 | + for curr_key in lexicon: |
| 39 | + lexicon[curr_key] = "0" + lexicon[curr_key] |
| 40 | + |
| 41 | + lexicon[curr_string + "1"] = bin(index)[2:] |
| 42 | + |
| 43 | + |
| 44 | +def compress_data(data_bits: str) -> str: |
| 45 | + """ |
| 46 | + Compresses given data_bits using Lempel–Ziv–Welch compression algorithm |
| 47 | + and returns the result as a string |
| 48 | + """ |
| 49 | + lexicon = {"0": "0", "1": "1"} |
| 50 | + result, curr_string = "", "" |
| 51 | + index = len(lexicon) |
| 52 | + |
| 53 | + for i in range(len(data_bits)): |
| 54 | + curr_string += data_bits[i] |
| 55 | + if curr_string not in lexicon: |
| 56 | + continue |
| 57 | + |
| 58 | + last_match_id = lexicon[curr_string] |
| 59 | + result += last_match_id |
| 60 | + add_key_to_lexicon(lexicon, curr_string, index, last_match_id) |
| 61 | + index += 1 |
| 62 | + curr_string = "" |
| 63 | + |
| 64 | + while curr_string != "" and curr_string not in lexicon: |
| 65 | + curr_string += "0" |
| 66 | + |
| 67 | + if curr_string != "": |
| 68 | + last_match_id = lexicon[curr_string] |
| 69 | + result += last_match_id |
| 70 | + |
| 71 | + return result |
| 72 | + |
| 73 | + |
| 74 | +def add_file_length(source_path: str, compressed: str) -> str: |
| 75 | + """ |
| 76 | + Adds given file's length in front (using Elias gamma coding) of the compressed |
| 77 | + string |
| 78 | + """ |
| 79 | + file_length = os.path.getsize(source_path) |
| 80 | + file_length_binary = bin(file_length)[2:] |
| 81 | + length_length = len(file_length_binary) |
| 82 | + |
| 83 | + return "0" * (length_length - 1) + file_length_binary + compressed |
| 84 | + |
| 85 | + |
| 86 | +def write_file_binary(file_path: str, to_write: str) -> None: |
| 87 | + """ |
| 88 | + Writes given to_write string (should only consist of 0's and 1's) as bytes in the |
| 89 | + file |
| 90 | + """ |
| 91 | + byte_length = 8 |
| 92 | + try: |
| 93 | + with open(file_path, "wb") as opened_file: |
| 94 | + result_byte_array = [ |
| 95 | + to_write[i : i + byte_length] |
| 96 | + for i in range(0, len(to_write), byte_length) |
| 97 | + ] |
| 98 | + |
| 99 | + if len(result_byte_array[-1]) % byte_length == 0: |
| 100 | + result_byte_array.append("10000000") |
| 101 | + else: |
| 102 | + result_byte_array[-1] += "1" + "0" * ( |
| 103 | + byte_length - len(result_byte_array[-1]) - 1 |
| 104 | + ) |
| 105 | + |
| 106 | + for elem in result_byte_array: |
| 107 | + opened_file.write(int(elem, 2).to_bytes(1, byteorder="big")) |
| 108 | + except IOError: |
| 109 | + print("File not accessible") |
| 110 | + sys.exit() |
| 111 | + |
| 112 | + |
| 113 | +def compress(source_path, destination_path: str) -> None: |
| 114 | + """ |
| 115 | + Reads source file, compresses it and writes the compressed result in destination |
| 116 | + file |
| 117 | + """ |
| 118 | + data_bits = read_file_binary(source_path) |
| 119 | + compressed = compress_data(data_bits) |
| 120 | + compressed = add_file_length(source_path, compressed) |
| 121 | + write_file_binary(destination_path, compressed) |
| 122 | + |
| 123 | + |
| 124 | +if __name__ == "__main__": |
| 125 | + compress(sys.argv[1], sys.argv[2]) |
0 commit comments