SolarEnergy/python_device/src/api.py

94 lines
2.6 KiB
Python
Raw Normal View History

2024-06-29 16:05:58 +00:00
import requests
from src.args import args
2024-07-01 19:10:45 +00:00
import src.constants as constants
2024-09-04 18:17:11 +00:00
from cryptography.x509 import load_pem_x509_certificate
from cryptography import utils
import jwt
2024-07-01 19:10:45 +00:00
2024-06-29 16:05:58 +00:00
def get_secure_origin() -> str:
res = requests.get(f"{args.unsecure_origin}/secure_origin")
if res.status_code < 200 or res.status_code > 299:
raise Exception(f"Get secure origin failed with status {res.status_code}")
2024-06-29 16:08:57 +00:00
return res.text
2024-07-01 19:10:45 +00:00
2024-06-29 16:08:57 +00:00
def get_root_ca() -> str:
2024-07-01 19:10:45 +00:00
res = requests.get(f"{args.unsecure_origin}/pki/root_ca.crt")
2024-06-29 16:08:57 +00:00
if res.status_code < 200 or res.status_code > 299:
raise Exception(f"Get root CA failed with status {res.status_code}")
return res.text
2024-07-01 19:10:45 +00:00
def device_enrollment_status() -> str:
"""
Get current device enrollment status
"""
res = requests.get(
f"{args.secure_origin}/devices_api/mgmt/enrollment_status?id={args.dev_id}",
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Failed to check enrollment with status {res.status_code}")
return res.json()["status"]
2024-07-01 19:10:45 +00:00
def device_info():
"""
Get device information to return with enrollment and sync requests
"""
return {
"reference": constants.DEV_REFERENCE,
"version": constants.DEV_VERSION,
"max_relays": len(args.relay_gpios_list),
}
2024-07-02 20:55:51 +00:00
def enroll_device(csr: str):
"""
Enroll device, ie. submit CSR to API.
Certificate cannot be retrieved before device is validated.
"""
2024-07-01 19:10:45 +00:00
res = requests.post(
f"{args.secure_origin}/devices_api/mgmt/enroll",
json={"csr": csr, "info": device_info()},
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Enrollment failed with status {res.status_code}")
def device_certificate() -> str:
"""
Retrieve device certificate
"""
res = requests.get(
f"{args.secure_origin}/devices_api/mgmt/get_certificate?id={args.dev_id}",
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Failed to check enrollment with status {res.status_code}")
return res.text
2024-09-04 18:17:11 +00:00
def sync_device(dev_id: str, privkey):
"""
Synchronize device with backend
"""
encoded = jwt.encode(
{"info": device_info()}, privkey, algorithm="RS256", headers={"kid": dev_id}
)
res = requests.post(
f"{args.secure_origin}/devices_api/mgmt/sync",
json={"payload": encoded},
verify=args.root_ca_path,
)
print(encoded)
print(res)