Skip to content
Snippets Groups Projects
Commit 8a6ac9d0 authored by Simon Künzel's avatar Simon Künzel
Browse files

Add Open Telemetry OTLP traces

parent 87af04e1
No related branches found
No related tags found
No related merge requests found
Pipeline #7107 passed
......@@ -130,3 +130,6 @@ INTERNAL_IP_RANGES = ["127.0.0.0/8", "192.168.155.0/24", "fd78:4d90:6fe4::/48"]
# Only for debugging. In percent, from 0 to 100. With this you need luck to make a request
# API_ROULETTE_MODE = 0
# OPEN_TELEMETRY_OLTP_ENDPOINT = "http://localhost:4318"
OPEN_TELEMETRY_TRACE_FILTER_ONLY_HTTP_HEADER = "X-Trace-Me"
......@@ -17,6 +17,11 @@ mysql-connector-python==8.4.0
psycopg[c]==3.1.19
# sqlite is part of the standard library
# Open Telemetry
opentelemetry-api==1.28.2
opentelemetry-sdk==1.28.2
opentelemetry-exporter-otlp-proto-http==1.28.2
# required for testing
coverage==7.5.1
pylint==3.2.0
......@@ -11,10 +11,38 @@ from api.miscellaneous import *
from api.version import get_api_path, API_LATEST_VERSION, API_OLDEST_ACTIVE_VERSION
_SERVER_NAME = api.config["API_SERVER_NAME"]
_API_GLOBAL_RATE_LIMITERS = create_configured_host_rate_limiters("global", api.config["API_GLOBAL_RATE_LIMIT"])
_DEFAULT_CACHE_CONTROL_MAX_AGE_SECONDS = api.config["DEFAULT_CACHE_CONTROL_MAX_AGE_SECONDS"]
def _init_open_telemetry_tracer(oltp_endpoint: str) -> "Tracer":
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExportResult
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
resource = Resource(attributes={
SERVICE_NAME: "videoag-api"
})
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint=f"{oltp_endpoint}/v1/traces")
provider.add_span_processor(BatchSpanProcessor(exporter))
if exporter.export([]) != SpanExportResult.SUCCESS:
print(f"Warning: Test span export to {oltp_endpoint} failed. Tracing might not work")
tracer = provider.get_tracer("videoag.api")
return tracer
open_telemetry_tracer = None
if "OPEN_TELEMETRY_OLTP_ENDPOINT" in api.config:
open_telemetry_tracer = _init_open_telemetry_tracer(api.config["OPEN_TELEMETRY_OLTP_ENDPOINT"])
def check_client_int(value: int, name: str, min_value: int = MIN_VALUE_UINT32, max_value: int = MAX_VALUE_UINT32):
from api.miscellaneous.errors import ApiClientException, ERROR_REQUEST_INVALID_PARAMETER
if value < min_value:
......@@ -138,31 +166,15 @@ def api_add_route(path: str, methods: list[str],
return decorator
def api_function(track_in_diagnostics: bool = True,
allow_while_readonly: bool = False,
allow_while_disabled: bool = False,
rate_limiters: tuple[HostBasedCounterRateLimiter, ...] = _API_GLOBAL_RATE_LIMITERS):
def decorator(func):
if hasattr(func, "is_api_route") and func.is_api_route:
raise Exception("An @api_function() decorator has already been applied. Are you using multiple @api_route()? "
"Use @api_add_route(...)@api_add_route(..)@api_function() instead")
call_counter = None
call_time_counter = None
if track_in_diagnostics:
func_name: str = func.__name__
if not func_name.startswith("api_route_"): # pragma: no cover
raise RuntimeError("Api route function names must start with 'api_route_' "
"(These names are used in diagnostics data)")
func_name = func_name[len("api_route_"):]
if len(func_name) == 0:
raise RuntimeError("Api route function has no name (just 'api_route_')") # pragma: no cover
call_counter = DIAGNOSTICS_TRACKER.register_counter(f"route.{func_name}")
call_time_counter = DIAGNOSTICS_TRACKER.register_counter(f"route.{func_name}.time")
@wraps(func)
def wrapper(*args, **kwargs):
def _execute_api_route(
call_counter: DiagnosticsCounter or None,
allow_while_readonly: bool,
allow_while_disabled: bool,
rate_limiters: tuple[HostBasedCounterRateLimiter, ...],
func,
args,
kwargs
):
try:
if api.live_config.is_disabled() and not allow_while_disabled:
raise ApiClientException(ERROR_SITE_IS_DISABLED)
......@@ -184,28 +196,20 @@ def api_function(track_in_diagnostics: bool = True,
if random.random() * 100 < int(api.config["API_ROULETTE_MODE"]):
raise ApiClientException(random.choice(ALL_ERRORS_RANDOM))
start_time = time.time_ns()
result = func(*args, **kwargs)
stop_time = time.time_ns()
if call_time_counter is not None:
call_time_counter.trigger(int((stop_time - start_time) / 1000000))
if isinstance(result, Response):
resp = result
return result
elif isinstance(result, ApiResponse):
resp = result.build_response()
return result.build_response()
elif result is None:
resp = ApiResponse(None, HTTP_200_OK, None).build_response()
return ApiResponse(None, HTTP_200_OK, None).build_response()
elif isinstance(result, dict):
resp = ApiResponse(result).build_response()
return ApiResponse(result).build_response()
elif isinstance(result, tuple) and len(result) == 2:
resp = ApiResponse(result[0], result[1]).build_response()
return ApiResponse(result[0], result[1]).build_response()
else: # pragma: no cover
raise Exception(f"Api route {truncate_string(request.path)} returned result of unknown type: {str(result)}")
if call_time_counter is not None: # i.e. diagnostics are enabled
resp.headers["Server-Timing"] = f"api;dur={(stop_time - start_time) / 1000000}"
return resp
except ApiClientException as e:
return api_on_error(e.error)
except (TransactionConflictError, NoAvailableConnectionError) as e:
......@@ -217,6 +221,74 @@ def api_function(track_in_diagnostics: bool = True,
traceback.print_exception(e)
return api_on_error(ERROR_INTERNAL_SERVER_ERROR)
def _handle_api_request(
call_counter: DiagnosticsCounter or None,
allow_while_readonly: bool,
allow_while_disabled: bool,
rate_limiters: tuple[HostBasedCounterRateLimiter, ...],
func,
args,
kwargs
):
do_tracing = open_telemetry_tracer is not None
if do_tracing:
header_filter = api.config.get("OPEN_TELEMETRY_TRACE_FILTER_ONLY_HTTP_HEADER", "")
if header_filter and header_filter not in request.headers:
do_tracing = False
start_time = time.time_ns()
if do_tracing:
from opentelemetry.sdk.trace import Tracer
assert isinstance(open_telemetry_tracer, Tracer)
with open_telemetry_tracer.start_as_current_span("api-request", attributes={
"url": request.url,
"server": _SERVER_NAME,
}):
resp = _execute_api_route(call_counter, allow_while_readonly, allow_while_disabled, rate_limiters, func, args, kwargs)
else:
resp = _execute_api_route(call_counter, allow_while_readonly, allow_while_disabled, rate_limiters, func, args, kwargs)
exec_time_ms = (time.time_ns() - start_time) // 1_000_000
resp.headers["Server-Timing"] = f"api;dur={exec_time_ms}"
resp.headers["X-Server-Name"] = _SERVER_NAME
return resp
def api_function(track_in_diagnostics: bool = True,
allow_while_readonly: bool = False,
allow_while_disabled: bool = False,
rate_limiters: tuple[HostBasedCounterRateLimiter, ...] = _API_GLOBAL_RATE_LIMITERS):
def decorator(func):
if hasattr(func, "is_api_route") and func.is_api_route:
raise Exception("An @api_function() decorator has already been applied. Are you using multiple @api_route()? "
"Use @api_add_route(...)@api_add_route(..)@api_function() instead")
call_counter = None
if track_in_diagnostics:
func_name: str = func.__name__
if not func_name.startswith("api_route_"): # pragma: no cover
raise RuntimeError("Api route function names must start with 'api_route_' "
"(These names are used in diagnostics data)")
func_name = func_name[len("api_route_"):]
if len(func_name) == 0:
raise RuntimeError("Api route function has no name (just 'api_route_')") # pragma: no cover
call_counter = DIAGNOSTICS_TRACKER.register_counter(f"route.{func_name}")
@wraps(func)
def wrapper(*args, **kwargs):
return _handle_api_request(
call_counter,
allow_while_readonly,
allow_while_disabled,
rate_limiters,
func,
args,
kwargs
)
wrapper.is_api_route = True
return wrapper
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment