From 0812df6ebf6fbbfec0784b462bfee75c18b9bc89 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 18 Jan 2024 15:09:09 -0500 Subject: [PATCH] Expose the server and create a way to enable HTTP2 (#3311) Signed-off-by: Joe Elliott --- cmd/tempo/app/app.go | 18 ++++---- cmd/tempo/app/modules.go | 76 ++++++++++++++++----------------- cmd/tempo/app/server_service.go | 13 +++++- 3 files changed, 58 insertions(+), 49 deletions(-) diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 96b7b7fa07b..3a5f3a47305 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -66,7 +66,7 @@ var ( type App struct { cfg Config - server TempoServer + Server TempoServer InternalServer *server.Server readRings map[string]*ring.Ring @@ -95,7 +95,7 @@ func New(cfg Config) (*App, error) { app := &App{ cfg: cfg, readRings: map[string]*ring.Ring{}, - server: newTempoServer(), + Server: newTempoServer(), } usagestats.Edition("oss") @@ -194,12 +194,12 @@ func (t *App) Run() error { t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm)) } - t.server.HTTP().Path(addHTTPAPIPrefix(&t.cfg, api.PathBuildInfo)).Handler(t.buildinfoHandler()).Methods("GET") + t.Server.HTTP().Path(addHTTPAPIPrefix(&t.cfg, api.PathBuildInfo)).Handler(t.buildinfoHandler()).Methods("GET") - t.server.HTTP().Path("/ready").Handler(t.readyHandler(sm)) - t.server.HTTP().Path("/status").Handler(t.statusHandler()).Methods("GET") - t.server.HTTP().Path("/status/{endpoint}").Handler(t.statusHandler()).Methods("GET") - grpc_health_v1.RegisterHealthServer(t.server.GRPC(), grpcutil.NewHealthCheck(sm)) + t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm)) + t.Server.HTTP().Path("/status").Handler(t.statusHandler()).Methods("GET") + t.Server.HTTP().Path("/status/{endpoint}").Handler(t.statusHandler()).Methods("GET") + grpc_health_v1.RegisterHealthServer(t.Server.GRPC(), grpcutil.NewHealthCheck(sm)) // Let's listen for events from this manager, and log them. healthy := func() { level.Info(log.Logger).Log("msg", "Tempo started") } @@ -228,7 +228,7 @@ func (t *App) Run() error { sm.AddListener(services.NewManagerListener(healthy, stopped, serviceFailed)) // Setup signal handler. If signal arrives, we stop the manager, which stops all the services. - handler := signals.NewHandler(t.server.Log()) + handler := signals.NewHandler(t.Server.Log()) go func() { handler.Loop() sm.StopAsync() @@ -474,7 +474,7 @@ func (t *App) writeStatusEndpoints(w io.Writer) error { endpoints := []endpoint{} - err := t.server.HTTP().Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { + err := t.Server.HTTP().Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { e := endpoint{} pathTemplate, err := route.GetPathTemplate() diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 10b72495687..04c880db2f4 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -106,7 +106,7 @@ func (t *App) initServer() (services.Service, error) { return svs } - return t.server.StartAndReturnService(t.cfg.Server, t.cfg.StreamOverHTTPEnabled, servicesToWaitFor) + return t.Server.StartAndReturnService(t.cfg.Server, t.cfg.StreamOverHTTPEnabled, servicesToWaitFor) } func (t *App) initInternalServer() (services.Service, error) { @@ -163,7 +163,7 @@ func (t *App) initReadRing(cfg ring.Config, name, key string) (*ring.Ring, error return nil, fmt.Errorf("failed to create ring %s: %w", name, err) } - t.server.HTTP().Handle("/"+name+"/ring", ring) + t.Server.HTTP().Handle("/"+name+"/ring", ring) t.readRings[name] = ring return ring, nil @@ -202,10 +202,10 @@ func (t *App) initOverridesAPI() (services.Service, error) { return t.HTTPAuthMiddleware.Wrap(h) } - t.server.HTTP().Path(overridesPath).Methods(http.MethodGet).Handler(wrapHandler(userConfigOverridesAPI.GetHandler)) - t.server.HTTP().Path(overridesPath).Methods(http.MethodPost).Handler(wrapHandler(userConfigOverridesAPI.PostHandler)) - t.server.HTTP().Path(overridesPath).Methods(http.MethodPatch).Handler(wrapHandler(userConfigOverridesAPI.PatchHandler)) - t.server.HTTP().Path(overridesPath).Methods(http.MethodDelete).Handler(wrapHandler(userConfigOverridesAPI.DeleteHandler)) + t.Server.HTTP().Path(overridesPath).Methods(http.MethodGet).Handler(wrapHandler(userConfigOverridesAPI.GetHandler)) + t.Server.HTTP().Path(overridesPath).Methods(http.MethodPost).Handler(wrapHandler(userConfigOverridesAPI.PostHandler)) + t.Server.HTTP().Path(overridesPath).Methods(http.MethodPatch).Handler(wrapHandler(userConfigOverridesAPI.PatchHandler)) + t.Server.HTTP().Path(overridesPath).Methods(http.MethodDelete).Handler(wrapHandler(userConfigOverridesAPI.DeleteHandler)) return userConfigOverridesAPI, nil } @@ -226,7 +226,7 @@ func (t *App) initDistributor() (services.Service, error) { t.distributor = distributor if distributor.DistributorRing != nil { - t.server.HTTP().Handle("/distributor/ring", distributor.DistributorRing) + t.Server.HTTP().Handle("/distributor/ring", distributor.DistributorRing) } return t.distributor, nil @@ -242,10 +242,10 @@ func (t *App) initIngester() (services.Service, error) { } t.ingester = ingester - tempopb.RegisterPusherServer(t.server.GRPC(), t.ingester) - tempopb.RegisterQuerierServer(t.server.GRPC(), t.ingester) - t.server.HTTP().Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler)) - t.server.HTTP().Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler)) + tempopb.RegisterPusherServer(t.Server.GRPC(), t.ingester) + tempopb.RegisterQuerierServer(t.Server.GRPC(), t.ingester) + t.Server.HTTP().Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler)) + t.Server.HTTP().Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler)) return t.ingester, nil } @@ -262,12 +262,12 @@ func (t *App) initGenerator() (services.Service, error) { t.generator = genSvc spanStatsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.generator.SpanMetricsHandler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetrics)), spanStatsHandler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetrics)), spanStatsHandler) queryRangeHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.generator.QueryRangeHandler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler) - tempopb.RegisterMetricsGeneratorServer(t.server.GRPC(), t.generator) + tempopb.RegisterMetricsGeneratorServer(t.Server.GRPC(), t.generator) return t.generator, nil } @@ -314,30 +314,30 @@ func (t *App) initQuerier() (services.Service, error) { ) tracesHandler := middleware.Wrap(http.HandlerFunc(t.querier.TraceByIDHandler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTraces)), tracesHandler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTraces)), tracesHandler) searchHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchHandler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearch)), searchHandler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearch)), searchHandler) searchTagsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsHandler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTags)), searchTagsHandler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTags)), searchTagsHandler) searchTagsV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsV2Handler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2)), searchTagsV2Handler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2)), searchTagsV2Handler) searchTagValuesHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesHandler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues)), searchTagValuesHandler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues)), searchTagValuesHandler) searchTagValuesV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesV2Handler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2)), searchTagValuesV2Handler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2)), searchTagValuesV2Handler) spanMetricsSummaryHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SpanMetricsSummaryHandler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary)), spanMetricsSummaryHandler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary)), spanMetricsSummaryHandler) queryRangeHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.QueryRangeHandler)) - t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler) + t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler) - return t.querier, t.querier.CreateAndRegisterWorker(t.server.HTTP()) + return t.querier, t.querier.CreateAndRegisterWorker(t.Server.HTTP()) } func (t *App) initQueryFrontend() (services.Service, error) { @@ -356,10 +356,10 @@ func (t *App) initQueryFrontend() (services.Service, error) { } // register grpc server for queriers to connect to - frontend_v1pb.RegisterFrontendServer(t.server.GRPC(), t.frontend) + frontend_v1pb.RegisterFrontendServer(t.Server.GRPC(), t.frontend) // we register the streaming querier service on both the http and grpc servers. Grafana expects // this GRPC service to be available on the HTTP server. - tempopb.RegisterStreamingQuerierServer(t.server.GRPC(), queryFrontend) + tempopb.RegisterStreamingQuerierServer(t.Server.GRPC(), queryFrontend) // wrap handlers with auth base := middleware.Merge( @@ -368,28 +368,28 @@ func (t *App) initQueryFrontend() (services.Service, error) { ) // http trace by id endpoint - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathTraces), base.Wrap(queryFrontend.TraceByIDHandler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathTraces), base.Wrap(queryFrontend.TraceByIDHandler)) // http search endpoints - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), base.Wrap(queryFrontend.SearchHandler)) - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTags), base.Wrap(queryFrontend.SearchTagsHandler)) - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2), base.Wrap(queryFrontend.SearchTagsV2Handler)) - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), base.Wrap(queryFrontend.SearchTagsValuesHandler)) - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2), base.Wrap(queryFrontend.SearchTagsValuesV2Handler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), base.Wrap(queryFrontend.SearchHandler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTags), base.Wrap(queryFrontend.SearchTagsHandler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2), base.Wrap(queryFrontend.SearchTagsV2Handler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), base.Wrap(queryFrontend.SearchTagsValuesHandler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2), base.Wrap(queryFrontend.SearchTagsValuesV2Handler)) // http metrics endpoints - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), base.Wrap(queryFrontend.SpanMetricsSummaryHandler)) - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange), base.Wrap(queryFrontend.QueryRangeHandler)) - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathPromQueryRange), base.Wrap(queryFrontend.QueryRangeHandler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), base.Wrap(queryFrontend.SpanMetricsSummaryHandler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange), base.Wrap(queryFrontend.QueryRangeHandler)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathPromQueryRange), base.Wrap(queryFrontend.QueryRangeHandler)) // the query frontend needs to have knowledge of the blocks so it can shard search jobs t.store.EnablePolling(context.Background(), nil) // http query echo endpoint - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathEcho), echoHandler()) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathEcho), echoHandler()) // http endpoint to see usage stats data - t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathUsageStats), usageStatsHandler(t.cfg.UsageReport)) + t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathUsageStats), usageStatsHandler(t.cfg.UsageReport)) // todo: queryFrontend should implement service.Service and take the cortex frontend a submodule return t.frontend, nil @@ -407,7 +407,7 @@ func (t *App) initCompactor() (services.Service, error) { t.compactor = compactor if t.compactor.Ring != nil { - t.server.HTTP().Handle("/compactor/ring", t.compactor.Ring) + t.Server.HTTP().Handle("/compactor/ring", t.compactor.Ring) } return t.compactor, nil @@ -447,7 +447,7 @@ func (t *App) initMemberlistKV() (services.Service, error) { t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.server.HTTP().Handle("/memberlist", t.MemberlistKV) + t.Server.HTTP().Handle("/memberlist", t.MemberlistKV) return t.MemberlistKV, nil } diff --git a/cmd/tempo/app/server_service.go b/cmd/tempo/app/server_service.go index 20e6dcfd084..93dd2a7acdd 100644 --- a/cmd/tempo/app/server_service.go +++ b/cmd/tempo/app/server_service.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strings" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -23,6 +24,7 @@ type TempoServer interface { HTTP() *mux.Router GRPC() *grpc.Server Log() log.Logger + EnableHTTP2() StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) } @@ -31,7 +33,8 @@ type TempoServer interface { type tempoServer struct { mux *mux.Router // all tempo http routes are added here - externalServer *server.Server // the standard server that all HTTP/GRPC requests are served on + externalServer *server.Server // the standard server that all HTTP/GRPC requests are served on + enableHTTP2Once sync.Once } func newTempoServer() *tempoServer { @@ -53,6 +56,12 @@ func (s *tempoServer) Log() log.Logger { return s.externalServer.Log } +func (s *tempoServer) EnableHTTP2() { + s.enableHTTP2Once.Do(func() { + s.externalServer.HTTPServer.Handler = h2c.NewHandler(s.externalServer.HTTPServer.Handler, &http2.Server{}) + }) +} + func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) { var err error @@ -74,7 +83,7 @@ func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP // now that we have created the server and service let's setup our grpc/http router if necessary if supportGRPCOnHTTP { // for grpc to work we must enable h2c on the external server - s.externalServer.HTTPServer.Handler = h2c.NewHandler(s.externalServer.HTTPServer.Handler, &http2.Server{}) + s.EnableHTTP2() // recreate dskit instrumentation here cfg.DoNotAddDefaultHTTPMiddleware = false