Skip to content

Solving the Top k most frequent words problem using a max-heap #8685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions DIRECTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@
* [Minimum Coin Change](dynamic_programming/minimum_coin_change.py)
* [Minimum Cost Path](dynamic_programming/minimum_cost_path.py)
* [Minimum Partition](dynamic_programming/minimum_partition.py)
* [Minimum Size Subarray Sum](dynamic_programming/minimum_size_subarray_sum.py)
* [Minimum Squares To Represent A Number](dynamic_programming/minimum_squares_to_represent_a_number.py)
* [Minimum Steps To One](dynamic_programming/minimum_steps_to_one.py)
* [Minimum Tickets Cost](dynamic_programming/minimum_tickets_cost.py)
Expand All @@ -339,6 +340,7 @@
* [Word Break](dynamic_programming/word_break.py)

## Electronics
* [Apparent Power](electronics/apparent_power.py)
* [Builtin Voltage](electronics/builtin_voltage.py)
* [Carrier Concentration](electronics/carrier_concentration.py)
* [Circular Convolution](electronics/circular_convolution.py)
Expand All @@ -348,6 +350,7 @@
* [Electrical Impedance](electronics/electrical_impedance.py)
* [Ind Reactance](electronics/ind_reactance.py)
* [Ohms Law](electronics/ohms_law.py)
* [Real And Reactive Power](electronics/real_and_reactive_power.py)
* [Resistor Equivalence](electronics/resistor_equivalence.py)
* [Resonant Frequency](electronics/resonant_frequency.py)

Expand Down Expand Up @@ -483,6 +486,7 @@
* [Astar](machine_learning/astar.py)
* [Data Transformations](machine_learning/data_transformations.py)
* [Decision Tree](machine_learning/decision_tree.py)
* [Dimensionality Reduction](machine_learning/dimensionality_reduction.py)
* Forecasting
* [Run](machine_learning/forecasting/run.py)
* [Gradient Descent](machine_learning/gradient_descent.py)
Expand Down Expand Up @@ -604,6 +608,7 @@
* [Perfect Number](maths/perfect_number.py)
* [Perfect Square](maths/perfect_square.py)
* [Persistence](maths/persistence.py)
* [Pi Generator](maths/pi_generator.py)
* [Pi Monte Carlo Estimation](maths/pi_monte_carlo_estimation.py)
* [Points Are Collinear 3D](maths/points_are_collinear_3d.py)
* [Pollard Rho](maths/pollard_rho.py)
Expand Down Expand Up @@ -1162,6 +1167,7 @@
* [Snake Case To Camel Pascal Case](strings/snake_case_to_camel_pascal_case.py)
* [Split](strings/split.py)
* [Text Justification](strings/text_justification.py)
* [Top K Frequent Words](strings/top_k_frequent_words.py)
* [Upper](strings/upper.py)
* [Wave](strings/wave.py)
* [Wildcard Pattern Matching](strings/wildcard_pattern_matching.py)
Expand Down
31 changes: 25 additions & 6 deletions data_structures/heap/heap.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
from __future__ import annotations

from abc import abstractmethod
from collections.abc import Iterable
from typing import Generic, Protocol, TypeVar


class Heap:
class Comparable(Protocol):
@abstractmethod
def __lt__(self: T, other: T) -> bool:
pass

@abstractmethod
def __gt__(self: T, other: T) -> bool:
pass

@abstractmethod
def __eq__(self: T, other: object) -> bool:
pass


T = TypeVar("T", bound=Comparable)


class Heap(Generic[T]):
"""A Max Heap Implementation

>>> unsorted = [103, 9, 1, 7, 11, 15, 25, 201, 209, 107, 5]
Expand All @@ -27,7 +46,7 @@ class Heap:
"""

def __init__(self) -> None:
self.h: list[float] = []
self.h: list[T] = []
self.heap_size: int = 0

def __repr__(self) -> str:
Expand Down Expand Up @@ -79,7 +98,7 @@ def max_heapify(self, index: int) -> None:
# fix the subsequent violation recursively if any
self.max_heapify(violation)

def build_max_heap(self, collection: Iterable[float]) -> None:
def build_max_heap(self, collection: Iterable[T]) -> None:
"""build max heap from an unsorted array"""
self.h = list(collection)
self.heap_size = len(self.h)
Expand All @@ -88,7 +107,7 @@ def build_max_heap(self, collection: Iterable[float]) -> None:
for i in range(self.heap_size // 2 - 1, -1, -1):
self.max_heapify(i)

def extract_max(self) -> float:
def extract_max(self) -> T:
"""get and remove max from heap"""
if self.heap_size >= 2:
me = self.h[0]
Expand All @@ -102,7 +121,7 @@ def extract_max(self) -> float:
else:
raise Exception("Empty heap")

def insert(self, value: float) -> None:
def insert(self, value: T) -> None:
"""insert a new value into the max heap"""
self.h.append(value)
idx = (self.heap_size - 1) // 2
Expand Down Expand Up @@ -144,7 +163,7 @@ def heap_sort(self) -> None:
]:
print(f"unsorted array: {unsorted}")

heap = Heap()
heap: Heap[int] = Heap()
heap.build_max_heap(unsorted)
print(f"after build heap: {heap}")

Expand Down
2 changes: 1 addition & 1 deletion machine_learning/linear_discriminant_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def main():
if input("Press any key to restart or 'q' for quit: ").strip().lower() == "q":
print("\n" + "GoodBye!".center(100, "-") + "\n")
break
system("clear" if name == "posix" else "cls") # noqa: S605
system("cls" if name == "nt" else "clear") # noqa: S605


if __name__ == "__main__":
Expand Down
101 changes: 101 additions & 0 deletions strings/top_k_frequent_words.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
Finds the top K most frequent words from the provided word list.

This implementation aims to show how to solve the problem using the Heap class
already present in this repository.
Computing order statistics is, in fact, a typical usage of heaps.

This is mostly shown for educational purposes, since the problem can be solved
in a few lines using collections.Counter from the Python standard library:

from collections import Counter
def top_k_frequent_words(words, k_value):
return [x[0] for x in Counter(words).most_common(k_value)]
"""


from collections import Counter
from functools import total_ordering

from data_structures.heap.heap import Heap


@total_ordering
class WordCount:
def __init__(self, word: str, count: int) -> None:
self.word = word
self.count = count

def __eq__(self, other: object) -> bool:
"""
>>> WordCount('a', 1).__eq__(WordCount('b', 1))
True
>>> WordCount('a', 1).__eq__(WordCount('a', 1))
True
>>> WordCount('a', 1).__eq__(WordCount('a', 2))
False
>>> WordCount('a', 1).__eq__(WordCount('b', 2))
False
>>> WordCount('a', 1).__eq__(1)
NotImplemented
"""
if not isinstance(other, WordCount):
return NotImplemented
return self.count == other.count

def __lt__(self, other: object) -> bool:
"""
>>> WordCount('a', 1).__lt__(WordCount('b', 1))
False
>>> WordCount('a', 1).__lt__(WordCount('a', 1))
False
>>> WordCount('a', 1).__lt__(WordCount('a', 2))
True
>>> WordCount('a', 1).__lt__(WordCount('b', 2))
True
>>> WordCount('a', 2).__lt__(WordCount('a', 1))
False
>>> WordCount('a', 2).__lt__(WordCount('b', 1))
False
>>> WordCount('a', 1).__lt__(1)
NotImplemented
"""
if not isinstance(other, WordCount):
return NotImplemented
return self.count < other.count


def top_k_frequent_words(words: list[str], k_value: int) -> list[str]:
"""
Returns the `k_value` most frequently occurring words,
in non-increasing order of occurrence.
In this context, a word is defined as an element in the provided list.

In case `k_value` is greater than the number of distinct words, a value of k equal
to the number of distinct words will be considered, instead.

>>> top_k_frequent_words(['a', 'b', 'c', 'a', 'c', 'c'], 3)
['c', 'a', 'b']
>>> top_k_frequent_words(['a', 'b', 'c', 'a', 'c', 'c'], 2)
['c', 'a']
>>> top_k_frequent_words(['a', 'b', 'c', 'a', 'c', 'c'], 1)
['c']
>>> top_k_frequent_words(['a', 'b', 'c', 'a', 'c', 'c'], 0)
[]
>>> top_k_frequent_words([], 1)
[]
>>> top_k_frequent_words(['a', 'a'], 2)
['a']
"""
heap: Heap[WordCount] = Heap()
count_by_word = Counter(words)
heap.build_max_heap(
[WordCount(word, count) for word, count in count_by_word.items()]
)
return [heap.extract_max().word for _ in range(min(k_value, len(count_by_word)))]


if __name__ == "__main__":
import doctest

doctest.testmod()