Skip to content

Commit af2645e

Browse files
committed
Support for authenticated data channel on Gemini
1 parent 0fa4476 commit af2645e

File tree

10 files changed

+171
-38
lines changed

10 files changed

+171
-38
lines changed

cryptofeed/auth/__init__.py

Whitespace-only changes.

cryptofeed/auth/gemini.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
'''
2+
Copyright (C) 2017-2021 Bryant Moscon - [email protected]
3+
4+
Please see the LICENSE file for the terms and conditions
5+
associated with this software.
6+
'''
7+
import base64
8+
import hashlib
9+
import hmac
10+
import json
11+
import time
12+
13+
14+
def generate_token(key_id: str, key_secret: str, request: str, account_name=None, payload=None) -> dict:
15+
if not payload:
16+
payload = {}
17+
payload['request'] = request
18+
payload['nonce'] = int(time.time() * 1000)
19+
20+
if account_name:
21+
payload['account'] = account_name
22+
23+
b64_payload = base64.b64encode(json.dumps(payload).encode('utf-8'))
24+
signature = hmac.new(key_secret.encode('utf-8'), b64_payload, hashlib.sha384).hexdigest()
25+
26+
return {
27+
'X-GEMINI-PAYLOAD': b64_payload.decode(),
28+
'X-GEMINI-APIKEY': key_id,
29+
'X-GEMINI-SIGNATURE': signature
30+
}

cryptofeed/callback.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,7 @@ class MarketInfoCallback(Callback):
100100

101101
class TransactionsCallback(Callback):
102102
pass
103+
104+
105+
class OrderInfoCallback(Callback):
106+
pass

cryptofeed/defines.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
WHALE_ALERT = 'WHALE_ALERT'
4646

4747

48+
# Market Data
4849
L2_BOOK = 'l2_book'
4950
L3_BOOK = 'l3_book'
5051
BOOK_DELTA = 'book_delta'
@@ -59,6 +60,10 @@
5960
MARKET_INFO = 'market_info'
6061
TRANSACTIONS = 'transactions'
6162

63+
# Account Data / Authenticated Channels
64+
ORDER_INFO = 'order_info'
65+
66+
6267
BUY = 'buy'
6368
SELL = 'sell'
6469
BID = 'bid'

cryptofeed/exchange/gemini.py

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
'''
77
import logging
88
from decimal import Decimal
9+
from typing import List, Tuple, Callable
910

1011
from sortedcontainers import SortedDict as sd
1112
from yapic import json
1213

14+
from cryptofeed.auth.gemini import generate_token
1315
from cryptofeed.connection import AsyncConnection
14-
from cryptofeed.defines import BID, ASK, BUY, GEMINI, L2_BOOK, SELL, TRADES
16+
from cryptofeed.defines import BID, ASK, BUY, GEMINI, L2_BOOK, SELL, TRADES, ORDER_INFO
1517
from cryptofeed.feed import Feed
16-
from cryptofeed.standards import symbol_exchange_to_std, timestamp_normalize
18+
from cryptofeed.standards import symbol_exchange_to_std, timestamp_normalize, is_authenticated_channel
1719

1820

1921
LOG = logging.getLogger('feedhandler')
@@ -22,8 +24,9 @@
2224
class Gemini(Feed):
2325
id = GEMINI
2426

25-
def __init__(self, **kwargs):
26-
super().__init__('wss://api.gemini.com/v2/marketdata/', **kwargs)
27+
def __init__(self, sandbox=False, **kwargs):
28+
auth_api = 'wss://api.gemini.com' if not sandbox else 'wss://api.sandbox.gemini.com'
29+
super().__init__({'public': 'wss://api.gemini.com/v2/marketdata/', 'auth': f'{auth_api}/v1/order/events'}, **kwargs)
2730

2831
def __reset(self, pairs):
2932
for pair in pairs:
@@ -69,7 +72,43 @@ async def _trade(self, msg: dict, timestamp: float):
6972
timestamp=timestamp_normalize(self.id, msg['timestamp']),
7073
receipt_timestamp=timestamp)
7174

72-
async def message_handler(self, msg: str, conn, timestamp: float):
75+
async def _order(self, msg: dict, timestamp: float):
76+
if msg['type'] == "initial" or msg['type'] == "booked":
77+
status = "active"
78+
elif msg['type'] == "fill":
79+
status = 'filled'
80+
else:
81+
status = msg['type']
82+
83+
keys = ('executed_amount', 'remaining_amount', 'original_amount', 'price', 'avg_execution_price', 'total_spend')
84+
data = {k: Decimal(msg[k]) for k in keys if k in msg}
85+
86+
await self.callback(ORDER_INFO, feed=self.id,
87+
symbol=symbol_exchange_to_std(msg['symbol'].upper()), # This uses the REST endpoint format (lower case)
88+
status=status,
89+
order_id=msg['order_id'],
90+
side=BUY if msg['side'].lower() == 'buy' else SELL,
91+
order_type=msg['order_type'],
92+
timestamp=msg['timestampms'] / 1000.0,
93+
receipt_timestamp=timestamp,
94+
**data
95+
)
96+
97+
async def message_handler_orders(self, msg: str, conn: AsyncConnection, timestamp: float):
98+
msg = json.loads(msg, parse_float=Decimal)
99+
100+
if isinstance(msg, list):
101+
for entry in msg:
102+
await self._order(entry, timestamp)
103+
elif isinstance(msg, dict):
104+
if msg['type'] == 'subscription_ack':
105+
LOG.info('%s: Authenticated successfully', self.id)
106+
elif msg['type'] == 'heartbeat':
107+
return
108+
else:
109+
await self._order(msg, timestamp)
110+
111+
async def message_handler(self, msg: str, conn: AsyncConnection, timestamp: float):
73112
msg = json.loads(msg, parse_float=Decimal)
74113

75114
if msg['type'] == 'l2_updates':
@@ -83,9 +122,27 @@ async def message_handler(self, msg: str, conn, timestamp: float):
83122
else:
84123
LOG.warning('%s: Invalid message type %s', self.id, msg)
85124

86-
async def subscribe(self, conn: AsyncConnection):
87-
pairs = self.symbols if not self.subscription else list(set.union(*list(self.subscription.values())))
88-
self.__reset(pairs)
125+
def connect(self) -> List[Tuple[AsyncConnection, Callable[[None], None], Callable[[str, float], None]]]:
126+
authenticated = []
127+
public = []
128+
ret = []
129+
130+
for channel in self.subscription or self.channels:
131+
if is_authenticated_channel(channel):
132+
authenticated.extend(self.subscription.get(channel) or self.symbols)
133+
else:
134+
public.extend(self.subscription.get(channel) or self.symbols)
135+
136+
if authenticated:
137+
header = generate_token(self.key_id, self.key_secret, "/v1/order/events", self.config.gemini.account_name)
138+
symbols = '&'.join([f"symbolFilter={s.lower()}" for s in authenticated]) # needs to match REST format (lower case)
139+
140+
ret.append(self._connect_builder(f"{self.address['auth']}?{symbols}", None, header=header, sub=self._empty_subscribe, handler=self.message_handler_orders))
141+
if public:
142+
ret.append(self._connect_builder(self.address['public'], list(set(public))))
143+
144+
return ret
89145

90-
await conn.send(json.dumps({"type": "subscribe",
91-
"subscriptions": [{"name": "l2", "symbols": pairs}]}))
146+
async def subscribe(self, conn: AsyncConnection, options=None):
147+
self.__reset(options)
148+
await conn.send(json.dumps({"type": "subscribe", "subscriptions": [{"name": "l2", "symbols": options}]}))

cryptofeed/feed.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
associated with this software.
66
'''
77
from collections import defaultdict
8+
from functools import partial
89
import logging
910
import os
1011
from typing import Tuple, Callable, Union, List
@@ -15,7 +16,7 @@
1516
from cryptofeed.defines import (ASK, BID, BOOK_DELTA, FUNDING, FUTURES_INDEX, L2_BOOK, L3_BOOK, LIQUIDATIONS,
1617
OPEN_INTEREST, MARKET_INFO, TICKER, TRADES, TRANSACTIONS, VOLUME)
1718
from cryptofeed.exceptions import BidAskOverlapping, UnsupportedDataFeed
18-
from cryptofeed.standards import feed_to_exchange, get_exchange_info, load_exchange_symbol_mapping, symbol_std_to_exchange
19+
from cryptofeed.standards import feed_to_exchange, get_exchange_info, load_exchange_symbol_mapping, symbol_std_to_exchange, is_authenticated_channel
1920
from cryptofeed.util.book import book_delta, depth
2021

2122

@@ -25,12 +26,14 @@
2526
class Feed:
2627
id = 'NotImplemented'
2728

28-
def __init__(self, address: Union[dict, str], symbols=None, channels=None, subscription=None, config: Union[Config, dict, str] = None, callbacks=None, max_depth=None, book_interval=1000, snapshot_interval=False, checksum_validation=False, cross_check=False, origin=None):
29+
def __init__(self, address: Union[dict, str], sandbox=False, symbols=None, channels=None, subscription=None, config: Union[Config, dict, str] = None, callbacks=None, max_depth=None, book_interval=1000, snapshot_interval=False, checksum_validation=False, cross_check=False, origin=None):
2930
"""
3031
address: str, or dict
3132
address to be used to create the connection.
3233
The address protocol (wss or https) will be used to determine the connection type.
3334
Use a "str" to pass one single address, or a dict of option/address
35+
sandbox: bool
36+
For authenticated channels, run against the sandbox websocket (when True)
3437
max_depth: int
3538
Maximum number of levels per side to return in book updates
3639
book_interval: int
@@ -69,15 +72,20 @@ def __init__(self, address: Union[dict, str], symbols=None, channels=None, subsc
6972
self.origin = origin
7073
self.checksum_validation = checksum_validation
7174
self.ws_defaults = {'ping_interval': 10, 'ping_timeout': None, 'max_size': 2**23, 'max_queue': None, 'origin': self.origin}
72-
key_id = os.environ.get(f'CF_{self.id}_KEY_ID') or self.config[self.id.lower()].key_id
73-
load_exchange_symbol_mapping(self.id, key_id=key_id)
75+
self.key_id = os.environ.get(f'CF_{self.id}_KEY_ID') or self.config[self.id.lower()].key_id
76+
self.key_secret = os.environ.get(f'CF_{self.id}_KEY_SECRET') or self.config[self.id.lower()].key_secret
77+
78+
load_exchange_symbol_mapping(self.id, key_id=self.key_id)
7479

7580
if subscription is not None and (symbols is not None or channels is not None):
7681
raise ValueError("Use subscription, or channels and symbols, not both")
7782

7883
if subscription is not None:
7984
for channel in subscription:
8085
chan = feed_to_exchange(self.id, channel)
86+
if is_authenticated_channel(channel):
87+
if not self.key_id or not self.key_secret:
88+
raise ValueError("Authenticated channel subscribed to, but no auth keys provided")
8189
self.subscription[chan].update([symbol_std_to_exchange(symbol, self.id) for symbol in subscription[channel]])
8290
self.normalized_symbols.extend(self.subscription[chan])
8391

@@ -86,6 +94,9 @@ def __init__(self, address: Union[dict, str], symbols=None, channels=None, subsc
8694
self.symbols = [symbol_std_to_exchange(symbol, self.id) for symbol in symbols]
8795
if channels:
8896
self.channels = list(set([feed_to_exchange(self.id, chan) for chan in channels]))
97+
if any(is_authenticated_channel(chan) for chan in channels):
98+
if not self.key_id or not self.key_secret:
99+
raise ValueError("Authenticated channel subscribed to, but no auth keys provided")
89100

90101
self.l3_book = {}
91102
self.l2_book = {}
@@ -112,6 +123,17 @@ def __init__(self, address: Union[dict, str], symbols=None, channels=None, subsc
112123
if not isinstance(callback, list):
113124
self.callbacks[key] = [callback]
114125

126+
def _connect_builder(self, address: str, options: list, header=None, sub=None, handler=None):
127+
"""
128+
Helper method for building a custom connect tuple
129+
"""
130+
subscribe = partial(self.subscribe if not sub else sub, options=options)
131+
conn = AsyncConnection(address, self.id, extra_headers=header, **self.ws_defaults)
132+
return conn, subscribe, handler if handler else self.message_handler
133+
134+
async def _empty_subscribe(self, conn: AsyncConnection, **kwargs):
135+
return
136+
115137
def connect(self) -> List[Tuple[AsyncConnection, Callable[[None], None], Callable[[str, float], None]]]:
116138
"""
117139
Generic connection method for exchanges. Exchanges that require/support

cryptofeed/rest/gemini.py

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
1-
import base64
2-
import hashlib
3-
import hmac
41
import logging
52
from decimal import Decimal
6-
from time import sleep, time
3+
from time import sleep
74

85
import pandas as pd
96
import requests
107
from sortedcontainers.sorteddict import SortedDict as sd
11-
from yapic import json
128

9+
from cryptofeed.auth.gemini import generate_token
1310
from cryptofeed.defines import BID, ASK, BUY, CANCELLED, FILLED, GEMINI, LIMIT, OPEN, PARTIAL, SELL
1411
from cryptofeed.rest.api import API, request_retry
1512
from cryptofeed.standards import normalize_trading_options, symbol_exchange_to_std, symbol_std_to_exchange
@@ -63,26 +60,14 @@ def helper():
6360
return helper()
6461

6562
def _post(self, command: str, payload=None):
66-
if not payload:
67-
payload = {}
68-
payload['request'] = command
69-
payload['nonce'] = int(time() * 1000)
70-
if self.config.account_name:
71-
payload['account'] = self.config.account_name
63+
headers = generate_token(self.config.key_id, self.config.key_secret, command, account_name=self.config.account_name, payload=payload)
64+
65+
headers['Content-Type'] = "text/plain"
66+
headers['Content-Length'] = "0"
67+
headers['Cache-Control'] = "no-cache"
7268

7369
api = self.api if not self.sandbox else self.sandbox_api
7470
api = f"{api}{command}"
75-
b64_payload = base64.b64encode(json.dumps(payload).encode('utf-8'))
76-
signature = hmac.new(self.config.key_secret.encode('utf-8'), b64_payload, hashlib.sha384).hexdigest()
77-
78-
headers = {
79-
'Content-Type': "text/plain",
80-
'Content-Length': "0",
81-
'X-GEMINI-APIKEY': self.config.key_id,
82-
'X-GEMINI-PAYLOAD': b64_payload,
83-
'X-GEMINI-SIGNATURE': signature,
84-
'Cache-Control': "no-cache"
85-
}
8671

8772
resp = requests.post(api, headers=headers)
8873
self._handle_error(resp, LOG)

cryptofeed/standards.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
KRAKEN, KRAKEN_FUTURES, OKCOIN, OKEX, POLONIEX, PROBIT, UPBIT, WHALE_ALERT)
2121
from cryptofeed.defines import (FILL_OR_KILL, IMMEDIATE_OR_CANCEL, LIMIT, MAKER_OR_CANCEL, MARKET, UNSUPPORTED)
2222
from cryptofeed.defines import (FUNDING, FUTURES_INDEX, L2_BOOK, L3_BOOK, LIQUIDATIONS, OPEN_INTEREST, MARKET_INFO,
23-
TICKER, TRADES, TRANSACTIONS, VOLUME)
23+
TICKER, TRADES, TRANSACTIONS, VOLUME, ORDER_INFO)
2424
from cryptofeed.exceptions import UnsupportedDataFeed, UnsupportedTradingOption, UnsupportedSymbol
2525
from cryptofeed.symbols import gen_symbols, _exchange_info
2626

@@ -250,6 +250,9 @@ def timestamp_normalize(exchange, ts):
250250
},
251251
FUTURES_INDEX: {
252252
BYBIT: 'instrument_info.100ms'
253+
},
254+
ORDER_INFO: {
255+
GEMINI: ORDER_INFO
253256
}
254257
}
255258

@@ -321,3 +324,7 @@ def raise_error():
321324
if ret == UNSUPPORTED:
322325
raise_error()
323326
return ret
327+
328+
329+
def is_authenticated_channel(channel: str) -> bool:
330+
return channel in (ORDER_INFO)

cryptofeed/symbols.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ def coinbase_symbols() -> Dict[str, str]:
194194
raise_failure_explanation('COINBASE', why, {"": r})
195195

196196

197-
def gemini_symbols() -> Dict[str, str]:
197+
def gemini_symbols(*args) -> Dict[str, str]:
198198
r = None
199199
try:
200200
r = requests.get('https://api.gemini.com/v1/symbols')

examples/demo_gemini_authenticated.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
'''
2+
Copyright (C) 2017-2021 Bryant Moscon - [email protected]
3+
4+
Please see the LICENSE file for the terms and conditions
5+
associated with this software.
6+
'''
7+
from cryptofeed import FeedHandler
8+
from cryptofeed.defines import GEMINI, ORDER_INFO
9+
10+
11+
async def order(**kwargs):
12+
print(f"Order Update: {kwargs}")
13+
14+
15+
def main():
16+
f = FeedHandler(config="config.yaml")
17+
f.add_feed(GEMINI, sandbox=True, subscription={ORDER_INFO: ['BTC-USD', 'ETH-USD']}, callbacks={ORDER_INFO: order})
18+
19+
f.run()
20+
21+
22+
if __name__ == '__main__':
23+
main()

0 commit comments

Comments
 (0)