Skip to content

Commit 47d4fcf

Browse files
authored
migrate rds failover sample to pro samples (#234)
1 parent 524c1a3 commit 47d4fcf

File tree

3 files changed

+360
-0
lines changed

3 files changed

+360
-0
lines changed

rds-failover-test/Makefile

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
export AWS_ACCESS_KEY_ID ?= test
2+
export AWS_SECRET_ACCESS_KEY ?= test
3+
export AWS_DEFAULT_REGION = us-east-1
4+
SHELL := /bin/bash
5+
PYTHON_BIN ?= $(shell which python3 || which python)
6+
7+
usage: ## Show this help
8+
@fgrep -h "##" $(MAKEFILE_LIST) | fgrep -v fgrep | sed -e 's/\\$$//' | sed -e 's/##//'
9+
10+
install: ## Install dependencies
11+
@which localstack || pip install localstack
12+
@which awslocal || pip install awscli-local
13+
@test -e .venv || ($(PYTHON_BIN) -m venv .venv; source .venv/bin/activate; pip install wheel; pip install psycopg2-binary boto3)
14+
15+
run: ## Run the scenario with the DB queries against local RDS
16+
source .venv/bin/activate; echo `which python3`; python3 main.py
17+
18+
start:
19+
localstack start -d
20+
21+
stop:
22+
@echo
23+
localstack stop
24+
ready:
25+
@echo Waiting on the LocalStack container...
26+
@localstack wait -t 30 && echo Localstack is ready to use! || (echo Gave up waiting on LocalStack, exiting. && exit 1)
27+
28+
logs:
29+
@localstack logs > logs.txt
30+
31+
test-ci:
32+
make start install ready run; return_code=`echo $$?`;\
33+
make logs; make stop; exit $$return_code;
34+
35+
.PHONY: usage install start run stop ready logs test-ci

rds-failover-test/README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# LocalStack Demo: RDS Failover Test
2+
3+
Simple demo application illustrating running a failover test against an RDS database.
4+
5+
## Prerequisites
6+
7+
* LocalStack
8+
* Docker
9+
* Python
10+
* `make`
11+
* [`awslocal`](https://github.com/localstack/awscli-local)
12+
13+
## Installing
14+
15+
To install the dependencies:
16+
17+
```bash
18+
make install
19+
```
20+
21+
## Starting LocalStack
22+
23+
Make sure that LocalStack is started:
24+
25+
```bash
26+
LOCALSTACK_API_KEY=... DEBUG=1 localstack start
27+
```
28+
29+
## Running
30+
31+
Run the scenario Python script `main.py` as follows:
32+
33+
```bash
34+
make run
35+
```
36+
37+
You should see some logs from the script, similar to the output below:
38+
39+
```bash
40+
Creating global cluster 'global-cluster-1'
41+
Creating primary DB cluster 'rds-cluster-1'
42+
Creating secondary DB cluster 'rds-cluster-2'
43+
Running assertions, to ensure the cluster writer has been updated
44+
Start global DB cluster failover ...
45+
✅ Test done - all assertions succeeded
46+
```
47+
48+
## License
49+
50+
This code is available under the Apache 2.0 license.

rds-failover-test/main.py

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
import boto3
2+
import time
3+
4+
ENDPOINT_URL = "http://localhost:4566"
5+
6+
global_cluster_id = "global-cluster-1"
7+
primary_cluster_id = "rds-cluster-1"
8+
secondary_cluster_id = "rds-cluster-2"
9+
secondary_cluster2_id = f"rds-cluster-3"
10+
db_instance_id_1 = "rds-inst-1-1"
11+
db_instance_id_2 = "rds-inst-1-2"
12+
region_1 = "us-east-1"
13+
region_2 = "us-west-1"
14+
region_3 = "us-west-2"
15+
16+
17+
class State:
18+
primary_cluster_arn = None
19+
secondary_cluster_arn = None
20+
secondary_cluster_arn_2 = None
21+
22+
23+
def client(service, **kwargs):
24+
kwargs.setdefault("region_name", region_1)
25+
kwargs.setdefault("aws_access_key_id", "test")
26+
kwargs.setdefault("aws_secret_access_key", "test")
27+
return boto3.client(service, endpoint_url=ENDPOINT_URL, **kwargs)
28+
29+
30+
def poll_condition(condition, timeout: float = None, interval: float = 0.5) -> bool:
31+
remaining = timeout or 0
32+
while not condition():
33+
if timeout is not None:
34+
remaining -= interval
35+
if remaining <= 0:
36+
return False
37+
time.sleep(interval)
38+
return True
39+
40+
41+
def create_cluster_with_instances():
42+
"""Create a global cluster with two clusters + instances"""
43+
44+
db_type = "aurora-postgresql"
45+
engine_version = "13.7"
46+
print(f"Creating global cluster '{global_cluster_id}'")
47+
rds_client = client("rds")
48+
rds_client.create_global_cluster(
49+
GlobalClusterIdentifier=global_cluster_id,
50+
Engine=db_type,
51+
EngineVersion=engine_version,
52+
)
53+
54+
# create primary
55+
instance_class = "db.r5.large"
56+
print(f"Creating primary DB cluster '{primary_cluster_id}'")
57+
result = rds_client.create_db_cluster(
58+
DBClusterIdentifier=primary_cluster_id,
59+
Engine=db_type,
60+
DatabaseName="test",
61+
EngineVersion=engine_version,
62+
GlobalClusterIdentifier=global_cluster_id,
63+
)
64+
State.primary_cluster_arn = result["DBCluster"]["DBClusterArn"]
65+
66+
# add instance to the primary cluster
67+
rds_client.create_db_instance(
68+
DBClusterIdentifier=primary_cluster_id,
69+
DBInstanceIdentifier=db_instance_id_1,
70+
Engine=db_type,
71+
EngineVersion=engine_version,
72+
DBInstanceClass=instance_class,
73+
)
74+
# add a second instance to the primary cluster
75+
rds_client.create_db_instance(
76+
DBClusterIdentifier=primary_cluster_id,
77+
DBInstanceIdentifier=db_instance_id_2,
78+
Engine=db_type,
79+
EngineVersion=engine_version,
80+
DBInstanceClass=instance_class,
81+
)
82+
83+
# add a secondary cluster
84+
print(f"Creating secondary DB cluster '{secondary_cluster_id}'")
85+
rds_client_2 = client("rds", region_name=region_2)
86+
result = rds_client_2.create_db_cluster(
87+
DBClusterIdentifier=secondary_cluster_id,
88+
Engine=db_type,
89+
EngineVersion=engine_version,
90+
GlobalClusterIdentifier=global_cluster_id,
91+
)
92+
State.cluster_arn_secondary = result["DBCluster"]["DBClusterArn"]
93+
94+
# add instance to the secondary cluster
95+
rds_client_2.create_db_instance(
96+
DBClusterIdentifier=secondary_cluster_id,
97+
DBInstanceIdentifier=db_instance_id_1,
98+
Engine=db_type,
99+
EngineVersion=engine_version,
100+
DBInstanceClass=instance_class,
101+
)
102+
103+
# describe cluster
104+
rds_client_2.describe_db_clusters(DBClusterIdentifier=secondary_cluster_id)
105+
106+
# add another secondary cluster (headless - no instances)
107+
rds_client_3 = client("rds", region_name=region_3)
108+
result = rds_client_3.create_db_cluster(
109+
DBClusterIdentifier=secondary_cluster2_id,
110+
Engine=db_type,
111+
EngineVersion=engine_version,
112+
GlobalClusterIdentifier=global_cluster_id,
113+
)
114+
State.cluster_arn_secondary_2 = result["DBCluster"]["DBClusterArn"]
115+
116+
117+
def check_global_clusters_writer_flag():
118+
"""List the global cluster members and assert that `IsWriter` is set for the primary cluster"""
119+
120+
rds_client_3 = client("rds", region_name=region_3)
121+
result = rds_client_3.describe_global_clusters(
122+
GlobalClusterIdentifier=global_cluster_id
123+
)
124+
members = result["GlobalClusters"][0]["GlobalClusterMembers"]
125+
assert len(members) == 3
126+
members_map = {m["DBClusterArn"]: m for m in members}
127+
assert members_map.get(State.primary_cluster_arn)["IsWriter"]
128+
assert not members_map.get(State.cluster_arn_secondary)["IsWriter"]
129+
assert not members_map.get(State.cluster_arn_secondary_2)["IsWriter"]
130+
131+
assert (
132+
State.cluster_arn_secondary
133+
in members_map.get(State.primary_cluster_arn)["Readers"]
134+
)
135+
assert (
136+
State.cluster_arn_secondary_2
137+
in members_map.get(State.primary_cluster_arn)["Readers"]
138+
)
139+
140+
assert not result["GlobalClusters"][0].get("FailoverState")
141+
142+
143+
def check_db_clusters_writer_flag():
144+
"""
145+
Run the describe-db-clusters for primary + secondary clusters and show that only the first instance in
146+
the primary cluster has the flag `IsClusterWriter` set to True, all other instances should return False
147+
"""
148+
149+
print("Running assertions, to ensure the cluster writer has been updated")
150+
151+
# primary:
152+
rds_client = client("rds")
153+
describe = rds_client.describe_db_clusters(DBClusterIdentifier=primary_cluster_id)
154+
members_map = {
155+
p["DBInstanceIdentifier"]: p
156+
for p in describe["DBClusters"][0]["DBClusterMembers"]
157+
}
158+
# assert that first instance in primary cluster is the writer
159+
assert len(members_map) == 2
160+
assert members_map.get(db_instance_id_1)["IsClusterWriter"]
161+
assert not members_map.get(db_instance_id_2)["IsClusterWriter"]
162+
163+
# secondary #1:
164+
rds_client_2 = client("rds", region_name=region_2)
165+
describe = rds_client_2.describe_db_clusters(
166+
DBClusterIdentifier=secondary_cluster_id
167+
)
168+
members_map = {
169+
p["DBInstanceIdentifier"]: p
170+
for p in describe["DBClusters"][0]["DBClusterMembers"]
171+
}
172+
173+
assert len(members_map) == 1
174+
assert not members_map.get(db_instance_id_1)["IsClusterWriter"]
175+
176+
177+
def run_global_cluster_failover():
178+
"""Run the failover via failover-global-cluster to switch primary with one secondary cluster"""
179+
180+
print("Start global DB cluster failover ...")
181+
rds_client = client("rds")
182+
rds_client.failover_global_cluster(
183+
GlobalClusterIdentifier=global_cluster_id,
184+
TargetDbClusterIdentifier=State.cluster_arn_secondary,
185+
)
186+
187+
def check_failover_started():
188+
res = rds_client.describe_global_clusters(
189+
GlobalClusterIdentifier=global_cluster_id
190+
)
191+
status = res["GlobalClusters"][0].get("FailoverState", {}).get("Status")
192+
return status in ("failing-over", "switching-over")
193+
194+
# assert that status is `failing-over`
195+
assert poll_condition(check_failover_started, timeout=40, interval=1)
196+
197+
def check_failover_finished():
198+
res = rds_client.describe_global_clusters(
199+
GlobalClusterIdentifier=global_cluster_id
200+
)
201+
return not res["GlobalClusters"][0].get("FailoverState")
202+
203+
# wait for failover to complete
204+
assert poll_condition(check_failover_finished, timeout=40, interval=1)
205+
206+
207+
def assert_global_cluster_writer_switched():
208+
"""Assert that the describe-global-clusters + describe-db-clusters correctly show that the writer switched"""
209+
210+
# assert that instances in primary cluster are no longer the writer
211+
rds_client = client("rds")
212+
describe = rds_client.describe_db_clusters(DBClusterIdentifier=primary_cluster_id)
213+
members_map = {
214+
p["DBInstanceIdentifier"]: p
215+
for p in describe["DBClusters"][0]["DBClusterMembers"]
216+
}
217+
assert len(members_map) == 2
218+
assert not members_map.get(db_instance_id_1)["IsClusterWriter"]
219+
assert not members_map.get(db_instance_id_2)["IsClusterWriter"]
220+
221+
# assert that first instance in secondary cluster is now the writer
222+
rds_client_2 = client("rds", region_name=region_2)
223+
describe = rds_client_2.describe_db_clusters(
224+
DBClusterIdentifier=secondary_cluster_id
225+
)
226+
members_map = {
227+
p["DBInstanceIdentifier"]: p
228+
for p in describe["DBClusters"][0]["DBClusterMembers"]
229+
}
230+
assert len(members_map) == 1
231+
assert members_map.get(db_instance_id_1)["IsClusterWriter"]
232+
233+
# remove secondary #2 cluster (headless) from global cluster:
234+
rds_client_3 = client("rds", region_name=region_3)
235+
describe = rds_client_3.describe_db_clusters(
236+
DBClusterIdentifier=secondary_cluster2_id
237+
)
238+
assert not describe["DBClusters"][0]["DBClusterMembers"]
239+
rds_client.remove_from_global_cluster(
240+
GlobalClusterIdentifier=global_cluster_id,
241+
DbClusterIdentifier=State.cluster_arn_secondary_2,
242+
)
243+
244+
def check_removed_global_cluster():
245+
res = rds_client.describe_global_clusters(
246+
GlobalClusterIdentifier=global_cluster_id
247+
)
248+
return len(res["GlobalClusters"][0].get("GlobalClusterMembers")) == 2
249+
250+
# assert that we now have 2 global cluster members (instead of 3 previously)
251+
assert poll_condition(check_removed_global_cluster, timeout=30, interval=0.5)
252+
253+
254+
def main():
255+
# (1) create a global cluster with two clusters + instances
256+
create_cluster_with_instances()
257+
258+
# (2) lists the members via describe-global-clusters and assert that `IsWriter` is set for the primary cluster
259+
check_global_clusters_writer_flag()
260+
261+
# (3) run the describe-db-clusters for primary + secondary clusters and show that only the first instance in
262+
# the primary cluster has the flag `IsClusterWriter` set to True, all other instances should return False
263+
check_db_clusters_writer_flag()
264+
265+
# (4) run the failover via failover-global-cluster to switch primary with one secondary cluster
266+
run_global_cluster_failover()
267+
268+
# (5) assert that the describe-global-clusters + describe-db-clusters correctly show that the writer switched
269+
assert_global_cluster_writer_switched()
270+
271+
print("✅ Test done - all assertions succeeded")
272+
273+
274+
if __name__ == "__main__":
275+
main()

0 commit comments

Comments
 (0)