8
8
changes in the value quicker than SMA, which is one of the advantages of using EMA.
9
9
"""
10
10
11
+ from collections .abc import Iterator
11
12
12
- def exponential_moving_average (series : list [float ], window_size : int ) -> list [float ]:
13
+
14
+ def exponential_moving_average (
15
+ series_generator : Iterator [float ], window_size : int
16
+ ) -> Iterator [float ]:
13
17
"""
14
- Returns the exponential moving average of the given array list
15
- >>> exponential_moving_average([2, 5, 3, 8.2, 6, 9, 10], 3)
18
+ Returns the generator which generates exponential moving average of the given
19
+ series generator
20
+ >>> list(exponential_moving_average((ele for ele in [2, 5, 3, 8.2, 6, 9, 10]), 3))
16
21
[2.0, 3.5, 3.25, 5.725, 5.8625, 7.43125, 8.715625]
17
22
18
- :param series: Array of numbers (Time series data)
23
+ :param series_generator: Generator which generates numbers
19
24
:param window_size: Window size for calculating average (window_size > 0)
20
- :return: Resulting array of exponentially averaged numbers
25
+ :return: Returns generator of which returns exponentially averaged numbers
21
26
22
27
Formula:
23
28
@@ -30,41 +35,41 @@ def exponential_moving_average(series: list[float], window_size: int) -> list[fl
30
35
31
36
if window_size <= 0 :
32
37
raise ValueError ("window_size must be > 0" )
33
- elif window_size >= len (series ):
34
- raise ValueError ("window_size must be < length of series" )
35
-
36
- # Resultent array
37
- exp_averaged_arr : list [float ] = []
38
38
39
39
# Calculating smoothing factor
40
40
alpha = 2 / (1 + window_size )
41
41
42
+ # Defining timestamp t
43
+ t = 0
44
+
42
45
# Exponential average at timestamp t
43
- st = series [ 0 ]
46
+ st = None
44
47
45
- for t in range ( len ( series )) :
48
+ for xt in series_generator :
46
49
if t <= window_size :
47
50
# Assigning simple moving average till the window_size for the first time
48
51
# is reached
49
- st = (st + series [t ]) * 0.5
50
- exp_averaged_arr .append (st )
52
+ st = float (xt ) if st is None else (st + xt ) * 0.5
51
53
else :
52
54
# Calculating exponential moving average based on current timestamp data
53
55
# point and previous exponential average value
54
- st = (alpha * series [t ]) + ((1 - alpha ) * st )
55
- exp_averaged_arr .append (st )
56
-
57
- return exp_averaged_arr
56
+ st = (alpha * xt ) + ((1 - alpha ) * st )
57
+ t += 1
58
+ yield st
58
59
59
60
60
61
if __name__ == "__main__" :
61
62
import doctest
62
63
63
64
doctest .testmod ()
64
65
66
+ def test_gen_func (arr : list [float ]):
67
+ yield from arr
68
+
65
69
test_series = [2 , 5 , 3 , 8.2 , 6 , 9 , 10 ]
70
+ test_generator = test_gen_func (test_series )
66
71
test_window_size = 3
67
- result = exponential_moving_average (test_series , test_window_size )
72
+ result = exponential_moving_average (test_generator , test_window_size )
68
73
print ("Test series: " , test_series )
69
74
print ("Window size: " , test_window_size )
70
- print ("Result: " , result )
75
+ print ("Result: " , list ( result ) )
0 commit comments