vendor: google.golang.org/grpc v1.76.0, google.golang.org/protobuf v1.36.10

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn
2025-11-11 10:40:43 +01:00
parent 65bb1bb21f
commit e65995d896
28 changed files with 540 additions and 103 deletions

4
go.mod
View File

@@ -109,8 +109,8 @@ require (
golang.org/x/text v0.30.0
golang.org/x/time v0.14.0
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5
google.golang.org/grpc v1.75.0
google.golang.org/protobuf v1.36.9
google.golang.org/grpc v1.76.0
google.golang.org/protobuf v1.36.10
gotest.tools/v3 v3.5.2
pgregory.net/rapid v1.2.0
resenje.org/singleflight v0.4.3

15
go.sum
View File

@@ -115,6 +115,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cloudflare/cfssl v1.6.4 h1:NMOvfrEjFfC63K3SGXgAnFdsgkmiq4kATme5BfcqrO8=
github.com/cloudflare/cfssl v1.6.4/go.mod h1:8b3CQMxfWPAeom3zBnGJ6sd+G1NkL5TXqmDXacb+1J0=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443 h1:aQ3y1lwWyqYPiWZThqv1aFbZMiM9vblcSArJRf2Irls=
github.com/cncf/xds/go v0.0.0-20250501225837-2ac532fd4443/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/cockroachdb/datadriven v1.0.2 h1:H9MtNqVoVhvd9nCBwOyDjUEdZCREqbIdCJD93PBm/jA=
github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU=
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb h1:EDmT6Q9Zs+SbUoc7Ik9EfrFqcylYqgPZ9ANSbTAntnE=
@@ -201,7 +203,12 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.13.4 h1:zEqyPVyku6IvWCFwux4x9RxkLOMUL+1vC9xUFv5l2/M=
github.com/envoyproxy/go-control-plane/envoy v1.32.4 h1:jb83lalDRZSpPWW2Z7Mck/8kXZ5CQAFYVjQcdVIr83A=
github.com/envoyproxy/go-control-plane/envoy v1.32.4/go.mod h1:Gzjc5k8JcJswLjAx1Zm+wSYE20UrLtt7JZMWiWQXQEw=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.2.1 h1:DEo3O99U8j4hBFwbJfrz9VtgcDfUKS7KJ7spH3d86P8=
github.com/envoyproxy/protoc-gen-validate v1.2.1/go.mod h1:d/C80l/jxXLdfEIhX1W2TmLfsJ31lvEjwamM4DxlWXU=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
@@ -802,8 +809,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A=
google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -813,8 +820,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -33,17 +33,21 @@ guidelines, there may be valid reasons to do so, but it should be rare.
## Guidelines for Pull Requests
How to get your contributions merged smoothly and quickly:
Please read the following carefully to ensure your contributions can be merged
smoothly and quickly.
### PR Contents
- Create **small PRs** that are narrowly focused on **addressing a single
concern**. We often receive PRs that attempt to fix several things at the same
time, and if one part of the PR has a problem, that will hold up the entire
PR.
- For **speculative changes**, consider opening an issue and discussing it
first. If you are suggesting a behavioral or API change, consider starting
with a [gRFC proposal](https://github.com/grpc/proposal). Many new features
that are not bug fixes will require cross-language agreement.
- If your change does not address an **open issue** with an **agreed
resolution**, consider opening an issue and discussing it first. If you are
suggesting a behavioral or API change, consider starting with a [gRFC
proposal](https://github.com/grpc/proposal). Many new features that are not
bug fixes will require cross-language agreement.
- If you want to fix **formatting or style**, consider whether your changes are
an obvious improvement or might be considered a personal preference. If a
@@ -56,16 +60,6 @@ How to get your contributions merged smoothly and quickly:
often written as "iff". Please do not make spelling correction changes unless
you are certain they are misspellings.
- Provide a good **PR description** as a record of **what** change is being made
and **why** it was made. Link to a GitHub issue if it exists.
- Maintain a **clean commit history** and use **meaningful commit messages**.
PRs with messy commit histories are difficult to review and won't be merged.
Before sending your PR, ensure your changes are based on top of the latest
`upstream/master` commits, and avoid rebasing in the middle of a code review.
You should **never use `git push -f`** unless absolutely necessary during a
review, as it can interfere with GitHub's tracking of comments.
- **All tests need to be passing** before your change can be merged. We
recommend you run tests locally before creating your PR to catch breakages
early on:
@@ -81,15 +75,80 @@ How to get your contributions merged smoothly and quickly:
GitHub, which will trigger a GitHub Actions run that you can use to verify
everything is passing.
- If you are adding a new file, make sure it has the **copyright message**
- Note that there are two GitHub actions checks that need not be green:
1. We test the freshness of the generated proto code we maintain via the
`vet-proto` check. If the source proto files are updated, but our repo is
not updated, an optional checker will fail. This will be fixed by our team
in a separate PR and will not prevent the merge of your PR.
2. We run a checker that will fail if there is any change in dependencies of
an exported package via the `dependencies` check. If new dependencies are
added that are not appropriate, we may not accept your PR (see below).
- If you are adding a **new file**, make sure it has the **copyright message**
template at the top as a comment. You can copy the message from an existing
file and update the year.
- The grpc package should only depend on standard Go packages and a small number
of exceptions. **If your contribution introduces new dependencies**, you will
need a discussion with gRPC-Go maintainers. A GitHub action check will run on
every PR, and will flag any transitive dependency changes from any public
package.
need a discussion with gRPC-Go maintainers.
### PR Descriptions
- **PR titles** should start with the name of the component being addressed, or
the type of change. Examples: transport, client, server, round_robin, xds,
cleanup, deps.
- Read and follow the **guidelines for PR titles and descriptions** here:
https://google.github.io/eng-practices/review/developer/cl-descriptions.html
*particularly* the sections "First Line" and "Body is Informative".
Note: your PR description will be used as the git commit message in a
squash-and-merge if your PR is approved. We may make changes to this as
necessary.
- **Does this PR relate to an open issue?** On the first line, please use the
tag `Fixes #<issue>` to ensure the issue is closed when the PR is merged. Or
use `Updates #<issue>` if the PR is related to an open issue, but does not fix
it. Consider filing an issue if one does not already exist.
- PR descriptions *must* conclude with **release notes** as follows:
```
RELEASE NOTES:
* <component>: <summary>
```
This need not match the PR title.
The summary must:
* be something that gRPC users will understand.
* clearly explain the feature being added, the issue being fixed, or the
behavior being changed, etc. If fixing a bug, be clear about how the bug
can be triggered by an end-user.
* begin with a capital letter and use complete sentences.
* be as short as possible to describe the change being made.
If a PR is *not* end-user visible -- e.g. a cleanup, testing change, or
GitHub-related, use `RELEASE NOTES: n/a`.
### PR Process
- Please **self-review** your code changes before sending your PR. This will
prevent simple, obvious errors from causing delays.
- Maintain a **clean commit history** and use **meaningful commit messages**.
PRs with messy commit histories are difficult to review and won't be merged.
Before sending your PR, ensure your changes are based on top of the latest
`upstream/master` commits, and avoid rebasing in the middle of a code review.
You should **never use `git push -f`** unless absolutely necessary during a
review, as it can interfere with GitHub's tracking of comments.
- Unless your PR is trivial, you should **expect reviewer comments** that you
will need to address before merging. We'll label the PR as `Status: Requires
@@ -98,5 +157,3 @@ How to get your contributions merged smoothly and quickly:
`stale`, and we will automatically close it after 7 days if we don't hear back
from you. Please feel free to ping issues or bugs if you do not get a response
within a week.
- Exceptions to the rules can be made if there's a compelling reason to do so.

View File

@@ -82,14 +82,8 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
}
md := metadata.Pairs(lbTokenKey, s.LoadBalanceToken)
ip := net.IP(s.IpAddress)
ipStr := ip.String()
if ip.To4() == nil {
// Add square brackets to ipv6 addresses, otherwise net.Dial() and
// net.SplitHostPort() will return too many colons error.
ipStr = fmt.Sprintf("[%s]", ipStr)
}
addr := imetadata.Set(resolver.Address{Addr: fmt.Sprintf("%s:%d", ipStr, s.Port)}, md)
ipStr := net.IP(s.IpAddress).String()
addr := imetadata.Set(resolver.Address{Addr: net.JoinHostPort(ipStr, fmt.Sprintf("%d", s.Port))}, md)
if lb.logger.V(2) {
lb.logger.Infof("Server list entry:|%d|, ipStr:|%s|, port:|%d|, load balancer token:|%v|", i, ipStr, s.Port, s.LoadBalanceToken)
}

View File

@@ -169,7 +169,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
addrs = state.ResolverState.Addresses
if cfg.ShuffleAddressList {
addrs = append([]resolver.Address{}, addrs...)
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
internal.RandShuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
}

View File

@@ -283,7 +283,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
newAddrs = state.ResolverState.Addresses
if cfg.ShuffleAddressList {
newAddrs = append([]resolver.Address{}, newAddrs...)
internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
internal.RandShuffle(len(newAddrs), func(i, j int) { newAddrs[i], newAddrs[j] = newAddrs[j], newAddrs[i] })
}
}
@@ -351,6 +351,13 @@ func (b *pickfirstBalancer) ExitIdle() {
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle {
// Move the balancer into CONNECTING state immediately. This is done to
// avoid staying in IDLE if a resolver update arrives before the first
// SubConn reports CONNECTING.
b.updateBalancerState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.startFirstPassLocked()
}
}
@@ -604,7 +611,7 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
if !b.addressList.seekTo(sd.addr) {
// This should not fail as we should have only one SubConn after
// entering READY. The SubConn should be present in the addressList.
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
if !b.healthCheckingEnabled {

View File

@@ -456,7 +456,7 @@ func (cc *ClientConn) validateTransportCredentials() error {
func (cc *ClientConn) channelzRegistration(target string) {
parentChannel, _ := cc.dopts.channelzParent.(*channelz.Channel)
cc.channelz = channelz.RegisterChannel(parentChannel, target)
cc.addTraceEvent("created")
cc.addTraceEvent(fmt.Sprintf("created for target %q", target))
}
// chainUnaryClientInterceptors chains all unary client interceptors into one.

View File

@@ -46,9 +46,25 @@ func (c *codecV2) Marshal(v any) (data mem.BufferSlice, err error) {
return nil, fmt.Errorf("proto: failed to marshal, message is %T, want proto.Message", v)
}
// Important: if we remove this Size call then we cannot use
// UseCachedSize in MarshalOptions below.
size := proto.Size(vv)
// MarshalOptions with UseCachedSize allows reusing the result from the
// previous Size call. This is safe here because:
//
// 1. We just computed the size.
// 2. We assume the message is not being mutated concurrently.
//
// Important: If the proto.Size call above is removed, using UseCachedSize
// becomes unsafe and may lead to incorrect marshaling.
//
// For more details, see the doc of UseCachedSize:
// https://pkg.go.dev/google.golang.org/protobuf/proto#MarshalOptions
marshalOptions := proto.MarshalOptions{UseCachedSize: true}
if mem.IsBelowBufferPoolingThreshold(size) {
buf, err := proto.Marshal(vv)
buf, err := marshalOptions.Marshal(vv)
if err != nil {
return nil, err
}
@@ -56,7 +72,7 @@ func (c *codecV2) Marshal(v any) (data mem.BufferSlice, err error) {
} else {
pool := mem.DefaultBufferPool()
buf := pool.Get(size)
if _, err := (proto.MarshalOptions{}).MarshalAppend((*buf)[:0], vv); err != nil {
if _, err := marshalOptions.MarshalAppend((*buf)[:0], vv); err != nil {
pool.Put(buf)
return nil, err
}

View File

@@ -83,6 +83,7 @@ func (b *Unbounded) Load() {
default:
}
} else if b.closing && !b.closed {
b.closed = true
close(b.c)
}
}

View File

@@ -194,7 +194,7 @@ func (r RefChannelType) String() string {
// If channelz is not turned ON, this will simply log the event descriptions.
func AddTraceEvent(l grpclog.DepthLoggerV2, e Entity, depth int, desc *TraceEvent) {
// Log only the trace description associated with the bottom most entity.
d := fmt.Sprintf("[%s]%s", e, desc.Desc)
d := fmt.Sprintf("[%s] %s", e, desc.Desc)
switch desc.Severity {
case CtUnknown, CtInfo:
l.InfoDepth(depth+1, d)

View File

@@ -68,4 +68,10 @@ var (
// trust. For more details, see:
// https://github.com/grpc/proposal/blob/master/A87-mtls-spiffe-support.md
XDSSPIFFEEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_MTLS_SPIFFE", false)
// XDSHTTPConnectEnabled is true if gRPC should parse custom Metadata
// configuring use of an HTTP CONNECT proxy via xDS from cluster resources.
// For more details, see:
// https://github.com/grpc/proposal/blob/master/A86-xds-http-connect.md
XDSHTTPConnectEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false)
)

View File

@@ -80,25 +80,11 @@ func (cs *CallbackSerializer) ScheduleOr(f func(ctx context.Context), onFailure
func (cs *CallbackSerializer) run(ctx context.Context) {
defer close(cs.done)
// TODO: when Go 1.21 is the oldest supported version, this loop and Close
// can be replaced with:
//
// context.AfterFunc(ctx, cs.callbacks.Close)
for ctx.Err() == nil {
select {
case <-ctx.Done():
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
case cb := <-cs.callbacks.Get():
cs.callbacks.Load()
cb.(func(context.Context))(ctx)
}
}
// Close the buffer when the context is canceled
// to prevent new callbacks from being added.
context.AfterFunc(ctx, cs.callbacks.Close)
// Close the buffer to prevent new callbacks from being added.
cs.callbacks.Close()
// Run all pending callbacks.
// Run all callbacks.
for cb := range cs.callbacks.Get() {
cs.callbacks.Load()
cb.(func(context.Context))(ctx)

View File

@@ -277,11 +277,13 @@ func (ht *serverHandlerTransport) writeStatus(s *ServerStream, st *status.Status
if err == nil { // transport has not been closed
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
s.hdrMu.Lock()
for _, sh := range ht.stats {
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
s.hdrMu.Unlock()
}
ht.Close(errors.New("finished writing status"))
return err

View File

@@ -556,6 +556,19 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
// Make the slice of certain predictable size to reduce allocations made by append.
hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
hfLen += len(authData) + len(callAuthData)
registeredCompressors := t.registeredCompressors
if callHdr.PreviousAttempts > 0 {
hfLen++
}
if callHdr.SendCompress != "" {
hfLen++
}
if registeredCompressors != "" {
hfLen++
}
if _, ok := ctx.Deadline(); ok {
hfLen++
}
headerFields := make([]hpack.HeaderField, 0, hfLen)
headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
@@ -568,7 +581,6 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
}
registeredCompressors := t.registeredCompressors
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
// Include the outgoing compressor name when compressor is not registered
@@ -1499,13 +1511,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
case "grpc-message":
grpcMessage = decodeGrpcMessage(hf.Value)
case ":status":
if hf.Value == "200" {
httpStatusErr = ""
statusCode := 200
httpStatusCode = &statusCode
break
}
c, err := strconv.ParseInt(hf.Value, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
@@ -1513,7 +1518,19 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}
statusCode := int(c)
if statusCode >= 100 && statusCode < 200 {
if endStream {
se := status.New(codes.Internal, fmt.Sprintf(
"protocol error: informational header with status code %d must not have END_STREAM set", statusCode))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
}
return
}
httpStatusCode = &statusCode
if statusCode == 200 {
httpStatusErr = ""
break
}
httpStatusErr = fmt.Sprintf(
"unexpected HTTP status code received from server: %d (%s)",

View File

@@ -1353,10 +1353,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
// called to interrupt the potential blocking on other goroutines.
s.cancel()
oldState := s.swapState(streamDone)
if oldState == streamDone {
return
}
// We can't return early even if the stream's state is "done" as the state
// might have been set by the `finishStream` method. Deleting the stream via
// `finishStream` can get blocked on flow control.
s.swapState(streamDone)
t.deleteStream(s, eosReceived)
t.controlBuf.put(&cleanupStream{

View File

@@ -0,0 +1,112 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package clients provides implementations of the clients to interact with
// xDS and LRS servers.
//
// # xDS Client
//
// The xDS client allows applications to:
// - Create client instances with in-memory configurations.
// - Register watches for named resources.
// - Receive resources via the ADS (Aggregated Discovery Service) stream.
//
// This enables applications to dynamically discover and configure resources
// such as listeners, routes, clusters, and endpoints from an xDS management
// server.
//
// # LRS Client
//
// The LRS (Load Reporting Service) client allows applications to report load
// data to an LRS server via the LRS stream. This data can be used for
// monitoring, traffic management, and other purposes.
//
// # Experimental
//
// NOTICE: This package is EXPERIMENTAL and may be changed or removed
// in a later release.
package clients
// ServerIdentifier holds identifying information for connecting to an xDS
// management or LRS server.
type ServerIdentifier struct {
// ServerURI is the target URI of the server.
ServerURI string
// Extensions can be populated with arbitrary data to be passed to the
// TransportBuilder and/or xDS Client's ResourceType implementations.
// This field can be used to provide additional configuration or context
// specific to the user's needs.
//
// The xDS and LRS clients do not interpret the contents of this field.
// It is the responsibility of the user's custom TransportBuilder and/or
// ResourceType implementations to handle and interpret these extensions.
//
// For example, a custom TransportBuilder might use this field to
// configure a specific security credentials.
//
// Extensions may be any type that is comparable, as they are used as map
// keys internally. If Extensions are not able to be used as a map key,
// the client may panic.
//
// See: https://go.dev/ref/spec#Comparison_operators
//
// Any equivalent extensions in all ServerIdentifiers present in a single
// client's configuration should have the same value. Not following this
// restriction may result in excess resource usage.
Extensions any
}
// Node represents the identity of the xDS client, allowing xDS and LRS servers
// to identify the source of xDS requests.
type Node struct {
// ID is a string identifier of the application.
ID string
// Cluster is the name of the cluster the application belongs to.
Cluster string
// Locality is the location of the application including region, zone,
// sub-zone.
Locality Locality
// Metadata provides additional context about the application by associating
// arbitrary key-value pairs with it.
Metadata any
// UserAgentName is the user agent name of application.
UserAgentName string
// UserAgentVersion is the user agent version of application.
UserAgentVersion string
}
// Locality represents the location of the xDS client application.
type Locality struct {
// Region is the region of the xDS client application.
Region string
// Zone is the area within a region.
Zone string
// SubZone is the further subdivision within a zone.
SubZone string
}
// MetricsReporter is used by the XDSClient to report metrics.
type MetricsReporter interface {
// ReportMetric reports a metric. The metric will be one of the predefined
// set of types depending on the client (XDSClient or LRSClient).
//
// Each client will produce different metrics. Please see the client's
// documentation for a list of possible metrics events.
ReportMetric(metric any)
}

View File

@@ -0,0 +1,53 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clients
import (
"context"
)
// TransportBuilder provides the functionality to create a communication
// channel to an xDS or LRS server.
type TransportBuilder interface {
// Build creates a new Transport instance to the server based on the
// provided ServerIdentifier.
Build(serverIdentifier ServerIdentifier) (Transport, error)
}
// Transport provides the functionality to communicate with an xDS or LRS
// server using streaming calls.
type Transport interface {
// NewStream creates a new streaming call to the server for the specific
// RPC method name. The returned Stream interface can be used to send and
// receive messages on the stream.
NewStream(context.Context, string) (Stream, error)
// Close closes the Transport.
Close()
}
// Stream provides methods to send and receive messages on a stream. Messages
// are represented as a byte slice.
type Stream interface {
// Send sends the provided message on the stream.
Send([]byte) error
// Recv blocks until the next message is received on the stream.
Recv() ([]byte, error)
}

View File

@@ -14,12 +14,17 @@
* limitations under the License.
*/
// Package xds contains methods to Get/Set handshake cluster names. It is separated
// out from the top level /internal package to avoid circular dependencies.
// Package xds contains functions, structs, and utilities for working with
// handshake cluster names, as well as shared components used by xds balancers
// and resolvers. It is separated from the top-level /internal package to
// avoid circular dependencies.
package xds
import (
"fmt"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/internal/xds/clients"
"google.golang.org/grpc/resolver"
)
@@ -40,3 +45,60 @@ func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) {
name, ok := v.(string)
return name, ok
}
// LocalityString generates a string representation of clients.Locality in the
// format specified in gRFC A76.
func LocalityString(l clients.Locality) string {
return fmt.Sprintf("{region=%q, zone=%q, sub_zone=%q}", l.Region, l.Zone, l.SubZone)
}
// IsLocalityEqual allows the values to be compared by Attributes.Equal.
func IsLocalityEqual(l clients.Locality, o any) bool {
ol, ok := o.(clients.Locality)
if !ok {
return false
}
return l.Region == ol.Region && l.Zone == ol.Zone && l.SubZone == ol.SubZone
}
// LocalityFromString converts a string representation of clients.locality as
// specified in gRFC A76, into a LocalityID struct.
func LocalityFromString(s string) (ret clients.Locality, _ error) {
_, err := fmt.Sscanf(s, "{region=%q, zone=%q, sub_zone=%q}", &ret.Region, &ret.Zone, &ret.SubZone)
if err != nil {
return clients.Locality{}, fmt.Errorf("%s is not a well formatted locality ID, error: %v", s, err)
}
return ret, nil
}
type localityKeyType string
const localityKey = localityKeyType("grpc.xds.internal.address.locality")
// GetLocalityID returns the locality ID of addr.
func GetLocalityID(addr resolver.Address) clients.Locality {
path, _ := addr.BalancerAttributes.Value(localityKey).(clients.Locality)
return path
}
// SetLocalityID sets locality ID in addr to l.
func SetLocalityID(addr resolver.Address, l clients.Locality) resolver.Address {
addr.BalancerAttributes = addr.BalancerAttributes.WithValue(localityKey, l)
return addr
}
// SetLocalityIDInEndpoint sets locality ID in endpoint to l.
func SetLocalityIDInEndpoint(endpoint resolver.Endpoint, l clients.Locality) resolver.Endpoint {
endpoint.Attributes = endpoint.Attributes.WithValue(localityKey, l)
return endpoint
}
// ResourceTypeMapForTesting maps TypeUrl to corresponding ResourceType.
var ResourceTypeMapForTesting map[string]any
// UnknownCSMLabels are TelemetryLabels emitted from CDS if CSM Telemetry Label
// data is not present in the CDS Resource.
var UnknownCSMLabels = map[string]string{
"csm.service_name": "unknown",
"csm.service_namespace_name": "unknown",
}

View File

@@ -549,6 +549,8 @@ type clientStream struct {
sentLast bool // sent an end stream
receivedFirstMsg bool // set after the first message is received
methodConfig *MethodConfig
ctx context.Context // the application's context, wrapped by stats/tracing
@@ -1144,11 +1146,16 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// Received no msg and status OK for non-server streaming rpcs.
if !cs.desc.ServerStreams && !cs.receivedFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
cs.receivedFirstMsg = true
if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
@@ -1177,7 +1184,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func (a *csAttempt) finish(err error) {
@@ -1359,6 +1366,7 @@ type addrConnStream struct {
transport transport.ClientTransport
ctx context.Context
sentLast bool
receivedFirstMsg bool
desc *StreamDesc
codec baseCodec
sendCompressorV0 Compressor
@@ -1484,10 +1492,15 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
if statusErr := as.transportStream.Status().Err(); statusErr != nil {
return statusErr
}
// Received no msg and status OK for non-server streaming rpcs.
if !as.desc.ServerStreams && !as.receivedFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no response message from non-server-streaming RPC")
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
as.receivedFirstMsg = true
if as.desc.ServerStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
@@ -1501,7 +1514,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func (as *addrConnStream) finish(err error) {

View File

@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.75.0"
const Version = "1.76.0"

View File

@@ -72,9 +72,10 @@ type (
EditionFeatures EditionFeatures
}
FileL2 struct {
Options func() protoreflect.ProtoMessage
Imports FileImports
Locations SourceLocations
Options func() protoreflect.ProtoMessage
Imports FileImports
OptionImports func() protoreflect.FileImports
Locations SourceLocations
}
// EditionFeatures is a frequently-instantiated struct, so please take care
@@ -126,12 +127,9 @@ func (fd *File) ParentFile() protoreflect.FileDescriptor { return fd }
func (fd *File) Parent() protoreflect.Descriptor { return nil }
func (fd *File) Index() int { return 0 }
func (fd *File) Syntax() protoreflect.Syntax { return fd.L1.Syntax }
// Not exported and just used to reconstruct the original FileDescriptor proto
func (fd *File) Edition() int32 { return int32(fd.L1.Edition) }
func (fd *File) Name() protoreflect.Name { return fd.L1.Package.Name() }
func (fd *File) FullName() protoreflect.FullName { return fd.L1.Package }
func (fd *File) IsPlaceholder() bool { return false }
func (fd *File) Name() protoreflect.Name { return fd.L1.Package.Name() }
func (fd *File) FullName() protoreflect.FullName { return fd.L1.Package }
func (fd *File) IsPlaceholder() bool { return false }
func (fd *File) Options() protoreflect.ProtoMessage {
if f := fd.lazyInit().Options; f != nil {
return f()
@@ -150,6 +148,16 @@ func (fd *File) Format(s fmt.State, r rune) { descfmt.FormatD
func (fd *File) ProtoType(protoreflect.FileDescriptor) {}
func (fd *File) ProtoInternal(pragma.DoNotImplement) {}
// The next two are not part of the FileDescriptor interface. They are just used to reconstruct
// the original FileDescriptor proto.
func (fd *File) Edition() int32 { return int32(fd.L1.Edition) }
func (fd *File) OptionImports() protoreflect.FileImports {
if f := fd.lazyInit().OptionImports; f != nil {
return f()
}
return emptyFiles
}
func (fd *File) lazyInit() *FileL2 {
if atomic.LoadUint32(&fd.once) == 0 {
fd.lazyInitOnce()
@@ -182,9 +190,9 @@ type (
L2 *EnumL2 // protected by fileDesc.once
}
EnumL1 struct {
eagerValues bool // controls whether EnumL2.Values is already populated
EditionFeatures EditionFeatures
Visibility int32
eagerValues bool // controls whether EnumL2.Values is already populated
}
EnumL2 struct {
Options func() protoreflect.ProtoMessage
@@ -219,6 +227,11 @@ func (ed *Enum) ReservedNames() protoreflect.Names { return &ed.lazyInit()
func (ed *Enum) ReservedRanges() protoreflect.EnumRanges { return &ed.lazyInit().ReservedRanges }
func (ed *Enum) Format(s fmt.State, r rune) { descfmt.FormatDesc(s, r, ed) }
func (ed *Enum) ProtoType(protoreflect.EnumDescriptor) {}
// This is not part of the EnumDescriptor interface. It is just used to reconstruct
// the original FileDescriptor proto.
func (ed *Enum) Visibility() int32 { return ed.L1.Visibility }
func (ed *Enum) lazyInit() *EnumL2 {
ed.L0.ParentFile.lazyInit() // implicitly initializes L2
return ed.L2
@@ -244,13 +257,13 @@ type (
L2 *MessageL2 // protected by fileDesc.once
}
MessageL1 struct {
Enums Enums
Messages Messages
Extensions Extensions
IsMapEntry bool // promoted from google.protobuf.MessageOptions
IsMessageSet bool // promoted from google.protobuf.MessageOptions
Enums Enums
Messages Messages
Extensions Extensions
EditionFeatures EditionFeatures
Visibility int32
IsMapEntry bool // promoted from google.protobuf.MessageOptions
IsMessageSet bool // promoted from google.protobuf.MessageOptions
}
MessageL2 struct {
Options func() protoreflect.ProtoMessage
@@ -319,6 +332,11 @@ func (md *Message) Messages() protoreflect.MessageDescriptors { return &md.L
func (md *Message) Extensions() protoreflect.ExtensionDescriptors { return &md.L1.Extensions }
func (md *Message) ProtoType(protoreflect.MessageDescriptor) {}
func (md *Message) Format(s fmt.State, r rune) { descfmt.FormatDesc(s, r, md) }
// This is not part of the MessageDescriptor interface. It is just used to reconstruct
// the original FileDescriptor proto.
func (md *Message) Visibility() int32 { return md.L1.Visibility }
func (md *Message) lazyInit() *MessageL2 {
md.L0.ParentFile.lazyInit() // implicitly initializes L2
return md.L2

View File

@@ -284,6 +284,13 @@ func (ed *Enum) unmarshalSeed(b []byte, sb *strs.Builder, pf *File, pd protorefl
case genid.EnumDescriptorProto_Value_field_number:
numValues++
}
case protowire.VarintType:
v, m := protowire.ConsumeVarint(b)
b = b[m:]
switch num {
case genid.EnumDescriptorProto_Visibility_field_number:
ed.L1.Visibility = int32(v)
}
default:
m := protowire.ConsumeFieldValue(num, typ, b)
b = b[m:]
@@ -365,6 +372,13 @@ func (md *Message) unmarshalSeed(b []byte, sb *strs.Builder, pf *File, pd protor
md.unmarshalSeedOptions(v)
}
prevField = num
case protowire.VarintType:
v, m := protowire.ConsumeVarint(b)
b = b[m:]
switch num {
case genid.DescriptorProto_Visibility_field_number:
md.L1.Visibility = int32(v)
}
default:
m := protowire.ConsumeFieldValue(num, typ, b)
b = b[m:]

View File

@@ -134,6 +134,7 @@ func (fd *File) unmarshalFull(b []byte) {
var enumIdx, messageIdx, extensionIdx, serviceIdx int
var rawOptions []byte
var optionImports []string
fd.L2 = new(FileL2)
for len(b) > 0 {
num, typ, n := protowire.ConsumeTag(b)
@@ -157,6 +158,8 @@ func (fd *File) unmarshalFull(b []byte) {
imp = PlaceholderFile(path)
}
fd.L2.Imports = append(fd.L2.Imports, protoreflect.FileImport{FileDescriptor: imp})
case genid.FileDescriptorProto_OptionDependency_field_number:
optionImports = append(optionImports, sb.MakeString(v))
case genid.FileDescriptorProto_EnumType_field_number:
fd.L1.Enums.List[enumIdx].unmarshalFull(v, sb)
enumIdx++
@@ -178,6 +181,23 @@ func (fd *File) unmarshalFull(b []byte) {
}
}
fd.L2.Options = fd.builder.optionsUnmarshaler(&descopts.File, rawOptions)
if len(optionImports) > 0 {
var imps FileImports
var once sync.Once
fd.L2.OptionImports = func() protoreflect.FileImports {
once.Do(func() {
imps = make(FileImports, len(optionImports))
for i, path := range optionImports {
imp, _ := fd.builder.FileRegistry.FindFileByPath(path)
if imp == nil {
imp = PlaceholderFile(path)
}
imps[i] = protoreflect.FileImport{FileDescriptor: imp}
}
})
return &imps
}
}
}
func (ed *Enum) unmarshalFull(b []byte, sb *strs.Builder) {

View File

@@ -52,7 +52,7 @@ import (
const (
Major = 1
Minor = 36
Patch = 9
Patch = 10
PreRelease = ""
)

View File

@@ -152,6 +152,28 @@ func (o FileOptions) New(fd *descriptorpb.FileDescriptorProto, r Resolver) (prot
imp := &f.L2.Imports[i]
imps.importPublic(imp.Imports())
}
if len(fd.GetOptionDependency()) > 0 {
optionImports := make(filedesc.FileImports, len(fd.GetOptionDependency()))
for i, path := range fd.GetOptionDependency() {
imp := &optionImports[i]
f, err := r.FindFileByPath(path)
if err == protoregistry.NotFound {
// We always allow option imports to be unresolvable.
f = filedesc.PlaceholderFile(path)
} else if err != nil {
return nil, errors.New("could not resolve import %q: %v", path, err)
}
imp.FileDescriptor = f
if imps[imp.Path()] {
return nil, errors.New("already imported %q", path)
}
imps[imp.Path()] = true
}
f.L2.OptionImports = func() protoreflect.FileImports {
return &optionImports
}
}
// Handle source locations.
f.L2.Locations.File = f

View File

@@ -29,6 +29,7 @@ func (r descsByName) initEnumDeclarations(eds []*descriptorpb.EnumDescriptorProt
e.L2.Options = func() protoreflect.ProtoMessage { return opts }
}
e.L1.EditionFeatures = mergeEditionFeatures(parent, ed.GetOptions().GetFeatures())
e.L1.Visibility = int32(ed.GetVisibility())
for _, s := range ed.GetReservedName() {
e.L2.ReservedNames.List = append(e.L2.ReservedNames.List, protoreflect.Name(s))
}
@@ -70,6 +71,7 @@ func (r descsByName) initMessagesDeclarations(mds []*descriptorpb.DescriptorProt
return nil, err
}
m.L1.EditionFeatures = mergeEditionFeatures(parent, md.GetOptions().GetFeatures())
m.L1.Visibility = int32(md.GetVisibility())
if opts := md.GetOptions(); opts != nil {
opts = proto.Clone(opts).(*descriptorpb.MessageOptions)
m.L2.Options = func() protoreflect.ProtoMessage { return opts }

View File

@@ -70,16 +70,27 @@ func ToFileDescriptorProto(file protoreflect.FileDescriptor) *descriptorpb.FileD
if syntax := file.Syntax(); syntax != protoreflect.Proto2 && syntax.IsValid() {
p.Syntax = proto.String(file.Syntax().String())
}
desc := file
if fileImportDesc, ok := file.(protoreflect.FileImport); ok {
desc = fileImportDesc.FileDescriptor
}
if file.Syntax() == protoreflect.Editions {
desc := file
if fileImportDesc, ok := file.(protoreflect.FileImport); ok {
desc = fileImportDesc.FileDescriptor
}
if editionsInterface, ok := desc.(interface{ Edition() int32 }); ok {
p.Edition = descriptorpb.Edition(editionsInterface.Edition()).Enum()
}
}
type hasOptionImports interface {
OptionImports() protoreflect.FileImports
}
if opts, ok := desc.(hasOptionImports); ok {
if optionImports := opts.OptionImports(); optionImports.Len() > 0 {
optionDeps := make([]string, optionImports.Len())
for i := range optionImports.Len() {
optionDeps[i] = optionImports.Get(i).Path()
}
p.OptionDependency = optionDeps
}
}
return p
}
@@ -123,6 +134,14 @@ func ToDescriptorProto(message protoreflect.MessageDescriptor) *descriptorpb.Des
for i, names := 0, message.ReservedNames(); i < names.Len(); i++ {
p.ReservedName = append(p.ReservedName, string(names.Get(i)))
}
type hasVisibility interface {
Visibility() int32
}
if vis, ok := message.(hasVisibility); ok {
if visibility := vis.Visibility(); visibility > 0 {
p.Visibility = descriptorpb.SymbolVisibility(visibility).Enum()
}
}
return p
}
@@ -216,6 +235,14 @@ func ToEnumDescriptorProto(enum protoreflect.EnumDescriptor) *descriptorpb.EnumD
for i, names := 0, enum.ReservedNames(); i < names.Len(); i++ {
p.ReservedName = append(p.ReservedName, string(names.Get(i)))
}
type hasVisibility interface {
Visibility() int32
}
if vis, ok := enum.(hasVisibility); ok {
if visibility := vis.Visibility(); visibility > 0 {
p.Visibility = descriptorpb.SymbolVisibility(visibility).Enum()
}
}
return p
}

7
vendor/modules.txt vendored
View File

@@ -1614,8 +1614,8 @@ google.golang.org/genproto/googleapis/api/monitoredres
google.golang.org/genproto/googleapis/rpc/code
google.golang.org/genproto/googleapis/rpc/errdetails
google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.75.0
## explicit; go 1.23.0
# google.golang.org/grpc v1.76.0
## explicit; go 1.24.0
google.golang.org/grpc
google.golang.org/grpc/attributes
google.golang.org/grpc/backoff
@@ -1682,6 +1682,7 @@ google.golang.org/grpc/internal/syscall
google.golang.org/grpc/internal/transport
google.golang.org/grpc/internal/transport/networktype
google.golang.org/grpc/internal/xds
google.golang.org/grpc/internal/xds/clients
google.golang.org/grpc/keepalive
google.golang.org/grpc/mem
google.golang.org/grpc/metadata
@@ -1693,7 +1694,7 @@ google.golang.org/grpc/serviceconfig
google.golang.org/grpc/stats
google.golang.org/grpc/status
google.golang.org/grpc/tap
# google.golang.org/protobuf v1.36.9
# google.golang.org/protobuf v1.36.10
## explicit; go 1.23
google.golang.org/protobuf/cmd/protoc-gen-go/internal_gengo
google.golang.org/protobuf/compiler/protogen