|
| 1 | +''' |
| 2 | +
|
| 3 | +Author : Gowtham Kamalasekar |
| 4 | +LinkedIn : https://www.linkedin.com/in/gowtham-kamalasekar/ |
| 5 | +
|
| 6 | +''' |
| 7 | +import math |
| 8 | + |
| 9 | +import matplotlib.pyplot as plt |
| 10 | +import pandas as pd |
| 11 | +from typing import dict, list |
| 12 | + |
| 13 | +class DbScan: |
| 14 | + |
| 15 | + ''' |
| 16 | + DBSCAN Algorithm : |
| 17 | + Density-Based Spatial Clustering Of Applications With Noise |
| 18 | + Refer this website for more details : https://en.wikipedia.org/wiki/DBSCAN |
| 19 | +
|
| 20 | + Functions: |
| 21 | + ---------- |
| 22 | + __init__() : Constructor that sets minPts, radius and file |
| 23 | + perform_dbscan() : Invoked by constructor and calculates the core |
| 24 | + and noise points and returns a dictionary. |
| 25 | + print_dbscan() : Prints the core and noise points along |
| 26 | + with stating if the noise are border points or not. |
| 27 | + plot_dbscan() : Plots the points to show the core and noise point. |
| 28 | +
|
| 29 | + To create a object |
| 30 | + ------------------ |
| 31 | + import dbscan |
| 32 | + obj = dbscan.DbScan(minpts, radius, file) |
| 33 | + obj.print_dbscan() |
| 34 | + obj.plot_dbscan() |
| 35 | + ''' |
| 36 | + def __init__(self, minpts : int, radius : int, file : str = |
| 37 | + ({'x': 3, 'y': 7}, {'x': 4, 'y': 6}, {'x': 5, 'y': 5}, |
| 38 | + {'x': 6, 'y': 4},{'x': 7, 'y': 3}, {'x': 6, 'y': 2}, |
| 39 | + {'x': 7, 'y': 2}, {'x': 8, 'y': 4},{'x': 3, 'y': 3}, |
| 40 | + {'x': 2, 'y': 6}, {'x': 3, 'y': 5}, {'x': 2, 'y': 4}) |
| 41 | + ) -> None: |
| 42 | + ''' |
| 43 | + Constructor |
| 44 | +
|
| 45 | + Args: |
| 46 | + ----------- |
| 47 | + minpts (int) : Minimum number of points needed to be |
| 48 | + within the radius to considered as core |
| 49 | + radius (int) : The radius from a given core point where |
| 50 | + other core points can be considered as core |
| 51 | + file (csv) : CSV file location. Should contain x and y |
| 52 | + coordinate value for each point. |
| 53 | +
|
| 54 | + Example : |
| 55 | + minPts = 4 |
| 56 | + radius = 1.9 |
| 57 | + file = 'data_dbscan.csv' |
| 58 | +
|
| 59 | + File Structure of CSV Data: |
| 60 | + --------------------------- |
| 61 | + _____ |
| 62 | + x | y |
| 63 | + ----- |
| 64 | + 3 | 7 |
| 65 | + 4 | 6 |
| 66 | + 5 | 5 |
| 67 | + 6 | 4 |
| 68 | + 7 | 3 |
| 69 | + ----- |
| 70 | + ''' |
| 71 | + self.minpts = minpts |
| 72 | + self.radius = radius |
| 73 | + self.file = file |
| 74 | + self.dict1 = self.perform_dbscan() |
| 75 | + def perform_dbscan(self) -> dict[int, list[int]]: |
| 76 | + ''' |
| 77 | + Args: |
| 78 | + ----------- |
| 79 | + None |
| 80 | +
|
| 81 | + Return: |
| 82 | + -------- |
| 83 | + Dictionary with points and the list |
| 84 | + of points that lie in its radius |
| 85 | +
|
| 86 | + >>> result = DbScan(4, 1.9).perform_dbscan() |
| 87 | + >>> for key in sorted(result): |
| 88 | + ... print(key, sorted(result[key])) |
| 89 | + 1 [1, 2, 10] |
| 90 | + 2 [1, 2, 3, 11] |
| 91 | + 3 [2, 3, 4] |
| 92 | + 4 [3, 4, 5] |
| 93 | + 5 [4, 5, 6, 7, 8] |
| 94 | + 6 [5, 6, 7] |
| 95 | + 7 [5, 6, 7] |
| 96 | + 8 [5, 8] |
| 97 | + 9 [9, 12] |
| 98 | + 10 [1, 10, 11] |
| 99 | + 11 [2, 10, 11, 12] |
| 100 | + 12 [9, 11, 12] |
| 101 | +
|
| 102 | + ''' |
| 103 | + if type(self.file) is str: |
| 104 | + data = pd.read_csv(self.file) |
| 105 | + else: |
| 106 | + data = pd.DataFrame(list(self.file)) |
| 107 | + e = self.radius |
| 108 | + dict1 = {} |
| 109 | + for i in range(len(data)): |
| 110 | + for j in range(len(data)): |
| 111 | + dist = math.sqrt(pow(data['x'][j] - data['x'][i],2) |
| 112 | + + pow(data['y'][j] - data['y'][i],2)) |
| 113 | + if dist < e: |
| 114 | + if i+1 in dict1: |
| 115 | + dict1[i+1].append(j+1) |
| 116 | + else: |
| 117 | + dict1[i+1] = [j+1,] |
| 118 | + return dict1 |
| 119 | + def print_dbscan(self) -> None: |
| 120 | + ''' |
| 121 | + Outputs: |
| 122 | + -------- |
| 123 | + Prints each point and if it is a core or a noise (w/ border) |
| 124 | +
|
| 125 | + >>> DbScan(4,1.9).print_dbscan() |
| 126 | + 1 [1, 2, 10] ---> Noise ---> Border |
| 127 | + 2 [1, 2, 3, 11] ---> Core |
| 128 | + 3 [2, 3, 4] ---> Noise ---> Border |
| 129 | + 4 [3, 4, 5] ---> Noise ---> Border |
| 130 | + 5 [4, 5, 6, 7, 8] ---> Core |
| 131 | + 6 [5, 6, 7] ---> Noise ---> Border |
| 132 | + 7 [5, 6, 7] ---> Noise ---> Border |
| 133 | + 8 [5, 8] ---> Noise ---> Border |
| 134 | + 9 [9, 12] ---> Noise |
| 135 | + 10 [1, 10, 11] ---> Noise ---> Border |
| 136 | + 11 [2, 10, 11, 12] ---> Core |
| 137 | + 12 [9, 11, 12] ---> Noise ---> Border |
| 138 | + ''' |
| 139 | + for i in self.dict1: |
| 140 | + print(i," ",self.dict1[i], end=' ---> ') |
| 141 | + if len(self.dict1[i]) >= self.minpts: |
| 142 | + print("Core") |
| 143 | + else: |
| 144 | + for j in self.dict1: |
| 145 | + if ( |
| 146 | + i != j |
| 147 | + and len(self.dict1[j]) >= self.minpts |
| 148 | + and i in self.dict1[j] |
| 149 | + ): |
| 150 | + print("Noise ---> Border") |
| 151 | + break |
| 152 | + else: |
| 153 | + print("Noise") |
| 154 | + def plot_dbscan(self) -> None: |
| 155 | + ''' |
| 156 | + Output: |
| 157 | + ------- |
| 158 | + A matplotlib plot that show points as core and noise along |
| 159 | + with the circle that lie within it. |
| 160 | +
|
| 161 | + >>> DbScan(4,1.9).plot_dbscan() |
| 162 | + Plotted Successfully |
| 163 | + ''' |
| 164 | + if type(self.file) is str: |
| 165 | + data = pd.read_csv(self.file) |
| 166 | + else: |
| 167 | + data = pd.DataFrame(list(self.file)) |
| 168 | + e = self.radius |
| 169 | + for i in self.dict1: |
| 170 | + if len(self.dict1[i]) >= self.minpts: |
| 171 | + plt.scatter(data['x'][i-1], data['y'][i-1], color='red') |
| 172 | + circle = plt.Circle((data['x'][i-1], data['y'][i-1]), |
| 173 | + e, color='blue', fill=False) |
| 174 | + plt.gca().add_artist(circle) |
| 175 | + plt.text(data['x'][i-1], data['y'][i-1], |
| 176 | + 'P'+str(i), ha='center', va='bottom') |
| 177 | + else: |
| 178 | + plt.scatter(data['x'][i-1], data['y'][i-1], color='green') |
| 179 | + plt.text(data['x'][i-1], data['y'][i-1], |
| 180 | + 'P'+str(i), ha='center', va='bottom') |
| 181 | + plt.xlabel('X') |
| 182 | + plt.ylabel('Y') |
| 183 | + plt.title('DBSCAN Clustering') |
| 184 | + plt.legend(['Core','Noise']) |
| 185 | + plt.show() |
| 186 | + print("Plotted Successfully") |
| 187 | + |
| 188 | +if __name__ == "__main__": |
| 189 | + import doctest |
| 190 | + doctest.testmod() |
0 commit comments