|
26 | 26 | else: |
27 | 27 | import Queue |
28 | 28 |
|
29 | | -from pymongo import common |
30 | | -from pymongo import periodic_executor |
| 29 | +from pymongo import (common, |
| 30 | + helpers, |
| 31 | + periodic_executor) |
31 | 32 | from pymongo.pool import PoolOptions |
32 | 33 | from pymongo.topology_description import (updated_topology_description, |
33 | 34 | _updated_topology_description_srv_polling, |
34 | 35 | TopologyDescription, |
35 | 36 | SRV_POLLING_TOPOLOGIES, TOPOLOGY_TYPE) |
36 | | -from pymongo.errors import ServerSelectionTimeoutError, ConfigurationError |
| 37 | +from pymongo.errors import (ConnectionFailure, |
| 38 | + ConfigurationError, |
| 39 | + NetworkTimeout, |
| 40 | + NotMasterError, |
| 41 | + OperationFailure, |
| 42 | + ServerSelectionTimeoutError) |
37 | 43 | from pymongo.monitor import SrvMonitor |
38 | 44 | from pymongo.monotonic import time as _time |
39 | 45 | from pymongo.server import Server |
@@ -264,14 +270,17 @@ def _process_change(self, server_description): |
264 | 270 | Hold the lock when calling this. |
265 | 271 | """ |
266 | 272 | td_old = self._description |
267 | | - old_server_description = td_old._server_descriptions[ |
268 | | - server_description.address] |
| 273 | + sd_old = td_old._server_descriptions[server_description.address] |
| 274 | + if _is_stale_server_description(sd_old, server_description): |
| 275 | + # This is a stale isMaster response. Ignore it. |
| 276 | + return |
| 277 | + |
269 | 278 | suppress_event = ((self._publish_server or self._publish_tp) |
270 | | - and old_server_description == server_description) |
| 279 | + and sd_old == server_description) |
271 | 280 | if self._publish_server and not suppress_event: |
272 | 281 | self._events.put(( |
273 | 282 | self._listeners.publish_server_description_changed, |
274 | | - (old_server_description, server_description, |
| 283 | + (sd_old, server_description, |
275 | 284 | server_description.address, self._topology_id))) |
276 | 285 |
|
277 | 286 | self._description = updated_topology_description( |
@@ -410,25 +419,15 @@ def reset_pool(self, address): |
410 | 419 | if server: |
411 | 420 | server.pool.reset() |
412 | 421 |
|
413 | | - def reset_server(self, address, error): |
414 | | - """Clear our pool for a server and mark it Unknown. |
415 | | -
|
416 | | - Do *not* request an immediate check. |
417 | | - """ |
418 | | - with self._lock: |
419 | | - self._reset_server(address, reset_pool=True, error=error) |
420 | | - |
421 | | - def reset_server_and_request_check(self, address, error): |
| 422 | + def handle_getlasterror(self, address, error_msg): |
422 | 423 | """Clear our pool for a server, mark it Unknown, and check it soon.""" |
| 424 | + error = NotMasterError(error_msg, {'code': 10107, 'errmsg': error_msg}) |
423 | 425 | with self._lock: |
424 | | - self._reset_server(address, reset_pool=True, error=error) |
425 | | - self._request_check(address) |
426 | | - |
427 | | - def mark_server_unknown_and_request_check(self, address, error): |
428 | | - """Mark a server Unknown, and check it soon.""" |
429 | | - with self._lock: |
430 | | - self._reset_server(address, reset_pool=False, error=error) |
431 | | - self._request_check(address) |
| 426 | + server = self._servers.get(address) |
| 427 | + if server: |
| 428 | + self._process_change(ServerDescription(address, error=error)) |
| 429 | + server.pool.reset() |
| 430 | + server.request_check() |
432 | 431 |
|
433 | 432 | def update_pool(self, all_credentials): |
434 | 433 | # Remove any stale sockets and add new sockets if pool is too small. |
@@ -540,28 +539,78 @@ def _ensure_opened(self): |
540 | 539 | for server in itervalues(self._servers): |
541 | 540 | server.open() |
542 | 541 |
|
543 | | - def _reset_server(self, address, reset_pool, error): |
544 | | - """Mark a server Unknown and optionally reset it's pool. |
545 | | -
|
546 | | - Hold the lock when calling this. Does *not* request an immediate check. |
547 | | - """ |
| 542 | + def _is_stale_error(self, address, err_ctx): |
548 | 543 | server = self._servers.get(address) |
549 | | - |
550 | | - # "server" is None if another thread removed it from the topology. |
551 | | - if server: |
552 | | - if reset_pool: |
| 544 | + if server is None: |
| 545 | + # Another thread removed this server from the topology. |
| 546 | + return True |
| 547 | + |
| 548 | + if err_ctx.sock_generation != server._pool.generation: |
| 549 | + # This is an outdated error from a previous pool version. |
| 550 | + return True |
| 551 | + |
| 552 | + # topologyVersion check, ignore error when cur_tv >= error_tv: |
| 553 | + cur_tv = server.description.topology_version |
| 554 | + error = err_ctx.error |
| 555 | + error_tv = None |
| 556 | + if error and hasattr(error, 'details'): |
| 557 | + if isinstance(error.details, dict): |
| 558 | + error_tv = error.details.get('topologyVersion') |
| 559 | + |
| 560 | + return _is_stale_error_topology_version(cur_tv, error_tv) |
| 561 | + |
| 562 | + def _handle_error(self, address, err_ctx): |
| 563 | + if self._is_stale_error(address, err_ctx): |
| 564 | + return |
| 565 | + |
| 566 | + server = self._servers[address] |
| 567 | + error = err_ctx.error |
| 568 | + exc_type = type(error) |
| 569 | + if issubclass(exc_type, NetworkTimeout): |
| 570 | + # The socket has been closed. Don't reset the server. |
| 571 | + # Server Discovery And Monitoring Spec: "When an application |
| 572 | + # operation fails because of any network error besides a socket |
| 573 | + # timeout...." |
| 574 | + return |
| 575 | + elif issubclass(exc_type, NotMasterError): |
| 576 | + # As per the SDAM spec if: |
| 577 | + # - the server sees a "not master" error, and |
| 578 | + # - the server is not shutting down, and |
| 579 | + # - the server version is >= 4.2, then |
| 580 | + # we keep the existing connection pool, but mark the server type |
| 581 | + # as Unknown and request an immediate check of the server. |
| 582 | + # Otherwise, we clear the connection pool, mark the server as |
| 583 | + # Unknown and request an immediate check of the server. |
| 584 | + err_code = error.details.get('code', -1) |
| 585 | + is_shutting_down = err_code in helpers._SHUTDOWN_CODES |
| 586 | + # Mark server Unknown, clear the pool, and request check. |
| 587 | + self._process_change(ServerDescription(address, error=error)) |
| 588 | + if is_shutting_down or (err_ctx.max_wire_version <= 7): |
| 589 | + # Clear the pool. |
553 | 590 | server.reset() |
554 | | - |
555 | | - # Mark this server Unknown. |
| 591 | + server.request_check() |
| 592 | + elif issubclass(exc_type, ConnectionFailure): |
| 593 | + # "Client MUST replace the server's description with type Unknown |
| 594 | + # ... MUST NOT request an immediate check of the server." |
556 | 595 | self._process_change(ServerDescription(address, error=error)) |
| 596 | + # Clear the pool. |
| 597 | + server.reset() |
| 598 | + elif issubclass(exc_type, OperationFailure): |
| 599 | + # Do not request an immediate check since the server is likely |
| 600 | + # shutting down. |
| 601 | + if error.code in helpers._NOT_MASTER_CODES: |
| 602 | + self._process_change(ServerDescription(address, error=error)) |
| 603 | + # Clear the pool. |
| 604 | + server.reset() |
557 | 605 |
|
558 | | - def _request_check(self, address): |
559 | | - """Wake one monitor. Hold the lock when calling this.""" |
560 | | - server = self._servers.get(address) |
| 606 | + def handle_error(self, address, err_ctx): |
| 607 | + """Handle an application error. |
561 | 608 |
|
562 | | - # "server" is None if another thread removed it from the topology. |
563 | | - if server: |
564 | | - server.request_check() |
| 609 | + May reset the server to Unknown, clear the pool, and request an |
| 610 | + immediate check depending on the error and the context. |
| 611 | + """ |
| 612 | + with self._lock: |
| 613 | + self._handle_error(address, err_ctx) |
565 | 614 |
|
566 | 615 | def _request_check_all(self): |
567 | 616 | """Wake all monitors. Hold the lock when calling this.""" |
@@ -692,3 +741,30 @@ def __repr__(self): |
692 | 741 | if not self._opened: |
693 | 742 | msg = 'CLOSED ' |
694 | 743 | return '<%s %s%r>' % (self.__class__.__name__, msg, self._description) |
| 744 | + |
| 745 | + |
| 746 | +class _ErrorContext(object): |
| 747 | + """An error with context for SDAM error handling.""" |
| 748 | + def __init__(self, error, max_wire_version, sock_generation): |
| 749 | + self.error = error |
| 750 | + self.max_wire_version = max_wire_version |
| 751 | + self.sock_generation = sock_generation |
| 752 | + |
| 753 | + |
| 754 | +def _is_stale_error_topology_version(current_tv, error_tv): |
| 755 | + """Return True if the error's topologyVersion is <= current.""" |
| 756 | + if current_tv is None or error_tv is None: |
| 757 | + return False |
| 758 | + if current_tv['processId'] != error_tv['processId']: |
| 759 | + return False |
| 760 | + return current_tv['counter'] >= error_tv['counter'] |
| 761 | + |
| 762 | + |
| 763 | +def _is_stale_server_description(current_sd, new_sd): |
| 764 | + """Return True if the new topologyVersion is < current.""" |
| 765 | + current_tv, new_tv = current_sd.topology_version, new_sd.topology_version |
| 766 | + if current_tv is None or new_tv is None: |
| 767 | + return False |
| 768 | + if current_tv['processId'] != new_tv['processId']: |
| 769 | + return False |
| 770 | + return current_tv['counter'] > new_tv['counter'] |
0 commit comments