Files
SolarEnergy/python_device/src/api.py

95 lines
2.6 KiB
Python

import requests
from src.args import args
import src.constants as constants
from cryptography.x509 import load_pem_x509_certificate
from cryptography import utils
import jwt
def get_secure_origin() -> str:
res = requests.get(f"{args.unsecure_origin}/secure_origin")
if res.status_code < 200 or res.status_code > 299:
raise Exception(f"Get secure origin failed with status {res.status_code}")
return res.text
def get_root_ca() -> str:
res = requests.get(f"{args.unsecure_origin}/pki/root_ca.crt")
if res.status_code < 200 or res.status_code > 299:
raise Exception(f"Get root CA failed with status {res.status_code}")
return res.text
def device_enrollment_status() -> str:
"""
Get current device enrollment status
"""
res = requests.get(
f"{args.secure_origin}/devices_api/mgmt/enrollment_status?id={args.dev_id}",
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Failed to check enrollment with status {res.status_code}")
return res.json()["status"]
def device_info():
"""
Get device information to return with enrollment and sync requests
"""
return {
"reference": constants.DEV_REFERENCE,
"version": constants.DEV_VERSION,
"max_relays": len(args.relay_gpios_list),
}
def enroll_device(csr: str):
"""
Enroll device, ie. submit CSR to API.
Certificate cannot be retrieved before device is validated.
"""
res = requests.post(
f"{args.secure_origin}/devices_api/mgmt/enroll",
json={"csr": csr, "info": device_info()},
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Enrollment failed with status {res.status_code}")
def device_certificate() -> str:
"""
Retrieve device certificate
"""
res = requests.get(
f"{args.secure_origin}/devices_api/mgmt/get_certificate?id={args.dev_id}",
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Failed to check enrollment with status {res.status_code}")
return res.text
def sync_device(dev_id: str, privkey):
"""
Synchronize device with backend
"""
encoded = jwt.encode(
{"info": device_info()}, privkey, algorithm="RS256", headers={"kid": dev_id}
)
res = requests.post(
f"{args.secure_origin}/devices_api/mgmt/sync",
json={"payload": encoded},
verify=args.root_ca_path,
)
print(encoded)
print(res)
print(res.text)