// Copyright 2018 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "fmt" "net" "os" "time" "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/kv" "go.opentelemetry.io/otel/exporters/trace/jaeger" grpcotel "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc" sdktrace "go.opentelemetry.io/otel/sdk/trace" pb "github.com/GoogleCloudPlatform/microservices-demo/src/shippingservice/genproto" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) const ( defaultPort = "50051" ) var log *logrus.Logger func init() { log = logrus.New() log.Level = logrus.DebugLevel log.Formatter = &logrus.JSONFormatter{ FieldMap: logrus.FieldMap{ logrus.FieldKeyTime: "timestamp", logrus.FieldKeyLevel: "severity", logrus.FieldKeyMsg: "message", }, TimestampFormat: time.RFC3339Nano, } log.Out = os.Stdout } func main() { if os.Getenv("DISABLE_TRACING") == "" { log.Info("Tracing enabled.") initTracer(log) } else { log.Info("Tracing disabled.") } port := defaultPort if value, ok := os.LookupEnv("PORT"); ok { port = value } port = fmt.Sprintf(":%s", port) lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } var srv *grpc.Server if os.Getenv("DISABLE_STATS") == "" { log.Info("Stats enabled.") srv = grpc.NewServer( grpc.UnaryInterceptor(grpcotel.UnaryServerInterceptor(global.Tracer(""))), grpc.StreamInterceptor(grpcotel.StreamServerInterceptor(global.Tracer(""))), ) } else { log.Info("Stats disabled.") srv = grpc.NewServer() } svc := &server{} pb.RegisterShippingServiceServer(srv, svc) healthpb.RegisterHealthServer(srv, svc) log.Infof("Shipping Service listening on port %s", port) // Register reflection service on gRPC server. reflection.Register(srv) if err := srv.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } // server controls RPC service responses. type server struct{} // Check is for health checking. func (s *server) Check(ctx context.Context, req *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { return &healthpb.HealthCheckResponse{Status: healthpb.HealthCheckResponse_SERVING}, nil } func (s *server) Watch(req *healthpb.HealthCheckRequest, ws healthpb.Health_WatchServer) error { return status.Errorf(codes.Unimplemented, "health check via Watch not implemented") } // GetQuote produces a shipping quote (cost) in USD. func (s *server) GetQuote(ctx context.Context, in *pb.GetQuoteRequest) (*pb.GetQuoteResponse, error) { log.Info("[GetQuote] received request") defer log.Info("[GetQuote] completed request") // 1. Our quote system requires the total number of items to be shipped. count := 0 for _, item := range in.Items { count += int(item.Quantity) } // 2. Generate a quote based on the total number of items to be shipped. quote := CreateQuoteFromCount(count) // 3. Generate a response. return &pb.GetQuoteResponse{ CostUsd: &pb.Money{ CurrencyCode: "USD", Units: int64(quote.Dollars), Nanos: int32(quote.Cents * 10000000)}, }, nil } // ShipOrder mocks that the requested items will be shipped. // It supplies a tracking ID for notional lookup of shipment delivery status. func (s *server) ShipOrder(ctx context.Context, in *pb.ShipOrderRequest) (*pb.ShipOrderResponse, error) { log.Info("[ShipOrder] received request") defer log.Info("[ShipOrder] completed request") // 1. Create a Tracking ID baseAddress := fmt.Sprintf("%s, %s, %s", in.Address.StreetAddress, in.Address.City, in.Address.State) id := CreateTrackingId(baseAddress) // 2. Generate a response. return &pb.ShipOrderResponse{ TrackingId: id, }, nil } func initTracer(log logrus.FieldLogger) func() { svcAddr := os.Getenv("JAEGER_SERVICE_ADDR") podIp := os.Getenv("POD_IP") podName := os.Getenv("POD_NAME") nodeName := os.Getenv("NODE_NAME") serviceName := os.Getenv("SERVICE_NAME") if svcAddr == "" { log.Info("jaeger initialization disabled.") } endPoint := fmt.Sprintf("http://%s", svcAddr) flush, err := jaeger.InstallNewPipeline( jaeger.WithCollectorEndpoint(endPoint), jaeger.WithProcess(jaeger.Process{ ServiceName: serviceName, Tags: []kv.KeyValue{ kv.String("exporter", "jaeger"), kv.Float64("float", 312.23), kv.String("ip", podIp), kv.String("name", podName), kv.String("node_name", nodeName), }, }), jaeger.WithSDK(&sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}), ) if err != nil { log.Fatal(err) } log.Info("jaeger initialization completed.") return func() { flush() } }