Skip to content

Commit 7c13667

Browse files
committed
PYTHON-1898 Close events executor faster after the topology is freed
1 parent ddac30d commit 7c13667

File tree

2 files changed

+48
-13
lines changed

2 files changed

+48
-13
lines changed

pymongo/topology.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def __init__(self, topology_settings):
7474

7575
# Create events queue if there are publishers.
7676
self._events = None
77-
self._events_thread = None
77+
self.__events_executor = None
7878

7979
if self._publish_server or self._publish_tp:
8080
self._events = Queue.Queue(maxsize=100)
@@ -127,7 +127,7 @@ def target():
127127
# We strongly reference the executor and it weakly references
128128
# the queue via this closure. When the topology is freed, stop
129129
# the executor soon.
130-
weak = weakref.ref(self._events)
130+
weak = weakref.ref(self._events, executor.close)
131131
self.__events_executor = executor
132132
executor.open()
133133

test/test_monitor.py

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,64 @@
2121
sys.path[0:0] = [""]
2222

2323
from pymongo.periodic_executor import _EXECUTORS
24-
from test import client_context, unittest, IntegrationTest
25-
from test.utils import single_client, one, connected, wait_until
24+
25+
from test import unittest, IntegrationTest
26+
from test.utils import (connected,
27+
ServerAndTopologyEventListener,
28+
single_client,
29+
wait_until)
2630

2731

2832
def unregistered(ref):
2933
gc.collect()
3034
return ref not in _EXECUTORS
3135

3236

37+
def get_executors(client):
38+
executors = []
39+
for server in client._topology._servers.values():
40+
executors.append(server._monitor._executor)
41+
executors.append(client._kill_cursors_executor)
42+
executors.append(client._topology._Topology__events_executor)
43+
return [e for e in executors if e is not None]
44+
45+
46+
def create_client():
47+
listener = ServerAndTopologyEventListener()
48+
client = single_client(event_listeners=[listener])
49+
connected(client)
50+
return client
51+
52+
3353
class TestMonitor(IntegrationTest):
34-
def test_atexit_hook(self):
35-
client = single_client(client_context.host, client_context.port)
36-
executor = one(client._topology._servers.values())._monitor._executor
37-
connected(client)
54+
def test_cleanup_executors_on_client_del(self):
55+
client = create_client()
56+
executors = get_executors(client)
57+
self.assertEqual(len(executors), 3)
3858

39-
# The executor stores a weakref to itself in _EXECUTORS.
40-
ref = one([r for r in _EXECUTORS.copy() if r() is executor])
59+
# Each executor stores a weakref to itself in _EXECUTORS.
60+
executor_refs = [
61+
(r, r()._name) for r in _EXECUTORS.copy() if r() in executors]
4162

42-
del executor
63+
del executors
4364
del client
4465

45-
wait_until(partial(unregistered, ref), 'unregister executor',
46-
timeout=5)
66+
for ref, name in executor_refs:
67+
wait_until(partial(unregistered, ref),
68+
'unregister executor: %s' % (name,),
69+
timeout=5)
70+
71+
def test_cleanup_executors_on_client_close(self):
72+
client = create_client()
73+
executors = get_executors(client)
74+
self.assertEqual(len(executors), 3)
75+
76+
client.close()
77+
78+
for executor in executors:
79+
wait_until(lambda: executor._stopped,
80+
'closed executor: %s' % (executor._name,),
81+
timeout=5)
4782

4883

4984
if __name__ == "__main__":

0 commit comments

Comments
 (0)