Skip to content

Commit fbd12a4

Browse files
committed
Working on parallel stream
1 parent 5985e7e commit fbd12a4

2 files changed

Lines changed: 109 additions & 13 deletions

File tree

stream/parallelstream.py

Lines changed: 105 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
1+
from itertools import tee
2+
from multiprocessing import Process, Queue, RLock
3+
import multiprocessing as mp
4+
import queue
15

2-
from stream import Stream
6+
from threading import Thread
7+
from time import sleep
8+
9+
from stream.stream import Stream
310
from stream.iterators import IteratorUtils
411

512

@@ -8,31 +15,116 @@ class ParallelUtils:
815
@staticmethod
916
def splitted(iterable, pre, offset):
1017
for i in range(pre):
11-
if next(iterable) is None:
18+
try:
19+
next(iterable)
20+
except:
1221
return
1322
while True:
14-
elem = next(iterable)
15-
if elem is None:
23+
try:
24+
elem = next(iterable)
25+
yield elem
26+
except:
1627
return
17-
yield elem
28+
1829
for i in range(offset - 1):
19-
if next(iterable) is None:
30+
try:
31+
next(iterable)
32+
except:
2033
return
2134

35+
@staticmethod
36+
def _split(iterable):
37+
while True:
38+
try:
39+
elem = next(iterable)
40+
yield elem
41+
except:
42+
return
43+
2244
@staticmethod
2345
def split(iterable, count):
24-
iters = []
25-
for i in range(count):
26-
iters.append(ParallelUtils.splitted(iterable, i, count))
27-
return iters
46+
elements = list(iterable)
47+
chunks = [[] for _ in range(count)]
48+
chunk_size = int(len(elements) / count)
49+
50+
for index, elem in enumerate(elements):
51+
chunk = int(index / chunk_size)
52+
chunks[chunk].append(elem)
53+
54+
return [Stream(chunk) for chunk in chunks]
55+
56+
57+
class StreamThread(Thread):
58+
59+
def __init__(self, source):
60+
# Call the Thread class's init function
61+
Thread.__init__(self)
62+
63+
self._queue = queue.Queue()
64+
self._stream = source
65+
self._result = None
66+
self._terminate = False
67+
self.lock = RLock()
68+
69+
def _onThread(self, function, *args, **kwargs):
70+
self._queue.put((function, args, kwargs))
71+
72+
def _map(self, *args):
73+
self._stream.map(args[0])
74+
75+
def map(self, mapper):
76+
self._onThread(self._map, mapper, None)
77+
78+
def _reduce(self, *args):
79+
self._result = self._stream.reduce(args[0], args[1])
80+
self._terminate = True
81+
82+
def reduce(self, accumulator, identity=None):
83+
self._onThread(self._reduce, *(accumulator, identity), None)
84+
85+
def run(self):
86+
while not self._terminate:
87+
func, args, kwargs = self._queue.get()
88+
if args:
89+
if kwargs:
90+
func(*args, **kwargs)
91+
else:
92+
func(*args)
93+
else:
94+
func()
95+
96+
def getResult(self):
97+
return self._result
2898

2999

30100
class ParallelStream(Stream):
31101

32-
PROCESS = 4
102+
PROCESS = 8
33103

34104
def __init__(self, iterable):
35-
self.__iterables = ParallelUtils.split(iterable, self.PROCESS)
105+
self.__streams = [StreamThread(
106+
_stream) for _stream in ParallelUtils.split(iterable, self.PROCESS)]
107+
108+
for _stream in self.__streams:
109+
_stream.start()
110+
111+
def map(self, mapper):
112+
113+
for _stream in self.__streams:
114+
_stream.map(mapper)
115+
116+
return self
117+
118+
def reduce(self, accumulator, identity=None):
119+
120+
for _stream in self.__streams:
121+
_stream.reduce(accumulator, identity)
122+
123+
for _stream in self.__streams:
124+
_stream.join()
125+
126+
results = [_stream.getResult().get() for _stream in self.__streams]
127+
return Stream(results).reduce(accumulator, identity)
36128

37129
def get(self):
38-
return self.__iterables[0]
130+
return self.__streams

stream/stream.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ def randint(lower, upper):
115115
def __init__(self, iterable):
116116
self.__iterable = iterable
117117

118+
def parallel(self):
119+
from .parallelstream import ParallelStream
120+
return ParallelStream(self.__iterable)
121+
118122
def filter(self, predicate):
119123
'''
120124
Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.

0 commit comments

Comments
 (0)