Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions internal/xds/balancer/clusterresolver/configbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,12 @@ func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priority
if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
continue
}
resolverEndpoint := resolver.Endpoint{}
for _, as := range endpoint.Addresses {
resolverEndpoint.Addresses = append(resolverEndpoint.Addresses, resolver.Address{Addr: as})
}

// Create a copy of endpoint.ResolverEndpoint to avoid race.
resolverEndpoint := endpoint.ResolverEndpoint
resolverEndpoint.Addresses = make([]resolver.Address, len(endpoint.ResolverEndpoint.Addresses))
copy(resolverEndpoint.Addresses, endpoint.ResolverEndpoint.Addresses)

resolverEndpoint = hierarchy.SetInEndpoint(resolverEndpoint, []string{priorityName, localityStr})
resolverEndpoint = xdsinternal.SetLocalityIDInEndpoint(resolverEndpoint, locality.ID)
// "To provide the xds_wrr_locality load balancer information about
Expand Down
112 changes: 77 additions & 35 deletions internal/xds/balancer/clusterresolver/configbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,12 @@ func init() {
endpoints = append(endpoints, resolver.Endpoint{Addresses: []resolver.Address{{Addr: addr}}})
ends = append(ends, xdsresource.Endpoint{
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
Addresses: []string{
addr,
fmt.Sprintf("addr-%d-%d-additional-1", i, j),
fmt.Sprintf("addr-%d-%d-additional-2", i, j),
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{
{Addr: addr},
{Addr: fmt.Sprintf("addr-%d-%d-additional-1", i, j)},
{Addr: fmt.Sprintf("addr-%d-%d-additional-2", i, j)},
},
},
})
}
Expand Down Expand Up @@ -315,8 +317,8 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) {
Name: "pick_first",
},
}
e1 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][0].Addresses[0]}}}
e2 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][1].Addresses[0]}}}
e1 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][0].ResolverEndpoint.Addresses[0].Addr}}}
e2 := resolver.Endpoint{Addresses: []resolver.Address{{Addr: testEndpoints[0][1].ResolverEndpoint.Addresses[0].Addr}}}
wantEndpoints := []resolver.Endpoint{
hierarchy.SetInEndpoint(e1, []string{"priority-3"}),
hierarchy.SetInEndpoint(e2, []string{"priority-3"}),
Expand Down Expand Up @@ -417,14 +419,14 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
},
}
wantEndpoints := []resolver.Endpoint{
testEndpointWithAttrs(testEndpoints[0][0].Addresses, 20, 1, "priority-2-0", &testLocalityIDs[0]),
testEndpointWithAttrs(testEndpoints[0][1].Addresses, 20, 1, "priority-2-0", &testLocalityIDs[0]),
testEndpointWithAttrs(testEndpoints[1][0].Addresses, 80, 1, "priority-2-0", &testLocalityIDs[1]),
testEndpointWithAttrs(testEndpoints[1][1].Addresses, 80, 1, "priority-2-0", &testLocalityIDs[1]),
testEndpointWithAttrs(testEndpoints[2][0].Addresses, 20, 1, "priority-2-1", &testLocalityIDs[2]),
testEndpointWithAttrs(testEndpoints[2][1].Addresses, 20, 1, "priority-2-1", &testLocalityIDs[2]),
testEndpointWithAttrs(testEndpoints[3][0].Addresses, 80, 1, "priority-2-1", &testLocalityIDs[3]),
testEndpointWithAttrs(testEndpoints[3][1].Addresses, 80, 1, "priority-2-1", &testLocalityIDs[3]),
testEndpointWithAttrs(testEndpoints[0][0].ResolverEndpoint, 20, 1, "priority-2-0", &testLocalityIDs[0]),
testEndpointWithAttrs(testEndpoints[0][1].ResolverEndpoint, 20, 1, "priority-2-0", &testLocalityIDs[0]),
testEndpointWithAttrs(testEndpoints[1][0].ResolverEndpoint, 80, 1, "priority-2-0", &testLocalityIDs[1]),
testEndpointWithAttrs(testEndpoints[1][1].ResolverEndpoint, 80, 1, "priority-2-0", &testLocalityIDs[1]),
testEndpointWithAttrs(testEndpoints[2][0].ResolverEndpoint, 20, 1, "priority-2-1", &testLocalityIDs[2]),
testEndpointWithAttrs(testEndpoints[2][1].ResolverEndpoint, 20, 1, "priority-2-1", &testLocalityIDs[2]),
testEndpointWithAttrs(testEndpoints[3][0].ResolverEndpoint, 80, 1, "priority-2-1", &testLocalityIDs[3]),
testEndpointWithAttrs(testEndpoints[3][1].ResolverEndpoint, 80, 1, "priority-2-1", &testLocalityIDs[3]),
}

if diff := cmp.Diff(gotNames, wantNames); diff != "" {
Expand Down Expand Up @@ -547,16 +549,36 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) {
localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{
{Addresses: []string{"addr-1-1"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Addresses: []string{"addr-1-2"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
{
ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}},
Weight: 90,
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
},
{
ResolverEndpoint: resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}},
Weight: 10,
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
},
},
ID: clients.Locality{Zone: "test-zone-1"},
Weight: 20,
},
{
Endpoints: []xdsresource.Endpoint{
{Addresses: []string{"addr-2-1"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Addresses: []string{"addr-2-2"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: "addr-2-1"}},
},
Weight: 90,
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
},
{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: "addr-2-2"}},
},
Weight: 10,
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
},
},
ID: clients.Locality{Zone: "test-zone-2"},
Weight: 80,
Expand All @@ -576,27 +598,51 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) {
ChildPolicy: &iserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
wantEndpoints: []resolver.Endpoint{
testEndpointWithAttrs([]string{"addr-1-1"}, 20, 90, "test-priority", &clients.Locality{Zone: "test-zone-1"}),
testEndpointWithAttrs([]string{"addr-1-2"}, 20, 10, "test-priority", &clients.Locality{Zone: "test-zone-1"}),
testEndpointWithAttrs([]string{"addr-2-1"}, 80, 90, "test-priority", &clients.Locality{Zone: "test-zone-2"}),
testEndpointWithAttrs([]string{"addr-2-2"}, 80, 10, "test-priority", &clients.Locality{Zone: "test-zone-2"}),
testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, 20, 90, "test-priority", &clients.Locality{Zone: "test-zone-1"}),
testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, 20, 10, "test-priority", &clients.Locality{Zone: "test-zone-1"}),
testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, 80, 90, "test-priority", &clients.Locality{Zone: "test-zone-2"}),
testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, 80, 10, "test-priority", &clients.Locality{Zone: "test-zone-2"}),
},
},
{
name: "ring_hash as child",
localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{
{Addresses: []string{"addr-1-1"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Addresses: []string{"addr-1-2"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: "addr-1-1"}},
},
Weight: 90,
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
},
{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: "addr-1-2"}},
},
Weight: 10,
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
},
},
ID: clients.Locality{Zone: "test-zone-1"},
Weight: 20,
},
{
Endpoints: []xdsresource.Endpoint{
{Addresses: []string{"addr-2-1"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Addresses: []string{"addr-2-2"}, HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: "addr-2-1"}},
},
Weight: 90,
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
},
{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: "addr-2-2"}},
},
Weight: 10,
HealthStatus: xdsresource.EndpointHealthStatusHealthy,
},
},
ID: clients.Locality{Zone: "test-zone-2"},
Weight: 80,
Expand All @@ -612,10 +658,10 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) {
},
},
wantEndpoints: []resolver.Endpoint{
testEndpointWithAttrs([]string{"addr-1-1"}, 20, 90, "test-priority", &clients.Locality{Zone: "test-zone-1"}),
testEndpointWithAttrs([]string{"addr-1-2"}, 20, 10, "test-priority", &clients.Locality{Zone: "test-zone-1"}),
testEndpointWithAttrs([]string{"addr-2-1"}, 80, 90, "test-priority", &clients.Locality{Zone: "test-zone-2"}),
testEndpointWithAttrs([]string{"addr-2-2"}, 80, 10, "test-priority", &clients.Locality{Zone: "test-zone-2"}),
testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-1"}}}, 20, 90, "test-priority", &clients.Locality{Zone: "test-zone-1"}),
testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-1-2"}}}, 20, 10, "test-priority", &clients.Locality{Zone: "test-zone-1"}),
testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-1"}}}, 80, 90, "test-priority", &clients.Locality{Zone: "test-zone-2"}),
testEndpointWithAttrs(resolver.Endpoint{Addresses: []resolver.Address{{Addr: "addr-2-2"}}}, 80, 10, "test-priority", &clients.Locality{Zone: "test-zone-2"}),
},
},
}
Expand All @@ -635,11 +681,7 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) {
}
}

func testEndpointWithAttrs(addrStrs []string, localityWeight, endpointWeight uint32, priority string, lID *clients.Locality) resolver.Endpoint {
endpoint := resolver.Endpoint{}
for _, a := range addrStrs {
endpoint.Addresses = append(endpoint.Addresses, resolver.Address{Addr: a})
}
func testEndpointWithAttrs(endpoint resolver.Endpoint, localityWeight, endpointWeight uint32, priority string, lID *clients.Locality) resolver.Endpoint {
path := []string{priority}
if lID != nil {
path = append(path, xdsinternal.LocalityString(*lID))
Expand Down
71 changes: 61 additions & 10 deletions internal/xds/xdsclient/tests/eds_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/types/known/wrapperspb"

v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
Expand Down Expand Up @@ -176,7 +177,12 @@ func (s) TestEDSWatch(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost1, edsPort1)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand All @@ -199,7 +205,12 @@ func (s) TestEDSWatch(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost1, edsPort1)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand Down Expand Up @@ -338,7 +349,12 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost1, edsPort1)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand All @@ -354,7 +370,12 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost2, edsPort2)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost2, edsPort2)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand All @@ -376,7 +397,12 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost1, edsPort1)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand All @@ -392,7 +418,12 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost2, edsPort2)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost2, edsPort2)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand Down Expand Up @@ -590,7 +621,12 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost1, edsPort1)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand Down Expand Up @@ -681,7 +717,12 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost1, edsPort1)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand Down Expand Up @@ -813,7 +854,12 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost1, edsPort1)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand Down Expand Up @@ -976,7 +1022,12 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{fmt.Sprintf("%s:%d", edsHost1, edsPort1)}, Weight: 1}},
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: fmt.Sprintf("%s:%d", edsHost1, edsPort1)}},
},
Weight: 1,
}},
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand Down
10 changes: 8 additions & 2 deletions internal/xds/xdsclient/tests/federation_watchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/internal/xds/xdsclient"
"google.golang.org/grpc/internal/xds/xdsclient/xdsresource"
"google.golang.org/grpc/resolver"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
Expand Down Expand Up @@ -291,8 +292,13 @@ func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) {
update: xdsresource.EndpointsUpdate{
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Addresses: []string{"localhost:666"}, Weight: 1}},
Weight: 1,
Endpoints: []xdsresource.Endpoint{{
ResolverEndpoint: resolver.Endpoint{
Addresses: []resolver.Address{{Addr: "localhost:666"}},
},
Weight: 1,
}},
Weight: 1,
ID: clients.Locality{
Region: "region-1",
Zone: "zone-1",
Expand Down
Loading