Skip to content

Conversation

@dwskoog
Copy link
Contributor

@dwskoog dwskoog commented Jan 9, 2026

I found to bugs in the main code by refreshing the examples to work under the current API:

  • Source.from_tcp could kill the TCP socket because it assumed the result of self.source._emit was always awaitable.
    • I added another test to push two inputs over the same socket to keep that from happening
  • core.flatten in 0.6.4 would skip over empty iterables, but the change to handle no-length iterables made it raise a spurious StopIteration under the same condition.
    • I added a test to feed an empty list to flatten to show the behavior.
    • This behavior change broke our production unit tests when I finally got a moment to upgrade to 0.6.5

Example refreshes:

  • I rebuilt the URL scraper example to work in Python3 with the BeautifulSoup4 which is how I found the flatten change and then I ran updated my company's product code to 0.6.5 and immediately ran into the StopIteration fault in our test suite.
  • I redid all of the Fibonacci examples to look similar and run the same.

Unit tests:

  • I added the Standard Hack from the Python docs to find an open port and used that in test_tcp_word_count_example to make it less flaky.

Running the network word count example showed an attempt to await a list
object in the Tornado framework. I tracked it down to the
EmitServer.handle_stream assuming the result of source._emit was
awaitable. Pivoting on inspect.isawaitable there makes the example as
written work.

I added a try/except block to trap the keyboard interrupt and suppress
the stack spew to the example.
* Change the examples to only seed with the value 1
  * The thread example blocks on emit due to the cycle in the stream
    so it CAN only seed with one value.
* Take all explicit references to Tornado out of the asyncio variant
* Suppress the stack spew from KeyboardException
* Ensure all variants have the same output
Using BeautifulSoup4 and making sure to filter out empty lists before
feeding them flatten, this runs again.
Trap the StopIteration that erupts from update when passed an empty
iterable so that the stream can continue.
@martindurant
Copy link
Member

Thank you, and I have not forgotten #493 either.

There is some code to ensure that if any of the nodes are asynchronous, then all of them should be. So _emit not returning an awaitable - that happens in the case that a node cannot be async? Is there an argument that streamz should standardise on async nodes only?

@dwskoog
Copy link
Contributor Author

dwskoog commented Jan 12, 2026

Thank you, and I have not forgotten #493 either.

There is some code to ensure that if any of the nodes are asynchronous, then all of them should be. So _emit not returning an awaitable - that happens in the case that a node cannot be async?

The from_tcp and _emit fix I included came from the network word count example. It was the most expedient way of getting the example working.

From going back through the code around Source/Stream creation, it seems that the example as written guarantees that the resultant stream is synchronous:

s = Stream.from_tcp(9999)
s.map(bytes.split).flatten().frequencies().sink(print)

super().__init__(ensure_io_loop=True, **kwargs)

streamz/streamz/core.py

Lines 258 to 259 in 8c73290

if ensure_io_loop and not self.loop:
self._set_asynchronous(False)

I believe that is why the _emit call was not returning an Awaitable and the server ended up killing the socket when trying to await it anyway. As far as I understand the deeper plumbing of Stream, this was executing the actual work in an event loop just in a different thread and so the stream itself was returning the end result instead of a Task representing the work and subsequent result.

Is there an argument that streamz should standardise on async nodes only?

I think as a framework there is probably value in both threaded and asyncio support. The asyncio module/design appears to be fairly polarizing in the larger Python ecosystem. A significant number of projects that I run into in the wild forswear it entirely while others are asyncio-only. Our historical code is non-asyncio/multiprocessing and our more current work is all asyncio-native. We've definitely had difficulties supporting both.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants