Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,32 @@
sys.path.append(rootpath)

from mysql.replicant.commands import (
fetch_master_pos,
fetch_slave_pos,
fetch_main_pos,
fetch_subordinate_pos,
)

from mysql.replicant.errors import (
NotMasterError,
NotSlaveError,
NotMainError,
NotSubordinateError,
)

import my_deployment

print "# Executing 'show databases'"
for db in my_deployment.master.sql("show databases"):
for db in my_deployment.main.sql("show databases"):
print db["Database"]

print "# Executing 'ls'"
for line in my_deployment.master.ssh(["ls"]):
for line in my_deployment.main.ssh(["ls"]):
print line

try:
print "Master position is:", fetch_master_pos(my_deployment.master)
except NotMasterError:
print my_deployment.master.name, "is not configured as a master"
print "Main position is:", fetch_main_pos(my_deployment.main)
except NotMainError:
print my_deployment.main.name, "is not configured as a main"

for slave in my_deployment.slaves:
for subordinate in my_deployment.subordinates:
try:
print "Slave position is:", fetch_slave_pos(slave)
except NotSlaveError:
print slave.name, "not configured as a slave"
print "Subordinate position is:", fetch_subordinate_pos(subordinate)
except NotSubordinateError:
print subordinate.name, "not configured as a subordinate"
18 changes: 9 additions & 9 deletions examples/load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TestLoadBalancer(unittest.TestCase):
"Class to test the load balancer functions."

def setUp(self):
from my_deployment import common, master, slaves
from my_deployment import common, main, subordinates
common.sql("DROP DATABASE IF EXISTS common")
common.sql("CREATE DATABASE common")
common.sql(_CREATE_TABLE)
Expand All @@ -70,23 +70,23 @@ def tearDown(self):
common.sql("DROP DATABASE common")

def testServers(self):
from my_deployment import common, master, slaves
from my_deployment import common, main, subordinates

try:
pool_add(common, master, ['READ', 'WRITE'])
pool_add(common, main, ['READ', 'WRITE'])
except AlreadyInPoolError:
pool_set(common, master, ['READ', 'WRITE'])
pool_set(common, main, ['READ', 'WRITE'])

for slave in slaves:
for subordinate in subordinates:
try:
pool_add(common, slave, ['READ'])
pool_add(common, subordinate, ['READ'])
except AlreadyInPoolError:
pool_set(common, slave, ['READ'])
pool_set(common, subordinate, ['READ'])

for row in common.sql("SELECT * FROM nodes", db="common"):
if row['port'] == master.port:
if row['port'] == main.port:
self.assertEqual(row['type'], 'READ,WRITE')
elif row['port'] in [slave.port for slave in slaves]:
elif row['port'] in [subordinate.port for subordinate in subordinates]:
self.assertEqual(row['type'], 'READ')

if __name__ == '__main__':
Expand Down
12 changes: 6 additions & 6 deletions examples/my_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,33 @@
ConfigManagerFile,
)

servers = [Server('master',
servers = [Server('main',
server_id=1,
sql_user=User("mysql_replicant"),
ssh_user=User("mats"),
machine=Linux(),
port=3307,
socket='/var/run/mysqld/mysqld1.sock',
),
Server('slave1', server_id=2,
Server('subordinate1', server_id=2,
sql_user=User("mysql_replicant"),
ssh_user=User("mats"),
machine=Linux(),
port=3308,
socket='/var/run/mysqld/mysqld2.sock'),
Server('slave2',
Server('subordinate2',
sql_user=User("mysql_replicant"),
ssh_user=User("mats"),
machine=Linux(),
port=3309,
socket='/var/run/mysqld/mysqld3.sock'),
Server('slave3',
Server('subordinate3',
sql_user=User("mysql_replicant"),
ssh_user=User("mats"),
machine=Linux(),
port=3310,
socket='/var/run/mysqld/mysqld4.sock')]

master = servers[0]
main = servers[0]
common = servers[0] # Where the common database is stored
slaves = servers[1:]
subordinates = servers[1:]
18 changes: 9 additions & 9 deletions examples/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@
User,
)
from mysql.replicant.roles import (
Master,
Main,
Final,
)
from mysql.replicant.commands import (
change_master,
change_main,
)

master_role = Master(User("repl_user", "xyzzy"))
final_role = Final(my_deployment.master)
main_role = Main(User("repl_user", "xyzzy"))
final_role = Final(my_deployment.main)

try:
master_role.imbue(my_deployment.master)
main_role.imbue(my_deployment.main)
except IOError, e:
print "Cannot imbue master with Master role:", e
print "Cannot imbue main with Main role:", e

for slave in my_deployment.slaves:
for subordinate in my_deployment.subordinates:
try:
final_role.imbue(slave)
final_role.imbue(subordinate)
except IOError, e:
print "Cannot imbue slave with Final role:", e
print "Cannot imbue subordinate with Final role:", e
4 changes: 2 additions & 2 deletions lib/mysql/replicant/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ class PhysicalBackup(BackupImage):
def backup_server(self, server, database="*"):

from mysql.replicant.commands import (
fetch_master_position,
fetch_main_position,
)

datadir = server.fetch_config().get('datadir')
if database == "*":
database = [d for d in os.listdir(datadir)
if os.path.isdir(os.path.join(datadir, d))]
server.sql("FLUSH TABLES WITH READ LOCK")
position = fetch_master_position(server)
position = fetch_main_position(server)
if server.host != "localhost":
path = os.path.basename(self.url.path)
else:
Expand Down
88 changes: 44 additions & 44 deletions lib/mysql/replicant/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ def unlock_database(server):
MASTER_USER=%s, MASTER_PASSWORD=%s"""


def change_master(slave, master, position=None):
"""Configure replication to read from a master and position."""
def change_main(subordinate, main, position=None):
"""Configure replication to read from a main and position."""
try:
user = master.repl_user
user = main.repl_user
except AttributeError:
raise _errors.NotMasterError
raise _errors.NotMainError

slave.sql("STOP SLAVE")
subordinate.sql("STOP SLAVE")
if position:
slave.sql(_CHANGE_MASTER_TO,
(master.host, master.port, user.name, user.passwd,
subordinate.sql(_CHANGE_MASTER_TO,
(main.host, main.port, user.name, user.passwd,
position.file, position.pos))
else:
slave.sql(_CHANGE_MASTER_TO_NO_POS,
(master.host, master.port, user.name, user.passwd))
slave.sql("START SLAVE")
slave.disconnect()
subordinate.sql(_CHANGE_MASTER_TO_NO_POS,
(main.host, main.port, user.name, user.passwd))
subordinate.sql("START SLAVE")
subordinate.disconnect()


def fetch_master_position(server):
def fetch_main_position(server):
"""Get the position of the next event that will be written to
the binary log"""

Expand All @@ -58,22 +58,22 @@ def fetch_master_position(server):
result = server.sql("SHOW MASTER STATUS")
return Position(result["File"], result["Position"])
except _errors.EmptyRowError:
raise _errors.NotMasterError
raise _errors.NotMainError


def fetch_slave_position(server):
"""Get the position of the next event to be read from the master.
def fetch_subordinate_position(server):
"""Get the position of the next event to be read from the main.

"""

from mysql.replicant.server import Position

try:
result = server.sql("SHOW SLAVE STATUS")
return Position(result["Relay_Master_Log_File"],
result["Exec_Master_Log_Pos"])
return Position(result["Relay_Main_Log_File"],
result["Exec_Main_Log_Pos"])
except _errors.EmptyRowError:
raise _errors.NotSlaveError
raise _errors.NotSubordinateError


_START_SLAVE_UNTIL = """START SLAVE UNTIL
Expand All @@ -82,34 +82,34 @@ def fetch_slave_position(server):
_MASTER_POS_WAIT = "SELECT MASTER_POS_WAIT(%s, %s)"


def slave_wait_for_pos(slave, position):
slave.sql(_MASTER_POS_WAIT, (position.file, position.pos))
def subordinate_wait_for_pos(subordinate, position):
subordinate.sql(_MASTER_POS_WAIT, (position.file, position.pos))


def slave_status_wait_until(server, field, pred):
def subordinate_status_wait_until(server, field, pred):
while True:
row = server.sql("SHOW SLAVE STATUS")
value = row[field]
if pred(value):
return value


def slave_wait_and_stop(slave, position):
def subordinate_wait_and_stop(subordinate, position):
"""Set up replication so that it will wait for the position to be
reached and then stop replication exactly at that binlog
position."""
slave.sql("STOP SLAVE")
slave.sql(_START_SLAVE_UNTIL, (position.file, position.pos))
slave.sql(_MASTER_POS_WAIT, (position.file, position.pos))
subordinate.sql("STOP SLAVE")
subordinate.sql(_START_SLAVE_UNTIL, (position.file, position.pos))
subordinate.sql(_MASTER_POS_WAIT, (position.file, position.pos))


def slave_wait_for_empty_relay_log(server):
def subordinate_wait_for_empty_relay_log(server):
"Wait until the relay log is empty and return."
result = server.sql("SHOW SLAVE STATUS")
fname = result["Master_Log_File"]
pos = result["Read_Master_Log_Pos"]
fname = result["Main_Log_File"]
pos = result["Read_Main_Log_Pos"]
if server.sql(_MASTER_POS_WAIT, (fname, pos)) is None:
raise _errors.SlaveNotRunningError
raise _errors.SubordinateNotRunningError


def fetch_binlog(server, binlog_files=None,
Expand Down Expand Up @@ -141,28 +141,28 @@ def fetch_binlog(server, binlog_files=None,
return iter(Popen(command + binlog_files, stdout=PIPE).stdout)


def clone(slave, source, master=None):
"""Function to create a new slave by cloning either a master or a
slave."""
def clone(subordinate, source, main=None):
"""Function to create a new subordinate by cloning either a main or a
subordinate."""

backup_name = source.host + ".tar.gz"
if master is not None:
if main is not None:
source.sql("STOP SLAVE")
lock_database(source)
if master is None:
position = fetch_master_position(source)
if main is None:
position = fetch_main_position(source)
else:
position = fetch_slave_position(source)
position = fetch_subordinate_position(source)
source.ssh("tar Pzcf " + backup_name + " /usr/var/mysql")
if master is not None:
if main is not None:
source.sql("START SLAVE")
subprocess.call(["scp", source.host + ":" + backup_name, slave.host + ":."])
slave.ssh("tar Pzxf " + backup_name + " /usr/var/mysql")
if master is None:
change_master(slave, source, position)
subprocess.call(["scp", source.host + ":" + backup_name, subordinate.host + ":."])
subordinate.ssh("tar Pzxf " + backup_name + " /usr/var/mysql")
if main is None:
change_main(subordinate, source, position)
else:
change_master(slave, master, position)
slave.sql("START SLAVE")
change_main(subordinate, main, position)
subordinate.sql("START SLAVE")


_START_SLAVE_UNTIL = "START SLAVE UNTIL MASTER_LOG_FILE=%s, MASTER_LOG_POS=%s"
Expand All @@ -172,6 +172,6 @@ def clone(slave, source, master=None):
def replicate_to_position(server, pos):
"""Run replication until it reaches 'pos'.

The function will block until the slave have reached the position."""
The function will block until the subordinate have reached the position."""
server.sql(_START_SLAVE_UNTIL, (pos.file, pos.pos))
server.sql(_MASTER_POS_WAIT, (pos.file, pos.pos))
12 changes: 6 additions & 6 deletions lib/mysql/replicant/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ class NoOptionError(Error):
pass


class SlaveNotRunningError(Error):
"Exception raised when slave is not running but were expected to run"
class SubordinateNotRunningError(Error):
"Exception raised when subordinate is not running but were expected to run"
pass


class NotMasterError(Error):
"""Exception raised when the server is not a master and the
class NotMainError(Error):
"""Exception raised when the server is not a main and the
operation is illegal."""
pass


class NotSlaveError(Error):
"""Exception raised when the server is not a slave and the
class NotSubordinateError(Error):
"""Exception raised when the server is not a subordinate and the
operation is illegal."""
pass

Expand Down
Loading