From 6dd839209d188f758b159244f2ad0576867e4cd4 Mon Sep 17 00:00:00 2001 From: Arjan Singh Bal <46515553+arjan-bal@users.noreply.github.com> Date: Wed, 26 Mar 2025 22:30:16 +0530 Subject: [PATCH 1/2] delegatingresolver: Stop calls into delegates once the parent resolver is closed (#8195) --- .../delegatingresolver/delegatingresolver.go | 40 +++- .../delegatingresolver_ext_test.go | 212 ++++++++++++++++++ 2 files changed, 244 insertions(+), 8 deletions(-) diff --git a/internal/resolver/delegatingresolver/delegatingresolver.go b/internal/resolver/delegatingresolver/delegatingresolver.go index a6c647013388..7b93f692be01 100644 --- a/internal/resolver/delegatingresolver/delegatingresolver.go +++ b/internal/resolver/delegatingresolver/delegatingresolver.go @@ -44,15 +44,19 @@ var ( // // It implements the [resolver.Resolver] interface. type delegatingResolver struct { - target resolver.Target // parsed target URI to be resolved - cc resolver.ClientConn // gRPC ClientConn - targetResolver resolver.Resolver // resolver for the target URI, based on its scheme - proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured - proxyURL *url.URL // proxy URL, derived from proxy environment and target + target resolver.Target // parsed target URI to be resolved + cc resolver.ClientConn // gRPC ClientConn + proxyURL *url.URL // proxy URL, derived from proxy environment and target mu sync.Mutex // protects all the fields below targetResolverState *resolver.State // state of the target resolver proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured + + // childMu serializes calls into child resolvers. It also protects access to + // the following fields. + childMu sync.Mutex + targetResolver resolver.Resolver // resolver for the target URI, based on its scheme + proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured } // nopResolver is a resolver that does nothing. @@ -111,6 +115,10 @@ func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOpti logger.Infof("Proxy URL detected : %s", r.proxyURL) } + // Resolver updates from one child may trigger calls into the other. Block + // updates until the children are initialized. + r.childMu.Lock() + defer r.childMu.Unlock() // When the scheme is 'dns' and target resolution on client is not enabled, // resolution should be handled by the proxy, not the client. Therefore, we // bypass the target resolver and store the unresolved target address. @@ -165,11 +173,15 @@ func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resol } func (r *delegatingResolver) ResolveNow(o resolver.ResolveNowOptions) { + r.childMu.Lock() + defer r.childMu.Unlock() r.targetResolver.ResolveNow(o) r.proxyResolver.ResolveNow(o) } func (r *delegatingResolver) Close() { + r.childMu.Lock() + defer r.childMu.Unlock() r.targetResolver.Close() r.targetResolver = nil @@ -267,11 +279,17 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro err := r.updateClientConnStateLocked() // Another possible approach was to block until updates are received from // both resolvers. But this is not used because calling `New()` triggers - // `Build()` for the first resolver, which calls `UpdateState()`. And the + // `Build()` for the first resolver, which calls `UpdateState()`. And the // second resolver hasn't sent an update yet, so it would cause `New()` to // block indefinitely. if err != nil { - r.targetResolver.ResolveNow(resolver.ResolveNowOptions{}) + go func() { + r.childMu.Lock() + defer r.childMu.Unlock() + if r.targetResolver != nil { + r.targetResolver.ResolveNow(resolver.ResolveNowOptions{}) + } + }() } return err } @@ -291,7 +309,13 @@ func (r *delegatingResolver) updateTargetResolverState(state resolver.State) err r.targetResolverState = &state err := r.updateClientConnStateLocked() if err != nil { - r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{}) + go func() { + r.childMu.Lock() + defer r.childMu.Unlock() + if r.proxyResolver != nil { + r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{}) + } + }() } return nil } diff --git a/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go b/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go index 6f60153c7dd2..148b44816614 100644 --- a/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go +++ b/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go @@ -19,6 +19,8 @@ package delegatingresolver_test import ( + "context" + "errors" "net/http" "net/url" "testing" @@ -548,3 +550,213 @@ func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { t.Fatalf("Unexpected state from delegating resolver. Diff (-got +want):\n%v", diff) } } + +// Tests that delegatingresolver doesn't panic when the channel closes the +// resolver while it's handling an update from it's child. The test closes the +// delegating resolver, verifies the target resolver is closed and blocks the +// proxy resolver from being closed. The test sends an update from the proxy +// resolver and verifies that the target resolver's ResolveNow method is not +// called after the channels returns an error. +func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { + const envProxyAddr = "proxytest.com" + + hpfe := func(req *http.Request) (*url.URL, error) { + return &url.URL{ + Scheme: "https", + Host: envProxyAddr, + }, nil + } + originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment + delegatingresolver.HTTPSProxyFromEnvironment = hpfe + defer func() { + delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe + }() + + // Manual resolver to control the target resolution. + targetResolver := manual.NewBuilderWithScheme("test") + targetResolverCalled := make(chan struct{}) + targetResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { + close(targetResolverCalled) + } + targetResolverCloseCalled := make(chan struct{}) + targetResolver.CloseCallback = func() { + close(targetResolverCloseCalled) + t.Log("Target resolver is closed.") + } + + target := targetResolver.Scheme() + ":///" + "ignored" + // Set up a manual DNS resolver to control the proxy address resolution. + proxyResolver := setupDNS(t) + + unblockProxyResolverClose := make(chan struct{}) + proxyResolver.CloseCallback = func() { + <-unblockProxyResolverClose + t.Log("Proxy resolver is closed.") + } + + tcc, _, _ := createTestResolverClientConn(t) + tcc.UpdateStateF = func(resolver.State) error { + return errors.New("test error") + } + dr, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false) + if err != nil { + t.Fatalf("Failed to create delegating resolver: %v", err) + } + + targetResolver.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, + }) + + // Closing the delegating resolver will block until the test writes to the + // unblockProxyResolverClose channel. + go dr.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-targetResolverCloseCalled: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for target resolver's Close method to be called.") + } + + // Updating the channel will result in an error being returned. Since the + // target resolver's Close method is already called, the delegating resolver + // must not call "ResolveNow" on it. + go proxyResolver.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, + }) + unblockProxyResolverClose <- struct{}{} + + select { + case <-targetResolverCalled: + t.Fatalf("targetResolver.ResolveNow() called unexpectedly.") + case <-time.After(defaultTestShortTimeout): + } +} + +// Tests that calling cc.UpdateState in a blocking manner from a child resolver +// while handling a ResolveNow call doesn't result in a deadlock. The test uses +// a fake ClientConn that returns an error when calling cc.UpdateState. The test +// makes the proxy resolver update the resolver state. The test verifies that +// the delegating resolver calls ResolveNow on the target resolver when the +// ClientConn returns an error. +func (s) TestDelegatingResolverUpdateStateFromResolveNow(t *testing.T) { + const envProxyAddr = "proxytest.com" + + hpfe := func(req *http.Request) (*url.URL, error) { + return &url.URL{ + Scheme: "https", + Host: envProxyAddr, + }, nil + } + originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment + delegatingresolver.HTTPSProxyFromEnvironment = hpfe + defer func() { + delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe + }() + + // Manual resolver to control the target resolution. + targetResolver := manual.NewBuilderWithScheme("test") + targetResolverCalled := make(chan struct{}) + targetResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { + // Updating the resolver state should not deadlock. + targetResolver.CC().UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, + }) + close(targetResolverCalled) + } + + target := targetResolver.Scheme() + ":///" + "ignored" + // Set up a manual DNS resolver to control the proxy address resolution. + proxyResolver := setupDNS(t) + + tcc, _, _ := createTestResolverClientConn(t) + tcc.UpdateStateF = func(resolver.State) error { + return errors.New("test error") + } + _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false) + if err != nil { + t.Fatalf("Failed to create delegating resolver: %v", err) + } + + targetResolver.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, + }) + + // Updating the channel will result in an error being returned. The + // delegating resolver should call call "ResolveNow" on the target resolver. + proxyResolver.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-targetResolverCalled: + case <-ctx.Done(): + t.Fatalf("context timed out waiting for targetResolver.ResolveNow() to be called.") + } +} + +// Tests that calling cc.UpdateState in a blocking manner from child resolvers +// doesn't result in deadlocks. +func (s) TestDelegatingResolverResolveNow(t *testing.T) { + const envProxyAddr = "proxytest.com" + + hpfe := func(req *http.Request) (*url.URL, error) { + return &url.URL{ + Scheme: "https", + Host: envProxyAddr, + }, nil + } + originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment + delegatingresolver.HTTPSProxyFromEnvironment = hpfe + defer func() { + delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe + }() + + // Manual resolver to control the target resolution. + targetResolver := manual.NewBuilderWithScheme("test") + targetResolverCalled := make(chan struct{}) + targetResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { + // Updating the resolver state should not deadlock. + targetResolver.CC().UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, + }) + close(targetResolverCalled) + } + + target := targetResolver.Scheme() + ":///" + "ignored" + // Set up a manual DNS resolver to control the proxy address resolution. + proxyResolver := setupDNS(t) + proxyResolverCalled := make(chan struct{}) + proxyResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { + // Updating the resolver state should not deadlock. + proxyResolver.CC().UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, + }) + close(proxyResolverCalled) + } + + tcc, _, _ := createTestResolverClientConn(t) + dr, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false) + if err != nil { + t.Fatalf("Failed to create delegating resolver: %v", err) + } + + // Call ResolveNow on the delegatingResolver and verify both children + // receive the ResolveNow call. + dr.ResolveNow(resolver.ResolveNowOptions{}) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-targetResolverCalled: + case <-ctx.Done(): + t.Fatalf("context timed out waiting for targetResolver.ResolveNow() to be called.") + } + select { + case <-proxyResolverCalled: + case <-ctx.Done(): + t.Fatalf("context timed out waiting for proxyResolver.ResolveNow() to be called.") + } +} From 720552e72e253af08b8fa906b4b5fbf4ded2e720 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Wed, 26 Mar 2025 23:03:40 +0530 Subject: [PATCH 2/2] Resolve merge issues --- .../delegatingresolver/delegatingresolver_ext_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go b/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go index 148b44816614..18ae17ef4ae7 100644 --- a/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go +++ b/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go @@ -659,7 +659,7 @@ func (s) TestDelegatingResolverUpdateStateFromResolveNow(t *testing.T) { targetResolverCalled := make(chan struct{}) targetResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { // Updating the resolver state should not deadlock. - targetResolver.CC().UpdateState(resolver.State{ + targetResolver.CC.UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) close(targetResolverCalled) @@ -719,7 +719,7 @@ func (s) TestDelegatingResolverResolveNow(t *testing.T) { targetResolverCalled := make(chan struct{}) targetResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { // Updating the resolver state should not deadlock. - targetResolver.CC().UpdateState(resolver.State{ + targetResolver.CC.UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) close(targetResolverCalled) @@ -731,7 +731,7 @@ func (s) TestDelegatingResolverResolveNow(t *testing.T) { proxyResolverCalled := make(chan struct{}) proxyResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { // Updating the resolver state should not deadlock. - proxyResolver.CC().UpdateState(resolver.State{ + proxyResolver.CC.UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) close(proxyResolverCalled)