Skip to content

Commit da55f09

Browse files
committed
Created Device Emulator
1 parent cba528c commit da55f09

File tree

2 files changed

+227
-26
lines changed

2 files changed

+227
-26
lines changed

test_utils/device_emulator.py

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
#!/usr/bin/env python3
2+
3+
# Emulate some HTTP handlers and UDP multicast handler of Smart Home device
4+
5+
import sys
6+
import math
7+
import json
8+
import socket
9+
import struct
10+
import threading
11+
import http.server
12+
import socketserver
13+
import urllib.parse
14+
from datetime import datetime
15+
16+
MAC_ADDRESS = 'AA:BB:CC:DD:EE:FF'
17+
18+
HTTP_PORT = 80
19+
20+
UDP_MULTICAST_GRP = '227.16.119.203'
21+
UDP_MULTICAST_PORT = 25061
22+
23+
DIMMER_MAX_VALUE = 1000
24+
25+
name = 'device-emulator'
26+
27+
password = '12345'
28+
29+
values = {
30+
'dim0': 500,
31+
'dim1': 250,
32+
'dim2': 750,
33+
'sw0': 0,
34+
'sw1': 1,
35+
'sw2': 1,
36+
'sw3': 0
37+
}
38+
39+
dimmers_settings = {
40+
'dim0': {
41+
'value_change_step': 10,
42+
'min_lightness_micros': 8300,
43+
'max_lightness_micros': 4000
44+
},
45+
'dim1': {
46+
'value_change_step': 10,
47+
'min_lightness_micros': 8300,
48+
'max_lightness_micros': 4000
49+
},
50+
'dim2': {
51+
'value_change_step': 10,
52+
'min_lightness_micros': 8300,
53+
'max_lightness_micros': 4000
54+
}
55+
}
56+
57+
switchers_inverted = {
58+
'sw0': 1,
59+
'sw1': 1,
60+
'sw2': 0,
61+
'sw3': 1
62+
}
63+
64+
def info(msg):
65+
print('[%s, %d, %s] %s' % (
66+
datetime.now().strftime('%d.%m.%Y %H:%M:%S'),
67+
threading.active_count(),
68+
threading.current_thread().name,
69+
msg
70+
))
71+
sys.stdout.flush()
72+
73+
def dimmerValueToMicros(dimmer):
74+
minLM = dimmers_settings[dimmer]['min_lightness_micros'];
75+
maxLM = dimmers_settings[dimmer]['max_lightness_micros'];
76+
return round(math.cos(math.pi * (values[dimmer] - 1) / 2 / (DIMMER_MAX_VALUE - 1)) * (minLM - maxLM) + maxLM)
77+
78+
class HTTPRequestHandler(http.server.BaseHTTPRequestHandler):
79+
# def log_message(self, format, *args):
80+
# return
81+
82+
def send_response_advanced(self, code, contentType, data):
83+
dataB = bytes(data, 'UTF-8') if isinstance(data, str) else data
84+
assert isinstance(dataB, bytes)
85+
self.send_response(code)
86+
self.send_header('Content-Type', contentType)
87+
self.send_header('Content-Length', len(dataB))
88+
self.end_headers()
89+
self.wfile.write(dataB)
90+
self.wfile.flush()
91+
92+
def checkPassword(self):
93+
if self.headers.get('Password', '') != password:
94+
self.send_response_advanced(403, 'text/plain', 'Forbidden')
95+
return False
96+
return True
97+
98+
def handleSetValues(self, argsStr):
99+
global values
100+
101+
try:
102+
argsList = argsStr.split('&')
103+
for arg in argsList:
104+
key, value = arg.split('=')[0:2]
105+
value = int(value)
106+
assert key in values
107+
if key.startswith('dim'):
108+
assert 0 <= value and value <= 1000
109+
elif key.startswith('sw'):
110+
assert value in [0, 1]
111+
values[key] = value
112+
self.send_response_advanced(200, 'text/plain', 'ACCEPTED\n') # TODO: support NOTHING_CHANGED response
113+
except:
114+
self.send_response_advanced(400, 'text/plain', 'Bad Request')
115+
116+
def do_GET(self):
117+
if not self.checkPassword():
118+
return
119+
120+
if self.path == '/get_info?minimal':
121+
result = {
122+
'mac': MAC_ADDRESS,
123+
'name': name
124+
}
125+
self.send_response_advanced(200, 'application/json', json.dumps(result, indent=2) + '\n')
126+
127+
elif self.path == '/get_info':
128+
result = {
129+
'mac': MAC_ADDRESS,
130+
'name': name,
131+
'values': values,
132+
'micros': {
133+
'dim0': dimmerValueToMicros('dim0'),
134+
'dim1': dimmerValueToMicros('dim1'),
135+
'dim2': dimmerValueToMicros('dim2')
136+
},
137+
'dimmers_settings': dimmers_settings,
138+
'switchers_inverted': switchers_inverted,
139+
'order': {
140+
'dimmers': [0, 1, 2],
141+
'switchers': [0, 1, 2, 3]
142+
}
143+
}
144+
self.send_response_advanced(200, 'application/json', json.dumps(result, indent=2) + '\n')
145+
146+
elif self.path.startswith('/set_values?'):
147+
self.handleSetValues(self.path[len('/set_values?'):])
148+
149+
else:
150+
self.send_response_advanced(404, 'text/plain', 'Not Found')
151+
152+
def do_POST(self):
153+
global name, dimmers_settings, switchers_inverted
154+
155+
if not self.checkPassword():
156+
return
157+
158+
contentLength = int(self.headers.get('Content-Length', 0))
159+
if contentLength <= 0:
160+
info('No header Content-Length in POST request')
161+
self.send_response_advanced(400, 'text/plain', 'Bad Request')
162+
return
163+
body = self.rfile.read(contentLength)
164+
165+
if self.path == '/set_values':
166+
self.handleSetValues(body.decode('UTF-8'))
167+
168+
elif self.path == '/set_settings':
169+
try:
170+
argsList = body.decode('UTF-8').split('&')
171+
for arg in argsList:
172+
key, value = arg.split('=')[0:2]
173+
if key == 'name':
174+
name = urllib.parse.unquote(value)
175+
elif key.startswith('dim'):
176+
assert key in dimmers_settings
177+
dimSettings = [int(dimSetting) for dimSetting in value.split(',')]
178+
assert len(dimSettings) == 3
179+
dimmers_settings[key] = {
180+
'value_change_step': dimSettings[0],
181+
'min_lightness_micros': dimSettings[1],
182+
'max_lightness_micros': dimSettings[2]
183+
}
184+
elif key.startswith('sw'):
185+
assert key in switchers_inverted
186+
value = int(value)
187+
assert value in [0, 1]
188+
switchers_inverted[key] = value
189+
self.send_response_advanced(200, 'text/plain', 'ACCEPTED\n')
190+
except:
191+
self.send_response_advanced(400, 'text/plain', 'Bad Request')
192+
193+
else:
194+
self.send_response_advanced(404, 'text/plain', 'Not Found')
195+
196+
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
197+
pass
198+
199+
def handleUdpMulticastForever():
200+
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
201+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
202+
sock.bind((UDP_MULTICAST_GRP, UDP_MULTICAST_PORT))
203+
204+
mreq = struct.pack('4sl', socket.inet_aton(UDP_MULTICAST_GRP), socket.INADDR_ANY)
205+
206+
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
207+
208+
sockResp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
209+
210+
while True:
211+
data, addr = sock.recvfrom(1024)
212+
info('Received UDP message [%s] from [%s]' % (data, addr))
213+
if data == b'SMART_HOME_SCAN':
214+
udpResponse = ('MAC=%s\nNAME=%s' % (MAC_ADDRESS, name)).encode('UTF-8')
215+
sock.sendto(udpResponse, (addr[0], 25062))
216+
info('Sent UDP response [%s]' % udpResponse)
217+
218+
if __name__ == '__main__':
219+
threading.Thread(target=handleUdpMulticastForever, daemon=True).start()
220+
server = ThreadedHTTPServer(('', HTTP_PORT), HTTPRequestHandler)
221+
info('Smart Home Device Emulator HTTP server created, serving forever on port %d...' % HTTP_PORT)
222+
try:
223+
server.serve_forever()
224+
except KeyboardInterrupt:
225+
info('Halting Smart Home Device Emulator HTTP server')
226+
server.shutdown()
227+
info('Goodbye!')

test_utils/udp_receive_multicast_and_send_unicast.py

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)