Skip to content

Commit 99a511f

Browse files
sukunrtMarcoPolo
andcommitted
connmgr: fix transport association bug (#3221)
reuse port didn't work for the second transport, either QUIC or WebTransport, that was used for listening. This change fixes it by calling associate on all paths. This impacted hole punching for some users since you cannot hole punch without reuse port. There's a test in holepunch package to prevent regressions. Fixes #3165 Co-authored-by: Marco Munizaga <[email protected]>
1 parent ccc4849 commit 99a511f

File tree

4 files changed

+200
-0
lines changed

4 files changed

+200
-0
lines changed

p2p/protocol/holepunch/holepunch_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net"
7+
"slices"
78
"sync"
89
"sync/atomic"
910
"testing"
@@ -682,3 +683,98 @@ func SetLegacyBehavior(legacyBehavior bool) holepunch.Option {
682683
return nil
683684
}
684685
}
686+
687+
// TestEndToEndSimConnectQUICReuse tests that hole punching works if we are
688+
// reusing the same port for QUIC and WebTransport, and when we have multiple
689+
// QUIC listeners on different ports.
690+
//
691+
// If this tests fails or is flaky it may be because:
692+
// - The quicreuse logic (and association logic) is not returning the appropriate transport for holepunching.
693+
// - The ordering of listeners is unexpected (remember the swarm will sort the listeners with `.ListenOrder()`).
694+
func TestEndToEndSimConnectQUICReuse(t *testing.T) {
695+
h1tr := &mockEventTracer{}
696+
h2tr := &mockEventTracer{}
697+
698+
router := &simconn.SimpleFirewallRouter{}
699+
relay := MustNewHost(t,
700+
quicSimConn(true, router),
701+
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
702+
libp2p.DisableRelay(),
703+
libp2p.ResourceManager(&network.NullResourceManager{}),
704+
libp2p.WithFxOption(fx.Invoke(func(h host.Host) {
705+
// Setup relay service
706+
_, err := relayv2.New(h)
707+
require.NoError(t, err)
708+
})),
709+
)
710+
711+
// We return addrs of quic on port 8001 and circuit.
712+
// This lets us listen on other ports for QUIC in order to confuse the quicreuse logic during hole punching.
713+
onlyQuicOnPort8001AndCircuit := func(addrs []ma.Multiaddr) []ma.Multiaddr {
714+
return slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool {
715+
_, err := a.ValueForProtocol(ma.P_CIRCUIT)
716+
isCircuit := err == nil
717+
if isCircuit {
718+
return false
719+
}
720+
_, err = a.ValueForProtocol(ma.P_QUIC_V1)
721+
isQuic := err == nil
722+
if !isQuic {
723+
return true
724+
}
725+
port, err := a.ValueForProtocol(ma.P_UDP)
726+
if err != nil {
727+
return true
728+
}
729+
isPort8001 := port == "8001"
730+
return !isPort8001
731+
})
732+
}
733+
734+
h1 := MustNewHost(t,
735+
quicSimConn(false, router),
736+
libp2p.EnableHolePunching(holepunch.WithTracer(h1tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
737+
libp2p.ListenAddrs(ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1/webtransport")),
738+
libp2p.ResourceManager(&network.NullResourceManager{}),
739+
libp2p.AddrsFactory(onlyQuicOnPort8001AndCircuit),
740+
libp2p.ForceReachabilityPrivate(),
741+
)
742+
// Listen on quic *after* listening on webtransport.
743+
// This is to test that the quicreuse logic is not returning the wrong transport.
744+
// See: https://github.com/libp2p/go-libp2p/issues/3165#issuecomment-2700126706 for details.
745+
h1.Network().Listen(
746+
ma.StringCast("/ip4/2.2.0.1/udp/8001/quic-v1"),
747+
ma.StringCast("/ip4/2.2.0.1/udp/9001/quic-v1"),
748+
)
749+
750+
h2 := MustNewHost(t,
751+
quicSimConn(false, router),
752+
libp2p.ListenAddrs(
753+
ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1/webtransport"),
754+
),
755+
libp2p.ResourceManager(&network.NullResourceManager{}),
756+
connectToRelay(&relay),
757+
libp2p.EnableHolePunching(holepunch.WithTracer(h2tr), holepunch.DirectDialTimeout(100*time.Millisecond)),
758+
libp2p.AddrsFactory(onlyQuicOnPort8001AndCircuit),
759+
libp2p.ForceReachabilityPrivate(),
760+
)
761+
// Listen on quic after listening on webtransport.
762+
h2.Network().Listen(
763+
ma.StringCast("/ip4/2.2.0.2/udp/8001/quic-v1"),
764+
ma.StringCast("/ip4/2.2.0.2/udp/9001/quic-v1"),
765+
)
766+
767+
defer h1.Close()
768+
defer h2.Close()
769+
defer relay.Close()
770+
771+
// Wait for holepunch service to start
772+
waitForHolePunchingSvcActive(t, h1)
773+
waitForHolePunchingSvcActive(t, h2)
774+
775+
learnAddrs(h1, h2)
776+
pingAtoB(t, h1, h2)
777+
778+
// wait till a direct connection is complete
779+
ensureDirectConn(t, h1, h2)
780+
}

p2p/transport/quicreuse/connmgr.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/tls"
66
"errors"
7+
"fmt"
78
"io"
89
"net"
910
"sync"
@@ -190,6 +191,18 @@ func (c *ConnManager) ListenQUICAndAssociate(association any, addr ma.Multiaddr,
190191
}
191192
key = tr.LocalAddr().String()
192193
entry = quicListenerEntry{ln: ln}
194+
} else if c.enableReuseport && association != nil {
195+
reuse, err := c.getReuse(netw)
196+
if err != nil {
197+
return nil, fmt.Errorf("reuse error: %w", err)
198+
}
199+
err = reuse.AssertTransportExists(entry.ln.transport)
200+
if err != nil {
201+
return nil, fmt.Errorf("reuse assert transport failed: %w", err)
202+
}
203+
if tr, ok := entry.ln.transport.(*refcountedTransport); ok {
204+
tr.associate(association)
205+
}
193206
}
194207
l, err := entry.ln.Add(tlsConf, allowWindowIncrease, func() { c.onListenerClosed(key) })
195208
if err != nil {

p2p/transport/quicreuse/connmgr_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,3 +315,59 @@ func TestExternalTransport(t *testing.T) {
315315
t.Fatal("doneWithTr not closed")
316316
}
317317
}
318+
319+
func TestAssociate(t *testing.T) {
320+
testAssociate := func(lnAddr1, lnAddr2 ma.Multiaddr, dialAddr *net.UDPAddr) {
321+
cm, err := NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{})
322+
require.NoError(t, err)
323+
defer cm.Close()
324+
325+
lp2pTLS := &tls.Config{NextProtos: []string{"libp2p"}}
326+
assoc1 := "test-1"
327+
ln1, err := cm.ListenQUICAndAssociate(assoc1, lnAddr1, lp2pTLS, nil)
328+
require.NoError(t, err)
329+
defer ln1.Close()
330+
addrs := ln1.Multiaddrs()
331+
require.Len(t, addrs, 1)
332+
333+
addr := addrs[0]
334+
assoc2 := "test-2"
335+
h3TLS := &tls.Config{NextProtos: []string{"h3"}}
336+
ln2, err := cm.ListenQUICAndAssociate(assoc2, addr, h3TLS, nil)
337+
require.NoError(t, err)
338+
defer ln2.Close()
339+
340+
tr1, err := cm.TransportWithAssociationForDial(assoc1, "udp4", dialAddr)
341+
require.NoError(t, err)
342+
defer tr1.Close()
343+
require.Equal(t, tr1.LocalAddr().String(), ln1.Addr().String())
344+
345+
tr2, err := cm.TransportWithAssociationForDial(assoc2, "udp4", dialAddr)
346+
require.NoError(t, err)
347+
defer tr2.Close()
348+
require.Equal(t, tr2.LocalAddr().String(), ln2.Addr().String())
349+
350+
ln3, err := cm.ListenQUICAndAssociate(assoc1, lnAddr2, lp2pTLS, nil)
351+
require.NoError(t, err)
352+
defer ln3.Close()
353+
354+
// an unused association should also return the same transport
355+
// association is only a preference for a specific transport, not an exclusion criteria
356+
tr3, err := cm.TransportWithAssociationForDial("unused", "udp4", dialAddr)
357+
require.NoError(t, err)
358+
defer tr3.Close()
359+
require.Contains(t, []string{ln2.Addr().String(), ln3.Addr().String()}, tr3.LocalAddr().String())
360+
}
361+
362+
t.Run("MultipleUnspecifiedListeners", func(t *testing.T) {
363+
testAssociate(ma.StringCast("/ip4/0.0.0.0/udp/0/quic-v1"),
364+
ma.StringCast("/ip4/0.0.0.0/udp/0/quic-v1"),
365+
&net.UDPAddr{IP: net.IPv4(1, 1, 1, 1), Port: 1})
366+
})
367+
t.Run("MultipleSpecificListeners", func(t *testing.T) {
368+
testAssociate(ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"),
369+
ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1"),
370+
&net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1},
371+
)
372+
})
373+
}

p2p/transport/quicreuse/reuse.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ func (r *reuse) transportForDialLocked(association any, network string, source *
303303
return tr, nil
304304
}
305305
}
306+
// We don't have a transport with the association, use any one
307+
for _, tr := range trs {
308+
return tr, nil
309+
}
306310
}
307311
}
308312

@@ -313,6 +317,10 @@ func (r *reuse) transportForDialLocked(association any, network string, source *
313317
return tr, nil
314318
}
315319
}
320+
// We don't have a transport with the association, use any one
321+
for _, tr := range r.globalListeners {
322+
return tr, nil
323+
}
316324

317325
// Use a transport we've previously dialed from
318326
for _, tr := range r.globalDialers {
@@ -360,6 +368,33 @@ func (r *reuse) AddTransport(tr *refcountedTransport, laddr *net.UDPAddr) error
360368
return nil
361369
}
362370

371+
func (r *reuse) AssertTransportExists(tr refCountedQuicTransport) error {
372+
t, ok := tr.(*refcountedTransport)
373+
if !ok {
374+
return fmt.Errorf("invalid transport type: expected: *refcountedTransport, got: %T", tr)
375+
}
376+
laddr := t.LocalAddr().(*net.UDPAddr)
377+
if laddr.IP.IsUnspecified() {
378+
if lt, ok := r.globalListeners[laddr.Port]; ok {
379+
if lt == t {
380+
return nil
381+
}
382+
return errors.New("two global listeners on the same port")
383+
}
384+
return errors.New("transport not found")
385+
}
386+
if m, ok := r.unicast[laddr.IP.String()]; ok {
387+
if lt, ok := m[laddr.Port]; ok {
388+
if lt == t {
389+
return nil
390+
}
391+
return errors.New("two unicast listeners on same ip:port")
392+
}
393+
return errors.New("transport not found")
394+
}
395+
return errors.New("transport not found")
396+
}
397+
363398
func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcountedTransport, error) {
364399
r.mutex.Lock()
365400
defer r.mutex.Unlock()

0 commit comments

Comments
 (0)