Skip to content

Commit 6030a57

Browse files
fix: add thread safety to SSE connection for realtime flags
- Import threading module at top of client.py - Add threading.Lock (_sse_lock) for sse_connected flag - Wrap all sse_connected reads/writes with lock - Store SSE response and call close() for graceful shutdown - Ensure thread-safe cleanup in _close_sse_connection() 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 1fb9990 commit 6030a57

2 files changed

Lines changed: 20 additions & 14 deletions

File tree

posthog/client.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import os
44
import sys
5+
import threading
56
from datetime import datetime, timedelta
67
from typing import Any, Dict, Optional, Union
78
from typing_extensions import Unpack
@@ -255,6 +256,7 @@ def __init__(
255256
self.sse_connection = None # type: Optional[Any]
256257
self.sse_response = None
257258
self.sse_connected = False
259+
self._sse_lock = threading.Lock()
258260

259261
self.capture_exception_code_variables = capture_exception_code_variables
260262
self.code_variables_mask_patterns = (
@@ -2245,9 +2247,11 @@ def _setup_sse_connection(self):
22452247
)
22462248
return
22472249

2248-
if self.sse_connected:
2249-
self.log.debug("[FEATURE FLAGS] SSE connection already established")
2250-
return
2250+
with self._sse_lock:
2251+
if self.sse_connected:
2252+
self.log.debug("[FEATURE FLAGS] SSE connection already established")
2253+
return
2254+
self.sse_connected = True
22512255

22522256
try:
22532257
import threading
@@ -2274,16 +2278,17 @@ def sse_listener():
22742278
self.log.warning(
22752279
f"[FEATURE FLAGS] SSE connection failed with status {response.status_code}"
22762280
)
2277-
self.sse_connected = False
2281+
with self._sse_lock:
2282+
self.sse_connected = False
22782283
return
22792284

2280-
self.sse_connected = True
22812285
self.log.debug("[FEATURE FLAGS] SSE connection established")
22822286

22832287
# Process the stream line by line, checking sse_connected periodically
22842288
for line in response.iter_lines():
2285-
if not self.sse_connected:
2286-
break
2289+
with self._sse_lock:
2290+
if not self.sse_connected:
2291+
break
22872292

22882293
if not line:
22892294
continue
@@ -2304,7 +2309,8 @@ def sse_listener():
23042309
self.log.warning(
23052310
f"[FEATURE FLAGS] SSE connection error: {e}. Reconnecting in 5 seconds..."
23062311
)
2307-
self.sse_connected = False
2312+
with self._sse_lock:
2313+
self.sse_connected = False
23082314

23092315
# Attempt to reconnect after 5 seconds if realtime_flags is still enabled
23102316
if self.realtime_flags:
@@ -2338,7 +2344,8 @@ def _close_sse_connection(self):
23382344
"""
23392345
if self.sse_connection:
23402346
self.log.debug("[FEATURE FLAGS] Closing SSE connection")
2341-
self.sse_connected = False
2347+
with self._sse_lock:
2348+
self.sse_connected = False
23422349

23432350
# Close the response to interrupt iter_lines()
23442351
if self.sse_response:

posthog/test/test_realtime_feature_flags.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ def test_process_flag_update(self, mock_get):
132132

133133
# Verify flag was updated
134134
self.assertIn("test-flag", client.feature_flags_by_key)
135-
self.assertEqual(client.feature_flags_by_key["test-flag"]["name"], "Updated Test Flag")
135+
self.assertEqual(
136+
client.feature_flags_by_key["test-flag"]["name"], "Updated Test Flag"
137+
)
136138
self.assertFalse(client.feature_flags_by_key["test-flag"]["active"])
137139

138140
# Cleanup
@@ -289,7 +291,6 @@ def test_sse_cleanup_on_shutdown(self, mock_get):
289291
self.assertFalse(client.sse_connected)
290292
self.assertIsNone(client.sse_connection)
291293

292-
293294
@mock.patch("posthog.client.get")
294295
def test_on_feature_flags_update_callback(self, mock_get):
295296
"""Test that the callback is called when flags are updated"""
@@ -306,9 +307,7 @@ def test_on_feature_flags_update_callback(self, mock_get):
306307
callback_calls = []
307308

308309
def flag_update_callback(flag_key, flag_data):
309-
callback_calls.append(
310-
{"flag_key": flag_key, "flag_data": flag_data}
311-
)
310+
callback_calls.append({"flag_key": flag_key, "flag_data": flag_data})
312311

313312
client = Client(
314313
FAKE_TEST_API_KEY,

0 commit comments

Comments
 (0)