From 4acda082d67385b251a4299f4d043e4aa9bd2779 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 29 Apr 2026 23:02:59 +0300 Subject: [PATCH 1/4] node: switch to net/http server Let's see how it goes. Signed-off-by: Roman Khimov --- cmd/neofs-node/config.go | 9 ++-- cmd/neofs-node/grpc.go | 96 ++++++++++++++++++++-------------------- cmd/neofs-node/netmap.go | 4 +- 3 files changed, 54 insertions(+), 55 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index fc815bace0..ec7ac16d50 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -7,6 +7,7 @@ import ( "io/fs" "math/big" "net" + "net/http" "os" "os/signal" "path/filepath" @@ -233,8 +234,9 @@ func (c *cfg) CurrentEpoch() uint64 { return c.networkState.CurrentEpoch() } type cfgGRPC struct { mu sync.Mutex + gs *grpc.Server listeners []net.Listener - servers []*grpc.Server + servers []*http.Server // serviceRegistrators stores functions that register gRPC service // implementations into a gRPC server. @@ -247,10 +249,7 @@ func (g *cfgGRPC) registerService(f func(*grpc.Server)) { g.mu.Lock() defer g.mu.Unlock() - g.serviceRegistrators = append(g.serviceRegistrators, f) - for _, srv := range g.servers { - f(srv) - } + f(g.gs) } type cfgMeta struct { diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index 20008e933c..31e08e2e63 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -9,6 +9,7 @@ import ( "io" "math" "net" + "net/http" "os" "time" @@ -19,7 +20,6 @@ import ( "go.uber.org/zap" "golang.org/x/net/netutil" "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" ) @@ -106,11 +106,11 @@ func initGRPC(c *cfg) { // at the time of shutdown (including those created by reload). c.onShutdown(func() { c.cfgGRPC.mu.Lock() - srvs := make([]*grpc.Server, len(c.cfgGRPC.servers)) + srvs := make([]*http.Server, len(c.cfgGRPC.servers)) copy(srvs, c.cfgGRPC.servers) c.cfgGRPC.mu.Unlock() for _, srv := range srvs { - stopGRPC("NeoFS Public API", srv, c.log) + srv.Close() } }) } @@ -157,18 +157,6 @@ func buildGRPCServers(c *cfg, maxRecvMsgSizeOpt grpc.ServerOption) error { if len(c.appCfg.GRPC) == 0 { return errors.New("could not listen to any gRPC endpoints") } - for _, sc := range c.appCfg.GRPC { - srv, lis, err := buildSingleGRPCServer(c, sc, maxRecvMsgSizeOpt) - if err != nil { - return err - } - c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis) - c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv) - } - return nil -} - -func buildSingleGRPCServer(c *cfg, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.ServerOption) (*grpc.Server, net.Listener, error) { serverOpts := []grpc.ServerOption{ grpc.MaxSendMsgSize(maxMsgSize), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ @@ -185,30 +173,43 @@ func buildSingleGRPCServer(c *cfg, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.Se serverOpts = append(serverOpts, maxRecvMsgSizeOpt) } + c.cfgGRPC.gs = grpc.NewServer(serverOpts...) + + for _, sc := range c.appCfg.GRPC { + srv, lis, err := buildSingleGRPCServer(c, c.cfgGRPC.gs, sc, maxRecvMsgSizeOpt) + if err != nil { + return err + } + c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis) + c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv) + } + return nil +} + +func buildSingleGRPCServer(c *cfg, h http.Handler, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.ServerOption) (*http.Server, net.Listener, error) { + var http2protos http.Protocols + + http2protos.SetHTTP2(true) + http2protos.SetUnencryptedHTTP2(true) + + var srv = &http.Server{ + Handler: h, + Protocols: &http2protos, + } + tlsCfg := sc.TLS if tlsCfg.Key != "" { certFile, keyFile := tlsCfg.Certificate, tlsCfg.Key - if _, err := tls.LoadX509KeyPair(certFile, keyFile); err != nil { + crt, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { c.log.Error("could not read certificate from file", zap.Error(err)) return nil, nil, err } - - // read certificate from disk on each handshake to pick up renewals automatically. - creds := credentials.NewTLS(&tls.Config{ - GetConfigForClient: func(*tls.ClientHelloInfo) (*tls.Config, error) { - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, fmt.Errorf("reload TLS certificate: %w", err) - } - return &tls.Config{ - Certificates: []tls.Certificate{cert}, - }, nil - }, - }) - - serverOpts = append(serverOpts, grpc.Creds(creds)) + srv.TLSConfig = &tls.Config{ + Certificates: []tls.Certificate{crt}, + } } lis, err := net.Listen("tcp", sc.Endpoint) @@ -221,7 +222,7 @@ func buildSingleGRPCServer(c *cfg, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.Se lis = netutil.LimitListener(lis, connLimit) } - return grpc.NewServer(serverOpts...), lis, nil + return srv, lis, nil } // reloadGRPC performs a fine-grained reload: only gRPC servers whose @@ -239,7 +240,7 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { defer c.cfgGRPC.mu.Unlock() type serverEntry struct { - srv *grpc.Server + srv *http.Server lis net.Listener snap grpcServerSnapshot } @@ -254,11 +255,11 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { } } - newServers := make([]*grpc.Server, 0, len(newCfg)) + newServers := make([]*http.Server, 0, len(newCfg)) newListeners := make([]net.Listener, 0, len(newCfg)) // freshServers/freshListeners hold only newly created servers that need // service registration and must start serving. - var freshServers []*grpc.Server + var freshServers []*http.Server var freshListeners []net.Listener for _, newSnap := range newCfg { @@ -269,10 +270,10 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { newListeners = append(newListeners, old.lis) continue } - stopGRPC("NeoFS Public API", old.srv, c.log) + old.srv.Close() } - srv, lis, err := buildSingleGRPCServer(c, newSnap.GRPC, maxRecvMsgSizeOpt) + srv, lis, err := buildSingleGRPCServer(c, c.cfgGRPC.gs, newSnap.GRPC, maxRecvMsgSizeOpt) if err != nil { c.log.Error("failed to start gRPC server", zap.String("endpoint", newSnap.Endpoint), zap.Error(err)) @@ -286,7 +287,7 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { // stop servers that were removed from the config entirely for _, entry := range oldByEndpoint { - stopGRPC("NeoFS Public API", entry.srv, c.log) + entry.srv.Close() } if len(newServers) == 0 { @@ -296,20 +297,15 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { c.cfgGRPC.servers = newServers c.cfgGRPC.listeners = newListeners - for _, reg := range c.cfgGRPC.serviceRegistrators { - for _, srv := range freshServers { - reg(srv) - } - } - serveGRPCList(c, freshServers, freshListeners) + serveGRPCList(c, c.cfgGRPC.gs, freshServers, freshListeners) return nil } func serveGRPC(c *cfg) { - serveGRPCList(c, c.cfgGRPC.servers, c.cfgGRPC.listeners) + serveGRPCList(c, c.cfgGRPC.gs, c.cfgGRPC.servers, c.cfgGRPC.listeners) } -func serveGRPCList(c *cfg, servers []*grpc.Server, listeners []net.Listener) { +func serveGRPCList(c *cfg, gs *grpc.Server, servers []*http.Server, listeners []net.Listener) { for i := range servers { srv := servers[i] lis := listeners[i] @@ -325,7 +321,13 @@ func serveGRPCList(c *cfg, servers []*grpc.Server, listeners []net.Listener) { zap.Stringer("endpoint", lis.Addr()), ) - if err := srv.Serve(lis); err != nil { + var err error + if srv.TLSConfig == nil { + err = srv.Serve(lis) + } else { + err = srv.ServeTLS(lis, "", "") + } + if err != nil { c.log.Error("gRPC server failed", zap.Stringer("endpoint", lis.Addr()), zap.Error(err)) } }) diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index dd647e3b4c..b5b9d3db63 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -154,9 +154,7 @@ func initNetmapService(c *cfg) { server := netmapService.New(&c.key.PrivateKey, c) - for _, srv := range c.cfgGRPC.servers { - protonetmap.RegisterNetmapServiceServer(srv, server) - } + protonetmap.RegisterNetmapServiceServer(c.cfgGRPC.gs, server) addNewEpochNotificationHandler(c, func(ev event.Event) { c.cfgNetmap.state.setCurrentEpoch(ev.(netmapEvent.NewEpoch).EpochNumber()) From e60fc42808862b064544bce04b67fbc49da49e5d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 13 May 2026 16:52:40 +0300 Subject: [PATCH 2/4] node: mux HTTP requests Signed-off-by: Roman Khimov --- cmd/neofs-node/config.go | 1 + cmd/neofs-node/grpc.go | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index ec7ac16d50..b1cc603c7a 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -235,6 +235,7 @@ type cfgGRPC struct { mu sync.Mutex gs *grpc.Server + mux *http.ServeMux listeners []net.Listener servers []*http.Server diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index 31e08e2e63..7072664a43 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "os" + "strings" "time" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" @@ -174,9 +175,10 @@ func buildGRPCServers(c *cfg, maxRecvMsgSizeOpt grpc.ServerOption) error { } c.cfgGRPC.gs = grpc.NewServer(serverOpts...) + c.cfgGRPC.mux = http.NewServeMux() for _, sc := range c.appCfg.GRPC { - srv, lis, err := buildSingleGRPCServer(c, c.cfgGRPC.gs, sc, maxRecvMsgSizeOpt) + srv, lis, err := buildSingleGRPCServer(c, muxHandler{c.cfgGRPC.gs, c.cfgGRPC.mux}, sc, maxRecvMsgSizeOpt) if err != nil { return err } @@ -186,6 +188,19 @@ func buildGRPCServers(c *cfg, maxRecvMsgSizeOpt grpc.ServerOption) error { return nil } +type muxHandler struct { + gs *grpc.Server + mux *http.ServeMux +} + +func (h muxHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("Content-Type"), "application/grpc") { + h.gs.ServeHTTP(w, r) + } else { + h.mux.ServeHTTP(w, r) + } +} + func buildSingleGRPCServer(c *cfg, h http.Handler, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.ServerOption) (*http.Server, net.Listener, error) { var http2protos http.Protocols @@ -273,7 +288,7 @@ func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { old.srv.Close() } - srv, lis, err := buildSingleGRPCServer(c, c.cfgGRPC.gs, newSnap.GRPC, maxRecvMsgSizeOpt) + srv, lis, err := buildSingleGRPCServer(c, muxHandler{c.cfgGRPC.gs, c.cfgGRPC.mux}, newSnap.GRPC, maxRecvMsgSizeOpt) if err != nil { c.log.Error("failed to start gRPC server", zap.String("endpoint", newSnap.Endpoint), zap.Error(err)) From 5f5fc833b8580697b4f91141518ce3d822afdcd9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 18 May 2026 10:24:23 +0300 Subject: [PATCH 3/4] object: serve an additional HTTP-based GET API Accept protobufs, reply with protobufs. TODO: * deal with TTL/new requests (create new ones signed by container node) * create new requests for EC parts * create and handle them client-side Signed-off-by: Roman Khimov --- cmd/neofs-node/object.go | 2 + pkg/core/client/client.go | 3 + pkg/network/cache/clients.go | 37 ++++ pkg/services/object/get.go | 14 ++ pkg/services/object/put/service_test.go | 5 + pkg/services/object/server.go | 216 ++++++++++++++++++++++++ pkg/services/object/server_test.go | 9 + 7 files changed, 286 insertions(+) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 6f5a0e00f6..73dbf5145e 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -352,6 +352,8 @@ func initObjectService(c *cfg) { server := objectService.New(objSvc, mNumber, c.cfgObject.pool.search, fsChain, storage, c.metaService, c.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor) os.server = server + c.cfgGRPC.mux.HandleFunc("/get", server.GetHTTP) + svcDesc := protoobject.ObjectService_ServiceDesc svcDesc.Methods = slices.Clone(protoobject.ObjectService_ServiceDesc.Methods) diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go index ee66a20a3c..1edb7ea050 100644 --- a/pkg/core/client/client.go +++ b/pkg/core/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" "io" + "net/http" "github.com/nspcc-dev/neofs-sdk-go/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -38,4 +39,6 @@ type MultiAddressClient interface { // ForAnyGRPCConn executes op over gRPC connections to given multi-address // endpoint-by-endpoint until success. ForAnyGRPCConn(context.Context, func(context.Context, *grpc.ClientConn) error) error + + ForAnyHTTPClient(context.Context, func(context.Context, *http.Client, string) error) error } diff --git a/pkg/network/cache/clients.go b/pkg/network/cache/clients.go index 9864045180..230c7cf944 100644 --- a/pkg/network/cache/clients.go +++ b/pkg/network/cache/clients.go @@ -8,6 +8,7 @@ import ( "io" "iter" "maps" + "net/http" "slices" "sync" "time" @@ -177,6 +178,7 @@ func (x *Clients) syncWithNetmapSN(ctx context.Context, sn netmap.NodeInfo) erro func (x *Clients) initConnections(ctx context.Context, pub []byte, addrs iter.Seq[string]) (*connections, error) { m := make(map[string]*client.Client) + mh := make(map[string]*http.Client) l := x.log.With(zap.String("public key", hex.EncodeToString(pub))) for s := range addrs { l.Info("initializing connection to the SN...", zap.String("address", s)) @@ -190,12 +192,14 @@ func (x *Clients) initConnections(ctx context.Context, pub []byte, addrs iter.Se } l.Info("connection to the SN successfully initialized", zap.String("address", s)) m[s] = c + mh[s] = new(http.Client) } var hexKey = hex.EncodeToString(pub) return &connections{ log: x.log.With(zap.String("SN public key", hexKey)), nodeID: hexKey, m: m, + mh: mh, }, nil } @@ -260,6 +264,7 @@ type connections struct { mtx sync.RWMutex m map[string]*client.Client // keys are multiaddrs + mh map[string]*http.Client // keys are multiaddrs } func (x *connections) closeAll() { @@ -278,6 +283,12 @@ func (x *connections) all(f func(ma string, c *client.Client) bool) { x.mtx.RUnlock() } +func (x *connections) allHTTP(yield func(ma string, c *http.Client) bool) { + x.mtx.RLock() + maps.All(x.mh)(yield) + x.mtx.RUnlock() +} + func (x *connections) forAny(ctx context.Context, f func(context.Context, *client.Client) error) error { var firstErr error for ma, c := range x.all { @@ -295,12 +306,38 @@ func (x *connections) forAny(ctx context.Context, f func(context.Context, *clien return newMultiEndpointError(x.nodeID, firstErr) } +func (x *connections) forAnyHTTP(ctx context.Context, f func(context.Context, *http.Client, string) error) error { + var firstErr error + for ma, c := range x.allHTTP { + // FIXME: pending removal in #3982. + var a network.Address + if err := a.FromString(ma); err != nil { + return fmt.Errorf("parse network address %q: %w", ma, err) + } + err := f(ctx, c, a.URIAddr()) + if err == nil { + return nil + } + if !isTempError(err) { + return newEndpointError(ma, err) + } + if firstErr == nil { + firstErr = newEndpointError(ma, err) + } + } + return newMultiEndpointError(x.nodeID, firstErr) +} + func (x *connections) ForAnyGRPCConn(ctx context.Context, f func(context.Context, *grpc.ClientConn) error) error { return x.forAny(ctx, func(ctx context.Context, c *client.Client) error { return f(ctx, c.Conn()) }) } +func (x *connections) ForAnyHTTPClient(ctx context.Context, f func(context.Context, *http.Client, string) error) error { + return x.forAnyHTTP(ctx, f) +} + func (x *connections) ObjectPutInit(ctx context.Context, hdr object.Object, signer user.Signer, opts client.PrmObjectPutInit) (client.ObjectWriter, error) { var res client.ObjectWriter return res, x.forAny(ctx, func(ctx context.Context, c *client.Client) error { diff --git a/pkg/services/object/get.go b/pkg/services/object/get.go index 85f3169d4d..1aeb535b31 100644 --- a/pkg/services/object/get.go +++ b/pkg/services/object/get.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "net/http" iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-node/internal/protobuf/protoscan" @@ -25,6 +26,19 @@ type getStreamProgress struct { readPayload int } +func continueHTTP(ctx context.Context, w io.Writer, req *protoobject.GetRequest, buf []byte, conn *http.Client, pref string) error { + httpReq, err := http.NewRequestWithContext(ctx, "POST", pref+"/get", bytes.NewBuffer(buf)) + if err != nil { + return err + } + r, err := conn.Do(httpReq) + if err != nil { + return err + } + _, err = io.Copy(w, r.Body) + return err +} + // returns: // - nil on completed object transmission // - [object.SplitInfoError]/nil on split info response and unset/set raw flag in request diff --git a/pkg/services/object/put/service_test.go b/pkg/services/object/put/service_test.go index 6aeb7c06ec..ff074f350d 100644 --- a/pkg/services/object/put/service_test.go +++ b/pkg/services/object/put/service_test.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "math" + "net/http" "slices" "strconv" "sync" @@ -1129,6 +1130,10 @@ func (m *serviceClient) ForAnyGRPCConn(context.Context, func(context.Context, *g panic("unimplemented") } +func (m *serviceClient) ForAnyHTTPClient(context.Context, func(context.Context, *http.Client, string) error) error { + panic("unimplemented") +} + type testPayloadStream Streamer func (x *testPayloadStream) Write(p []byte) (int, error) { diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 619da94f57..4f7de9183a 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -11,6 +11,7 @@ import ( "fmt" "hash" "io" + "net/http" "slices" "strings" "sync" @@ -54,6 +55,7 @@ import ( grpccodes "google.golang.org/grpc/codes" "google.golang.org/grpc/mem" grpcstatus "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" ) @@ -1031,6 +1033,7 @@ func (s *Server) sendStatusGetResponse(stream protoobject.ObjectService_GetServe type getStream struct { base protoobject.ObjectService_GetServer + w io.Writer srv *Server reqInfo aclsvc.RequestInfo @@ -1075,6 +1078,135 @@ func (s *getStream) WriteChunk(chunk []byte) error { return nil } +func (s *Server) GetHTTP(w http.ResponseWriter, r *http.Request) { + var ( + err error + recheckEACL bool + t = time.Now() + ) + defer func() { s.pushOpExecResult(stat.MethodObjectGet, err, t) }() + + var req = new(protoobject.GetRequest) + var buf = make([]byte, 16*1024) + + n, err := r.Body.Read(buf) + if err != nil || n == len(buf) { + w.WriteHeader(500) + return + } + buf = buf[:n] + err = proto.Unmarshal(buf, req) + if err != nil { + w.WriteHeader(400) + return + } + + needSignResp := needSignGetResponse(req) + + if err = icrypto.VerifyRequestSignatures(req); err != nil { + w.WriteHeader(403) + return + } + + if s.fsChain.LocalNodeUnderMaintenance() { + w.WriteHeader(500) + return + } + + reqInfo, err := s.reqInfoProc.GetRequestToInfo(req) + if err != nil { + if !errors.Is(err, apistatus.Error) { + var bad = new(apistatus.BadRequest) + bad.SetMessage(err.Error()) + err = bad // defer + } + w.WriteHeader(500) + return + } + if !s.aclChecker.CheckBasicACL(reqInfo) { + err = basicACLErr(reqInfo) // needed for defer + w.WriteHeader(403) + return + } + err = s.aclChecker.CheckEACL(req, reqInfo) + if err != nil { + if !errors.Is(err, aclsvc.ErrNotMatched) { + err = eACLErr(reqInfo, err) // needed for defer + w.WriteHeader(403) + return + } + recheckEACL = true + } + + p, err := convertGetPrmHTTP(s.signer, reqInfo.Container, req, buf, &getStream{ + w: w, + srv: s, + reqInfo: reqInfo, + recheckEACL: recheckEACL, + signResponse: needSignResp, + }) + if err != nil { + if !errors.Is(err, apistatus.Error) { + var bad = new(apistatus.BadRequest) + bad.SetMessage(err.Error()) + err = bad // defer + } + w.WriteHeader(500) + return + } + + // TODO: consider optimization + // We could acquire ~256K buffer (like for chunks) if storage would try to read it full. + // Then small objects would fit into a single buffer, and for large ones it'd be possible to + // encode the first chunk response using the heading buffer. + hdrRespBuf, hdrBuf := getBufferForHeadResponse() + defer hdrRespBuf.Free() + + hdrLen := -1 + var stream io.ReadCloser + defer func() { + if stream != nil { + stream.Close() + } + }() + + p.WithBuffer(hdrBuf, func(ln int, s io.ReadCloser) { hdrLen, stream = ln, s }) + + err = s.handlers.Get(r.Context(), p) + if err != nil { + w.WriteHeader(500) + return + } + + if hdrLen < 0 { + w.WriteHeader(500) + return + } + + idf, sigf, hdrf, err := iobject.GetNonPayloadFieldBounds(hdrBuf[:hdrLen]) + if err != nil { + w.WriteHeader(500) + return + } + + if recheckEACL { // previous check didn't match, but we have a header now. + err = s.aclChecker.CheckEACL(hdrBuf[hdrf.ValueFrom:hdrf.To], reqInfo) + if err != nil && !errors.Is(err, aclsvc.ErrNotMatched) { // Not matched -> follow basic ACL. + err = eACLErr(reqInfo, err) // defer + w.WriteHeader(403) + return + } + } + + pldFldOff := max(idf.To, sigf.To, hdrf.To) + + err = s.copyHTTPStream(w, hdrBuf, hdrLen, stream, pldFldOff) // defer + if err != nil { + w.WriteHeader(500) + return + } +} + func (s *Server) Get(req *protoobject.GetRequest, gStream protoobject.ObjectService_GetServer) error { var ( err error @@ -1344,6 +1476,90 @@ func convertGetPrm(signer ecdsa.PrivateKey, cnr container.Container, req *protoo return p, nil } +type httpWriter struct { + io.Writer +} + +func (w httpWriter) WriteHeader(hdr *object.Object) error { + var b [32]byte + + pref := protowire.AppendTag(b[:0], 3, protowire.BytesType) + hdrBin := hdr.CutPayload().Marshal() + pref = protowire.AppendVarint(pref, uint64(len(hdrBin))) + + _, err := w.Write(pref) + if err != nil { + return err + } + _, err = w.Write(hdrBin) + if err != nil { + return err + } + + pref = protowire.AppendTag(b[:0], 4, protowire.BytesType) + pref = protowire.AppendVarint(pref, hdr.PayloadSize()) + + _, err = w.Write(pref) + if err != nil { + return err + } + return nil +} + +func (w httpWriter) WriteChunk(data []byte) error { + _, err := w.Write(data) + return err +} + +// converts original request into parameters accepted by the internal handler. +// Note that the stream is untouched within this call, errors are not reported +// into it. +func convertGetPrmHTTP(signer ecdsa.PrivateKey, cnr container.Container, req *protoobject.GetRequest, buf []byte, stream *getStream) (getsvc.Prm, error) { + if req.MetaHeader != nil { + req.MetaHeader.Ttl -= 1 + } + buf, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], req) + if err != nil { + return getsvc.Prm{}, err + } + body := req.GetBody() + ma := body.GetAddress() + if ma == nil { // includes nil body + return getsvc.Prm{}, errors.New("missing object address") + } + + var addr oid.Address + if err := addr.FromProtoMessage(ma); err != nil { + return getsvc.Prm{}, fmt.Errorf("invalid object address: %w", err) + } + + cp, err := objutil.CommonPrmFromRequest(req) + if err != nil { + return getsvc.Prm{}, err + } + + var p getsvc.Prm + p.SetCommonParameters(cp) + p.WithAddress(addr) + p.WithContainer(cnr) + p.WithRawFlag(body.Raw) + p.SetObjectWriter(httpWriter{stream.w}) + if cp.LocalOnly() { + return p, nil + } + + p.SetRequestForwarder(func(ctx context.Context, c client.MultiAddressClient) error { + return c.ForAnyHTTPClient(ctx, func(ctx context.Context, conn *http.Client, pref string) error { + return continueHTTP(ctx, stream.w, req, buf, conn, pref) // TODO: log error + }) + }) + return p, nil +} + +func (s *Server) copyHTTPStream(w io.Writer, hdrBuf []byte, hdrLen int, stream io.Reader, pldFldOff int) error { + return nil +} + type getProxyContext struct { req *protoobject.GetRequest reqOID oid.ID diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index 0ef9805384..67150687fd 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "io" + "net/http" "path/filepath" "sync/atomic" "testing" @@ -833,6 +834,10 @@ func (emptyRemoteNode) ForAnyGRPCConn(context.Context, func(context.Context, *gr return errors.New("any transport error") } +func (emptyRemoteNode) ForAnyHTTPClient(context.Context, func(context.Context, *http.Client, string) error) error { + return errors.New("any transport error") +} + type mockGRPCConn struct { unimplementedConn conn *grpc.ClientConn @@ -841,3 +846,7 @@ type mockGRPCConn struct { func (x *mockGRPCConn) ForAnyGRPCConn(ctx context.Context, f func(context.Context, *grpc.ClientConn) error) error { return f(ctx, x.conn) } + +func (x *mockGRPCConn) ForAnyHTTPClient(context.Context, func(context.Context, *http.Client, string) error) error { + return errors.New("unimplemented") +} From be143196696070965ae97155eb0677b47dc9acd0 Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Thu, 21 May 2026 23:25:36 +0300 Subject: [PATCH 4/4] object: complete HTTP GET response streaming and forwarder - implement copyHTTPStream. - GetHTTP: read body with io.ReadAll, emit apistatus.Status protobuf in non-200 responses, map errors to 4xx/5xx. - continueHTTP: normalize peer URI scheme, set Content-Type, surface non-200 upstream status. - network/cache: h2c http.Client for inter-SN forwarder. Signed-off-by: Andrey Butusov --- pkg/network/cache/clients.go | 16 +++- pkg/services/object/get.go | 16 ++++ pkg/services/object/server.go | 137 +++++++++++++++++++++++++++------- 3 files changed, 143 insertions(+), 26 deletions(-) diff --git a/pkg/network/cache/clients.go b/pkg/network/cache/clients.go index 230c7cf944..86a5054209 100644 --- a/pkg/network/cache/clients.go +++ b/pkg/network/cache/clients.go @@ -3,11 +3,13 @@ package cache import ( "bytes" "context" + "crypto/tls" "encoding/hex" "fmt" "io" "iter" "maps" + "net" "net/http" "slices" "sync" @@ -25,6 +27,7 @@ import ( apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/zap" + "golang.org/x/net/http2" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" @@ -176,6 +179,17 @@ func (x *Clients) syncWithNetmapSN(ctx context.Context, sn netmap.NodeInfo) erro return nil } +func newHTTPClient() *http.Client { + return &http.Client{ + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, network, addr) + }, + }, + } +} + func (x *Clients) initConnections(ctx context.Context, pub []byte, addrs iter.Seq[string]) (*connections, error) { m := make(map[string]*client.Client) mh := make(map[string]*http.Client) @@ -192,7 +206,7 @@ func (x *Clients) initConnections(ctx context.Context, pub []byte, addrs iter.Se } l.Info("connection to the SN successfully initialized", zap.String("address", s)) m[s] = c - mh[s] = new(http.Client) + mh[s] = newHTTPClient() } var hexKey = hex.EncodeToString(pub) return &connections{ diff --git a/pkg/services/object/get.go b/pkg/services/object/get.go index 1aeb535b31..953011a4ff 100644 --- a/pkg/services/object/get.go +++ b/pkg/services/object/get.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "strings" iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-node/internal/protobuf/protoscan" @@ -27,14 +28,29 @@ type getStreamProgress struct { } func continueHTTP(ctx context.Context, w io.Writer, req *protoobject.GetRequest, buf []byte, conn *http.Client, pref string) error { + switch { + case strings.HasPrefix(pref, "http://"), strings.HasPrefix(pref, "https://"): + case strings.HasPrefix(pref, "grpcs://"): + pref = "https://" + strings.TrimPrefix(pref, "grpcs://") + case strings.HasPrefix(pref, "grpc://"): + pref = "http://" + strings.TrimPrefix(pref, "grpc://") + default: + pref = "http://" + pref + } httpReq, err := http.NewRequestWithContext(ctx, "POST", pref+"/get", bytes.NewBuffer(buf)) if err != nil { return err } + httpReq.Header.Set("Content-Type", "application/protobuf") + httpReq.ContentLength = int64(len(buf)) r, err := conn.Do(httpReq) if err != nil { return err } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + return fmt.Errorf("forwarded peer returned status %s", r.Status) + } _, err = io.Copy(w, r.Body) return err } diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 4f7de9183a..6ed720d706 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -13,6 +13,7 @@ import ( "io" "net/http" "slices" + "strconv" "strings" "sync" "time" @@ -1078,6 +1079,21 @@ func (s *getStream) WriteChunk(chunk []byte) error { return nil } +func writeHTTPGetError(w http.ResponseWriter, httpCode int, err error) { + w.Header().Set("Content-Type", "application/protobuf") + if err != nil { + if st := util.ToStatus(err); st != nil { + if b, mErr := proto.Marshal(st); mErr == nil { + w.Header().Set("Content-Length", strconv.Itoa(len(b))) + w.WriteHeader(httpCode) + _, _ = w.Write(b) + return + } + } + } + w.WriteHeader(httpCode) +} + func (s *Server) GetHTTP(w http.ResponseWriter, r *http.Request) { var ( err error @@ -1087,29 +1103,37 @@ func (s *Server) GetHTTP(w http.ResponseWriter, r *http.Request) { defer func() { s.pushOpExecResult(stat.MethodObjectGet, err, t) }() var req = new(protoobject.GetRequest) - var buf = make([]byte, 16*1024) - n, err := r.Body.Read(buf) - if err != nil || n == len(buf) { - w.WriteHeader(500) + buf, err := io.ReadAll(io.LimitReader(r.Body, 64*1024)) + if err != nil { + writeHTTPGetError(w, 500, err) return } - buf = buf[:n] - err = proto.Unmarshal(buf, req) - if err != nil { - w.WriteHeader(400) + if len(buf) == 0 { + bad := new(apistatus.BadRequest) + bad.SetMessage("empty request body") + err = bad + writeHTTPGetError(w, 400, err) + return + } + if err = proto.Unmarshal(buf, req); err != nil { + bad := new(apistatus.BadRequest) + bad.SetMessage("malformed request: " + err.Error()) + err = bad + writeHTTPGetError(w, 400, err) return } needSignResp := needSignGetResponse(req) if err = icrypto.VerifyRequestSignatures(req); err != nil { - w.WriteHeader(403) + writeHTTPGetError(w, 403, err) return } if s.fsChain.LocalNodeUnderMaintenance() { - w.WriteHeader(500) + err = new(apistatus.NodeUnderMaintenance) + writeHTTPGetError(w, 503, err) return } @@ -1120,19 +1144,19 @@ func (s *Server) GetHTTP(w http.ResponseWriter, r *http.Request) { bad.SetMessage(err.Error()) err = bad // defer } - w.WriteHeader(500) + writeHTTPGetError(w, 500, err) return } if !s.aclChecker.CheckBasicACL(reqInfo) { err = basicACLErr(reqInfo) // needed for defer - w.WriteHeader(403) + writeHTTPGetError(w, 403, err) return } err = s.aclChecker.CheckEACL(req, reqInfo) if err != nil { if !errors.Is(err, aclsvc.ErrNotMatched) { err = eACLErr(reqInfo, err) // needed for defer - w.WriteHeader(403) + writeHTTPGetError(w, 403, err) return } recheckEACL = true @@ -1151,7 +1175,7 @@ func (s *Server) GetHTTP(w http.ResponseWriter, r *http.Request) { bad.SetMessage(err.Error()) err = bad // defer } - w.WriteHeader(500) + writeHTTPGetError(w, 500, err) return } @@ -1174,18 +1198,25 @@ func (s *Server) GetHTTP(w http.ResponseWriter, r *http.Request) { err = s.handlers.Get(r.Context(), p) if err != nil { - w.WriteHeader(500) + httpCode := 500 + if errors.Is(err, apistatus.ErrObjectNotFound) { + httpCode = 404 + } else if errors.Is(err, apistatus.ErrObjectAccessDenied) { + httpCode = 403 + } + writeHTTPGetError(w, httpCode, err) return } if hdrLen < 0 { - w.WriteHeader(500) + err = errors.New("internal: empty get response") + writeHTTPGetError(w, 500, err) return } idf, sigf, hdrf, err := iobject.GetNonPayloadFieldBounds(hdrBuf[:hdrLen]) if err != nil { - w.WriteHeader(500) + writeHTTPGetError(w, 500, err) return } @@ -1193,7 +1224,7 @@ func (s *Server) GetHTTP(w http.ResponseWriter, r *http.Request) { err = s.aclChecker.CheckEACL(hdrBuf[hdrf.ValueFrom:hdrf.To], reqInfo) if err != nil && !errors.Is(err, aclsvc.ErrNotMatched) { // Not matched -> follow basic ACL. err = eACLErr(reqInfo, err) // defer - w.WriteHeader(403) + writeHTTPGetError(w, 403, err) return } } @@ -1515,13 +1546,6 @@ func (w httpWriter) WriteChunk(data []byte) error { // Note that the stream is untouched within this call, errors are not reported // into it. func convertGetPrmHTTP(signer ecdsa.PrivateKey, cnr container.Container, req *protoobject.GetRequest, buf []byte, stream *getStream) (getsvc.Prm, error) { - if req.MetaHeader != nil { - req.MetaHeader.Ttl -= 1 - } - buf, err := proto.MarshalOptions{}.MarshalAppend(buf[:0], req) - if err != nil { - return getsvc.Prm{}, err - } body := req.GetBody() ma := body.GetAddress() if ma == nil { // includes nil body @@ -1548,7 +1572,28 @@ func convertGetPrmHTTP(signer ecdsa.PrivateKey, cnr container.Container, req *pr return p, nil } + var onceResign sync.Once + meta := req.GetMetaHeader() + if meta == nil { + return getsvc.Prm{}, errors.New("missing meta header") + } + p.SetRequestForwarder(func(ctx context.Context, c client.MultiAddressClient) error { + var resignErr error + onceResign.Do(func() { + req.MetaHeader = &protosession.RequestMetaHeader{ + Ttl: meta.GetTtl() - 1, + Origin: meta, + } + req.VerifyHeader, resignErr = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(signer), req, nil) + if resignErr != nil { + return + } + buf, resignErr = proto.MarshalOptions{}.MarshalAppend(buf[:0], req) + }) + if resignErr != nil { + return resignErr + } return c.ForAnyHTTPClient(ctx, func(ctx context.Context, conn *http.Client, pref string) error { return continueHTTP(ctx, stream.w, req, buf, conn, pref) // TODO: log error }) @@ -1557,6 +1602,48 @@ func convertGetPrmHTTP(signer ecdsa.PrivateKey, cnr container.Container, req *pr } func (s *Server) copyHTTPStream(w io.Writer, hdrBuf []byte, hdrLen int, stream io.Reader, pldFldOff int) error { + var pref [16]byte + p := protowire.AppendTag(pref[:0], 3, protowire.BytesType) + p = protowire.AppendVarint(p, uint64(pldFldOff)) + if _, err := w.Write(p); err != nil { + return err + } + if _, err := w.Write(hdrBuf[:pldFldOff]); err != nil { + return err + } + + if pldFldOff >= hdrLen { + p = protowire.AppendTag(pref[:0], 4, protowire.BytesType) + p = protowire.AppendVarint(p, 0) + _, err := w.Write(p) + return err + } + + num, typ, tagSz := protowire.ConsumeTag(hdrBuf[pldFldOff:hdrLen]) + if tagSz < 0 || num != 4 || typ != protowire.BytesType { + return fmt.Errorf("bad payload field tag at offset %d", pldFldOff) + } + payloadSize, lenSz := protowire.ConsumeVarint(hdrBuf[pldFldOff+tagSz : hdrLen]) + if lenSz < 0 { + return errors.New("bad payload length varint") + } + payloadValueStart := pldFldOff + tagSz + lenSz + + p = protowire.AppendTag(pref[:0], 4, protowire.BytesType) + p = protowire.AppendVarint(p, payloadSize) + if _, err := w.Write(p); err != nil { + return err + } + if payloadValueStart < hdrLen { + if _, err := w.Write(hdrBuf[payloadValueStart:hdrLen]); err != nil { + return err + } + } + if stream != nil { + if _, err := io.Copy(w, stream); err != nil { + return err + } + } return nil }