Skip to content

Review websocket message handlers #675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
condrove10 opened this issue Mar 12, 2025 · 4 comments
Open

Review websocket message handlers #675

condrove10 opened this issue Mar 12, 2025 · 4 comments

Comments

@condrove10
Copy link
Contributor

WebSocket Message Handling Fragility

Hello,

During the short period of testing for my PR, I noticed that the current WebSocket message handling strategy is quite fragile. Let me elaborate:

Currently, the message read from the WebSocket is passed directly to a callback function called handler:

for {
    _, message, err := c.ReadMessage()
    if err != nil {
        if !silent {
            errHandler(err)
        }
        return
    }
    handler(message)
}

From my limited understanding, any operation that takes even a few seconds—over a span of 23 hours (assuming the connection is renewed before the 24-hour deadline)—could add up and create a backlog of messages. This is especially problematic during high market volatility, where PING messages sent by the server might exceed the 1-minute threshold required for responding with a PONG message.

Potential Improvement

It might be useful to review and devise a new strategy for parsing messages. Some possible approaches:

  1. Priority Queue: Introduce a priority queue to handle messages, ensuring that PING/PONG messages are always processed promptly.
  2. Message Router with Deadline: Send all messages (excluding PING/PONG frames) to a message router. This router could enforce a deadline—if no message is consumed within a 120-second window (or even lower), the channel/buffer is flushed, and the connection is closed.
    • This approach would prevent backlog accumulation and avoid disconnections due to timeouts.

Would love to hear thoughts on this! Let me know what you think.

Thanks!

@condrove10 condrove10 changed the title Review message websocket message handlers Review websocket message handlers Mar 12, 2025
@xyq-c-cpp
Copy link
Collaborator

There may be some issues, but the ultimate reason is the speed of customer processing; If refreshing cache messages or resetting connections, customers may lose messages. @condrove10

@pcriadoperez
Copy link
Collaborator

I checked the handlers, and they are quick sync functions that mostly unmarshal the JSON. Binance allows 60 seconds to respond to each ping, which seems sufficient where there shouldn't be any large backlog. I think if you’re seeing timeouts, the cause might be elsewhere.

How often are you experiencing these timeouts?

Regarding improvements, if the sync functions are indeed slow, we could consider using a channel

@condrove10
Copy link
Contributor Author

In my case, I was reading market data using the handler and sending it to an unbuffered Go channel. The timeouts were triggered because I was consuming data from the channel more slowly than I was pushing it. This happened because I was inserting small batches into my database, especially during peak hours.

I believe the increasing backlog in the channel, and the increasing channel capacity, was causing delays and eventually triggering timeouts. After a prolonged period of activity, instead of responding to PING frames immediately, I was exceeding the timeout threshold.


This is what I came up with to avoid the timeouts and stay performant:

func (app *App) startBinanceWebSocket() {
	wsDepthHandler := func(event *binance.WsBookTickerEvent) {
		t := time.Now().UTC()
		// Parse bid and ask prices
		bidPrice, err := strconv.ParseFloat(event.BestBidPrice, 64)
		if err != nil {
			return
		}

		bidSize, err := strconv.ParseFloat(event.BestBidQty, 64)
		if err != nil {
			return
		}

		askPrice, err := strconv.ParseFloat(event.BestAskPrice, 64)
		if err != nil {
			return
		}

		askSize, err := strconv.ParseFloat(event.BestAskQty, 64)
		if err != nil {
			return
		}

		// Send record to channel instead of direct insert
		record := TickerRecord{
			Symbol:    event.Symbol,
			Timestamp: t,
			Exchange:  "BINANCE",
			BidPrice:  bidPrice,
			BidSize:   bidSize,
			AskPrice:  askPrice,
			AskSize:   askSize,
		}

		go func() {
			if err := app.batchInsertRecord(&record); err != nil {
				log.Println(fmt.Errorf("failed to insert record: %w", err))
			}
		}()
	}

	errHandler := func(err error) {
		log.Printf("WebSocket error: %v", err)
	}

	// Function to handle WebSocket reconnection
	reconnect := func() {
		for {
			select {
			case <-app.ctx.Done():
				return
			default:
				log.Println("Connecting to Binance WebSocket...")
				doneC, stopC, err := binance.WsCombinedBookTickerServe(app.symbols, wsDepthHandler, errHandler)
				if err != nil {
					log.Printf("Failed to start WebSocket: %v", err)
					time.Sleep(time.Second) // Wait before reconnecting
					continue
				}

				// Create a timer for reconnection (Binance WebSocket connection lasts 24 hours)
				reconnectTimer := time.NewTimer(23 * time.Hour) // Reconnect before 24-hour limit

				select {
				case <-app.ctx.Done():
					stopC <- struct{}{}
					<-doneC // Wait for previous connection to close
					return
				case <-reconnectTimer.C:
					log.Println("Scheduled reconnection to Binance WebSocket...")
					stopC <- struct{}{}
					<-doneC // Wait for previous connection to close
				case <-doneC:
					log.Println("WebSocket connection closed unexpectedly, reconnecting...")
				}

				// Wait a bit before reconnecting
				time.Sleep(1 * time.Second)
			}
		}
	}

	go reconnect()
}

// batchInsertRecord adds the record to the appropriate batch and sends if batch size reached 100
func (app *App) batchInsertRecord(record *TickerRecord) error {
	symbol := record.Symbol
	tableName := strings.ToUpper(symbol)

	app.batchMutex.Lock()
	defer app.batchMutex.Unlock()

	// Ensure maps are initialized
	if app.batches == nil {
		app.batches = make(map[string]driver.Batch)
	}

	if app.batchCounters == nil {
		app.batchCounters = make(map[string]int)
	}

	// Initialize a new batch for this symbol if it doesn't exist
	if _, exists := app.batches[symbol]; !exists {
		query := fmt.Sprintf(`
			INSERT INTO %s.%s (timestamp, exchange, bid_price, bid_size, ask_price, ask_size)
			VALUES (?, ?, ?, ?, ?, ?)
		`, app.database, tableName)

		batch, err := app.clickhouseClient.PrepareBatch(context.Background(), query)
		if err != nil {
			return fmt.Errorf("failed to prepare batch for %s: %w", symbol, err)
		}

		app.batches[symbol] = batch
		app.batchCounters[symbol] = 0
	}

	// Add record to the batch
	err := app.batches[symbol].Append(record.Timestamp, record.Exchange, record.BidPrice, record.BidSize, record.AskPrice, record.AskSize)
	if err != nil {
		return fmt.Errorf("failed to append row to batch for %s: %w", symbol, err)
	}

	// Increment counter
	app.batchCounters[symbol]++

	// Send batch if counter reaches 100
	if app.batchCounters[symbol] >= 25 {
		if err := app.sendBatch(symbol); err != nil {
			return err
		}
	}

	return nil
}

Would love to hear thoughts on this! Let me know what you think.

Thanks!

@condrove10
Copy link
Contributor Author

After this fix I haven't encountered any issues personally; but I see how it could be an issue for anyone who is doing anything else in the handler that adds delay with each iteration.

Image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants