Skip to content

Python API reference

The inferencekey package is the typed, ergonomic Python surface over the Rust core. Declare a workload with WorkloadSpec, provision it idempotently with ManagementClient.ensure, call the resulting OpenAI-compatible Endpoint, and delete it when you’re done.

Install
pip install inferencekey
Imports
from inferencekey import (
ManagementClient, DataClient, Endpoint,
WorkloadSpec, EndpointRef, TextResult, EmbedResult,
Backend, TaskType, OnDrift, ExecutionPolicy,
InferenceKeyError, PermissionDenied, AuthError,
ValidationError, ConfigurationError, ApiError,
)

ManagementClient

Control-plane client (ik_sdk_ token), scoped to one project. Provisions and reconciles workloads; it has no inference methods.

class ManagementClient:
def __init__(self, *, base_url: str, sdk_token: str, project: str | None = None) -> None

base_url defaults to https://api.inferencekey.com when constructed via from_env. A blank sdk_token raises ConfigurationError.

from_env

@classmethod
def from_env(
cls,
*,
base_url: str | None = None,
project: str | None = None,
sdk_token: str | None = None,
) -> ManagementClient

Construct from configuration, resolving each value explicit > environment > default. Reads INFERENCEKEY_BASE_URL, INFERENCEKEY_PROJECT, and INFERENCEKEY_SDK_TOKEN.

mgmt = ManagementClient.from_env(project="acme") # reads INFERENCEKEY_SDK_TOKEN

ensure

def ensure(
self,
spec: WorkloadSpec,
*,
on_drift: OnDrift | str = OnDrift.RECONCILE,
project: str | None = None,
) -> EndpointRef

Idempotently provision or reconcile spec, returning an EndpointRef. Idempotency is keyed off the explicit spec.slug. The project is resolved from project= > spec.project > the client’s project; if none is set, raises ConfigurationError. on_drift governs what happens when the live workload differs from the spec — see OnDrift and the drift reference.

ref = mgmt.ensure(WorkloadSpec(
name="support-bot", slug="support-bot",
model="meta-llama/Llama-3.1-8B-Instruct", backend=Backend.VLLM,
command="vllm serve meta-llama/Llama-3.1-8B-Instruct --max-model-len 8192",
))
print(ref.project_slug, ref.workload_slug)

wait_until_ready

def wait_until_ready(
self,
workload_slug: str,
*,
project: str | None = None,
timeout: float = 600.0,
on_progress: Callable[[ReadinessEvent], None] | None = None,
silent: bool = False,
) -> None

Wait until workload_slug is serving, reporting progress as the platform schedules a worker, provisions a cloud GPU, and boots the runtime. Returns when the platform reports the ready phase; raises ApiError on an error phase or TimeoutError after timeout seconds.

This lives on the ManagementClient (control plane): progress is streamed over the ik_sdk_ token, so no ik_live_ data key is needed. By default it prints a live progress view to the terminal (a phase bar on a TTY, plain lines in CI); pass your own on_progress to handle each ReadinessEvent yourself, or silent=True to suppress output. Call it right after ensure() provisions a cold worker.

ref = mgmt.ensure(spec)
mgmt.wait_until_ready(ref.workload_slug, timeout=600)
# or handle progress yourself:
mgmt.wait_until_ready(ref.workload_slug, on_progress=lambda e: print(e.phase, e.message))

delete

def delete(
self,
workload_slug: str,
*,
project: str | None = None,
) -> bool

Delete the workload named by workload_slug from the project. Returns True if it existed and was removed, False if it was already gone. It is idempotent — deleting something that isn’t there is not an error — so it’s safe to call on shutdown without checking first. project falls back to the client’s project (or INFERENCEKEY_PROJECT) when omitted, exactly like ensure. Lives on the ManagementClient (control plane): it uses the ik_sdk_ token’s delete workloads capability and is scoped to that token’s project. A token for another project, or one lacking the capability, raises PermissionDenied.

existed = mgmt.delete(ref.workload_slug)
print("deleted" if existed else "already gone")

See Clean up on exit for the recommended shutdown pattern.

DataClient

Data-plane client. Derive an Endpoint per workload, each bound to its own ik_live_ key — so one app can drive several workloads with different keys.

class DataClient:
def __init__(self, *, base_url: str, project: str, api_key: str | None = None) -> None

A blank project raises ConfigurationError. api_key is the default ik_live_ key applied when endpoint is called without one.

from_env

@classmethod
def from_env(
cls,
*,
base_url: str | None = None,
project: str | None = None,
api_key: str | None = None,
) -> DataClient

Construct from configuration, resolving each value explicit > environment > default. Reads INFERENCEKEY_BASE_URL, INFERENCEKEY_PROJECT, and INFERENCEKEY_API_KEY (the default ik_live_ key).

data = DataClient.from_env(project="acme")

endpoint

def endpoint(self, workload_slug: str, *, api_key: str | None = None) -> Endpoint

Bind an Endpoint to workload_slug and an ik_live_ key. The key resolves from api_key= > the client’s default api_key (INFERENCEKEY_API_KEY); if neither is set, raises ConfigurationError.

ep = data.endpoint(ref.workload_slug, api_key="ik_live_...")

Endpoint

A single workload’s OpenAI-compatible endpoint, bound to one ik_live_ key. Obtain instances via DataClient.endpoint rather than constructing directly. The bound workload_slug is exposed as a public attribute.

generate_text

def generate_text(
self,
*,
prompt: str | None = None,
messages: list | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
) -> TextResult

Run a non-streaming chat completion. Pass either prompt (a single user turn) or messages (OpenAI-style role/content dicts). Unset parameters are omitted from the request. Returns a TextResult.

out = ep.generate_text(prompt="Hola", temperature=0.2, max_tokens=300)
print(out.text, out.model)

generate_text_stream

def generate_text_stream(
self,
*,
prompt: str | None = None,
messages: list | None = None,
temperature: float | None = None,
max_tokens: int | None = None,
) -> Iterator[TextChunk]

Run a streaming chat completion. Same parameters as generate_text, but instead of one result it returns an iterator yielding one TextChunk per server-sent event as the reply is produced. The connection is opened eagerly (so auth/validation errors raise here, not mid-iteration); chunks are then pulled lazily as you iterate. Concatenate chunk.text to rebuild the full reply.

for chunk in ep.generate_text_stream(prompt="Hola"):
print(chunk.text, end="", flush=True)

embed

def embed(self, *, input: str | list[str]) -> EmbedResult

Create embeddings for one or more inputs. A bare string is treated as a single-element batch. Returns an EmbedResult with one vector per input.

emb = data.endpoint("billing", api_key="ik_live_...").embed(input=["a", "b"])
print(emb.embeddings) # list[list[float]]

WorkloadSpec

The declarative workload definition handed to ensure. name, slug, model, and backend are required; everything else is optional. There is no provider and no min_vram_gb — placement is the platform’s job, never the caller’s.

@dataclass
class WorkloadSpec:
name: str
slug: str
model: str
backend: Backend | str
project: str | None = None
description: str | None = None
command: str | None = None
vllm_version: str | None = None
task_type: TaskType | str | None = None
config: dict | None = None
execution_policy: ExecutionPolicy | str | None = None
execution_policy_config: dict | None = None
worker_id: str | None = None
gpu_resource_id: str | None = None
FieldTypeDescription
namestrHuman-readable display name. Required.
slugstrStable identifier; the idempotency key for ensure(). Required.
modelstrModel reference, e.g. meta-llama/Llama-3.1-8B-Instruct. Required.
backendBackend | strInference engine. Required.
projectstr | NoneProject slug; overrides the client’s project for this spec.
descriptionstr | NoneFree-text note shown in the dashboard.
commandstr | NoneLaunch command (vLLM/SGLang), e.g. vllm serve ....
vllm_versionstr | NonePin the vLLM image version (vllm / vllm-omni backends).
task_typeTaskType | str | NoneWorkload modality; server default is text2text.
configdict | NoneBackend-specific configuration.
execution_policyExecutionPolicy | str | NoneHow the Manager schedules the workload.
execution_policy_configdict | NonePolicy-specific settings (e.g. autoscaling bounds).
worker_idstr | NonePin to a specific worker.
gpu_resource_idstr | NonePin to a specific GPU resource.

EndpointRef

Frozen dataclass returned by ensure: the slugs addressing a workload’s data-plane endpoint.

@dataclass(frozen=True)
class EndpointRef:
project_slug: str
workload_slug: str

TextResult

Frozen dataclass returned by generate_text.

@dataclass(frozen=True)
class TextResult:
text: str
model: str
finish_reason: str | None = None
raw: dict = {}
FieldTypeDescription
textstrThe generated completion text.
modelstrThe model that produced the result.
finish_reasonstr | NoneWhy generation stopped (e.g. stop, length), when reported.
rawdictThe underlying provider response, untouched.

TextChunk

Frozen dataclass yielded by generate_text_stream — one per streamed event.

@dataclass(frozen=True)
class TextChunk:
text: str
finish_reason: str | None = None
raw: dict = {}
FieldTypeDescription
textstrThe delta for this chunk. Concatenate across chunks to rebuild the full reply.
finish_reasonstr | NoneSet only on the terminal chunk (e.g. stop, length).
rawdictThe underlying chunk JSON, untouched.

ReadinessEvent

Frozen dataclass passed to the on_progress callback of wait_until_ready — one per progress update while a workload comes up.

@dataclass(frozen=True)
class ReadinessEvent:
phase: str
message: str
elapsed_ms: int = 0
step: str | None = None
FieldTypeDescription
phasestrscheduling / provisioning / bootstrapping / ready / error. ready means serving; error is terminal.
messagestrShort, printable description of what the platform is doing.
elapsed_msintMilliseconds since the wait started.
stepstr | NoneAllow-listed bootstrap step (e.g. model_load), when applicable.

EmbedResult

Frozen dataclass returned by embed: one vector per input.

@dataclass(frozen=True)
class EmbedResult:
embeddings: list[list[float]]
model: str
raw: dict = {}
FieldTypeDescription
embeddingslist[list[float]]One embedding vector per input, in order.
modelstrThe embedding model that produced the vectors.
rawdictThe underlying provider response, untouched.

Enums

Each enum’s value is the exact platform wire string, so you may pass either the enum member or its string. See the backends and policies reference for semantics.

Backend

The inference engine for a workload (the backend wire string).

class Backend(str, Enum):
OLLAMA = "ollama"
VLLM = "vllm"
VLLM_OMNI = "vllm-omni"
SGLANG = "sglang"

TaskType

The workload modality (task_type); the server default is text2text. reranker, classification, and reward are async-only (no sync OpenAI route).

class TaskType(str, Enum):
TEXT2TEXT = "text2text"
EMBEDDING = "embedding"
TEXT2IMAGE = "text2image"
TEXT2AUDIO = "text2audio"
AUDIO2TEXT = "audio2text"
RERANKER = "reranker"
CLASSIFICATION = "classification"
REWARD = "reward"

OnDrift

The drift-handling strategy for ensure; defaults to RECONCILE. See OnDrift.

class OnDrift(str, Enum):
RECONCILE = "reconcile"
FAIL = "fail"
DRY_RUN = "dry_run"
WARN = "warn"
IGNORE = "ignore"

ExecutionPolicy

How the Manager schedules the workload (execution_policy).

class ExecutionPolicy(str, Enum):
FIXED = "fixed"
SCHEDULED = "scheduled"
AUTOSCALING = "autoscaling"

Exceptions

Every SDK error derives from InferenceKeyError, so you can catch the base class to handle all of them. See Common errors.

InferenceKeyError
├── PermissionDenied # 403 — wrong_credential_type / project_scope_mismatch / scope_insufficient
├── AuthError # 401 — missing or invalid credentials
├── ValidationError # 400 — a request argument failed local or server validation
├── ConfigurationError # client-side misconfiguration before any request
└── ApiError # any other non-2xx response or transport failure
ExceptionRaised when
InferenceKeyErrorBase class for every SDK error.
PermissionDeniedThe credential may not perform the operation (e.g. a data token used for control, or a token scoped to another project).
AuthErrorCredentials are missing or invalid.
ValidationErrorA request argument failed local or server validation.
ConfigurationErrorThe client is misconfigured before any request is sent (e.g. no token, no project, or no ik_live_ key).
ApiErrorAny other non-2xx response or a transport failure.
from inferencekey import PermissionDenied, AuthError, InferenceKeyError
try:
ref = mgmt.ensure(spec)
except PermissionDenied:
print("This token cannot provision — is it an ik_sdk_ key for this project?")
except AuthError:
print("Check INFERENCEKEY_SDK_TOKEN.")
except InferenceKeyError as e:
print(f"Provisioning failed: {e}")

Clean up on exit

Delete the workload when your program ends so a run doesn’t leave it — and any cloud GPU the platform provisioned for it — running and billing. Put the delete in a finally so it runs on a clean exit, an error, and a KeyboardInterrupt (Ctrl+C):

mgmt = ManagementClient.from_env(project="acme")
ref = mgmt.ensure(spec)
try:
mgmt.wait_until_ready(ref.workload_slug, timeout=600)
# … use the endpoint …
finally:
mgmt.delete(ref.workload_slug) # idempotent; safe to always call

End-to-end

  1. Provision a workload with your control token.

    from inferencekey import ManagementClient, WorkloadSpec, Backend
    mgmt = ManagementClient.from_env(project="acme") # reads INFERENCEKEY_SDK_TOKEN
    ref = mgmt.ensure(WorkloadSpec(
    name="support-bot", slug="support-bot",
    model="meta-llama/Llama-3.1-8B-Instruct", backend=Backend.VLLM,
    command="vllm serve meta-llama/Llama-3.1-8B-Instruct --max-model-len 8192",
    ))
  2. Call the endpoint with that workload’s data key.

    from inferencekey import DataClient
    data = DataClient.from_env(project="acme")
    ep = data.endpoint(ref.workload_slug, api_key="ik_live_...")
    out = ep.generate_text(prompt="Hola", temperature=0.2, max_tokens=300)
    print(out.text)
  3. Delete it when you’re done, so nothing keeps running.

    mgmt.delete(ref.workload_slug)

New to InferenceKey? Create an account or open the dashboard · Learn more at inferencekey.com.