Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion packages/modules/vehicles/cupra/libcupra.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ async def connect(self, username, password):
async def reconnect(self):
# Get code challenge and verifier
code_verifier, code_challenge = self.get_code_challenge()
self.log.debug("Starting Cupra reconnect/auth flow")

# Get authorize page
_scope = 'openid profile nickname birthdate phone'
Expand All @@ -119,6 +120,7 @@ async def reconnect(self):
}

response = await self.session.get(LOGIN_BASE + '/authorize', params=payload)
self.log.debug("Authorize request finished with status=%s", response.status)
if response.status >= 400:
self.log.error(f"Authorize: Non-2xx response ({response.status})")
# Non 2xx response, failed
Expand All @@ -127,27 +129,37 @@ async def reconnect(self):
# Fill form with email (username)
(form, action) = self.form_from_response(await response.read())
form['email'] = self.username
self.log.debug("Submitting email form to action=%s", action)
response = await self.session.post(LOGIN_HANDLER_BASE+action, data=form)
self.log.debug("Email form response status=%s", response.status)
if response.status >= 400:
self.log.error("Email: Non-2xx response")
return False

# Fill form with password
(form, action) = self.password_form(await response.read())
if not form or not action:
self.log.error("Password form parsing failed")
return False
form['password'] = self.password
url = LOGIN_HANDLER_BASE + action
self.log.debug("Submitting password form to url=%s", url)
response = await self.session.post(url, data=form, allow_redirects=False)
self.log.debug("Password form response status=%s", response.status)

# Can get a 303 redirect for a "terms and conditions" page
if (response.status == 303):
url = response.headers['Location']
self.log.debug("Received 303 redirect to %s", url)
if ("terms-and-conditions" in url):
# Get terms and conditions page
url = LOGIN_HANDLER_BASE + url
self.log.debug("Opening terms and conditions page: %s", url)
response = await self.session.get(url, data=form, allow_redirects=False)
(form, action) = self.form_from_response(await response.read())

url = LOGIN_HANDLER_BASE + action
self.log.debug("Submitting terms and conditions form to %s", url)
response = await self.session.post(url, data=form, allow_redirects=False)

self.log.warning("Agreed to terms and conditions")
Expand All @@ -158,18 +170,25 @@ async def reconnect(self):
# Handle every single redirect and stop if the redirect
# URL uses the seat adapter.
while (True):
if 'Location' not in response.headers:
self.log.error("Redirect handling stopped: missing Location header (status=%s)",
response.status)
return False

url = response.headers['Location']
self.log.debug("Redirect: status=%s location=%s", response.status, url)
if (url.split(':')[0] == "seat"):
if not ('code' in url):
self.log.error("Missing authorization code")
return False
# Parse query string
query_string = url.split('?')[1]
query = {x[0]: x[1] for x in [x.split("=") for x in query_string.split("&")]}
self.log.debug("Authorization redirect reached")
break

if (response.status != 302):
self.log.error("Not redirected, status %u" % response.status)
self.log.error("Not redirected, status=%u, last_url=%s", response.status, url)
return False

response = await self.session.get(url, data=form, allow_redirects=False)
Expand All @@ -191,6 +210,7 @@ async def reconnect(self):

response = await self.session.post(API_BASE + '/authorization/api/v1/token',
headers=headers, data=form)
self.log.debug("Authorization code exchange status=%s", response.status)
if response.status >= 400:
self.log.error("Login: Non-2xx response")
# Non 2xx response, failed
Expand All @@ -199,6 +219,7 @@ async def reconnect(self):

# Update header with final token
self.headers['Authorization'] = 'Bearer %s' % self.tokens["accessToken"]
self.log.debug("Cupra authorization completed successfully")

# Success
return True
Expand Down
26 changes: 24 additions & 2 deletions packages/modules/vehicles/skoda/libskoda.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async def connect(self, username, password):
async def reconnect(self):
# Get code challenge and verifier
code_verifier, code_challenge = self.get_code_challenge()
self.log.debug("Starting Skoda reconnect/auth flow")

# Get authorize page
_scope = 'address badge birthdate cars driversLicense dealers email mileage mbb nationalIdentifier'
Expand All @@ -113,6 +114,7 @@ async def reconnect(self):
}

response = await self.session.get(LOGIN_BASE + '/authorize', params=payload)
self.log.debug("Authorize request finished with status=%s", response.status)
if response.status >= 400:
self.log.error(f"Authorize: Non-2xx response ({response.status})")
# Non 2xx response, failed
Expand All @@ -121,49 +123,67 @@ async def reconnect(self):
# Fill form with email (username)
(form, action) = self.form_from_response(await response.read())
form['email'] = self.username
self.log.debug("Submitting email form to action=%s", action)
response = await self.session.post(LOGIN_HANDLER_BASE+action, data=form)
self.log.debug("Email form response status=%s", response.status)
if response.status >= 400:
self.log.error("Email: Non-2xx response")
return False

# Fill form with password
(form, action) = self.password_form(await response.read())
if not form or not action:
self.log.error("Password form parsing failed")
return False
form['password'] = self.password
url = LOGIN_HANDLER_BASE + action
self.log.debug("Submitting password form to url=%s", url)
response = await self.session.post(url, data=form, allow_redirects=False)
self.log.debug("Password form response status=%s", response.status)

# Can get a 303 redirect for a "terms and conditions" page
if (response.status == 303):
url = response.headers['Location']
self.log.debug("Received 303 redirect to %s", url)
if ("terms-and-conditions" in url):
# Get terms and conditions page
url = LOGIN_HANDLER_BASE + url
self.log.debug("Opening terms and conditions page: %s", url)
response = await self.session.get(url, data=form, allow_redirects=False)
(form, action) = self.form_from_response(await response.read())

url = LOGIN_HANDLER_BASE + action
self.log.debug("Submitting terms and conditions form to %s", url)
response = await self.session.post(url, data=form, allow_redirects=False)

self.log.warn("Agreed to terms and conditions")
self.log.warning("Agreed to terms and conditions")
else:
self.log.error("Got unknown 303 redirect")
return False

# Handle every single redirect and stop if the redirect
# URL uses the weconnect adapter.
while (True):
if 'Location' not in response.headers:
self.log.error("Redirect handling stopped: missing Location header (status=%s)",
response.status)
return False

url = response.headers['Location']
self.log.debug("Redirect: status=%s location=%s", response.status, url)
if (url.split(':')[0] == "myskoda"):
if not ('code' in url):
self.log.error("Missing authorization code")
return False
# Parse query string
query_string = url.split('?')[1]
query = {x[0]: x[1] for x in [x.split("=") for x in query_string.split("&")]}
self.log.debug("Authorization redirect reached")
break

if (response.status != 302):
self.log.error("Not redirected, status %u" % response.status)
self.log.error("Not redirected, status=%u, last_url=%s",
response.status, url)
return False

response = await self.session.get(url, data=form, allow_redirects=False)
Expand All @@ -179,6 +199,7 @@ async def reconnect(self):
}
response = await self.session.post(API_BASE + '/v1/authentication/exchange-authorization-code',
params=params, json=payload)
self.log.debug("Authorization code exchange status=%s", response.status)
if response.status >= 400:
self.log.error("Login: Non-2xx response")
# Non 2xx response, failed
Expand All @@ -187,6 +208,7 @@ async def reconnect(self):

# Update header with final token
self.headers['Authorization'] = 'Bearer %s' % self.tokens["accessToken"]
self.log.debug("Skoda authorization completed successfully")

# Success
return True
Expand Down