Average mutation functional effects for a condition¶
Import Python modules.
We use polyclonal
for the plotting:
import itertools
import math
import altair as alt
import dms_variants.utils
import pandas as pd
import polyclonal
import polyclonal.plot
This notebook is parameterized by papermill
.
The next cell is tagged as parameters
to get the passed parameters.
# this cell is tagged parameters for `papermill` parameterization
site_numbering_map_csv = None
mutation_annotations_csv = None
func_effects_csv = None
latent_effects_csv = None
latent_html = None
functional_html = None
params = None
# Parameters
params = {
"avg_method": "median",
"floor_for_effect_std": -3,
"per_selection_tooltips": True,
"plot_kwargs": {
"alphabet": [
"R",
"K",
"H",
"D",
"E",
"Q",
"N",
"S",
"T",
"Y",
"W",
"F",
"A",
"I",
"L",
"M",
"V",
"G",
"P",
"C",
"*",
],
"addtl_slider_stats": {
"times_seen": 2,
"effect_std": 3,
"nt changes to codon": 3,
},
"addtl_slider_stats_as_max": ["effect_std", "nt changes to codon"],
"addtl_slider_stats_hide_not_filter": ["nt changes to codon"],
"heatmap_max_at_least": 2,
"heatmap_min_at_least": -2,
"init_floor_at_zero": False,
"init_site_statistic": "mean",
"site_zoom_bar_color_col": "region",
"slider_binding_range_kwargs": {
"times_seen": {"step": 1, "min": 1, "max": 20},
"n_selections": {"step": 1},
"nt changes to codon": {"step": 1, "min": 1, "max": 3},
},
},
"selections": ["Lib1-240125-293-SA26", "Lib2-240125-293-SA26"],
}
mutation_annotations_csv = "results/mutation_annotations/mutation_annotations.csv"
site_numbering_map_csv = "data/site_numbering_map.csv"
func_effects_csv = "results/func_effects/averages/293_SA26_entry_func_effects.csv"
latent_effects_csv = "results/func_effects/averages/293_SA26_entry_latent_effects.csv"
functional_html = "results/func_effects/averages/293_SA26_entry_func_effects.html"
latent_html = "results/func_effects/averages/293_SA26_entry_latent_effects.html"
Read the input data:
site_numbering_map = pd.read_csv(site_numbering_map_csv).rename(
columns={"reference_site": "site"}
)
assert site_numbering_map[["site", "sequential_site"]].notnull().all().all()
addtl_site_cols = [
c for c in site_numbering_map.columns if c != "site" and c.endswith("site")
]
if mutation_annotations_csv:
mutation_annotations = pd.read_csv(mutation_annotations_csv)
selections = params["selections"]
func_effects = pd.concat(
[
pd.read_csv(f"results/func_effects/by_selection/{s}_func_effects.csv").assign(
selection=s
)
for s in selections
],
ignore_index=True,
).assign(times_seen=lambda x: x["times_seen"].astype("Int64"))
Correlations among selections¶
Compute the correlations in the mutation effects in terms of functional scores and latent-phenotype effects across selections:
# We compute for several times seen values, get those:
try:
init_times_seen = params["plot_kwargs"]["addtl_slider_stats"]["times_seen"]
except KeyError:
print("No times seen in params, using a value of 3")
init_times_seen = 3
func_effects_tidy = func_effects.assign(
mutation=lambda x: x["wildtype"] + x["site"].astype(str) + x["mutant"]
).melt(
id_vars=["selection", "mutation", "times_seen"],
value_vars=["latent_phenotype_effect", "functional_effect"],
var_name="phenotype",
value_name="effect",
)
# do analysis for each "times_seen"
func_effects_tidy = pd.concat(
[
func_effects_tidy.query("times_seen >= @t").assign(min_times_seen=t)
for t in [1, init_times_seen, 2 * init_times_seen]
]
)
corrs = (
dms_variants.utils.tidy_to_corr(
df=func_effects_tidy,
sample_col="selection",
label_col="mutation",
value_col="effect",
group_cols=["phenotype", "min_times_seen"],
)
.assign(r2=lambda x: x["correlation"] ** 2)
.drop(columns="correlation")
.assign(
min_times_seen=lambda x: "min times seen " + x["min_times_seen"].astype(str)
)
)
for phenotype, phenotype_corr in corrs.groupby("phenotype"):
corr_chart = (
alt.Chart(phenotype_corr)
.encode(
alt.X("selection_1", title=None),
alt.Y("selection_2", title=None),
column=alt.Column("min_times_seen", title=None),
color=alt.Color("r2", scale=alt.Scale(zero=True)),
tooltip=[
alt.Tooltip(c, format=".3g") if c == "r2" else c
for c in ["phenotype", "selection_1", "selection_2", "r2"]
],
)
.mark_rect(stroke="black")
.properties(width=alt.Step(15), height=alt.Step(15), title=phenotype)
.configure_axis(labelLimit=500)
)
display(corr_chart)
Make a scatter plot:
print(f"Correlation scatter plot for times_seen filter of {init_times_seen}")
corr_panels = []
for sel1, sel2 in itertools.combinations(sorted(func_effects["selection"].unique()), 2):
df = func_effects.assign(
mutation=lambda x: x["wildtype"] + x["site"].astype(str) + x["mutant"]
).query("times_seen >= @init_times_seen")
corr_df = (
df.query("selection == @sel1")[["mutation", "functional_effect"]]
.rename(columns={"functional_effect": sel1})
.merge(
df.query("selection == @sel2")[["mutation", "functional_effect"]].rename(
columns={"functional_effect": sel2}
),
validate="one_to_one",
)
)
n = len(corr_df)
r = corr_df[[sel1, sel2]].corr().values[1, 0]
corr_panels.append(
alt.Chart(corr_df)
.encode(
alt.X(sel1, scale=alt.Scale(nice=False, padding=4)),
alt.Y(sel2, scale=alt.Scale(nice=False, padding=4)),
tooltip=[
"mutation",
alt.Tooltip(sel1, format=".3f"),
alt.Tooltip(sel2, format=".3f"),
],
)
.mark_circle(color="black", size=30, opacity=0.25)
.properties(
width=160,
height=160,
title=alt.TitleParams(
f"R = {r:.2f}, N = {n}", fontSize=11, fontWeight="normal", dy=2
),
)
)
ncols = 4
corr_rows = []
for irow in range(int(math.ceil(len(corr_panels) / ncols))):
corr_rows.append(
alt.hconcat(
*[
corr_panels[irow * ncols + icol]
for icol in range(min(ncols, len(corr_panels[irow * ncols :])))
]
)
)
alt.vconcat(*corr_rows).configure_axis(grid=False)
Correlation scatter plot for times_seen filter of 2
Average functional effects¶
We average the functional effects across selections. The resulting file has the average functional effect for each selection (either mean or median), the mean times seen across selections, the number of selections each mutation is seen in, and the effect (times seen) for each individual selection.
avg_method = params["avg_method"]
assert avg_method in {"mean", "median"}, avg_method
func_effects_by_phenotype = func_effects.melt(
id_vars=["site", "wildtype", "mutant", "times_seen", "selection"],
value_vars=["functional_effect", "latent_phenotype_effect"],
var_name="phenotype",
value_name="effect",
)
if "floor_for_effect_std" in params:
floor_for_effect_std = params["floor_for_effect_std"]
print(f"For computing effect std, first floor at {floor_for_effect_std=}")
else:
floor_for_effect_std = None
func_effects_by_phenotype["effect_floored"] = func_effects_by_phenotype["effect"].clip(
lower=floor_for_effect_std
)
avg_func_effects = (
func_effects_by_phenotype.groupby(
["phenotype", "site", "wildtype", "mutant"], as_index=False
)
.aggregate(
effect=pd.NamedAgg("effect", avg_method),
effect_std=pd.NamedAgg("effect_floored", "std"),
times_seen=pd.NamedAgg("times_seen", "sum"),
n_selections=pd.NamedAgg("site", "count"),
)
.assign(
times_seen=lambda x: (x["times_seen"] / x["n_selections"]).where(
x["mutant"] != x["wildtype"],
pd.NA,
)
)
)
assert not set(selections).intersection(avg_func_effects.columns)
# add per-selection effects (times_seen)
avg_func_effects = avg_func_effects.merge(
(
func_effects_by_phenotype.assign(
effect_times_seen=lambda x: (
x["effect"].map(lambda e: f"{e:.2f}")
+ (" (" + x["times_seen"].astype(str) + ")").where(
x["mutant"] != x["wildtype"],
"",
)
)
)
.pivot_table(
index=[
"site",
"wildtype",
"mutant",
"phenotype",
],
values="effect_times_seen",
columns="selection",
aggfunc=lambda s: ",".join(s),
)
.reset_index()
),
on=["phenotype", "site", "wildtype", "mutant"],
validate="one_to_one",
)
for phenotype, csv_file in [
("functional_effect", func_effects_csv),
("latent_phenotype_effect", latent_effects_csv),
]:
print(f"Writing {phenotype} to {csv_file}")
(
avg_func_effects.query("phenotype == @phenotype")[
[
"site",
"wildtype",
"mutant",
"effect",
"effect_std",
"times_seen",
"n_selections",
]
].to_csv(csv_file, index=False, float_format="%.4g")
)
For computing effect std, first floor at floor_for_effect_std=-3
Writing functional_effect to results/func_effects/averages/293_SA26_entry_func_effects.csv Writing latent_phenotype_effect to results/func_effects/averages/293_SA26_entry_latent_effects.csv
Make plots¶
Set up keyword arguments to https://jbloomlab.github.io/polyclonal/polyclonal.plot.html#polyclonal.plot.lineplot_and_heatmap if they are not already specified:
plot_kwargs = params["plot_kwargs"]
if "addtl_slider_stats" not in plot_kwargs:
plot_kwargs["addtl_slider_stats"] = {}
if "times_seen" not in plot_kwargs["addtl_slider_stats"]:
plot_kwargs["addtl_slider_stats"]["times_seen"] = 3
if "effect_std" not in plot_kwargs["addtl_slider_stats"]:
plot_kwargs["addtl_slider_stats"]["effect_std"] = avg_func_effects[
"effect_std"
].max()
if "addtl_slider_stats_as_max" not in plot_kwargs:
plot_kwargs["addtl_slider_stats_as_max"] = ["effect_std"]
else:
plot_kwargs["addtl_slider_stats_as_max"].append("effect_std")
elif "addtl_slider_stats_as_max" not in plot_kwargs:
raise ValueError(
"You specified `effect_std` in `addtl_slider_stats` but did not add it to "
"`addtl_slider_stats_as_max`. If you really do not want `effect_std` in "
"`addtl_slider_stats_as_max`, then specify that list without it."
)
if "n_selections" not in plot_kwargs["addtl_slider_stats"]:
plot_kwargs["addtl_slider_stats"]["n_selections"] = len(selections) // 2 + 1
if "site_zoom_bar_color_col" in plot_kwargs:
if plot_kwargs["site_zoom_bar_color_col"] in avg_func_effects.columns:
pass
elif plot_kwargs["site_zoom_bar_color_col"] in site_numbering_map.columns:
avg_func_effects = avg_func_effects.merge(
site_numbering_map[["site", plot_kwargs["site_zoom_bar_color_col"]]],
on="site",
validate="many_to_one",
how="left",
)
if mutation_annotations_csv:
if not {"site", "mutant"}.issubset(mutation_annotations.columns):
raise ValueError(f"{mutation_annotations.columns=} lacks 'site', 'mutant'")
if set(mutation_annotations.columns).intersection(avg_func_effects.columns) != {
"site",
"mutant",
}:
raise ValueError(
f"{mutation_annotations.columns=} shares columns with {avg_func_effects.columns=}"
)
avg_func_effects = avg_func_effects.merge(
mutation_annotations,
on=["site", "mutant"],
how="left",
validate="many_to_one",
)
for col in mutation_annotations.columns:
if col not in {"site", "mutant"}:
avg_func_effects[col] = avg_func_effects[col].where(
avg_func_effects["wildtype"] != avg_func_effects["mutant"], pd.NA
)
if "addtl_tooltip_stats" not in plot_kwargs:
plot_kwargs["addtl_tooltip_stats"] = []
for c in ["effect_std"] + addtl_site_cols:
if c not in plot_kwargs["addtl_tooltip_stats"]:
plot_kwargs["addtl_tooltip_stats"].append(c)
if "sequential_site" not in avg_func_effects.columns:
avg_func_effects = avg_func_effects.merge(
site_numbering_map[["site", *addtl_site_cols]],
on="site",
validate="many_to_one",
how="left",
)
if any(avg_func_effects["site"] != avg_func_effects["sequential_site"]):
if "sequential_site" not in plot_kwargs["addtl_tooltip_stats"]:
plot_kwargs["addtl_tooltip_stats"].append("sequential_site")
if params["per_selection_tooltips"]:
assert set(selections).issubset(avg_func_effects.columns)
plot_kwargs["addtl_tooltip_stats"] += [
s for s in selections if s not in plot_kwargs["addtl_tooltip_stats"]
]
if "alphabet" not in plot_kwargs:
plot_kwargs["alphabet"] = [
a
for a in polyclonal.alphabets.biochem_order_aas(polyclonal.AAS_WITHSTOP_WITHGAP)
if a in set(avg_func_effects["mutant"])
]
if "sites" not in plot_kwargs:
plot_kwargs["sites"] = site_numbering_map.sort_values("sequential_site")[
"site"
].tolist()
First plot standard deviation in functional effects measurements across libraries versus effect. This is useful to look at if you have imposed a filter on the standard deviation on the functional effects to filter variants with large variation.
times_seen_slider = alt.param(
value=plot_kwargs["addtl_slider_stats"]["times_seen"],
bind=alt.binding_range(
name="minimum times_seen",
min=1,
max=avg_func_effects["times_seen"].quantile(0.9),
),
)
effect_std_slider = alt.param(
value=plot_kwargs["addtl_slider_stats"]["effect_std"],
bind=alt.binding_range(
name="maximum effect_std",
min=0,
max=avg_func_effects["effect_std"].max(),
),
)
std_df = avg_func_effects.rename(
columns={c: c.replace(".", "_") for c in avg_func_effects.columns}
)
std_chart = (
alt.Chart(std_df)
.add_params(effect_std_slider, times_seen_slider)
.transform_filter(alt.datum["times_seen"] >= times_seen_slider)
.transform_calculate(
above_max_effect_std=alt.datum["effect_std"] > effect_std_slider
)
.encode(
alt.X("effect_std", title=f"effect_std (with {floor_for_effect_std=})"),
alt.Y("effect"),
alt.Color(
"above_max_effect_std:N",
scale=alt.Scale(domain=[False, True]),
legend=alt.Legend(orient="bottom", symbolOpacity=1),
),
tooltip=[
alt.Tooltip(c, format=".3g") if avg_func_effects[c].dtype == float else c
for c in std_df.columns
],
column="phenotype",
)
.mark_circle(opacity=0.2, strokeOpacity=1, stroke="black", strokeWidth=0.5)
.resolve_scale(x="independent", y="independent")
.properties(width=250, height=250)
.configure_axis(grid=False)
)
std_chart
Now make the plot of mutation effects:
for phenotype, plotfile in [
("functional_effect", functional_html),
("latent_phenotype_effect", latent_html),
]:
print(f"\n\nPlotting {phenotype} and saving to {plotfile}")
df = avg_func_effects.query("phenotype == @phenotype")
chart = polyclonal.plot.lineplot_and_heatmap(
data_df=df,
stat_col="effect",
category_col="phenotype",
**plot_kwargs,
)
chart.save(plotfile)
display(chart)
Plotting functional_effect and saving to results/func_effects/averages/293_SA26_entry_func_effects.html
Plotting latent_phenotype_effect and saving to results/func_effects/averages/293_SA26_entry_latent_effects.html