import secrets
import requests
from eth_account.messages import encode_defunct
from eth_account import Account
class ConfidentialClient:
def __init__(self, api_key: str, base_url: str = "https://api.intelligence.io.net/v1/private"):
self.api_key = api_key
self.base_url = base_url
self.attestation = None
self.nonce = None
def _headers(self):
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
def attest(self, model_id: str) -> dict:
"""Get and verify attestation for a model."""
# Generate fresh nonce
self.nonce = secrets.token_hex(16)
response = requests.post(
f"{self.base_url}/attestation",
headers=self._headers(),
json={"model_id": model_id, "nonce": self.nonce}
)
response.raise_for_status()
self.attestation = response.json()
# Verify nonce freshness (nonce is padded to 64 hex chars)
if not self.attestation["nonce"].startswith(self.nonce):
raise SecurityError("Nonce mismatch - possible replay attack!")
# TODO: Verify GPU/CPU attestation reports against root certificates
# This requires NVIDIA/AMD attestation verification libraries
return self.attestation
def complete(self, model: str, messages: list, **kwargs) -> dict:
"""Run verified confidential inference."""
if not self.attestation:
raise ValueError("Must call attest() before complete()")
response = requests.post(
f"{self.base_url}/completions",
headers=self._headers(),
json={"model": model, "messages": messages, **kwargs}
)
response.raise_for_status()
# Extract signature headers
sig_headers = {
"text": response.headers.get("text"),
"signature": response.headers.get("signature"),
"signing_address": response.headers.get("signing_address"),
"signing_algo": response.headers.get("signing_algo")
}
# Verify signing address matches attestation
if sig_headers["signing_address"].lower() != \
self.attestation["signing_address"].lower():
raise SecurityError("Signing address doesn't match attestation!")
# Verify signature
if not self._verify_signature(sig_headers):
raise SecurityError("Response signature verification failed!")
return response.json()
def _verify_signature(self, headers: dict) -> bool:
"""Verify response signature."""
if headers["signing_algo"] == "ecdsa":
message = encode_defunct(text=headers["text"])
recovered = Account.recover_message(
message,
signature=headers["signature"]
)
return recovered.lower() == headers["signing_address"].lower()
else:
raise ValueError(f"Unknown signing algorithm: {headers['signing_algo']}")
class SecurityError(Exception):
pass
# Usage
client = ConfidentialClient(api_key="your-key")
client.attest(model_id="model-uuid-here")
response = client.complete(
model="meta-llama/Llama-3.3-70B-Instruct",
messages=[{"role": "user", "content": "Hello"}]
)