Skip to content

Commit 4e869cf

Browse files
authored
Merge pull request #355 from opensensor/qoe-services
Implement Stream Health & QoE Monitoring (Phase 1)
2 parents a670754 + 6746252 commit 4e869cf

26 files changed

Lines changed: 1936 additions & 18 deletions

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ list(APPEND VIDEO_SOURCES ${FFMPEG_LEAK_DETECTOR_SOURCES})
433433

434434
# Add HLS sources
435435
file(GLOB HLS_SOURCES "src/video/hls/*.c")
436+
file(GLOB TELEMETRY_SOURCES "src/telemetry/*.c")
436437

437438
# Add unified HLS thread sources explicitly to ensure they're included
438439
set(HLS_UNIFIED_THREAD_SOURCES
@@ -462,6 +463,7 @@ set(SOURCES
462463
${VIDEO_SOURCES}
463464
${HLS_SOURCES}
464465
${HLS_UNIFIED_THREAD_SOURCES}
466+
${TELEMETRY_SOURCES}
465467
${EZXML_SOURCES}
466468
)
467469
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* @file player_telemetry.h
3+
* @brief Client-side playback QoE telemetry ingestion
4+
*
5+
* Accepts QoE events from the web player (TTFF, rebuffer, WebRTC RTT)
6+
* via POST /api/telemetry/player and stores them in a ring buffer for
7+
* Prometheus exposition and health reporting.
8+
*/
9+
10+
#ifndef LIGHTNVR_PLAYER_TELEMETRY_H
11+
#define LIGHTNVR_PLAYER_TELEMETRY_H
12+
13+
#include <stdint.h>
14+
#include <time.h>
15+
#include "core/config.h"
16+
17+
#define PLAYER_TELEMETRY_RING_SIZE 256
18+
19+
/**
20+
* Single player telemetry event
21+
*/
22+
typedef struct {
23+
char stream_name[MAX_STREAM_NAME];
24+
char session_id[64];
25+
char transport[16]; /* "webrtc", "hls", "mse" */
26+
double ttff_ms; /* time-to-first-frame */
27+
int rebuffer_count;
28+
double rebuffer_duration_ms;
29+
int resolution_switches;
30+
double webrtc_rtt_ms;
31+
time_t timestamp;
32+
} player_telemetry_event_t;
33+
34+
/**
35+
* Initialize the player telemetry subsystem
36+
*/
37+
void player_telemetry_init(void);
38+
39+
/**
40+
* Shutdown the player telemetry subsystem
41+
*/
42+
void player_telemetry_shutdown(void);
43+
44+
/**
45+
* Record a player telemetry event
46+
*
47+
* @param event Pointer to the event to record (copied into ring buffer)
48+
*/
49+
void player_telemetry_record(const player_telemetry_event_t *event);
50+
51+
/**
52+
* Snapshot recent player telemetry events
53+
*
54+
* @param out Pre-allocated array to receive events
55+
* @param max_count Size of out array
56+
* @return Number of events copied (most recent first)
57+
*/
58+
int player_telemetry_snapshot(player_telemetry_event_t *out, int max_count);
59+
60+
/**
61+
* Get the number of events currently stored
62+
*/
63+
int player_telemetry_count(void);
64+
65+
#endif /* LIGHTNVR_PLAYER_TELEMETRY_H */

include/telemetry/stream_metrics.h

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/**
2+
* @file stream_metrics.h
3+
* @brief In-process stream health metric counters and ring buffers
4+
*
5+
* Provides per-stream QoS/QoE metric collection for Prometheus exposition,
6+
* JSON health summaries, and UI sparklines. Zero external dependencies.
7+
*
8+
* Thread safety: metric slots use pthread_rwlock_t. Writer threads (recording,
9+
* HLS, stream_state) acquire write locks briefly to update counters. The HTTP
10+
* thread acquires read locks to snapshot metrics for API responses.
11+
*/
12+
13+
#ifndef LIGHTNVR_STREAM_METRICS_H
14+
#define LIGHTNVR_STREAM_METRICS_H
15+
16+
#include <stdint.h>
17+
#include <stdbool.h>
18+
#include <stdatomic.h>
19+
#include <time.h>
20+
#include <pthread.h>
21+
#include "core/config.h"
22+
23+
/* Ring buffer: 5-second sample interval, 720 samples = 1 hour of sparkline data */
24+
#define METRICS_RING_SIZE 720
25+
26+
/* Thresholds for health status classification */
27+
#define STREAM_HEALTH_FRAME_TIMEOUT_UP 5 /* seconds: no frame → not UP */
28+
#define STREAM_HEALTH_FRAME_TIMEOUT_DOWN 10 /* seconds: no frame → DOWN */
29+
#define STREAM_HEALTH_FPS_RATIO 0.5 /* current/configured < this → degraded */
30+
31+
/* Gap detection threshold */
32+
#define RECORDING_GAP_THRESHOLD_SEC 5
33+
34+
/**
35+
* Stream health status
36+
*/
37+
typedef enum {
38+
STREAM_HEALTH_UP = 0, /* Receiving frames within thresholds */
39+
STREAM_HEALTH_DEGRADED, /* Receiving frames but below FPS threshold */
40+
STREAM_HEALTH_DOWN /* No frames received recently */
41+
} stream_health_status_t;
42+
43+
/**
44+
* Single ring buffer sample for sparkline rendering
45+
*/
46+
typedef struct {
47+
float fps;
48+
float bitrate_kbps;
49+
time_t timestamp;
50+
} metrics_ring_sample_t;
51+
52+
/**
53+
* Per-stream metric slot
54+
*
55+
* Gauges use _Atomic for lock-free reads from the HTTP thread.
56+
* The rwlock protects the ring buffer and multi-field updates.
57+
*/
58+
typedef struct {
59+
/* Identity */
60+
char stream_name[MAX_STREAM_NAME];
61+
bool active; /* slot in use */
62+
63+
/* Health status (recomputed by sampler thread) */
64+
atomic_int health_status; /* stream_health_status_t */
65+
atomic_int stream_up; /* 1 if UP, 0 otherwise */
66+
67+
/* Gauges */
68+
atomic_int_fast64_t last_frame_ts; /* unix epoch seconds of last frame */
69+
double current_fps; /* computed FPS (updated by sampler) */
70+
double current_bitrate_bps; /* computed bitrate in bits/s (updated by sampler) */
71+
double connection_latency_ms; /* last (re)connect latency */
72+
double configured_fps; /* configured FPS for health threshold */
73+
74+
/* Monotonic counters */
75+
atomic_uint_fast64_t frames_total;
76+
atomic_uint_fast64_t frames_dropped;
77+
atomic_uint_fast64_t bytes_received;
78+
atomic_uint_fast64_t reconnects_total;
79+
atomic_uint_fast64_t uptime_seconds;
80+
81+
/* Error counters by type */
82+
atomic_uint_fast64_t error_decode;
83+
atomic_uint_fast64_t error_timeout;
84+
atomic_uint_fast64_t error_protocol;
85+
atomic_uint_fast64_t error_io;
86+
87+
/* Recording metrics */
88+
atomic_int recording_active; /* 1 if recording, 0 otherwise */
89+
atomic_uint_fast64_t recording_bytes_written;
90+
atomic_uint_fast64_t recording_segments_total;
91+
atomic_uint_fast64_t recording_gaps_total;
92+
93+
/* Ring buffer for sparkline data (protected by rwlock) */
94+
metrics_ring_sample_t ring[METRICS_RING_SIZE];
95+
int ring_head; /* next write position */
96+
int ring_count; /* number of valid samples */
97+
98+
/* Gap detection state */
99+
time_t last_segment_end_time;
100+
int expected_segment_duration;
101+
102+
/* Stream start time for uptime calculation */
103+
time_t stream_start_time;
104+
105+
/* FPS/bitrate computation state (used internally by sampler) */
106+
uint64_t prev_frames_total;
107+
uint64_t prev_bytes_received;
108+
time_t prev_sample_time;
109+
110+
/* Read-write lock for this slot */
111+
pthread_rwlock_t lock;
112+
} stream_metrics_t;
113+
114+
/**
115+
* Initialize the metrics subsystem
116+
*
117+
* @param max_streams Maximum number of stream slots to allocate
118+
* @return 0 on success, -1 on failure
119+
*/
120+
int metrics_init(int max_streams);
121+
122+
/**
123+
* Shutdown the metrics subsystem and free resources
124+
*/
125+
void metrics_shutdown(void);
126+
127+
/**
128+
* Get or create a metric slot for a stream.
129+
* Returns NULL if all slots are full and the name doesn't match any existing slot.
130+
*
131+
* @param stream_name Stream name
132+
* @return Pointer to the stream's metric slot, or NULL
133+
*/
134+
stream_metrics_t *metrics_get_slot(const char *stream_name);
135+
136+
/**
137+
* Release (deactivate) a metric slot for a stream
138+
*
139+
* @param stream_name Stream name
140+
*/
141+
void metrics_release_slot(const char *stream_name);
142+
143+
/**
144+
* Record a received frame (video or audio)
145+
*
146+
* @param stream_name Stream name
147+
* @param bytes Packet size in bytes
148+
* @param is_video true for video frames, false for audio
149+
*/
150+
void metrics_record_frame(const char *stream_name, int bytes, bool is_video);
151+
152+
/**
153+
* Record a dropped frame
154+
*
155+
* @param stream_name Stream name
156+
*/
157+
void metrics_record_drop(const char *stream_name);
158+
159+
/**
160+
* Record a stream error
161+
*
162+
* @param stream_name Stream name
163+
* @param error_type One of: "decode", "timeout", "protocol", "io"
164+
*/
165+
void metrics_record_error(const char *stream_name, const char *error_type);
166+
167+
/**
168+
* Record a reconnection event
169+
*
170+
* @param stream_name Stream name
171+
*/
172+
void metrics_record_reconnect(const char *stream_name);
173+
174+
/**
175+
* Record a completed recording segment (for gap detection)
176+
*
177+
* @param stream_name Stream name
178+
* @param start_time Segment start timestamp
179+
* @param end_time Segment end timestamp
180+
* @param bytes Bytes written in this segment
181+
*/
182+
void metrics_record_segment_complete(const char *stream_name, time_t start_time,
183+
time_t end_time, uint64_t bytes);
184+
185+
/**
186+
* Set recording active state for a stream
187+
*
188+
* @param stream_name Stream name
189+
* @param active true if recording is active
190+
*/
191+
void metrics_set_recording_active(const char *stream_name, bool active);
192+
193+
/**
194+
* Set the connection latency for the last (re)connect
195+
*
196+
* @param stream_name Stream name
197+
* @param latency_ms Latency in milliseconds
198+
*/
199+
void metrics_set_connection_latency(const char *stream_name, double latency_ms);
200+
201+
/**
202+
* Set the configured FPS for a stream (used for health threshold computation)
203+
*
204+
* @param stream_name Stream name
205+
* @param fps Configured FPS value
206+
*/
207+
void metrics_set_configured_fps(const char *stream_name, double fps);
208+
209+
/**
210+
* Thread-safe snapshot of all active stream metrics
211+
*
212+
* @param out_array Pre-allocated array to receive snapshots
213+
* @param max_count Size of out_array
214+
* @return Number of active streams copied
215+
*/
216+
int metrics_snapshot_all(stream_metrics_t *out_array, int max_count);
217+
218+
/**
219+
* Get ring buffer data for a stream's sparkline
220+
*
221+
* @param stream_name Stream name
222+
* @param out Pre-allocated array to receive samples
223+
* @param max_count Size of out array
224+
* @return Number of samples copied (oldest first)
225+
*/
226+
int metrics_get_ring_data(const char *stream_name, metrics_ring_sample_t *out, int max_count);
227+
228+
/**
229+
* Get the total number of allocated metric slots
230+
*/
231+
int metrics_get_max_streams(void);
232+
233+
#endif /* LIGHTNVR_STREAM_METRICS_H */

include/video/mp4_writer_thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ typedef struct {
3838
// If set, the next segment should start by writing this packet first.
3939
// This intentionally duplicates the boundary keyframe to bias toward overlap (no gaps).
4040
AVPacket *pending_video_keyframe;
41+
42+
// Stream name for telemetry instrumentation
43+
char stream_name[MAX_STREAM_NAME];
4144
} segment_info_t;
4245

4346
/**

include/web/api_handlers_metrics.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* @file api_handlers_metrics.h
3+
* @brief Prometheus metrics endpoint and player telemetry ingest handlers
4+
*/
5+
6+
#ifndef LIGHTNVR_API_HANDLERS_METRICS_H
7+
#define LIGHTNVR_API_HANDLERS_METRICS_H
8+
9+
#include "web/request_response.h"
10+
11+
/**
12+
* GET /api/metrics
13+
* Returns all stream health metrics in Prometheus exposition text format.
14+
*/
15+
void handle_get_metrics(const http_request_t *req, http_response_t *res);
16+
17+
/**
18+
* POST /api/telemetry/player
19+
* Accepts client-side playback QoE events from the web player.
20+
* Returns 204 No Content.
21+
*/
22+
void handle_post_player_telemetry(const http_request_t *req, http_response_t *res);
23+
24+
#endif /* LIGHTNVR_API_HANDLERS_METRICS_H */

src/core/main.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
#include "video/ffmpeg_leak_detector.h"
4444
#include "video/onvif_motion_recording.h"
4545
#include "core/curl_init.h"
46+
#include "telemetry/stream_metrics.h"
47+
#include "telemetry/player_telemetry.h"
4648

4749
// Include go2rtc headers if USE_GO2RTC is defined
4850
#ifdef USE_GO2RTC
@@ -804,6 +806,13 @@ int main(int argc, char *argv[]) {
804806
goto cleanup;
805807
}
806808

809+
// Initialize telemetry subsystem
810+
if (metrics_init(config.max_streams) != 0) {
811+
log_error("Failed to initialize metrics subsystem");
812+
goto cleanup;
813+
}
814+
player_telemetry_init();
815+
807816
// Initialize go2rtc integration if enabled
808817
#ifdef USE_GO2RTC
809818
if (!config.go2rtc_enabled) {
@@ -1204,6 +1213,11 @@ int main(int argc, char *argv[]) {
12041213
log_info("Stopping health check system...");
12051214
cleanup_health_check_system();
12061215

1216+
// Shutdown telemetry subsystem
1217+
log_info("Shutting down telemetry...");
1218+
metrics_shutdown();
1219+
player_telemetry_shutdown();
1220+
12071221
// Now that we're in the main thread (not signal handler), we can safely
12081222
// call initiate_shutdown() which uses mutexes and logging
12091223
initiate_shutdown();

0 commit comments

Comments
 (0)