Skip to content

Commit c805000

Browse files
authored
Revert "remove suggested clusterId functionality (#1476)" (#1493)
1 parent adab1e1 commit c805000

File tree

7 files changed

+525
-21
lines changed

7 files changed

+525
-21
lines changed

docs/using-the-jdbc-driver/using-plugins/UsingTheFailover2Plugin.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ In addition to the parameters that you can configure for the underlying driver,
6262
| `failoverTimeoutMs` | Integer | No | Maximum allowed time in milliseconds to attempt reconnecting to a new writer or reader instance after a cluster failover is initiated. | `300000` |
6363
| `clusterTopologyHighRefreshRateMs` | Integer | No | Interval of time in milliseconds to wait between attempts to update cluster topology after the writer has come back online following a failover event. It corresponds to the increased monitoring rate described earlier. Usually, the topology monitoring component uses this increased monitoring rate for 30s after a new writer was detected. | `100` |
6464
| `failoverReaderHostSelectorStrategy` | String | No | Strategy used to select a reader node during failover. For more information on the available reader selection strategies, see this [table](../ReaderSelectionStrategies.md). | `random` |
65-
| `clusterId` | String | If using multiple database clusters, yes; otherwise, no | A unique identifier for the cluster. Connections with the same cluster id share a cluster topology cache. This parameter is optional and defaults to `1`. When supporting multiple database clusters, this parameter becomes mandatory. Each connection string must include the `clusterId` parameter with a value that can be any number or string. However, all connection strings associated with the same database cluster must use identical `clusterId` values, while connection strings belonging to different database clusters must specify distinct values. Examples of value: `1`, `2`, `1234`, `abc-1`, `abc-2`. | `1` |
65+
| `clusterId` | String | No | A unique identifier for the cluster. Connections with the same cluster id share a cluster topology cache. | None |
6666
| `telemetryFailoverAdditionalTopTrace` | Boolean | No | Allows the driver to produce an additional telemetry span associated with failover. Such span helps to facilitate telemetry analysis in AWS CloudWatch. | `false` |
6767
| `skipFailoverOnInterruptedThread` | Boolean | No | Enable to skip failover if the current thread is interrupted. This may leave the Connection in an invalid state so the Connection should be disposed. | `false` |
6868

wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsHostListProvider.java

Lines changed: 144 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ public class RdsHostListProvider implements DynamicHostListProvider {
7272
+ "after which it will be updated during the next interaction with the connection.");
7373

7474
public static final AwsWrapperProperty CLUSTER_ID = new AwsWrapperProperty(
75-
"clusterId", "1",
75+
"clusterId", "",
7676
"A unique identifier for the cluster. "
7777
+ "Connections with the same cluster id share a cluster topology cache. "
78-
+ "If unspecified, a cluster id is '1'.");
78+
+ "If unspecified, a cluster id is automatically created for AWS RDS clusters.");
7979

8080
public static final AwsWrapperProperty CLUSTER_INSTANCE_HOST_PATTERN =
8181
new AwsWrapperProperty(
@@ -90,8 +90,11 @@ public class RdsHostListProvider implements DynamicHostListProvider {
9090
protected static final RdsUtils rdsHelper = new RdsUtils();
9191
protected static final ConnectionUrlParser connectionUrlParser = new ConnectionUrlParser();
9292
protected static final int defaultTopologyQueryTimeoutMs = 5000;
93-
protected final FullServicesContainer servicesContainer;
93+
protected static final long suggestedClusterIdRefreshRateNano = TimeUnit.MINUTES.toNanos(10);
94+
protected static final CacheMap<String, String> suggestedPrimaryClusterIdCache = new CacheMap<>();
95+
protected static final CacheMap<String, Boolean> primaryClusterIdCache = new CacheMap<>();
9496

97+
protected final FullServicesContainer servicesContainer;
9598
protected final HostListProviderService hostListProviderService;
9699
protected final String originalUrl;
97100
protected final String topologyQuery;
@@ -109,6 +112,10 @@ public class RdsHostListProvider implements DynamicHostListProvider {
109112
protected String clusterId;
110113
protected HostSpec clusterInstanceTemplate;
111114

115+
// A primary clusterId is a clusterId that is based off of a cluster endpoint URL
116+
// (rather than a GUID or a value provided by the user).
117+
protected boolean isPrimaryClusterId;
118+
112119
protected volatile boolean isInitialized = false;
113120

114121
protected Properties properties;
@@ -155,7 +162,8 @@ protected void init() throws SQLException {
155162
this.initialHostSpec = this.initialHostList.get(0);
156163
this.hostListProviderService.setInitialConnectionHostSpec(this.initialHostSpec);
157164

158-
this.clusterId = CLUSTER_ID.getString(this.properties);
165+
this.clusterId = UUID.randomUUID().toString();
166+
this.isPrimaryClusterId = false;
159167
this.refreshRateNano =
160168
TimeUnit.MILLISECONDS.toNanos(CLUSTER_TOPOLOGY_REFRESH_RATE_MS.getInteger(properties));
161169

@@ -176,8 +184,34 @@ protected void init() throws SQLException {
176184
validateHostPatternSetting(this.clusterInstanceTemplate.getHost());
177185

178186
this.rdsUrlType = rdsHelper.identifyRdsType(this.initialHostSpec.getHost());
179-
this.isInitialized = true;
180187

188+
final String clusterIdSetting = CLUSTER_ID.getString(this.properties);
189+
if (!StringUtils.isNullOrEmpty(clusterIdSetting)) {
190+
this.clusterId = clusterIdSetting;
191+
} else if (rdsUrlType == RdsUrlType.RDS_PROXY) {
192+
// Each proxy is associated with a single cluster, so it's safe to use RDS Proxy Url as cluster
193+
// identification
194+
this.clusterId = this.initialHostSpec.getUrl();
195+
} else if (rdsUrlType.isRds()) {
196+
final ClusterSuggestedResult clusterSuggestedResult =
197+
getSuggestedClusterId(this.initialHostSpec.getHostAndPort());
198+
if (clusterSuggestedResult != null && !StringUtils.isNullOrEmpty(clusterSuggestedResult.clusterId)) {
199+
this.clusterId = clusterSuggestedResult.clusterId;
200+
this.isPrimaryClusterId = clusterSuggestedResult.isPrimaryClusterId;
201+
} else {
202+
final String clusterRdsHostUrl =
203+
rdsHelper.getRdsClusterHostUrl(this.initialHostSpec.getHost());
204+
if (!StringUtils.isNullOrEmpty(clusterRdsHostUrl)) {
205+
this.clusterId = this.clusterInstanceTemplate.isPortSpecified()
206+
? String.format("%s:%s", clusterRdsHostUrl, this.clusterInstanceTemplate.getPort())
207+
: clusterRdsHostUrl;
208+
this.isPrimaryClusterId = true;
209+
primaryClusterIdCache.put(this.clusterId, true, suggestedClusterIdRefreshRateNano);
210+
}
211+
}
212+
}
213+
214+
this.isInitialized = true;
181215
} finally {
182216
lock.unlock();
183217
}
@@ -198,8 +232,25 @@ protected void init() throws SQLException {
198232
protected FetchTopologyResult getTopology(final Connection conn, final boolean forceUpdate) throws SQLException {
199233
init();
200234

235+
final String suggestedPrimaryClusterId = suggestedPrimaryClusterIdCache.get(this.clusterId);
236+
237+
// Change clusterId by accepting a suggested one
238+
if (!StringUtils.isNullOrEmpty(suggestedPrimaryClusterId)
239+
&& !this.clusterId.equals(suggestedPrimaryClusterId)) {
240+
241+
final String oldClusterId = this.clusterId;
242+
this.clusterId = suggestedPrimaryClusterId;
243+
this.isPrimaryClusterId = true;
244+
this.clusterIdChanged(oldClusterId);
245+
}
246+
201247
final List<HostSpec> storedHosts = this.getStoredTopology();
202248

249+
// This clusterId is a primary one and is about to create a new entry in the cache.
250+
// When a primary entry is created it needs to be suggested for other (non-primary) entries.
251+
// Remember a flag to do suggestion after cache is updated.
252+
final boolean needToSuggest = storedHosts == null && this.isPrimaryClusterId;
253+
203254
if (storedHosts == null || forceUpdate) {
204255

205256
// need to re-fetch topology
@@ -215,6 +266,9 @@ protected FetchTopologyResult getTopology(final Connection conn, final boolean f
215266

216267
if (!Utils.isNullOrEmpty(hosts)) {
217268
this.servicesContainer.getStorageService().set(this.clusterId, new Topology(hosts));
269+
if (needToSuggest) {
270+
this.suggestPrimaryCluster(hosts);
271+
}
218272
return new FetchTopologyResult(false, hosts);
219273
}
220274
}
@@ -227,6 +281,78 @@ protected FetchTopologyResult getTopology(final Connection conn, final boolean f
227281
}
228282
}
229283

284+
protected void clusterIdChanged(final String oldClusterId) throws SQLException {
285+
// do nothing
286+
}
287+
288+
protected ClusterSuggestedResult getSuggestedClusterId(final String url) {
289+
Map<String, Topology> entries = this.servicesContainer.getStorageService().getEntries(Topology.class);
290+
if (entries == null) {
291+
return null;
292+
}
293+
294+
for (final Entry<String, Topology> entry : entries.entrySet()) {
295+
final String key = entry.getKey(); // clusterId
296+
final List<HostSpec> hosts = entry.getValue().getHosts();
297+
final boolean isPrimaryCluster = primaryClusterIdCache.get(key, false,
298+
suggestedClusterIdRefreshRateNano);
299+
if (key.equals(url)) {
300+
return new ClusterSuggestedResult(url, isPrimaryCluster);
301+
}
302+
if (hosts == null) {
303+
continue;
304+
}
305+
for (final HostSpec host : hosts) {
306+
if (host.getHostAndPort().equals(url)) {
307+
LOGGER.finest(() -> Messages.get("RdsHostListProvider.suggestedClusterId",
308+
new Object[] {key, url}));
309+
return new ClusterSuggestedResult(key, isPrimaryCluster);
310+
}
311+
}
312+
}
313+
return null;
314+
}
315+
316+
protected void suggestPrimaryCluster(final @NonNull List<HostSpec> primaryClusterHosts) {
317+
if (Utils.isNullOrEmpty(primaryClusterHosts)) {
318+
return;
319+
}
320+
321+
final Set<String> primaryClusterHostUrls = new HashSet<>();
322+
for (final HostSpec hostSpec : primaryClusterHosts) {
323+
primaryClusterHostUrls.add(hostSpec.getUrl());
324+
}
325+
326+
Map<String, Topology> entries = this.servicesContainer.getStorageService().getEntries(Topology.class);
327+
if (entries == null) {
328+
return;
329+
}
330+
331+
for (final Entry<String, Topology> entry : entries.entrySet()) {
332+
final String clusterId = entry.getKey();
333+
final List<HostSpec> clusterHosts = entry.getValue().getHosts();
334+
final boolean isPrimaryCluster = primaryClusterIdCache.get(clusterId, false,
335+
suggestedClusterIdRefreshRateNano);
336+
final String suggestedPrimaryClusterId = suggestedPrimaryClusterIdCache.get(clusterId);
337+
if (isPrimaryCluster
338+
|| !StringUtils.isNullOrEmpty(suggestedPrimaryClusterId)
339+
|| Utils.isNullOrEmpty(clusterHosts)) {
340+
continue;
341+
}
342+
343+
// The entry is non-primary
344+
for (final HostSpec host : clusterHosts) {
345+
if (primaryClusterHostUrls.contains(host.getUrl())) {
346+
// Instance on this cluster matches with one of the instance on primary cluster
347+
// Suggest the primary clusterId to this entry
348+
suggestedPrimaryClusterIdCache.put(clusterId, this.clusterId,
349+
suggestedClusterIdRefreshRateNano);
350+
break;
351+
}
352+
}
353+
}
354+
}
355+
230356
/**
231357
* Obtain a cluster topology from database.
232358
*
@@ -388,8 +514,8 @@ protected String getHostEndpoint(final String nodeName) {
388514
* Clear topology cache for all clusters.
389515
*/
390516
public static void clearAll() {
391-
// nothing to clear
392-
// TODO: consider to remove
517+
primaryClusterIdCache.clear();
518+
suggestedPrimaryClusterIdCache.clear();
393519
}
394520

395521
/**
@@ -548,4 +674,15 @@ public String getClusterId() throws UnsupportedOperationException, SQLException
548674
init();
549675
return this.clusterId;
550676
}
677+
678+
public static class ClusterSuggestedResult {
679+
680+
public String clusterId;
681+
public boolean isPrimaryClusterId;
682+
683+
public ClusterSuggestedResult(final String clusterId, final boolean isPrimaryClusterId) {
684+
this.clusterId = clusterId;
685+
this.isPrimaryClusterId = isPrimaryClusterId;
686+
}
687+
}
551688
}

wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public interface ClusterTopologyMonitor extends Monitor {
2828

2929
boolean canDispose();
3030

31+
void setClusterId(final String clusterId);
32+
3133
List<HostSpec> forceRefresh(final boolean writerImportant, final long timeoutMs)
3234
throws SQLException, TimeoutException;
3335

wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/monitoring/ClusterTopologyMonitorImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,11 @@ public boolean canDispose() {
163163
return true;
164164
}
165165

166+
@Override
167+
public void setClusterId(String clusterId) {
168+
this.clusterId = clusterId;
169+
}
170+
166171
@Override
167172
public List<HostSpec> forceRefresh(final boolean shouldVerifyWriter, final long timeoutMs)
168173
throws SQLException, TimeoutException {

0 commit comments

Comments
 (0)