Charmed MySQL
- By Canonical Data Platform
- Cloud
Channel | Revision | Published | Runs on |
---|---|---|---|
8.0/stable | 196 | 29 Sep 2023 | |
8.0/candidate | 234 | 06 May 2024 | |
8.0/beta | 234 | 06 May 2024 | |
8.0/edge | 234 | 06 May 2024 |
juju deploy mysql --channel 8.0/stable
Deploy universal operators easily with Juju, the Universal Operator Lifecycle Manager.
Platform:
22.04
charms.mysql.v0.async_replication
-
- Last updated 29 Apr 2024
- Revision Library version 0.1
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
"""MySQL async replication module."""
import enum
import logging
import typing
import uuid
from functools import cached_property
from charms.mysql.v0.mysql import (
MySQLFencingWritesError,
MySQLPromoteClusterToPrimaryError,
MySQLRejoinClusterError,
)
from ops import (
ActionEvent,
ActiveStatus,
BlockedStatus,
MaintenanceStatus,
Relation,
RelationDataContent,
Secret,
SecretNotFoundError,
WaitingStatus,
)
from ops.framework import Object
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed
from typing_extensions import Optional
from constants import (
BACKUPS_PASSWORD_KEY,
BACKUPS_USERNAME,
CLUSTER_ADMIN_PASSWORD_KEY,
CLUSTER_ADMIN_USERNAME,
MONITORING_PASSWORD_KEY,
MONITORING_USERNAME,
PEER,
ROOT_PASSWORD_KEY,
ROOT_USERNAME,
SERVER_CONFIG_PASSWORD_KEY,
SERVER_CONFIG_USERNAME,
)
if typing.TYPE_CHECKING:
from charm import MySQLOperatorCharm
logger = logging.getLogger(__name__)
# The unique Charmhub library identifier, never change it
LIBID = "4de21f1a022c4e2c87ac8e672ec16f6a"
LIBAPI = 0
LIBPATCH = 1
PRIMARY_RELATION = "async-primary"
REPLICA_RELATION = "async-replica"
SECRET_LABEL = "async-secret"
class ClusterSetInstanceState(typing.NamedTuple):
"""Cluster set instance state."""
cluster_role: str # primary or replica
instance_role: str # primary or secondary
relation_side: str # primary or replica
class States(str, enum.Enum):
"""States for the relation."""
UNINITIALIZED = "uninitialized" # relation is not initialized
SYNCING = "syncing" # credentials are being synced
INITIALIZING = "initializing" # cluster to be added
RECOVERING = "recovery" # replica cluster is being recovered
READY = "ready" # cluster set is ready
FAILED = "failed" # cluster set is in a failed state
class MySQLAsyncReplication(Object):
"""MySQL async replication base class."""
def __init__(self, charm: "MySQLOperatorCharm", relation_name: str):
super().__init__(charm, relation_name)
self._charm = charm
# relation broken is observed on all units
self.framework.observe(
self._charm.on[PRIMARY_RELATION].relation_broken, self.on_async_relation_broken
)
self.framework.observe(
self._charm.on[REPLICA_RELATION].relation_broken, self.on_async_relation_broken
)
@cached_property
def role(self) -> ClusterSetInstanceState:
"""Current cluster set role of the unit, after the relation is established."""
is_replica = self._charm._mysql.is_cluster_replica()
if is_replica:
cluster_role = "replica"
elif is_replica is False:
cluster_role = "primary"
else:
cluster_role = "unset"
_, instance_role = self._charm._mysql.get_member_state()
if self.model.get_relation(REPLICA_RELATION):
relation_side = "replica"
else:
relation_side = "primary"
return ClusterSetInstanceState(cluster_role, instance_role, relation_side)
@property
def cluster_name(self) -> str:
"""This cluster name."""
return self._charm.app_peer_data["cluster-name"]
@property
def cluster_set_name(self) -> str:
"""Cluster set name."""
return self._charm.app_peer_data["cluster-set-domain-name"]
def get_remote_relation_data(self, relation: Relation) -> Optional[RelationDataContent]:
"""Remote data."""
if not relation.app:
return
return relation.data[relation.app]
def _on_promote_standby_cluster(self, event: ActionEvent) -> None:
"""Promote a standby cluster to primary."""
if not self._charm.unit.is_leader():
event.fail("Only the leader unit can promote a standby cluster")
return
if not self._charm._mysql.is_cluster_replica():
event.fail("Only a standby cluster can be promoted")
return
if event.params.get("cluster-set-name") != self.cluster_set_name:
event.fail("Invalid/empty cluster set name")
return
# promote cluster to primary
cluster_name = self.cluster_name
force = event.params.get("force", False)
try:
self._charm._mysql.promote_cluster_to_primary(cluster_name, force)
message = f"Cluster {cluster_name} promoted to primary"
logger.info(message)
event.set_results({"message": message})
self._charm._on_update_status(None)
except MySQLPromoteClusterToPrimaryError:
logger.exception("Failed to promote cluster to primary")
event.fail("Failed to promote cluster to primary")
def _on_fence_unfence_writes_action(self, event: ActionEvent) -> None:
"""Fence or unfence writes to a cluster."""
if event.params.get("cluster-set-name") != self.cluster_set_name:
event.fail("Invalid/empty cluster set name")
return
if self.role.cluster_role == "replica":
event.fail("Only a primary cluster can have writes fenced/unfence")
return
try:
if (
event.handle.kind == "fence_writes_action"
and not self._charm._mysql.is_cluster_writes_fenced()
):
logger.info("Fencing writes to the cluster")
self._charm._mysql.fence_writes()
event.set_results({"message": "Writes to the cluster are now fenced"})
elif (
event.handle.kind == "unfence_writes_action"
and self._charm._mysql.is_cluster_writes_fenced()
):
logger.info("Unfencing writes to the cluster")
self._charm._mysql.unfence_writes()
event.set_results({"message": "Writes to the cluster are now resumed"})
else:
event.fail("Writes are already fenced/unfenced")
return
# update status
self._charm._on_update_status(None)
except MySQLFencingWritesError:
event.fail("Failed to fence writes. Check logs for details")
def on_async_relation_broken(self, event): # noqa: C901
"""Handle the async relation being broken from either side."""
# Remove the replica cluster, if this is the primary
if self.role.cluster_role in ("replica", "unset") and not self._charm.removing_unit:
# The cluster being removed is a replica cluster
# role is `unset` when the primary cluster dissolved the replica before
# this hook execution i.e. was faster on running the handler
self._charm.unit.status = WaitingStatus("Waiting for cluster to be dissolved")
try:
# hold execution until the cluster is dissolved
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(10)):
with attempt:
if self._charm._mysql.is_instance_in_cluster(self._charm.unit_label):
logger.debug("Waiting for cluster to be dissolved")
raise Exception
except RetryError:
self._charm.unit.status = BlockedStatus(
"Replica cluster not dissolved after relation broken"
)
logger.warning(
"Replica cluster not dissolved after relation broken by the primary cluster."
"\n\tThis happens when the primary cluster was removed prior to removing the async relation."
"\n\tThis cluster can be promoted to primary with the `promote-standby-cluster` action."
)
return
self._charm.unit.status = BlockedStatus("Standalone read-only unit.")
# reset flag to allow instances rejoining the cluster
self._charm.unit_peer_data["member-state"] = "waiting"
del self._charm.unit_peer_data["unit-initialized"]
if self._charm.unit.is_leader():
self._charm.app.status = BlockedStatus("Recreate or rejoin cluster.")
logger.info(
"\n\tThis is a replica cluster and will be dissolved.\n"
"\tThe cluster can be recreated with the `recreate-cluster` action.\n"
"\tAlternatively the cluster can be rejoined to the cluster set."
)
# reset the cluster node count flag
del self._charm.app_peer_data["units-added-to-cluster"]
# set flag to persist removed from cluster-set state
self._charm.app_peer_data["removed-from-cluster-set"] = "true"
elif self.role.cluster_role == "primary":
if self._charm.unit.is_leader():
# only leader units can remove replica clusters
remote_data = self.get_remote_relation_data(event.relation) or {}
if cluster_name := remote_data.get("cluster-name"):
if self._charm._mysql.is_cluster_in_cluster_set(cluster_name):
self._charm.unit.status = MaintenanceStatus("Removing replica cluster")
logger.info(f"Removing replica cluster {cluster_name}")
# force removal when cluster is invalidated
force = self._charm._mysql.get_replica_cluster_status(cluster_name) in [
"invalidated",
"unknown",
]
self._charm._mysql.remove_replica_cluster(cluster_name, force=force)
logger.info(f"Replica cluster {cluster_name} removed")
self._charm.unit.status = ActiveStatus(self._charm.active_status_message)
else:
logger.warning(
f"Replica cluster {cluster_name} not found in cluster set, skipping removal"
)
else:
# Relation being broken before setup, e.g.: due to replica with user data
logger.warning("No cluster name found, skipping removal")
elif self._charm.unit_peer_data.get("member-state") == "waiting":
# set member-state to allow status update
# needed for secondaries status update when removing due to replica with user data
self._charm.unit_peer_data["member-state"] = "unknown"
self._charm._on_update_status(None)
def _on_rejoin_cluster_action(self, event: ActionEvent) -> None:
"""Rejoin cluster to cluster set action handler."""
cluster = event.params.get("cluster-name")
if not cluster:
message = "Invalid/empty cluster name"
event.fail(message)
logger.info(message)
return
if not self._charm._mysql.is_cluster_in_cluster_set(cluster):
message = f"Cluster {cluster=} not found in cluster set"
event.fail(message)
logger.info(message)
return
status = self._charm._mysql.get_replica_cluster_status(cluster)
if status != "invalidated":
message = f"Cluster {status=}. Only `invalidated` clusters can be rejoined"
event.fail(message)
logger.info(message)
return
try:
self._charm._mysql.rejoin_cluster(cluster)
message = f"{cluster=} rejoined to cluster set"
logger.info(message)
event.set_results({"message": message})
except MySQLRejoinClusterError:
message = f"Failed to rejoin {cluster=} to the cluster set"
event.fail(message)
logger.error(message)
class MySQLAsyncReplicationPrimary(MySQLAsyncReplication):
"""MySQL async replication primary side.
Implements the setup phase of the async replication for the primary side.
"""
def __init__(self, charm: "MySQLOperatorCharm"):
super().__init__(charm, PRIMARY_RELATION)
# Actions observed only on the primary class to avoid duplicated execution
# promotion action since both classes are always instantiated
self.framework.observe(
self._charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster
)
# fence writes action
self.framework.observe(
self._charm.on.fence_writes_action, self._on_fence_unfence_writes_action
)
# unfence writes action
self.framework.observe(
self._charm.on.unfence_writes_action, self._on_fence_unfence_writes_action
)
# rejoin invalidated cluster action
self.framework.observe(
self._charm.on.rejoin_cluster_action, self._on_rejoin_cluster_action
)
self.framework.observe(
self._charm.on[PRIMARY_RELATION].relation_created, self._on_primary_created
)
self.framework.observe(
self._charm.on[PRIMARY_RELATION].relation_changed,
self._on_primary_relation_changed,
)
def get_relation(self, relation_id: int) -> Optional[Relation]:
"""Return the relation."""
return self.model.get_relation(PRIMARY_RELATION, relation_id)
def get_local_relation_data(self, relation: Relation) -> Optional[RelationDataContent]:
"""Local data."""
return relation.data[self.model.app]
def get_state(self, relation: Relation) -> Optional[States]:
"""State of the relation, on primary side."""
if not relation:
return States.UNINITIALIZED
local_data = self.get_local_relation_data(relation)
remote_data = self.get_remote_relation_data(relation) or {}
if not local_data:
return States.UNINITIALIZED
if local_data.get("is-replica") == "true":
return States.FAILED
if local_data.get("secret-id") and not remote_data.get("endpoint"):
return States.SYNCING
if local_data.get("secret-id") and remote_data.get("endpoint"):
# evaluate cluster status
replica_status = self._charm._mysql.get_replica_cluster_status(
remote_data["cluster-name"]
)
if replica_status in ["ok", "invalidated"]:
return States.READY
elif replica_status == "unknown":
return States.INITIALIZING
else:
return States.RECOVERING
@property
def idle(self) -> bool:
"""Whether the async replication is idle for all related clusters."""
if not self.model.unit.is_leader():
# non leader units are always idle
return True
for relation in self.model.relations[PRIMARY_RELATION]:
if self.get_state(relation) not in [States.READY, States.UNINITIALIZED]:
return False
return True
def _get_secret(self) -> Secret:
"""Return async replication necessary secrets."""
try:
# Avoid recreating the secret
secret = self._charm.model.get_secret(label=SECRET_LABEL)
if not secret.id:
# workaround for the secret id not being set with model uuid
secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}"
return secret
except SecretNotFoundError:
pass
app_secret = self._charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app")
content = app_secret.peek_content()
# filter out unnecessary secrets
shared_content = dict(filter(lambda x: "password" in x[0], content.items()))
return self._charm.model.app.add_secret(content=shared_content, label=SECRET_LABEL)
def _on_primary_created(self, event):
"""Validate relations and share credentials with replica cluster."""
if not self._charm.unit.is_leader():
return
if not self._charm.unit_initialized:
logger.debug("Unit not initialized, deferring event")
event.defer()
return
if self._charm._mysql.is_cluster_replica():
logger.error(
f"This is a replica cluster, cannot be related as {PRIMARY_RELATION}. Remove relation."
)
self._charm.unit.status = BlockedStatus(
f"This is a replica cluster. Unrelate from the {PRIMARY_RELATION} relation"
)
event.relation.data[self.model.app]["is-replica"] = "true"
return
self._charm.app.status = MaintenanceStatus("Setting up async replication")
logger.info("Granting secrets access to async replication relation")
secret = self._get_secret()
secret_id = secret.id
secret.grant(event.relation)
# get workload version
version = self._charm._mysql.get_mysql_version()
logger.debug(f"Sharing {secret_id=} with replica cluster")
# Set variables for credential sync and validations
event.relation.data[self.model.app].update(
{
"secret-id": secret_id,
"cluster-name": self.cluster_name,
"mysql-version": version,
"cluster-set-name": self.cluster_set_name,
}
)
def _on_primary_relation_changed(self, event):
"""Handle the async_primary relation being changed."""
if not self._charm.unit.is_leader():
return
state = self.get_state(event.relation)
if state == States.INITIALIZING:
# Add replica cluster primary node
logger.info("Creating replica cluster primary node")
self._charm.unit.status = MaintenanceStatus("Adding replica cluster")
remote_data = self.get_remote_relation_data(event.relation) or {}
cluster = remote_data["cluster-name"]
endpoint = remote_data["endpoint"]
unit_label = remote_data["node-label"]
logger.debug("Looking for a donor node")
_, ro, _ = self._charm._mysql.get_cluster_endpoints(get_ips=False)
if not ro:
logger.debug(f"Adding replica {cluster=} with {endpoint=}. Primary is the donor")
self._charm._mysql.create_replica_cluster(
endpoint, cluster, instance_label=unit_label
)
else:
donor = ro.split(",")[0]
logger.debug(f"Adding replica {cluster=} with {endpoint=} using {donor=}")
self._charm._mysql.create_replica_cluster(
endpoint, cluster, instance_label=unit_label, donor=donor
)
event.relation.data[self.model.app]["replica-state"] = "initialized"
logger.info("Replica cluster created")
self._charm._on_update_status(None)
elif state == States.RECOVERING:
# Recover replica cluster
self._charm.unit.status = MaintenanceStatus("Replica cluster in recovery")
class MySQLAsyncReplicationReplica(MySQLAsyncReplication):
"""MySQL async replication replica side.
Implements the setup phase of the async replication for the replica side.
"""
def __init__(self, charm: "MySQLOperatorCharm"):
super().__init__(charm, REPLICA_RELATION)
# leader/primary
self.framework.observe(
self._charm.on[REPLICA_RELATION].relation_created, self._on_replica_created
)
self.framework.observe(
self._charm.on[REPLICA_RELATION].relation_changed, self._on_replica_changed
)
# non-leader/secondaries
self.framework.observe(
self._charm.on[REPLICA_RELATION].relation_created,
self._on_replica_non_leader_created,
)
self.framework.observe(
self._charm.on[REPLICA_RELATION].relation_changed,
self._on_replica_non_leader_changed,
)
@property
def relation(self) -> Optional[Relation]:
"""Relation."""
return self.model.get_relation(REPLICA_RELATION)
@property
def relation_data(self) -> RelationDataContent:
"""Relation data."""
return self.relation.data[self.model.app]
@property
def remote_relation_data(self) -> Optional[RelationDataContent]:
"""Relation data."""
if not self.relation.app:
return
return self.relation.data[self.relation.app]
@property
def state(self) -> Optional[States]:
"""State of the relation, on replica side."""
if not self.relation:
return None
if self.relation_data.get("user-data-found") == "true":
return States.FAILED
if self.remote_relation_data.get("secret-id") and not self.relation_data.get("endpoint"):
# received credentials from primary cluster
# and did not synced credentials
return States.SYNCING
if self.replica_initialized:
# cluster added to cluster-set by primary cluster
if self._charm.cluster_fully_initialized:
return States.READY
return States.RECOVERING
return States.INITIALIZING
@property
def idle(self) -> bool:
"""Whether the async replication is idle."""
if not self.model.unit.is_leader():
# non leader units are always idle
return True
return self.state in [States.READY, None]
@property
def returning_cluster(self) -> bool:
"""Whether to skip checks.
Used for skipping checks when a replica cluster was removed through broken relation.
"""
remote_cluster_set_name = self.remote_relation_data.get("cluster-set-name")
return (
self._charm.app_peer_data.get("removed-from-cluster-set") == "true"
and self.cluster_set_name == remote_cluster_set_name
)
@property
def replica_initialized(self) -> bool:
"""Whether the replica cluster is initialized as such."""
return self.remote_relation_data.get("replica-state") == "initialized"
def _check_version(self) -> bool:
"""Check if the MySQL version is compatible with the primary cluster."""
remote_version = self.remote_relation_data.get("mysql-version")
local_version = self._charm._mysql.get_mysql_version()
if not remote_version:
return False
if remote_version != local_version:
logger.error(
f"Primary cluster MySQL version {remote_version} is not compatible with this cluster MySQL version {local_version}"
)
return False
return True
def _obtain_secret(self) -> Secret:
"""Get secret from primary cluster."""
secret_id = self.remote_relation_data.get("secret-id")
return self._charm.model.get_secret(id=secret_id, label=SECRET_LABEL)
def _async_replication_credentials(self) -> dict[str, str]:
"""Get async replication credentials from primary cluster."""
secret = self._obtain_secret()
return secret.peek_content()
def _get_endpoint(self) -> str:
"""Get endpoint to be used by the primary cluster.
This is the address in which the unit must be reachable from the primary cluster.
Not necessarily the locally resolved address, but an ingress address.
"""
# TODO: devise method to inform the real address
# using unit informed address (fqdn or ip)
return self._charm.unit_address
def _on_replica_created(self, event):
"""Handle the async_replica relation being created on the leader unit."""
if not self._charm.unit.is_leader():
return
if not self._charm.unit_initialized and not self.returning_cluster:
# avoid running too early for non returning clusters
logger.debug("Unit not initialized, deferring event")
event.defer()
return
if self.returning_cluster:
# flag set on prior async relation broken
# allows the relation to be created with user data so
# rejoining to the cluster-set can be done incrementally
# on incompatible user data, join fallbacks to clone
logger.debug("User data check skipped")
else:
logger.debug("Checking for user data")
if self._charm._mysql.get_non_system_databases():
# don't check for user data if skip flag is set
logger.info(
"\n\tUser data found, aborting async replication setup."
"\n\tEnsure the cluster has no user data before trying to join a cluster set."
"\n\tAfter removing/backing up the data, remove the relation and add it again."
)
self._charm.app.status = BlockedStatus(
"User data found, check instruction in the log"
)
self._charm.unit.status = BlockedStatus(
"User data found, aborting async replication setup"
)
self.relation_data["user-data-found"] = "true"
return
self._charm.app.status = MaintenanceStatus("Setting up async replication")
self._charm.unit.status = WaitingStatus("Awaiting sync data from primary cluster")
def _on_replica_changed(self, event): # noqa: C901
"""Handle the async_replica relation being changed."""
if not self._charm.unit.is_leader():
return
state = self.state
logger.debug(f"Replica cluster {state.value=}")
if state == States.SYNCING:
if self.returning_cluster:
# when running from and async relation broken
# re-create the cluster and wait
logger.debug("Recreating cluster prior to sync credentials")
self._charm.create_cluster()
# (re)set flags
self._charm.app_peer_data.update(
{"removed-from-cluster-set": "", "rejoin-secondaries": "true"}
)
event.defer()
return
if not self._charm.cluster_fully_initialized:
# cluster is not fully initialized
# avoid race on credentials sync
logger.debug(
"Cluster not fully initialized yet, waiting until all units join the cluster"
)
self._charm.unit.status = WaitingStatus("Waiting other units join the cluster")
event.defer()
return
if not self._check_version():
self._charm.unit.status = BlockedStatus(
"MySQL version mismatch with primary cluster. Check logs for details"
)
logger.error("MySQL version mismatch with primary cluster. Remove relation.")
return
logger.debug("Syncing credentials from primary cluster")
self._charm.unit.status = MaintenanceStatus("Syncing credentials")
self._charm.app.status = MaintenanceStatus("Setting up async replication")
try:
credentials = self._async_replication_credentials()
except SecretNotFoundError:
logger.debug("Secret not found, deferring event")
event.defer()
return
sync_keys = {
SERVER_CONFIG_PASSWORD_KEY: SERVER_CONFIG_USERNAME,
CLUSTER_ADMIN_PASSWORD_KEY: CLUSTER_ADMIN_USERNAME,
MONITORING_PASSWORD_KEY: MONITORING_USERNAME,
BACKUPS_PASSWORD_KEY: BACKUPS_USERNAME,
ROOT_PASSWORD_KEY: ROOT_USERNAME,
}
for key, password in credentials.items():
# sync credentials only for necessary users
user = sync_keys[key]
if user == ROOT_USERNAME:
# root user is only local
self._charm._mysql.update_user_password(user, password, host="localhost")
else:
self._charm._mysql.update_user_password(user, password)
self._charm.set_secret("app", key, password)
logger.debug(f"Synced {user=} password")
self._charm.unit.status = MaintenanceStatus("Dissolving replica cluster")
logger.info("Dissolving replica cluster")
self._charm._mysql.dissolve_cluster()
# reset the cluster node count flag
del self._charm.app_peer_data["units-added-to-cluster"]
# reset force rejoin-secondaries flag
del self._charm.app_peer_data["rejoin-secondaries"]
if self.remote_relation_data["cluster-name"] == self.cluster_name:
# this cluster need a new cluster name
logger.warning(
"Cluster name is the same as the primary cluster. Appending generated value"
)
self._charm.app_peer_data[
"cluster-name"
] = f"{self.cluster_name}{uuid.uuid4().hex[:4]}"
self._charm.unit.status = MaintenanceStatus("Populate endpoint")
# this cluster name is used by the primary cluster to identify the replica cluster
self.relation_data["cluster-name"] = self.cluster_name
# the reachable endpoint address
self.relation_data["endpoint"] = self._get_endpoint()
# the node label in the replica cluster to be created
self.relation_data["node-label"] = self._charm.unit_label
logger.debug("Data for adding replica cluster shared with primary cluster")
self._charm.unit.status = WaitingStatus("Waiting for primary cluster")
elif state == States.READY:
# update status
logger.info("Replica cluster is ready")
# sync cluster-set domain name across clusters
if cluster_set_domain_name := self._charm._mysql.get_cluster_set_name():
self._charm.app_peer_data["cluster-set-domain-name"] = cluster_set_domain_name
# set the number of units added to the cluster for a single unit replica cluster
# needed here since it will skip the `RECOVERING` state
if self._charm.app.planned_units() == 1:
self._charm.app_peer_data["units-added-to-cluster"] = "1"
self._charm._on_update_status(None)
elif state == States.RECOVERING:
# recovering cluster (copying data and/or joining units)
self._charm.app.status = MaintenanceStatus("Recovering replica cluster")
self._charm.unit.status = WaitingStatus(
"Waiting for recovery to complete on other units"
)
logger.debug("Awaiting other units to join the cluster")
# reset the number of units added to the cluster
# this will trigger secondaries to join the cluster
node_count = self._charm._mysql.get_cluster_node_count()
self._charm.app_peer_data["units-added-to-cluster"] = str(node_count)
# set state flags to allow secondaries to join the cluster
self._charm.unit_peer_data["member-state"] = "online"
self._charm.unit_peer_data["member-role"] = "primary"
event.defer()
def _on_replica_non_leader_created(self, _):
"""Handle the async_replica relation being created for secondaries/non-leader."""
# set waiting state to inhibit auto recovery, only when not already set
if not self._charm.unit_peer_data.get("member-state") == "waiting":
self._charm.unit_peer_data["member-state"] = "waiting"
self._charm.unit.status = WaitingStatus("waiting replica cluster be configured")
def _on_replica_non_leader_changed(self, _):
"""Reset cluster secondaries to allow cluster rejoin after primary recovery."""
# the replica state is initialized when the primary cluster finished
# creating the replica cluster on this cluster primary/leader unit
if (
self.replica_initialized
or self._charm.app_peer_data.get("rejoin-secondaries") == "true"
) and not self._charm._mysql.is_instance_in_cluster(self._charm.unit_label):
logger.debug("Reset secondary unit to allow cluster rejoin")
# reset unit flag to allow cluster rejoin after primary recovery
# the unit will rejoin on the next peer relation changed or update status
del self._charm.unit_peer_data["unit-initialized"]
self._charm.unit_peer_data["member-state"] = "waiting"
self._charm.unit.status = WaitingStatus("waiting to join the cluster")