Skip to content
代码片段 群组 项目
recommendation_server.py 6.94 KiB
#!/usr/bin/python
#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import random
import time
import traceback
from concurrent import futures

import googlecloudprofiler
from google.auth.exceptions import DefaultCredentialsError
import grpc

import demo_pb2
import demo_pb2_grpc
from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc

from logger import getJSONLogger
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import server_interceptor
from opentelemetry.instrumentation.grpc.grpcext import intercept_server
from opentelemetry.instrumentation.grpc import client_interceptor
from opentelemetry.instrumentation.grpc.grpcext import intercept_channel
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter import jaeger
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
from opentelemetry.sdk.trace.export import Span, SpanExporter, SpanExportResult
from opentelemetry.exporter.jaeger import _translate_to_jaeger
from opentelemetry.exporter.jaeger.gen.jaeger import Collector as jaegerType

trace.set_tracer_provider(TracerProvider())
# create a JaegerSpanExporter
jaeger_exporter = jaeger.JaegerSpanExporter(
    service_name=os.environ.get('SERVICE_NAME', "recommendationservice"),
    # configure agent
    # agent_host_name="localhost",
    # agent_port=6831,
    # optional: configure also collector
    collector_host_name=os.environ.get('JAEGER_HOST', 'jaeger-collector'),
    collector_port=14268,
    collector_endpoint="/api/traces?format=jaeger.thrift",
    # username=xxxx, # optional
    # password=xxxx, # optional
)


def new_export(spans):
    global jaeger_exporter
    self = jaeger_exporter
    jaeger_spans = _translate_to_jaeger(spans)
    podIp = os.environ.get('POD_IP')
    podName = os.environ.get('POD_NAME')
    nodeName = os.environ.get('NODE_NAME')
    tags = [
        jaegerType.Tag(
            key="exporter", vType=jaegerType.TagType.STRING, vStr="jaeger"),
        jaegerType.Tag(
            key="float", vType=jaegerType.TagType.DOUBLE, vDouble=312.23),
        jaegerType.Tag(
            key="ip", vType=jaegerType.TagType.STRING, vStr=podIp),
        jaegerType.Tag(
            key="name", vType=jaegerType.TagType.STRING, vStr=podName),
        jaegerType.Tag(key="node_name",
                       vType=jaegerType.TagType.STRING, vStr=nodeName)
    ]

    batch = jaegerType.Batch(
        spans=jaeger_spans,
        process=jaegerType.Process(
            serviceName=self.service_name, tags=tags),
    )

    if self.collector is not None:
        self.collector.submit(batch)
    self.agent_client.emit(batch)

    return SpanExportResult.SUCCESS

jaeger_exporter.export = new_export

# create a BatchExportSpanProcessor and add the exporter to it
span_processor = BatchExportSpanProcessor(jaeger_exporter)

# add to the tracer factory
trace.get_tracer_provider().add_span_processor(span_processor)

logger = getJSONLogger('recommendationservice-server')


def initStackdriverProfiling():
    project_id = None
    try:
        project_id = os.environ["GCP_PROJECT_ID"]
    except KeyError:
        # Environment variable not set
        pass

    for retry in xrange(1, 4):
        try:
            if project_id:
                googlecloudprofiler.start(service='recommendation_server', service_version='1.0.0', verbose=0,
                                          project_id=project_id)
            else:
                googlecloudprofiler.start(service='recommendation_server', service_version='1.0.0', verbose=0)
            logger.info("Successfully started Stackdriver Profiler.")
            return
        except (BaseException) as exc:
            logger.info("Unable to start Stackdriver Profiler Python agent. " + str(exc))
            if (retry < 4):
                logger.info("Sleeping %d seconds to retry Stackdriver Profiler agent initialization" % (retry * 10))
                time.sleep(1)
            else:
                logger.warning("Could not initialize Stackdriver Profiler after retrying, giving up")
    return


class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
    def ListRecommendations(self, request, context):
        max_responses = 5
        # fetch list of products from product catalog stub
        cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
        product_ids = [x.id for x in cat_response.products]
        filtered_products = list(set(product_ids) - set(request.product_ids))
        num_products = len(filtered_products)
        num_return = min(max_responses, num_products)
        # sample list of indicies to return
        indices = random.sample(range(num_products), num_return)
        # fetch product ids from indices
        prod_list = [filtered_products[i] for i in indices]
        logger.info("[Recv ListRecommendations] product_ids={}".format(prod_list))
        # build and return response
        response = demo_pb2.ListRecommendationsResponse()
        response.product_ids.extend(prod_list)
        return response

    def Check(self, request, context):
        return health_pb2.HealthCheckResponse(
            status=health_pb2.HealthCheckResponse.SERVING)


if __name__ == "__main__":
    logger.info("initializing recommendationservice")

    port = os.environ.get('PORT', "8080")
    catalog_addr = os.environ.get('PRODUCT_CATALOG_SERVICE_ADDR', '')
    if catalog_addr == "":
        raise Exception('PRODUCT_CATALOG_SERVICE_ADDR environment variable not set')
    logger.info("product catalog address: " + catalog_addr)

    channel = grpc.insecure_channel(catalog_addr)
    channel = intercept_channel(channel, client_interceptor())
    product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(channel)

    # create gRPC server
    #server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
    #                     interceptors=(tracer_interceptor,))
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

    # OpenTelemetry magic!
    server = intercept_server(server, server_interceptor())

    # add class to gRPC server
    service = RecommendationService()
    demo_pb2_grpc.add_RecommendationServiceServicer_to_server(service, server)
    health_pb2_grpc.add_HealthServicer_to_server(service, server)

    # start server
    logger.info("listening on port: " + port)
    server.add_insecure_port('[::]:' + port)
    server.start()

    # keep alive
    try:
        while True:
            time.sleep(10000)
    except KeyboardInterrupt:
        server.stop(0)