-
Notifications
You must be signed in to change notification settings - Fork 844
bybit: enable multiconnection handling across websocket endpoints #1670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…larity on purpose. Change connections map to point to candidate to track subscriptions for future dynamic connections holder and drop struct ConnectionDetails.
…rror but websocket frame error or anything really makes the reader routine return and then connection never cycles and the buffer gets filled. * Handle reconnection via an errors.Is check which is simpler and in that scope allow for quick disconnect reconnect without waiting for connection cycle. * Dial now uses code from DialContext but just calls context.Background() * Don't allow reader to return on parse binary response error. Just output error and return a non nil response
…would hang connection reader for eternity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work shazbert, I'm quite happy with a lot of the work
exchanges/bybit/bybit_wrapper.go
Outdated
Handler: func(ctx context.Context, resp []byte) error { | ||
return by.wsHandleData(ctx, resp, asset.USDCMarginedFutures) | ||
}, | ||
BespokeGenerateMessageID: by.bespokeWebsocketRequestID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not too late to update these to RequestIDGenerator:
by.websocketRequestIDGenerator,
/ by.getWebsocketRequestID,
? 😄
exchanges/bybit/bybit_websocket.go
Outdated
return fmt.Errorf("unhandled stream data %s", string(respRaw)) | ||
} | ||
|
||
func (by *Bybit) wsHandleAuthenticated(_ context.Context, respRaw []byte) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle authenticated what? 😄
exchanges/bybit/bybit_websocket.go
Outdated
// Below provides a way of matching an order change to a websocket request. There is no batch support for this | ||
// so the first element will be used to match the order ID. | ||
if id, err := jsonparser.GetString(respRaw, "data", "[0]", "orderId"); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't know bybit's orderID before being sent so we can't match on that. Sure, this captures updates, but why not also match on OrderLinkID
which can allow you to match on order creation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch this was actually updated to that in the request PR built on this. 🚀 I will roll that update here.
exchanges/bybit/bybit_websocket.go
Outdated
tick := &ticker.Price{Pair: p, ExchangeName: by.Name, AssetType: assetType} | ||
snapshot, err := ticker.GetTicker(by.Name, p, assetType) | ||
if err == nil && resp.Type != "snapshot" { | ||
// ticker updates may be partial, so we need to update the current ticker | ||
tick, err = ticker.GetTicker(by.Name, p, assetType) | ||
if err != nil { | ||
return err | ||
} | ||
tick = snapshot | ||
} | ||
|
||
updateTicker(tick, tickResp) | ||
updateTicker(tick, &tickResp) | ||
tick.LastUpdated = resp.PushTimestamp.Time() | ||
|
||
if err = ticker.ProcessTicker(tick); err == nil { | ||
by.Websocket.DataHandler <- tick | ||
} | ||
|
||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tick := &ticker.Price{Pair: p, ExchangeName: by.Name, AssetType: assetType}
if resp.Type != "snapshot" {
snapshot, err := ticker.GetTicker(by.Name, p, assetType)
if err != nil {
return err
}
// ticker updates may be partial, so we need to update the current ticker
tick = snapshot
}
updateTicker(tick, &tickResp)
tick.LastUpdated = resp.PushTimestamp.Time()
if err = ticker.ProcessTicker(tick); err != nil {
return err
}
by.Websocket.DataHandler <- tick
return nil
I think is much cleaner, clearer and standard
exchanges/bybit/bybit_websocket.go
Outdated
@@ -792,3 +740,162 @@ const subTplText = ` | |||
func hasPotentialDelimiter(a asset.Item) bool { | |||
return a == asset.Options || a == asset.USDCMarginedFutures | |||
} | |||
|
|||
// TODO: Remove this function when template expansion is across all assets | |||
func (by *Bybit) handleSubscriptionNonTemplate(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (by *Bybit) handleSubscriptionNonTemplate(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List) error { | |
func (by *Bybit) submitSubscriptionNonTemplate(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List) error { |
Just to distinguish from handleSubscriptionsNonTemplate
a bit more and a bit closer to what it does.
That or update the other name
exchanges/bybit/bybit_websocket.go
Outdated
// These channels are currently being managed by the `generateAuthSubscriptions` method for the private connection | ||
// TODO: Reimplement these channels | ||
// {Enabled: true, Asset: asset.Spot, Authenticated: true, Channel: subscription.MyOrdersChannel}, | ||
// {Enabled: true, Asset: asset.Spot, Authenticated: true, Channel: subscription.MyWalletChannel}, | ||
// {Enabled: true, Asset: asset.Spot, Authenticated: true, Channel: subscription.MyTradesChannel}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// These channels are currently being managed by the `generateAuthSubscriptions` method for the private connection | |
// TODO: Reimplement these channels | |
// {Enabled: true, Asset: asset.Spot, Authenticated: true, Channel: subscription.MyOrdersChannel}, | |
// {Enabled: true, Asset: asset.Spot, Authenticated: true, Channel: subscription.MyWalletChannel}, | |
// {Enabled: true, Asset: asset.Spot, Authenticated: true, Channel: subscription.MyTradesChannel}, | |
// Authenticated channels are currently being managed by the `generateAuthSubscriptions` method for the private connection | |
// TODO: expand subscription template generation to handle authenticated subscriptions across all assets |
exchanges/bybit/bybit_websocket.go
Outdated
var err error | ||
for _, a := range assets { | ||
var cp currency.Pair | ||
if cp, err = by.MatchSymbolWithAvailablePairs(symbol, a, hasPotentialDelimiter(a)); err == nil { | ||
return cp, a, nil | ||
} | ||
} | ||
return currency.EMPTYPAIR, 0, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for _, a := range assets {
cp, err := by.MatchSymbolWithAvailablePairs(symbol, a, hasPotentialDelimiter(a))
if err != nil {
return currency.EMPTYPAIR, 0, err
}
return cp, a, nil
}
return currency.EMPTYPAIR, 0, nil
I think its an improvement to readability to not try funny business with err handling and such
exchanges/bybit/bybit_websocket.go
Outdated
for _, channel := range []string{chanPositions, chanExecution, chanOrder, chanWallet} { | ||
subscriptions = append(subscriptions, &subscription.Subscription{ | ||
Channel: channel, | ||
Pairs: currency.Pairs{currency.EMPTYPAIR}, // This is a placeholder, the actual pair is not required for these channels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is pairs required to be filled? Have you tried not sending pairs?
This PR is stale because it has been open 30 days with no activity. Please provide an update on the progress of this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances Bybit’s websocket handling by introducing multi-connection support for spot, options, linear (USDT/USDC margined), inverse (coin margined) futures and a dedicated authenticated connection.
Key changes:
- Refactor
Setup
to initialize individualwebsocket.Connection
instances for each asset type and a private connection. - Update websocket handlers to include context, unified subscription logic, and routing for authenticated messages.
- Separate REST and websocket ticker types (
TickerREST
,TickerWebsocket
) and adapt parsing logic.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
exchanges/bybit/bybit_wrapper.go | Refactor Setup to create multiple websocket connections. |
exchanges/bybit/bybit_websocket.go | Revamp subscription/auth handlers; add context and routing logic. |
exchanges/bybit/bybit_types.go | Split the old TickerItem into TickerREST and TickerWebsocket . |
exchanges/bybit/bybit_test.go | Update websocket tests to use DummyConnection and context. |
exchanges/bybit/bybit_options_websocket.go | Remove legacy WsOptionsConnect , unify subscription calls. |
exchanges/bybit/bybit_linear_websocket.go | Simplify linear subscription generator and update signatures. |
exchanges/bybit/bybit_inverse_websocket.go | Simplify inverse subscription generator and update signatures. |
exchanges/bybit/bybit.go | Add counter field for message ID generation. |
docs/ADD_NEW_EXCHANGE.md | Fix formatting typo. |
Comments suppressed due to low confidence (1)
exchanges/bybit/bybit_test.go:3194
- Currently only spot data paths are exercised in
TestPushDataPublic
. To ensure all connection handlers work as expected, add unit tests forwsHandleData
with payloads for options, USDT/USDC margined futures, and coin‐margined futures.
func TestPushDataPublic(t *testing.T) {
exchanges/bybit/bybit_websocket.go
Outdated
@@ -351,7 +314,7 @@ func (by *Bybit) wsProcessWalletPushData(assetType asset.Item, resp []byte) erro | |||
accounts = append(accounts, account.Change{ | |||
Exchange: by.Name, | |||
Currency: currency.NewCode(result.Data[x].Coin[y].Coin), | |||
Asset: assetType, | |||
Asset: asset.Spot, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In wsProcessWalletPushData
, the Asset
field is hardcoded to asset.Spot
. Since this handler is shared across different asset‐type connections, consider passing through or inferring the correct assetType
instead of always using spot.
Asset: asset.Spot, | |
Asset: assetType, |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
param reduced, no assetype
exchanges/bybit/bybit_websocket.go
Outdated
// TODO: Remove this function when template expansion is across all assets | ||
func (by *Bybit) handleSubscriptionNonTemplate(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List) error { | ||
payloads, err := by.handleSubscriptionsNonTemplate(conn, a, operation, channelsToSubscribe) | ||
if err != nil { | ||
return err | ||
} | ||
for _, payload := range payloads { | ||
if a == asset.Options { | ||
// The options connection does not send the subscription request id back with the subscription notification payload | ||
// therefore the code doesn't wait for the response to check whether the subscription is successful or not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The handleSubscriptionNonTemplate
and handleSubscriptionsNonTemplate
functions duplicate a lot of logic found in handleSubscriptions
. Consider unifying these into a single, parametrized subscription helper to reduce code duplication and simplify future updates.
// TODO: Remove this function when template expansion is across all assets | |
func (by *Bybit) handleSubscriptionNonTemplate(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List) error { | |
payloads, err := by.handleSubscriptionsNonTemplate(conn, a, operation, channelsToSubscribe) | |
if err != nil { | |
return err | |
} | |
for _, payload := range payloads { | |
if a == asset.Options { | |
// The options connection does not send the subscription request id back with the subscription notification payload | |
// therefore the code doesn't wait for the response to check whether the subscription is successful or not. | |
// TODO: Refactored to use processSubscriptions helper function | |
func (by *Bybit) handleSubscriptionNonTemplate(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List) error { | |
return by.processSubscriptions(ctx, conn, a, operation, channelsToSubscribe, false) | |
} | |
func (by *Bybit) processSubscriptions(ctx context.Context, conn websocket.Connection, a asset.Item, operation string, channelsToSubscribe subscription.List, useTemplate bool) error { | |
var payloads []SubscriptionPayload | |
var err error | |
if useTemplate { | |
payloads, err = by.handleSubscriptions(conn, a, operation, channelsToSubscribe) | |
} else { | |
payloads, err = by.handleSubscriptionsNonTemplate(conn, a, operation, channelsToSubscribe) | |
} | |
if err != nil { | |
return err | |
} | |
for _, payload := range payloads { | |
if a == asset.Options { |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to be RM'd anyway
PR Description
NOTE: This is a nice size PR for just functionality expansion, I will build outbound request functions for trading on top of this in a future PR. Here's the dependency: #1603
Fixes #
DOGEUSDT wasn't being processed correctly so usedThis was merged upstream.MatchSymbolWithAvailablePairs
because there was no delimiter.Type of change
Please delete options that are not relevant and add an
x
in[]
as item is complete.How has this been tested
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration and
also consider improving test coverage whilst working on a certain feature or package.
Checklist