Skip to content

Commit b17206b

Browse files
committed
feat: srw
1 parent 2df8d33 commit b17206b

File tree

8 files changed

+2339
-639
lines changed

8 files changed

+2339
-639
lines changed

aws_advanced_python_wrapper/plugin_service.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@
8282
from aws_advanced_python_wrapper.plugin import CanReleaseResources
8383
from aws_advanced_python_wrapper.read_write_splitting_plugin import \
8484
ReadWriteSplittingPluginFactory
85+
from aws_advanced_python_wrapper.simple_read_write_splitting_plugin import \
86+
SimpleReadWriteSplittingPluginFactory
8587
from aws_advanced_python_wrapper.stale_dns_plugin import StaleDnsPluginFactory
8688
from aws_advanced_python_wrapper.utils.cache_map import CacheMap
8789
from aws_advanced_python_wrapper.utils.decorators import \
@@ -760,6 +762,7 @@ class PluginManager(CanReleaseResources):
760762
"host_monitoring_v2": HostMonitoringV2PluginFactory,
761763
"failover": FailoverPluginFactory,
762764
"read_write_splitting": ReadWriteSplittingPluginFactory,
765+
"srw": SimpleReadWriteSplittingPluginFactory,
763766
"fastest_response_strategy": FastestResponseStrategyPluginFactory,
764767
"stale_dns": StaleDnsPluginFactory,
765768
"custom_endpoint": CustomEndpointPluginFactory,
@@ -784,6 +787,7 @@ class PluginManager(CanReleaseResources):
784787
AuroraConnectionTrackerPluginFactory: 100,
785788
StaleDnsPluginFactory: 200,
786789
ReadWriteSplittingPluginFactory: 300,
790+
SimpleReadWriteSplittingPluginFactory: 310,
787791
FailoverPluginFactory: 400,
788792
HostMonitoringPluginFactory: 500,
789793
HostMonitoringV2PluginFactory: 510,

aws_advanced_python_wrapper/read_write_splitting_plugin.py

Lines changed: 490 additions & 160 deletions
Large diffs are not rendered by default.

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ OpenTelemetryFactory.WrongParameterType="[OpenTelemetryFactory] Wrong parameter
286286

287287
Plugin.UnsupportedMethod=[Plugin] '{}' is not supported by this plugin.
288288

289-
PluginManager.ConfigurationProfileNotFound=PluginManager] Configuration profile '{}' not found.
289+
PluginManager.ConfigurationProfileNotFound=[PluginManager] Configuration profile '{}' not found.
290290
PluginManager.InvalidPlugin=[PluginManager] Invalid plugin requested: '{}'.
291291
PluginManager.MethodInvokedAgainstOldConnection = [PluginManager] The internal connection has changed since '{}' was created. This is likely due to failover or read-write splitting functionality. To ensure you are using the updated connection, please re-create Cursor objects after failover and/or setting readonly.
292292
PluginManager.PipelineNone=[PluginManager] A pipeline was requested but the created pipeline evaluated to None.
@@ -357,8 +357,9 @@ ReadWriteSplittingPlugin.ErrorVerifyingInitialHostSpecRole=[ReadWriteSplittingPl
357357
ReadWriteSplittingPlugin.ExceptionWhileExecutingCommand=[ReadWriteSplittingPlugin] Detected an exception while executing a command: '{}'
358358
ReadWriteSplittingPlugin.ExecutingAgainstOldConnection=[ReadWriteSplittingPlugin] Executing method against old connection: '{}'
359359
ReadWriteSplittingPlugin.FailedToConnectToReader=[ReadWriteSplittingPlugin] Failed to connect to reader host: '{}'
360+
ReadWriteSplittingPlugin.FailedToConnectToWriter=[ReadWriteSplittingPlugin] Failed to connect to writer host: '{}'
360361
ReadWriteSplittingPlugin.FailoverExceptionWhileExecutingCommand=[ReadWriteSplittingPlugin] Detected a failover exception while executing a command: '{}'
361-
ReadWriteSplittingPlugin.FallbackToWriter=[ReadWriteSplittingPlugin] Failed to switch to a reader; the current writer will be used as a fallback: '{}'
362+
ReadWriteSplittingPlugin.FallbackToCurrentConnection=[ReadWriteSplittingPlugin] Failed to switch to a reader; the current connection will be used as a fallback: '{}'
362363
ReadWriteSplittingPlugin.NoReadersAvailable=[ReadWriteSplittingPlugin] The plugin was unable to establish a reader connection to any reader instance.
363364
ReadWriteSplittingPlugin.NoReadersFound=[ReadWriteSplittingPlugin] A reader instance was requested via set_read_only, but there are no readers in the host list. The current writer will be used as a fallback: '{}'
364365
ReadWriteSplittingPlugin.NoWriterFound=[ReadWriteSplittingPlugin] No writer was found in the current host list. This may occur if the writer is not in the list of allowed hosts.
@@ -382,6 +383,9 @@ RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs= [RoundRobinHostSelector
382383
WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs= [WeightedRandomHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1. Weight pair: '{}'
383384
WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight=[WeightedRandomHostSelector] The provided default weight value is not valid. Weight values must be an integer greater than or equal to 1.
384385

386+
SimpleReadWriteSplittingPlugin.MissingRequiredConfigParameter=[SimpleReadWriteSplittingPlugin] Configuration parameter {} is required.
387+
SimpleReadWriteSplittingPlugin.IncorrectConfiguration=[SimpleReadWriteSplittingPlugin] Unable to verify connections with this current configuration. Ensure a correct value is provided to the configuration parameter {}.
388+
385389
SqlAlchemyPooledConnectionProvider.PoolNone=[SqlAlchemyPooledConnectionProvider] Attempted to find or create a pool for '{}' but the result of the attempt evaluated to None.
386390
SqlAlchemyPooledConnectionProvider.UnableToCreateDefaultKey=[SqlAlchemyPooledConnectionProvider] Unable to create a default key for internal connection pools. By default, the user parameter is used, but the given user evaluated to None or the empty string (""). Please ensure you have passed a valid user in the connection properties.
387391

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from time import perf_counter_ns, sleep
18+
from typing import TYPE_CHECKING, Callable, Optional
19+
20+
from aws_advanced_python_wrapper.host_availability import HostAvailability
21+
from aws_advanced_python_wrapper.read_write_splitting_plugin import (
22+
ConnectionHandler, ReadWriteSplittingConnectionManager)
23+
from aws_advanced_python_wrapper.utils.rds_url_type import RdsUrlType
24+
from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils
25+
26+
if TYPE_CHECKING:
27+
from aws_advanced_python_wrapper.driver_dialect import DriverDialect
28+
from aws_advanced_python_wrapper.host_list_provider import HostListProviderService
29+
from aws_advanced_python_wrapper.pep249 import Connection
30+
from aws_advanced_python_wrapper.plugin_service import PluginService
31+
from aws_advanced_python_wrapper.utils.properties import Properties
32+
33+
from aws_advanced_python_wrapper.errors import AwsWrapperError
34+
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
35+
from aws_advanced_python_wrapper.plugin import PluginFactory
36+
from aws_advanced_python_wrapper.utils.messages import Messages
37+
from aws_advanced_python_wrapper.utils.properties import WrapperProperties
38+
39+
40+
class EndpointBasedConnectionHandler(ConnectionHandler):
41+
"""Endpoint based implementation of connection handling logic."""
42+
43+
def __init__(self, plugin_service: PluginService, props: Properties):
44+
srw_read_endpoint = WrapperProperties.SRW_READ_ENDPOINT.get(props)
45+
if srw_read_endpoint is None:
46+
raise AwsWrapperError(
47+
Messages.get_formatted(
48+
"SimpleReadWriteSplittingPlugin.MissingRequiredConfigParameter",
49+
WrapperProperties.SRW_READ_ENDPOINT.name,
50+
)
51+
)
52+
self._read_endpoint: str = srw_read_endpoint
53+
54+
srw_write_endpoint = WrapperProperties.SRW_WRITE_ENDPOINT.get(props)
55+
if srw_write_endpoint is None:
56+
raise AwsWrapperError(
57+
Messages.get_formatted(
58+
"SimpleReadWriteSplittingPlugin.MissingRequiredConfigParameter",
59+
WrapperProperties.SRW_WRITE_ENDPOINT.name,
60+
)
61+
)
62+
self._write_endpoint: str = srw_write_endpoint
63+
64+
self._verify_new_connections: bool = (
65+
WrapperProperties.SRW_VERIFY_NEW_CONNECTIONS.get_bool(props)
66+
)
67+
if self._verify_new_connections is True:
68+
srw_connect_retry_timeout_ms: int = (
69+
WrapperProperties.SRW_CONNECT_RETRY_TIMEOUT_MS.get_int(props)
70+
)
71+
if srw_connect_retry_timeout_ms <= 0:
72+
raise ValueError(
73+
Messages.get_formatted(
74+
"SimpleReadWriteSplittingPlugin.IncorrectConfiguration",
75+
WrapperProperties.SRW_CONNECT_RETRY_TIMEOUT_MS.name,
76+
)
77+
)
78+
self._connect_retry_timeout_ms: int = srw_connect_retry_timeout_ms
79+
80+
srw_connect_retry_interval_ms: int = (
81+
WrapperProperties.SRW_CONNECT_RETRY_INTERVAL_MS.get_int(props)
82+
)
83+
if srw_connect_retry_interval_ms <= 0:
84+
raise ValueError(
85+
Messages.get_formatted(
86+
"SimpleReadWriteSplittingPlugin.IncorrectConfiguration",
87+
WrapperProperties.SRW_CONNECT_RETRY_INTERVAL_MS.name,
88+
)
89+
)
90+
self._connect_retry_interval_ms: int = srw_connect_retry_interval_ms
91+
92+
self._verify_opened_connection_type: Optional[HostRole] = (
93+
EndpointBasedConnectionHandler._parse_connection_type(
94+
WrapperProperties.SRW_VERIFY_INITIAL_CONNECTION_TYPE.get(props)
95+
)
96+
)
97+
98+
self._plugin_service: PluginService = plugin_service
99+
self._properties: Properties = props
100+
self._rds_utils: RdsUtils = RdsUtils()
101+
self._host_list_provider_service: Optional[HostListProviderService] = None
102+
self._write_endpoint_host_info: HostInfo = self._create_host_info(
103+
self._write_endpoint, HostRole.WRITER
104+
)
105+
self._read_endpoint_host_info: HostInfo = self._create_host_info(
106+
self._read_endpoint, HostRole.READER
107+
)
108+
109+
@property
110+
def host_list_provider_service(self) -> Optional[HostListProviderService]:
111+
return self._host_list_provider_service
112+
113+
@host_list_provider_service.setter
114+
def host_list_provider_service(self, new_value: HostListProviderService) -> None:
115+
self._host_list_provider_service = new_value
116+
117+
def open_new_writer_connection(
118+
self,
119+
) -> tuple[Optional[Connection], Optional[HostInfo]]:
120+
conn: Optional[Connection] = None
121+
if self._verify_new_connections:
122+
conn = self._get_verified_connection(
123+
self._properties, self._write_endpoint_host_info, HostRole.WRITER
124+
)
125+
else:
126+
conn = self._plugin_service.connect(
127+
self._write_endpoint_host_info, self._properties, None
128+
)
129+
130+
return conn, self._write_endpoint_host_info
131+
132+
def open_new_reader_connection(
133+
self,
134+
) -> tuple[Optional[Connection], Optional[HostInfo]]:
135+
conn: Optional[Connection] = None
136+
if self._verify_new_connections:
137+
conn = self._get_verified_connection(
138+
self._properties, self._read_endpoint_host_info, HostRole.READER
139+
)
140+
else:
141+
conn = self._plugin_service.connect(
142+
self._read_endpoint_host_info, self._properties, None
143+
)
144+
145+
return conn, self._read_endpoint_host_info
146+
147+
def get_verified_initial_connection(
148+
self,
149+
host_info: HostInfo,
150+
props: Properties,
151+
is_initial_connection: bool,
152+
connect_func: Callable,
153+
) -> Connection:
154+
if not is_initial_connection or not self._verify_new_connections:
155+
return connect_func()
156+
157+
url_type: RdsUrlType = self._rds_utils.identify_rds_type(host_info.host)
158+
159+
conn: Optional[Connection] = None
160+
161+
if (
162+
url_type == RdsUrlType.RDS_WRITER_CLUSTER
163+
or self._verify_opened_connection_type == HostRole.WRITER
164+
):
165+
conn = self._get_verified_connection(
166+
props, host_info, HostRole.WRITER, connect_func
167+
)
168+
elif (
169+
url_type == RdsUrlType.RDS_READER_CLUSTER
170+
or self._verify_opened_connection_type == HostRole.READER
171+
):
172+
conn = self._get_verified_connection(
173+
props, host_info, HostRole.READER, connect_func
174+
)
175+
176+
if conn is None:
177+
conn = connect_func()
178+
179+
self._set_initial_connection_host_info(conn, host_info)
180+
return conn
181+
182+
def _set_initial_connection_host_info(self, conn: Connection, host_info: HostInfo):
183+
if self._host_list_provider_service is None:
184+
return
185+
186+
self._host_list_provider_service.initial_connection_host_info = host_info
187+
188+
def _get_verified_connection(
189+
self,
190+
props: Properties,
191+
host_info: HostInfo,
192+
role: HostRole,
193+
connect_func: Optional[Callable] = None,
194+
) -> Optional[Connection]:
195+
end_time_nano = perf_counter_ns() + (self._connect_retry_timeout_ms * 1000000)
196+
197+
candidate_conn: Optional[Connection]
198+
199+
while perf_counter_ns() < end_time_nano:
200+
candidate_conn = None
201+
202+
try:
203+
if connect_func is not None:
204+
candidate_conn = connect_func()
205+
elif host_info is not None:
206+
candidate_conn = self._plugin_service.connect(
207+
host_info, props, None
208+
)
209+
else:
210+
return None
211+
212+
if candidate_conn is None:
213+
self._delay()
214+
continue
215+
216+
actual_role = self._plugin_service.get_host_role(candidate_conn)
217+
218+
if actual_role != role:
219+
ReadWriteSplittingConnectionManager.close_connection(candidate_conn)
220+
self._delay()
221+
continue
222+
223+
return candidate_conn
224+
225+
except Exception:
226+
ReadWriteSplittingConnectionManager.close_connection(candidate_conn)
227+
self._delay()
228+
229+
return None
230+
231+
def old_reader_can_be_used(self, reader_host_info: HostInfo) -> bool:
232+
# Assume that the old reader can always be used, no topology-based information to check.
233+
return True
234+
235+
def need_connect_to_writer(self) -> bool:
236+
# SetReadOnly(true) will always connect to the read_endpoint, and not the writer.
237+
return False
238+
239+
def refresh_and_store_host_list(
240+
self, current_conn: Optional[Connection], driver_dialect: DriverDialect
241+
):
242+
# Endpoint based connections do not require a host list.
243+
return
244+
245+
def should_update_writer_with_current_conn(
246+
self, current_conn: Connection, current_host: HostInfo, writer_conn: Connection
247+
) -> bool:
248+
return (
249+
self.is_writer_host(current_host)
250+
and current_conn != writer_conn
251+
and (
252+
not self._verify_new_connections
253+
or self._plugin_service.get_host_role(current_conn) == HostRole.WRITER
254+
)
255+
)
256+
257+
def should_update_reader_with_current_conn(
258+
self, current_conn: Connection, current_host: HostInfo, reader_conn: Connection
259+
) -> bool:
260+
return (
261+
self.is_reader_host(current_host)
262+
and current_conn != reader_conn
263+
and (
264+
not self._verify_new_connections
265+
or self._plugin_service.get_host_role(current_conn) == HostRole.READER
266+
)
267+
)
268+
269+
def is_writer_host(self, current_host: HostInfo) -> bool:
270+
return (
271+
current_host.host.casefold() == self._write_endpoint.casefold()
272+
or current_host.url.casefold() == self._write_endpoint.casefold()
273+
)
274+
275+
def is_reader_host(self, current_host: HostInfo) -> bool:
276+
return (
277+
current_host.host.casefold() == self._read_endpoint.casefold()
278+
or current_host.url.casefold() == self._read_endpoint.casefold()
279+
)
280+
281+
def _create_host_info(self, endpoint, role: HostRole) -> HostInfo:
282+
endpoint = endpoint.strip()
283+
host = endpoint
284+
port = self._plugin_service.database_dialect.default_port
285+
colon_index = endpoint.rfind(":")
286+
287+
if colon_index != -1:
288+
port_str = endpoint[colon_index + 1:]
289+
if port_str.isdigit():
290+
host = endpoint[:colon_index]
291+
port = int(port_str)
292+
else:
293+
if (
294+
self._host_list_provider_service is not None
295+
and self._host_list_provider_service.initial_connection_host_info
296+
is not None
297+
and self._host_list_provider_service.initial_connection_host_info.port
298+
!= HostInfo.NO_PORT
299+
):
300+
port = (
301+
self._host_list_provider_service.initial_connection_host_info.port
302+
)
303+
304+
return HostInfo(
305+
host=host, port=port, role=role, availability=HostAvailability.AVAILABLE
306+
)
307+
308+
def _delay(self):
309+
sleep(self._connect_retry_interval_ms / 1000)
310+
311+
@staticmethod
312+
def _parse_connection_type(phase_str: Optional[str]) -> HostRole:
313+
if not phase_str:
314+
return HostRole.UNKNOWN
315+
316+
phase_upper = phase_str.lower()
317+
if phase_upper == "reader":
318+
return HostRole.READER
319+
elif phase_upper == "writer":
320+
return HostRole.WRITER
321+
else:
322+
raise ValueError(
323+
Messages.get_formatted(
324+
"SimpleReadWriteSplittingPlugin.IncorrectConfiguration",
325+
WrapperProperties.SRW_VERIFY_INITIAL_CONNECTION_TYPE.name,
326+
)
327+
)
328+
329+
330+
class SimpleReadWriteSplittingPlugin(ReadWriteSplittingConnectionManager):
331+
def __init__(self, plugin_service, props: Properties):
332+
# The simple read/write splitting plugin handles connections based on configuration parameter endpoints.
333+
connection_handler = EndpointBasedConnectionHandler(
334+
plugin_service,
335+
props,
336+
)
337+
338+
super().__init__(plugin_service, props, connection_handler)
339+
340+
341+
class SimpleReadWriteSplittingPluginFactory(PluginFactory):
342+
def get_instance(self, plugin_service, props: Properties):
343+
return SimpleReadWriteSplittingPlugin(plugin_service, props)

0 commit comments

Comments
 (0)