"""
Calculates phase diagrams from sets of Phases.
"""
import numbers
from typing import Iterable, Sequence
import numpy as np
import pandas as pd
from scipy.constants import Boltzmann, eV
from sklearn.cluster import AgglomerativeClustering
from .phases import Phase, AbstractLinePhase
from .refine import Refiner, default_refiners, _find_one_point as find_one_point # noqa: F401
kB = Boltzmann / eV
__all__ = ["calc_phase_diagram", "get_transitions", "cluster_T_c", "cluster_T_c_mu"]
_PHASE_UNIT_SEP = "_"
def _join_phase_unit(phase: pd.Series, unit: pd.Series) -> pd.Series:
"""Encode ``(phase, unit)`` pairs as ``f"{phase}{_PHASE_UNIT_SEP}{unit}"`` strings.
Inverse is :func:`_split_phase_unit`. The two functions round-trip as long as
the unit-side string is a valid integer literal — the split uses ``rsplit``
with ``n=1``, so underscores inside ``phase`` are preserved.
"""
return phase.astype(str) + _PHASE_UNIT_SEP + unit.astype(str)
def _split_phase_unit(combined: pd.Series) -> tuple[pd.Series, pd.Series]:
"""Inverse of :func:`_join_phase_unit`. Returns ``(phase, unit)``."""
parts = combined.str.rsplit(_PHASE_UNIT_SEP, n=1)
phase = parts.map(lambda x: x[0])
unit = parts.map(lambda x: int(x[1]))
return phase, unit
def _split_stable(df):
udf = df.query("not stable").reset_index(drop=True)
udf["border"] = False
sdf = df.query("stable").reset_index(drop=True)
sdf["border"] = False
sdf["refined"] = "no"
return sdf, udf
def _border_edges(df, min_c, max_c):
"""Mark T extremes as border and emit synthetic +-inf mu edges (2D only)."""
df.loc[df["T"] == df["T"].min(), "border"] = True
df.loc[df["T"] == df["T"].max(), "border"] = True
left = df.loc[df["mu"] == df["mu"].min()][["phase", "T"]].copy()
left["mu"] = -np.inf
left["c"] = min_c
left["border"] = True
left["stable"] = True
right = df.loc[df["mu"] == df["mu"].max()][["phase", "T"]].copy()
right["mu"] = +np.inf
right["c"] = max_c
right["border"] = True
right["stable"] = True
return left, right
def refine_phase_diagram(
df: pd.DataFrame,
phases,
min_c: float = 0,
max_c: float = 1,
refiners: Sequence[Refiner] | None = None,
) -> pd.DataFrame:
"""Add additional points to a coarse phase diagram by searching for exact transitions.
Args:
df: dataframe of sampled phase points (with ``stable`` column)
phases: mapping of phase name to :class:`Phase`
min_c, max_c: concentration bounds used for the synthetic ``mu=+-inf`` border points
refiners: sequence of :class:`landau.refine.Refiner` to apply. If
``None``, picks defaults based on which of ``T``/``mu`` are sampled.
"""
sdf, udf = _split_stable(df)
data = [sdf, udf]
multiple_mus = len(sdf["mu"].unique()) > 1
multiple_ts = len(sdf["T"].unique()) > 1
if multiple_mus and multiple_ts:
data.extend(_border_edges(sdf, min_c, max_c))
if refiners is None:
refiners = default_refiners(sdf)
for r in refiners:
out = r.run(sdf, phases)
if not out.empty:
data.append(out)
return pd.concat(data, ignore_index=True)
def guess_mu_range(phases: Iterable[Phase], T: float, samples: int, tolerance: float = 1e-2):
"""Guess chemical potential window from the ideal solution.
Searches numerically for chemical potentials which stabilize
concentrations close to 0 and 1 and then use the concentrations
encountered along the way to numerically invert the c(mu) mapping.
Using an even c grid with mu(c) then yields a decent sampling of mu
space so that the final phase diagram is described everywhere equally.
Args:
phases: list of phases to consider
T: temperature at which to estimate mu(c)
samples: how many mu samples to return
Returns:
array of chemical potentials that likely cover the whole concentration space
"""
# TODO: this can be used immediately also for the actual phase diagram
# calculation: keep track of which phase is the most likely
import scipy.optimize as so
import scipy.interpolate as si
import numpy as np
# semigrand canonical "average" concentration
# use this to avoid discontinuities and be phase agnostic
def c(mu):
phis = np.array([p.semigrand_potential(T, mu) for p in phases])
conc = np.array([p.concentration(T, mu) for p in phases])
phis -= phis.min(axis=0)
beta = 1 / (kB * T)
prob = np.exp(-beta * phis)
prob /= prob.sum(axis=0)
ci = (prob * conc).sum(axis=0)
return ci
mu0 = so.brute(lambda x: +c(x[0]), ranges=[(-10, 10)], Ns=200)[0]
mu1 = so.brute(lambda x: -c(x[0]), ranges=[(-10, 10)], Ns=200)[0]
if mu0 == mu1:
if tolerance > 1e-7:
return guess_mu_range(phases, T, samples, tolerance/10)
raise ValueError(
"chemical potential range degenerate! Check that phases that not all phases have the same fixed "
"concentration!"
)
mm = np.linspace(mu0, mu1, samples)
cc = c(mm)
c0 = min(cc) + tolerance
c1 = max(cc) - tolerance
# At very low T, c(mu) is a step function and cc may have repeated values;
# sort and deduplicate before passing to interp1d which requires monotone x.
sort_idx = np.argsort(cc)
cc_s, mm_s = cc[sort_idx], mm[sort_idx]
_, unique_idx = np.unique(cc_s, return_index=True)
return si.interp1d(cc_s[unique_idx], mm_s[unique_idx])(np.linspace(c0, c1, samples)), c0, c1
[docs]
def calc_phase_diagram(
phases: Iterable[Phase],
Ts: Iterable[float] | float,
mu: Iterable[float] | float | int,
refine: bool = True,
keep_unstable: bool = False,
):
"""
Calculate phase diagram at given sampling points.
Args:
phases (iterable of Phases)
Ts (iterable of floats): sampling points in temperature
mu (iterable of floats): sampling points in chemical potential; if int
guess sampling points with guess_mu_range at max(Ts)
refine (bool): add additional sampling points at exact phase transitions
keep_unstable (bool): only keep entries of stable phases, otherwise keep entries of all phases at all sampling points
Returns:
dataframe of phase points
"""
if not isinstance(Ts, Iterable):
Ts = [Ts]
phases = {p.name: p for p in phases}
if isinstance(mu, numbers.Integral) and mu != 0:
# we would often pass mu=0 to calculate a fixed mu, temperature only diagram and it'd be a bit annoying to pass
# mu=0.0 all the time, so we special case as above
try:
mu, min_c, max_c = guess_mu_range(phases.values(), max(Ts), int(mu))
except ValueError:
if all(isinstance(p, AbstractLinePhase) for p in phases.values()):
raise ValueError(
"Cannot guess chemical potential range of line phases with all the same concentration!"
) from None
raise
elif refine:
min_c, max_c = None, None
def get(s, T):
phi = s.semigrand_potential(T, mu)
return {"T": T, "phase": s.name, "phi": phi, "mu": mu, "c": s.concentration(T, mu)}
pdf = pd.DataFrame([get(s, T) for s in phases.values() for T in Ts])
pdf = pdf.explode(["mu", "phi", "c"]).infer_objects().reset_index(drop=True)
pdf["stable"] = False
pdf.loc[pdf.groupby(["T", "mu"], group_keys=False).phi.idxmin(), "stable"] = True
if refine:
min_c = pdf.c.min()
max_c = pdf.c.max()
pdf = refine_phase_diagram(pdf, phases, min_c=min_c, max_c=max_c)
pdf["f"] = pdf.phi + pdf.mu * pdf.c
def sub(dd):
dd = dd.query("-inf<mu<inf")
if dd.empty:
return dd.f
c_min = dd.c.min()
c_max = dd.c.max()
c_span = c_max - c_min
lo_thr = c_min + 0.01 * c_span
hi_thr = c_max - 0.01 * c_span
f0, f1 = np.inf, np.inf
for _, g in dd.groupby("phase"):
if g["c"].min() <= lo_thr:
f0 = min(f0, g.loc[g["c"].idxmin(), "f"])
if g["c"].max() >= hi_thr:
f1 = min(f1, g.loc[g["c"].idxmax(), "f"])
if not np.isfinite(f0):
f0 = dd.loc[dd["c"].idxmin(), "f"]
if not np.isfinite(f1):
f1 = dd.loc[dd["c"].idxmax(), "f"]
return dd.f - (f0 * (1 - dd.c) + f1 * dd.c)
fex = pdf.groupby("T", group_keys=False).apply(sub, include_groups=False)
if len(Ts) > 1:
pdf["f_excess"] = fex
else:
# thank you pandas, this saved me -10min of my life.
pdf["f_excess"] = fex.T
if not keep_unstable:
pdf = pdf.query("stable")
return pdf
def reduce(dd):
dd = dd.sort_values("c")
return pd.Series(
{
"transition": "-".join(dd.phase.tolist()),
"c": dd.c.tolist(),
"phase": dd.phase.tolist(),
}
)
def _rescale_T(t):
tmin, tmax = t.min(), t.max()
if tmin != tmax:
return (t - tmin) / (tmax - tmin)
return t
def _agglomerative_clusterer(distance_threshold):
return AgglomerativeClustering(
n_clusters=None,
distance_threshold=distance_threshold,
linkage="single",
)
[docs]
def cluster_T_c(dd, *, distance_threshold) -> pd.Series:
"""Cluster points by (T, c) coordinates; used by cluster_phase.
Args:
dd: DataFrame with columns 'T' and 'c'.
distance_threshold: Agglomerative clustering cut-off in normalised (T, c) space.
Smaller values split more aggressively; larger values merge more. 0.5 was
hand-tuned for 2D T-c diagrams — lower it (e.g. 0.2) when disconnected
stable segments within one phase are incorrectly merged.
"""
if dd.empty:
return pd.Series(dtype=np.intp)
t = _rescale_T(dd["T"])
ids = _agglomerative_clusterer(distance_threshold).fit_predict(np.transpose([t, dd["c"]]))
return pd.Series(ids, index=dd.index)
[docs]
def cluster_T_c_mu(dd, *, distance_threshold) -> pd.Series:
"""Cluster points by (T, c, mu); rows with mu=±inf get their own distinct labels.
Used by get_transitions. The mu=±inf rows are border edges from _border_edges
and must not be fed to AgglomerativeClustering.
Args:
dd: DataFrame with columns 'T', 'c', and 'mu'.
distance_threshold: Agglomerative clustering cut-off in normalised (T, c, mu) space.
See :func:`cluster_T_c` for tuning guidance.
"""
if dd.empty:
return pd.Series(dtype=np.intp)
t = _rescale_T(dd["T"])
ids = pd.Series(np.zeros(len(dd), dtype=np.intp), index=dd.index)
F = np.isfinite(dd["mu"])
if F.any() and F.sum() >= 2:
ids.loc[F] = _agglomerative_clusterer(distance_threshold).fit_predict(
np.transpose([t.loc[F], dd["c"].loc[F], dd["mu"].loc[F]])
)
m = ids.max()
ids.loc[dd["mu"] == +np.inf] = m + 1
ids.loc[dd["mu"] == -np.inf] = m + 2
return ids
def cluster(dd, use_mu=True, *, distance_threshold):
"""Thin dispatch to cluster_T_c or cluster_T_c_mu; prefer calling those directly."""
if use_mu:
return cluster_T_c_mu(dd, distance_threshold=distance_threshold)
else:
return cluster_T_c(dd, distance_threshold=distance_threshold)
[docs]
def get_transitions(df):
"""
Identify "continuous" two-phase transition lines in mu/T space, i.e. transitions between the same two phases and along which mu/T are continuous.
Useful for plotting below, but potentially also to augment the existing refining routines and
acquire additional Free energies from calphy/etc. to improve the diagram.
"""
bdf = df.query("border")
# go from a table of mu/c/T points that are on the phase boundaries to a table where the two points that are at the same mu/T are grouped together
# use this information to add 'transition' column; handles also the case where border points are at mu=+-inf, there we have only one point
tdf = bdf.groupby(["mu", "T"])[["c", "phase"]].apply(reduce, include_groups=False)
# immediately explode again to go back to our familiar representation, but now with the added 'transition' column
tdf = tdf.reset_index().explode(["c", "phase"]).infer_objects().reset_index(drop=True)
# cluster points that are assigned as one transition, because the same transition can appear multiple times in "disconnected" manner in a phase
# diagram, e.g. a solid solution in contact with the melt interrupted by a higher melting intermetallic
if not tdf.empty:
res = tdf.groupby("transition", group_keys=False).apply(
lambda g: cluster_T_c_mu(g, distance_threshold=0.5), # 0.5 hand-tuned
include_groups=False,
)
if isinstance(res, pd.DataFrame):
# sometimes pandas returns a DataFrame instead of a Series when only one group exists
res = res.stack().reset_index(level=0, drop=True)
tdf["transition_unit"] = res
tdf["border_segment"] = _join_phase_unit(tdf["transition"], tdf["transition_unit"])
else:
tdf["transition_unit"] = []
tdf["border_segment"] = []
return tdf