SolarEnergy/python_device/src/api.py

114 lines
3.2 KiB
Python

import requests
from src.args import args
import src.constants as constants
from cryptography.x509 import load_pem_x509_certificate
from cryptography import utils
import jwt
import json
def get_secure_origin() -> str:
res = requests.get(f"{args.unsecure_origin}/secure_origin")
if res.status_code < 200 or res.status_code > 299:
raise Exception(f"Get secure origin failed with status {res.status_code}")
return res.text
def get_root_ca() -> str:
res = requests.get(f"{args.unsecure_origin}/pki/root_ca.crt")
if res.status_code < 200 or res.status_code > 299:
raise Exception(f"Get root CA failed with status {res.status_code}")
return res.text
def device_enrollment_status() -> str:
"""
Get current device enrollment status
"""
res = requests.get(
f"{args.secure_origin}/devices_api/mgmt/enrollment_status?id={args.dev_id}",
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Failed to check enrollment with status {res.status_code}")
return res.json()["status"]
def device_info():
"""
Get device information to return with enrollment and sync requests
"""
return {
"reference": constants.DEV_REFERENCE,
"version": constants.DEV_VERSION,
"max_relays": len(args.relay_gpios_list),
}
def enroll_device(csr: str):
"""
Enroll device, ie. submit CSR to API.
Certificate cannot be retrieved before device is validated.
"""
res = requests.post(
f"{args.secure_origin}/devices_api/mgmt/enroll",
json={"csr": csr, "info": device_info()},
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Enrollment failed with status {res.status_code}")
def device_certificate() -> str:
"""
Retrieve device certificate
"""
res = requests.get(
f"{args.secure_origin}/devices_api/mgmt/get_certificate?id={args.dev_id}",
verify=args.root_ca_path,
)
if res.status_code < 200 or res.status_code > 299:
print(res.text)
raise Exception(f"Failed to check enrollment with status {res.status_code}")
return res.text
def jwt_sign(data: any, dev_id: str, privkey) -> str:
"""
Generate a JWT for client request
"""
return jwt.encode(data, privkey, algorithm="RS256", headers={"kid": dev_id})
def sync_device(dev_id: str, privkey):
"""
Synchronize device with backend
"""
encoded = jwt_sign({"info": device_info()}, dev_id=dev_id, privkey=privkey)
res = requests.post(
f"{args.secure_origin}/devices_api/mgmt/sync",
json={"payload": encoded},
verify=args.root_ca_path,
)
return json.loads(res.text)
def report_log(severity: str, message: str, dev_id: str, privkey):
"""
Report log message to server
"""
encoded = jwt_sign(
{"severity": severity, "message": message}, dev_id=dev_id, privkey=privkey
)
requests.post(
f"{args.secure_origin}/devices_api/logging/record",
json={"payload": encoded},
verify=args.root_ca_path,
)