Skip to content

client

client

Asynchronous WebSocket client for Derive.

Classes

WebSocketClient

WebSocketClient(
    *,
    wallet: ChecksumAddress | str,
    session_key: str,
    subaccount_id: int,
    env: Environment,
    logger: LoggerType | None = None,
    request_timeout: float = 10.0
)

Asynchronous WebSocket client for real-time data and operations.

Source code in derive_client/_clients/websockets/client.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def __init__(
    self,
    *,
    wallet: ChecksumAddress | str,
    session_key: str,
    subaccount_id: int,
    env: Environment,
    logger: LoggerType | None = None,
    request_timeout: float = 10.0,
):
    config = CONFIGS[env]
    w3 = Web3(Web3.HTTPProvider(config.rpc_endpoint))
    account = w3.eth.account.from_key(session_key)

    auth = AuthContext(
        w3=w3,
        wallet=ChecksumAddress(wallet),
        account=account,
        config=config,
    )

    self._env = env
    self._auth = auth
    self._config = config
    self._subaccount_id = subaccount_id

    self._logger = logger if logger is not None else get_logger()
    self._session = WebSocketSession(
        url=config.ws_address,
        request_timeout=request_timeout,
        logger=self._logger,
        reconnect=True,
        on_disconnect=self._handle_disconnect,
        on_reconnect=self._handle_reconnect,
        on_before_resubscribe=self._handle_before_resubscribe,  # Re-authentication hook
    )

    self._public_api = PublicAPI(session=self._session)
    self._private_api = PrivateAPI(session=self._session)

    self._markets = MarketOperations(public_api=self._public_api, logger=self._logger)  # type: ignore
    self._transactions = TransactionOperations(public_api=self._public_api, logger=self._logger)  # type: ignore

    self._light_account: LightAccount | None = None
    self._subaccounts: dict[int, Subaccount] = {}
    self._logger.info(
        dedent(f"""
                    Initialized WebSocketClient for:
                        wallet:     {auth.wallet}
                        subaccount: {subaccount_id}
                        signer:     {auth.account.address}
                        environment {env.value} """)
    )
Attributes
account property
account: LightAccount

Get the LightAccount instance.

active_subaccount property
active_subaccount: Subaccount

Get the currently active subaccount.

cached_subaccounts property
cached_subaccounts: list[Subaccount]

Get all cached subaccounts.

markets property

Access market data and instruments.

transactions property
transactions: TransactionOperations

Query transaction status and details.

collateral property

Manage collateral and margin.

orders property

Place and manage orders.

positions property
positions: PositionOperations

View and manage positions.

rfq property

Request for quote operations.

trades property

View trade history.

mmp property

Market maker protection settings.

public_channels property
public_channels

Access public channel subscriptions.

private_channels property
private_channels

Access private channel subscriptions.

Functions
from_env classmethod
from_env(
    session_key_path: Path | None = None,
    env_file: Path | None = None,
) -> WebSocketClient

Create WebSocketClient from environment configuration.

Source code in derive_client/_clients/websockets/client.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
@classmethod
def from_env(
    cls,
    session_key_path: Path | None = None,
    env_file: Path | None = None,
) -> WebSocketClient:
    """Create WebSocketClient from environment configuration."""

    config = load_client_config(session_key_path=session_key_path, env_file=env_file)
    return cls(**config.model_dump())
connect async
connect() -> None

Connect to Derive via WebSocket and validate credentials.

Source code in derive_client/_clients/websockets/client.py
103
104
105
106
107
async def connect(self) -> None:
    """Connect to Derive via WebSocket and validate credentials."""
    await self._session.open()
    await self._authenticate()
    await self._initialize_account_and_markets()
disconnect async
disconnect() -> None

Close WebSocket connection and clear cached state. Idempotent.

Source code in derive_client/_clients/websockets/client.py
146
147
148
149
150
151
152
153
154
async def disconnect(self) -> None:
    """Close WebSocket connection and clear cached state. Idempotent."""

    await self._session.close()
    self._light_account = None
    self._subaccounts.clear()
    self._markets._erc20_instruments_cache.clear()
    self._markets._perp_instruments_cache.clear()
    self._markets._option_instruments_cache.clear()
fetch_subaccount async
fetch_subaccount(subaccount_id: int) -> Subaccount

Fetch a subaccount from API and cache it.

Source code in derive_client/_clients/websockets/client.py
199
200
201
202
203
async def fetch_subaccount(self, subaccount_id: int) -> Subaccount:
    """Fetch a subaccount from API and cache it."""

    self._subaccounts[subaccount_id] = await self._instantiate_subaccount(subaccount_id)
    return self._subaccounts[subaccount_id]
fetch_subaccounts async
fetch_subaccounts() -> list[Subaccount]

Fetch subaccounts from API and cache them.

Source code in derive_client/_clients/websockets/client.py
205
206
207
208
209
210
211
212
async def fetch_subaccounts(self) -> list[Subaccount]:
    """Fetch subaccounts from API and cache them."""

    account_subaccounts = await self.account.get_subaccounts()
    subaccounts = []
    for sid in account_subaccounts.subaccount_ids:
        subaccounts.append(await self.fetch_subaccount(sid))
    return sorted(subaccounts)
timeout
timeout(seconds: float) -> Generator[None, None, None]

Temporarily override request timeout for RPC calls.

Source code in derive_client/_clients/websockets/client.py
278
279
280
281
282
283
284
285
286
287
@contextlib.contextmanager
def timeout(self, seconds: float) -> Generator[None, None, None]:
    """Temporarily override request timeout for RPC calls."""

    prev = self._session._request_timeout
    try:
        self._session._request_timeout = float(seconds)
        yield
    finally:
        self._session._request_timeout = prev

Functions