Skip to content

Commit

Permalink
Expose the server and create a way to enable HTTP2 (#3311)
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott authored Jan 18, 2024
1 parent 23c6ea4 commit 0812df6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 49 deletions.
18 changes: 9 additions & 9 deletions cmd/tempo/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var (
type App struct {
cfg Config

server TempoServer
Server TempoServer
InternalServer *server.Server

readRings map[string]*ring.Ring
Expand Down Expand Up @@ -95,7 +95,7 @@ func New(cfg Config) (*App, error) {
app := &App{
cfg: cfg,
readRings: map[string]*ring.Ring{},
server: newTempoServer(),
Server: newTempoServer(),
}

usagestats.Edition("oss")
Expand Down Expand Up @@ -194,12 +194,12 @@ func (t *App) Run() error {
t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm))
}

t.server.HTTP().Path(addHTTPAPIPrefix(&t.cfg, api.PathBuildInfo)).Handler(t.buildinfoHandler()).Methods("GET")
t.Server.HTTP().Path(addHTTPAPIPrefix(&t.cfg, api.PathBuildInfo)).Handler(t.buildinfoHandler()).Methods("GET")

t.server.HTTP().Path("/ready").Handler(t.readyHandler(sm))
t.server.HTTP().Path("/status").Handler(t.statusHandler()).Methods("GET")
t.server.HTTP().Path("/status/{endpoint}").Handler(t.statusHandler()).Methods("GET")
grpc_health_v1.RegisterHealthServer(t.server.GRPC(), grpcutil.NewHealthCheck(sm))
t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm))
t.Server.HTTP().Path("/status").Handler(t.statusHandler()).Methods("GET")
t.Server.HTTP().Path("/status/{endpoint}").Handler(t.statusHandler()).Methods("GET")
grpc_health_v1.RegisterHealthServer(t.Server.GRPC(), grpcutil.NewHealthCheck(sm))

// Let's listen for events from this manager, and log them.
healthy := func() { level.Info(log.Logger).Log("msg", "Tempo started") }
Expand Down Expand Up @@ -228,7 +228,7 @@ func (t *App) Run() error {
sm.AddListener(services.NewManagerListener(healthy, stopped, serviceFailed))

// Setup signal handler. If signal arrives, we stop the manager, which stops all the services.
handler := signals.NewHandler(t.server.Log())
handler := signals.NewHandler(t.Server.Log())
go func() {
handler.Loop()
sm.StopAsync()
Expand Down Expand Up @@ -474,7 +474,7 @@ func (t *App) writeStatusEndpoints(w io.Writer) error {

endpoints := []endpoint{}

err := t.server.HTTP().Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error {
err := t.Server.HTTP().Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error {
e := endpoint{}

pathTemplate, err := route.GetPathTemplate()
Expand Down
76 changes: 38 additions & 38 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (t *App) initServer() (services.Service, error) {
return svs
}

return t.server.StartAndReturnService(t.cfg.Server, t.cfg.StreamOverHTTPEnabled, servicesToWaitFor)
return t.Server.StartAndReturnService(t.cfg.Server, t.cfg.StreamOverHTTPEnabled, servicesToWaitFor)
}

func (t *App) initInternalServer() (services.Service, error) {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (t *App) initReadRing(cfg ring.Config, name, key string) (*ring.Ring, error
return nil, fmt.Errorf("failed to create ring %s: %w", name, err)
}

t.server.HTTP().Handle("/"+name+"/ring", ring)
t.Server.HTTP().Handle("/"+name+"/ring", ring)
t.readRings[name] = ring

return ring, nil
Expand Down Expand Up @@ -202,10 +202,10 @@ func (t *App) initOverridesAPI() (services.Service, error) {
return t.HTTPAuthMiddleware.Wrap(h)
}

t.server.HTTP().Path(overridesPath).Methods(http.MethodGet).Handler(wrapHandler(userConfigOverridesAPI.GetHandler))
t.server.HTTP().Path(overridesPath).Methods(http.MethodPost).Handler(wrapHandler(userConfigOverridesAPI.PostHandler))
t.server.HTTP().Path(overridesPath).Methods(http.MethodPatch).Handler(wrapHandler(userConfigOverridesAPI.PatchHandler))
t.server.HTTP().Path(overridesPath).Methods(http.MethodDelete).Handler(wrapHandler(userConfigOverridesAPI.DeleteHandler))
t.Server.HTTP().Path(overridesPath).Methods(http.MethodGet).Handler(wrapHandler(userConfigOverridesAPI.GetHandler))
t.Server.HTTP().Path(overridesPath).Methods(http.MethodPost).Handler(wrapHandler(userConfigOverridesAPI.PostHandler))
t.Server.HTTP().Path(overridesPath).Methods(http.MethodPatch).Handler(wrapHandler(userConfigOverridesAPI.PatchHandler))
t.Server.HTTP().Path(overridesPath).Methods(http.MethodDelete).Handler(wrapHandler(userConfigOverridesAPI.DeleteHandler))

return userConfigOverridesAPI, nil
}
Expand All @@ -226,7 +226,7 @@ func (t *App) initDistributor() (services.Service, error) {
t.distributor = distributor

if distributor.DistributorRing != nil {
t.server.HTTP().Handle("/distributor/ring", distributor.DistributorRing)
t.Server.HTTP().Handle("/distributor/ring", distributor.DistributorRing)
}

return t.distributor, nil
Expand All @@ -242,10 +242,10 @@ func (t *App) initIngester() (services.Service, error) {
}
t.ingester = ingester

tempopb.RegisterPusherServer(t.server.GRPC(), t.ingester)
tempopb.RegisterQuerierServer(t.server.GRPC(), t.ingester)
t.server.HTTP().Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.server.HTTP().Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
tempopb.RegisterPusherServer(t.Server.GRPC(), t.ingester)
tempopb.RegisterQuerierServer(t.Server.GRPC(), t.ingester)
t.Server.HTTP().Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.Server.HTTP().Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
return t.ingester, nil
}

Expand All @@ -262,12 +262,12 @@ func (t *App) initGenerator() (services.Service, error) {
t.generator = genSvc

spanStatsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.generator.SpanMetricsHandler))
t.server.HTTP().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetrics)), spanStatsHandler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetrics)), spanStatsHandler)

queryRangeHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.generator.QueryRangeHandler))
t.server.HTTP().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixGenerator, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler)

tempopb.RegisterMetricsGeneratorServer(t.server.GRPC(), t.generator)
tempopb.RegisterMetricsGeneratorServer(t.Server.GRPC(), t.generator)

return t.generator, nil
}
Expand Down Expand Up @@ -314,30 +314,30 @@ func (t *App) initQuerier() (services.Service, error) {
)

tracesHandler := middleware.Wrap(http.HandlerFunc(t.querier.TraceByIDHandler))
t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTraces)), tracesHandler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTraces)), tracesHandler)

searchHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchHandler))
t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearch)), searchHandler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearch)), searchHandler)

searchTagsHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsHandler))
t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTags)), searchTagsHandler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTags)), searchTagsHandler)

searchTagsV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagsV2Handler))
t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2)), searchTagsV2Handler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2)), searchTagsV2Handler)

searchTagValuesHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesHandler))
t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues)), searchTagValuesHandler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues)), searchTagValuesHandler)

searchTagValuesV2Handler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchTagValuesV2Handler))
t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2)), searchTagValuesV2Handler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2)), searchTagValuesV2Handler)

spanMetricsSummaryHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SpanMetricsSummaryHandler))
t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary)), spanMetricsSummaryHandler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary)), spanMetricsSummaryHandler)

queryRangeHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.QueryRangeHandler))
t.server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler)
t.Server.HTTP().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange)), queryRangeHandler)

return t.querier, t.querier.CreateAndRegisterWorker(t.server.HTTP())
return t.querier, t.querier.CreateAndRegisterWorker(t.Server.HTTP())
}

func (t *App) initQueryFrontend() (services.Service, error) {
Expand All @@ -356,10 +356,10 @@ func (t *App) initQueryFrontend() (services.Service, error) {
}

// register grpc server for queriers to connect to
frontend_v1pb.RegisterFrontendServer(t.server.GRPC(), t.frontend)
frontend_v1pb.RegisterFrontendServer(t.Server.GRPC(), t.frontend)
// we register the streaming querier service on both the http and grpc servers. Grafana expects
// this GRPC service to be available on the HTTP server.
tempopb.RegisterStreamingQuerierServer(t.server.GRPC(), queryFrontend)
tempopb.RegisterStreamingQuerierServer(t.Server.GRPC(), queryFrontend)

// wrap handlers with auth
base := middleware.Merge(
Expand All @@ -368,28 +368,28 @@ func (t *App) initQueryFrontend() (services.Service, error) {
)

// http trace by id endpoint
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathTraces), base.Wrap(queryFrontend.TraceByIDHandler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathTraces), base.Wrap(queryFrontend.TraceByIDHandler))

// http search endpoints
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), base.Wrap(queryFrontend.SearchHandler))
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTags), base.Wrap(queryFrontend.SearchTagsHandler))
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2), base.Wrap(queryFrontend.SearchTagsV2Handler))
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), base.Wrap(queryFrontend.SearchTagsValuesHandler))
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2), base.Wrap(queryFrontend.SearchTagsValuesV2Handler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), base.Wrap(queryFrontend.SearchHandler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTags), base.Wrap(queryFrontend.SearchTagsHandler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagsV2), base.Wrap(queryFrontend.SearchTagsV2Handler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValues), base.Wrap(queryFrontend.SearchTagsValuesHandler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearchTagValuesV2), base.Wrap(queryFrontend.SearchTagsValuesV2Handler))

// http metrics endpoints
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), base.Wrap(queryFrontend.SpanMetricsSummaryHandler))
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange), base.Wrap(queryFrontend.QueryRangeHandler))
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathPromQueryRange), base.Wrap(queryFrontend.QueryRangeHandler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), base.Wrap(queryFrontend.SpanMetricsSummaryHandler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange), base.Wrap(queryFrontend.QueryRangeHandler))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathPromQueryRange), base.Wrap(queryFrontend.QueryRangeHandler))

// the query frontend needs to have knowledge of the blocks so it can shard search jobs
t.store.EnablePolling(context.Background(), nil)

// http query echo endpoint
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathEcho), echoHandler())
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathEcho), echoHandler())

// http endpoint to see usage stats data
t.server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathUsageStats), usageStatsHandler(t.cfg.UsageReport))
t.Server.HTTP().Handle(addHTTPAPIPrefix(&t.cfg, api.PathUsageStats), usageStatsHandler(t.cfg.UsageReport))

// todo: queryFrontend should implement service.Service and take the cortex frontend a submodule
return t.frontend, nil
Expand All @@ -407,7 +407,7 @@ func (t *App) initCompactor() (services.Service, error) {
t.compactor = compactor

if t.compactor.Ring != nil {
t.server.HTTP().Handle("/compactor/ring", t.compactor.Ring)
t.Server.HTTP().Handle("/compactor/ring", t.compactor.Ring)
}

return t.compactor, nil
Expand Down Expand Up @@ -447,7 +447,7 @@ func (t *App) initMemberlistKV() (services.Service, error) {
t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV

t.server.HTTP().Handle("/memberlist", t.MemberlistKV)
t.Server.HTTP().Handle("/memberlist", t.MemberlistKV)

return t.MemberlistKV, nil
}
Expand Down
13 changes: 11 additions & 2 deletions cmd/tempo/app/server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"strings"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -23,6 +24,7 @@ type TempoServer interface {
HTTP() *mux.Router
GRPC() *grpc.Server
Log() log.Logger
EnableHTTP2()

StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error)
}
Expand All @@ -31,7 +33,8 @@ type TempoServer interface {
type tempoServer struct {
mux *mux.Router // all tempo http routes are added here

externalServer *server.Server // the standard server that all HTTP/GRPC requests are served on
externalServer *server.Server // the standard server that all HTTP/GRPC requests are served on
enableHTTP2Once sync.Once
}

func newTempoServer() *tempoServer {
Expand All @@ -53,6 +56,12 @@ func (s *tempoServer) Log() log.Logger {
return s.externalServer.Log
}

func (s *tempoServer) EnableHTTP2() {
s.enableHTTP2Once.Do(func() {
s.externalServer.HTTPServer.Handler = h2c.NewHandler(s.externalServer.HTTPServer.Handler, &http2.Server{})
})
}

func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) {
var err error

Expand All @@ -74,7 +83,7 @@ func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP
// now that we have created the server and service let's setup our grpc/http router if necessary
if supportGRPCOnHTTP {
// for grpc to work we must enable h2c on the external server
s.externalServer.HTTPServer.Handler = h2c.NewHandler(s.externalServer.HTTPServer.Handler, &http2.Server{})
s.EnableHTTP2()

// recreate dskit instrumentation here
cfg.DoNotAddDefaultHTTPMiddleware = false
Expand Down

0 comments on commit 0812df6

Please sign in to comment.