Skip to content

Commit 3e3d6b6

Browse files
committed
Improve JSON parsing in handle function
Incomplete JSONs coming from Stream transport are now stored and completed, then processed with the callback function.
1 parent c334ed6 commit 3e3d6b6

4 files changed

Lines changed: 48 additions & 5 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ Currently, the following packages are available:
4747
hdl = OpenSIPSEventHandler(mi_connector, 'datagram', ip='127.0.0.1', port=50012)
4848
4949
def some_callback(message):
50-
# do something with the message
50+
# do something with the message (it is a JSON object)
5151
pass
5252
5353
ev: OpenSIPSEvent = None

opensips/event/__main__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ def main():
108108
def event_handler(message):
109109
""" Event handler callback """
110110
try:
111-
message_json = json.loads(message.decode('utf-8'))
112-
print(json.dumps(message_json, indent=4))
111+
print(json.dumps(message, indent=4))
113112
except json.JSONDecodeError as e:
114113
print(f"Failed to decode JSON: {e}")
115114

opensips/event/event.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from threading import Thread, Event
2424
from ..mi import OpenSIPSMIException
25+
from .json_helper import extract_json
2526

2627
class OpenSIPSEventException(Exception):
2728
""" Exceptions generated by OpenSIPS Events """
@@ -37,6 +38,8 @@ def __init__(self, handler, name: str, callback, expire=None):
3738
self.thread = None
3839
self.thread_stop = Event()
3940
self.thread_stop.clear()
41+
self.buf = b""
42+
self.json_queue = []
4043

4144
try:
4245
self.socket = self._handler.__new_socket__()
@@ -55,8 +58,14 @@ def handle(self, callback):
5558
""" Handles the event callbacks """
5659
while not self.thread_stop.is_set():
5760
data = self.socket.read()
58-
if data:
59-
callback(data)
61+
if not data:
62+
continue
63+
64+
self.buf += data
65+
self.json_queue, self.buf = extract_json(self.json_queue, self.buf)
66+
while self.json_queue:
67+
json_obj = self.json_queue.pop(0)
68+
callback(json_obj)
6069

6170
def unsubscribe(self):
6271
""" Unsubscribes the event """

opensips/event/json_helper.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/usr/bin/env python
2+
##
3+
## This file is part of the OpenSIPS Python Package
4+
## (see https://github.com/OpenSIPS/python-opensips).
5+
##
6+
## This program is free software: you can redistribute it and/or modify
7+
## it under the terms of the GNU General Public License as published by
8+
## the Free Software Foundation, either version 3 of the License, or
9+
## (at your option) any later version.
10+
##
11+
## This program is distributed in the hope that it will be useful,
12+
## but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
## GNU General Public License for more details.
15+
##
16+
## You should have received a copy of the GNU General Public License
17+
## along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
##
19+
20+
import json
21+
from collections import OrderedDict
22+
from typing import Tuple
23+
24+
def extract_json(json_acc: list, data: bytes) -> Tuple[list, bytes]:
25+
26+
""" Extracts JSON data from a byte stream """
27+
28+
while data:
29+
try:
30+
json_obj, idx = json.JSONDecoder(object_pairs_hook=OrderedDict).raw_decode(data.decode("utf-8"))
31+
json_acc.append(json_obj)
32+
data = data[idx:]
33+
except json.JSONDecodeError as e:
34+
break
35+
return json_acc, data

0 commit comments

Comments
 (0)