-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdecode_udp.py
More file actions
136 lines (109 loc) · 4.26 KB
/
decode_udp.py
File metadata and controls
136 lines (109 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import time
from roborock.web_api import RoborockApiClient
import asyncio
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
import zlib
import socket
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import io
import json
VACUUM_SSID = ""
YOUR_SSID = ""
YOUR_SSID_PASS = ""
YOUR_RR_USER = ""
YOUR_RR_PASS = ""
YOUR_TIMEZONE = "America/New_York"
YOUR_CST = "EST5EDT,M3.2.0,M11.1.0"
YOUR_REGION = "US"
def generate_rsa_key_pair(key_size=1024):
key = RSA.generate(key_size)
private_key = key.export_key().decode('utf-8')
public_key = key.publickey().export_key().decode('utf-8')
return private_key, public_key
KEY = generate_rsa_key_pair()
def calculate_crc32(data):
return zlib.crc32(data) & 0xffffffff
def build_bytes(data, cmd_id):
byte_array_output_stream = io.BytesIO()
byte_array_output_stream.write("1.0".encode('utf-8')[:3])
byte_array_output_stream.write(bytes([0, 0, 0, 1]))
byte_array_output_stream.write(bytes([0, cmd_id]))
byte_array_output_stream.write(bytes([len(data) >> 8]))
byte_array_output_stream.write(bytes([len(data) & 0xFF]))
byte_array_output_stream.write(data)
crc32_val = calculate_crc32(byte_array_output_stream.getvalue())
byte_array_output_stream.write(bytes([(crc32_val >> 24) & 0xFF]))
byte_array_output_stream.write(bytes([(crc32_val >> 16) & 0xFF]))
byte_array_output_stream.write(bytes([(crc32_val >> 8) & 0xFF]))
byte_array_output_stream.write(bytes([crc32_val & 0xFF]))
return byte_array_output_stream.getvalue()
def decrypt_rsa_data(data, private_key_pem):
key = RSA.importKey(private_key_pem.encode())
cipher = PKCS1_v1_5.new(key)
i8 = ((data[9] << 8) & 0xFF00) + (data[10] & 0xFF)
i7 = 11
block_size = key.size_in_bytes()
decrypted_data = bytearray()
while i8 >= block_size:
block = data[i7:i7 + block_size]
decrypted_block = cipher.decrypt(block,sentinel=None)
decrypted_data.extend(decrypted_block)
i8 -= block_size
i7 += block_size
return bytes(decrypted_data)
def build_first_message():
data = {
"id": 1,
"method": "hello",
"params": {
"app_ver": 1,
"key": KEY[1],
}
}
print(data)
key = VACUUM_SSID.encode("utf-8")[-16:]
cipher = AES.new(key, AES.MODE_ECB)
padded_data = pad(json.dumps(data, separators=(',', ':')).encode("utf-8"), AES.block_size)
ciphertext = cipher.encrypt(padded_data)
return build_bytes(ciphertext,16)
def decode_second_messagE(message):
return decrypt_rsa_data(message, KEY[0])
def build_third_message(key, u, s, t, r ="US"):
data = {"u":u,
"ssid": YOUR_SSID,
"token": {
"r": r,
"tz": YOUR_TIMEZONE,
"s": s,
"cst": YOUR_CST,
"t":t
},
"passwd": YOUR_SSID_PASS}
print(data)
cipher = AES.new(key.encode("utf-8"), AES.MODE_ECB)
padded_data = pad(json.dumps(data, separators=(',', ':')).encode("utf-8"), AES.block_size)
ciphertext = cipher.encrypt(padded_data)
return build_bytes(ciphertext, 1)
def send_udp_message(message, ip="192.168.8.1", port=55559, buffer_size=1024, timeout=20):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(timeout)
sock.sendto(message, (ip, port))
print(f"Sent: {message}")
data, addr = sock.recvfrom(buffer_size)
return data
async def main():
api = RoborockApiClient(YOUR_RR_USER)
ud = await api.pass_login(YOUR_RR_PASS)
prepare = await api.nc_prepare(ud, YOUR_TIMEZONE)
input("Reset the vacuum's wifi/ Change to the vacuums ssid, then hit any key")
hello_msg = build_first_message()
hello_response = json.loads(decode_second_messagE(send_udp_message(hello_msg)))
time.sleep(.5)
wifi_msg = send_udp_message(build_third_message(hello_response['params']['key'],u=ud.rruid,s=prepare['s'], t=prepare['t'],r=YOUR_REGION))
input("Wait for the status identifier on your vacuum for wifi go to solid blue. Then change to a real ssid, then hit any key.")
resp =await api.add_device(ud, s=prepare['s'], t=prepare['t'])
print(resp)
if __name__ == '__main__':
asyncio.run(main())