diff --git a/examples/fib_asyncio.py b/examples/fib_asyncio.py index 376e46f8..86ecfeb1 100644 --- a/examples/fib_asyncio.py +++ b/examples/fib_asyncio.py @@ -1,28 +1,17 @@ -from streamz import Stream import asyncio -from tornado.platform.asyncio import AsyncIOMainLoop -AsyncIOMainLoop().install() +from streamz import Stream -source = Stream() +source = Stream(asynchronous=True) s = source.sliding_window(2).map(sum) -L = s.sink_to_list() # store result in a list - -s.rate_limit(0.5).sink(source.emit) # pipe output back to input -s.rate_limit(1.0).sink(lambda x: print(L)) # print state of L every second - -source.emit(0) # seed with initial values -source.emit(1) - +L = s.sink_to_list() # store result in a list -def run_asyncio_loop(): - loop = asyncio.get_event_loop() - try: - loop.run_forever() - except KeyboardInterrupt: - pass - finally: - loop.close() +s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second +s.rate_limit('500ms').connect(source) # pipe output back to input +source.emit(1) # seed with initial value, does not block thread due to Future return -run_asyncio_loop() +try: + asyncio.get_event_loop().run_forever() +except (KeyboardInterrupt, asyncio.CancelledError): + pass diff --git a/examples/fib_thread.py b/examples/fib_thread.py index 827e9c0e..80852db4 100644 --- a/examples/fib_thread.py +++ b/examples/fib_thread.py @@ -1,12 +1,13 @@ from streamz import Stream -from tornado.ioloop import IOLoop source = Stream() s = source.sliding_window(2).map(sum) -L = s.sink_to_list() # store result in a list +L = s.sink_to_list() # store result in a list -s.rate_limit('500ms').sink(source.emit) # pipe output back to input s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second +s.rate_limit('500ms').connect(source) # pipe output back to input -source.emit(0) # seed with initial values -source.emit(1) +try: + source.emit(1) # seed with initial value, blocks thread due to cycle in stream +except KeyboardInterrupt: + pass diff --git a/examples/fib_tornado.py b/examples/fib_tornado.py index 86756621..f3bedb23 100644 --- a/examples/fib_tornado.py +++ b/examples/fib_tornado.py @@ -4,12 +4,14 @@ source = Stream(asynchronous=True) s = source.sliding_window(2).map(sum) -L = s.sink_to_list() # store result in a list +L = s.sink_to_list() # store result in a list -s.rate_limit('500ms').sink(source.emit) # pipe output back to input s.rate_limit('1s').sink(lambda x: print(L)) # print state of L every second +s.rate_limit('500ms').connect(source) # pipe output back to input -source.emit(0) # seed with initial values -source.emit(1) +source.emit(1) # seed with initial value, does not block thread due to Future return -IOLoop.current().start() +try: + IOLoop.current().start() +except KeyboardInterrupt: + pass diff --git a/examples/network_wordcount.py b/examples/network_wordcount.py index e3500f7b..7d4ce7eb 100644 --- a/examples/network_wordcount.py +++ b/examples/network_wordcount.py @@ -18,4 +18,9 @@ ) s.start() -time.sleep(600) + +try: + while True: + time.sleep(600) +except KeyboardInterrupt: + pass diff --git a/examples/scrape.py b/examples/scrape.py index fda6ca5b..f687835c 100644 --- a/examples/scrape.py +++ b/examples/scrape.py @@ -1,26 +1,23 @@ -from __future__ import print_function - -from time import sleep import sys - -from BeautifulSoup import BeautifulSoup # Python 2 only, sorry. +from urllib.parse import urlparse import requests -from streamz import Stream import toolz -import urlparse +from bs4 import BeautifulSoup +from streamz import Stream -def links_of_page((content, page)): - uri = urlparse.urlparse(page) +def links_of_page(content_page): + (content, page) = content_page + uri = urlparse(page) domain = '%s://%s' % (uri.scheme, uri.netloc) try: - soup = BeautifulSoup(content) + soup = BeautifulSoup(content, features="html.parser") except: return [] else: - links = [link.get('href') for link in soup.findAll('a')] + links = [link.get('href') for link in soup.find_all('a')] return [domain + link for link in links if link @@ -41,8 +38,8 @@ def topk_dict(d, k=10): .map(lambda x: x.content)) links = (content.zip(pages) .map(links_of_page) - .concat()) -links.sink(source.emit) + .flatten()) +links.connect(source) """ from nltk.corpus import stopwords @@ -60,8 +57,7 @@ def topk_dict(d, k=10): """ if len(sys.argv) > 1: - source.emit(sys.argv[1]) - - - -# + try: + source.emit(sys.argv[1]) + except KeyboardInterrupt: + pass diff --git a/streamz/core.py b/streamz/core.py index ae7a27f9..91be98e3 100644 --- a/streamz/core.py +++ b/streamz/core.py @@ -1637,7 +1637,10 @@ class flatten(Stream): def update(self, x, who=None, metadata=None): L = [] items = chain(x) - item = next(items) + try: + item = next(items) + except StopIteration: + return L for item_next in items: y = self._emit(item) item = item_next diff --git a/streamz/sources.py b/streamz/sources.py index 777f9181..78f352e0 100644 --- a/streamz/sources.py +++ b/streamz/sources.py @@ -3,6 +3,8 @@ import queue import os import time +from inspect import isawaitable + from tornado import gen import weakref @@ -252,7 +254,9 @@ async def handle_stream(self, stream, address): while not self.source.stopped: try: data = await stream.read_until(self.source.delimiter) - await self.source._emit(data) + result = self.source._emit(data) + if isawaitable(result): + await result except StreamClosedError: break diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 96ab9a24..0183bbeb 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -814,6 +814,17 @@ def test_flatten(iterators): assert L == [1, 2, 3, 4, 5, 6, 7, 8] +def test_flatten_empty(): + source = Stream() + L = source.flatten().sink_to_list() + + source.emit([1, 2]) + source.emit([]) + source.emit([3, 4]) + + assert L == [1, 2, 3, 4] + + def test_unique(): source = Stream() L = source.unique().sink_to_list() diff --git a/streamz/tests/test_sources.py b/streamz/tests/test_sources.py index 8210a8d8..de844360 100644 --- a/streamz/tests/test_sources.py +++ b/streamz/tests/test_sources.py @@ -4,7 +4,7 @@ from flaky import flaky import pytest from streamz import Source -from streamz.utils_test import wait_for, await_for, gen_test +from streamz.utils_test import free_port, wait_for, await_for, gen_test import socket @@ -47,6 +47,37 @@ def test_tcp(): sock2.close() +@flaky(max_runs=3, min_passes=1) +def test_tcp_word_count_example(): + port = free_port() + s = Source.from_tcp(port) + out = s.map(bytes.split).flatten().frequencies().sink_to_list() + s.start() + wait_for(lambda: s.server is not None, 2, period=0.02) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect(("localhost", port)) + sock.send(b'data\n') + + with (socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock, + socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock2): + sock.connect(("localhost", port)) + sock2.connect(("localhost", port)) + sock.send(b'data\n') + # regression test a bug in from_tcp where a second packet from + # the same socket is dropped due to the socket handler dying + sock.send(b'data\n') + sock2.send(b'data2\n') + + expected = [{b"data": 1}, {b"data": 2}, {b"data": 3}, {b"data": 3, b"data2": 1}] + + def fail_func(): + assert out == expected + + wait_for(lambda: out == expected, 2, fail_func=fail_func, period=0.01) + + + @flaky(max_runs=3, min_passes=1) @gen_test(timeout=60) def test_tcp_async(): diff --git a/streamz/utils_test.py b/streamz/utils_test.py index 6ff76ad7..d9ac1e7d 100644 --- a/streamz/utils_test.py +++ b/streamz/utils_test.py @@ -4,6 +4,7 @@ import logging import os import shutil +import socket import tempfile from time import time, sleep @@ -14,6 +15,13 @@ from .core import _io_loops, Stream +def free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('localhost', 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + @contextmanager def tmpfile(extension=''): extension = '.' + extension.lstrip('.')