import copy
import os
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import mdtraj as md
import numpy as np
from analyze_foldamers.parameters.angle_distributions import *
from cg_openmm.cg_model.cgmodel import CGModel
from cg_openmm.utilities.iotools import write_pdbfile_without_topology
from openmm import unit
from scipy.optimize import brute, fmin, minimize, minimize_scalar
from sklearn.cluster import DBSCAN, OPTICS, KMeans
from sklearn.metrics import silhouette_samples, silhouette_score
from sklearn_extra.cluster import KMedoids
[docs]def get_representative_structures(
file_list, cgmodel,
frame_start=0, frame_stride=1, frame_end=-1,
output_format="pdb", output_dir="cluster_output", homopolymer_sym=False):
"""
Using the similarity matrix from RMSD distances, determine a representative structure for each
file in file_list
:param file_list: A list of PDB or DCD files to read and concatenate
:type file_list: List( str )
:param cgmodel: A CGModel() class object
:type cgmodel: class
:param frame_start: First frame in pdb trajectory file to use for clustering.
:type frame_start: int
:param frame_stride: Advance by this many frames when reading trajectories.
:type frame_stride: int
:param frame_end: Last frame in trajectory file to use for clustering.
:type frame_end: int
:param homopolymer_sym: if there is end-to-end symmetry, scan forwards and backwards sequences for lowest rmsd (default=False)
:type homopolymer_sym: boolean
"""
if not os.path.exists(output_dir):
os.mkdir(output_dir)
if type(file_list) == str:
# If a single file, make into list:
file_list = file_list.split()
file_list_out = []
for file in file_list:
distances, traj_all = get_rmsd_matrix(file, cgmodel, frame_start, frame_stride, frame_end,
homopolymer_sym=homopolymer_sym)
# Compute medoid based on similarity scores:
medoid_index = np.exp(-distances/distances.std()).sum(axis=1).argmax()
medoid_xyz = traj_all[medoid_index].xyz[0]
positions = medoid_xyz * unit.nanometer
file_name = str(f"{file[:-4]}_sim.{output_format}")
file_list_out.append(file_name)
if output_format=="dcd":
dcdtraj = md.Trajectory(
xyz=positions.value_in_unit(unit.nanometer),
topology=md.Topology.from_openmm(cgmodel.topology),
)
md.Trajectory.save_dcd(dcdtraj,file_name)
else:
cgmodel.positions = positions
write_pdbfile_without_topology(cgmodel, file_name)
return file_list_out
[docs]def get_cluster_medoid_positions_KMedoids(
file_list, cgmodel, n_clusters=2,
frame_start=0, frame_stride=1, frame_end=-1,
output_format="pdb", output_dir="cluster_output",
output_cluster_traj=False, plot_silhouette=True, plot_rmsd_hist=True,
filter=False, filter_ratio=0.25, filter_brute_step=0.05,
homopolymer_sym=False):
"""
Given PDB or DCD trajectory files and coarse grained model as input, this function performs K-medoids clustering on the poses in trajectory, and returns a list of the coordinates for the medoid pose of each cluster.
:param file_list: A list of PDB or DCD files to read and concatenate
:type file_list: List( str )
:param cgmodel: A CGModel() class object
:type cgmodel: class
:param n_clusters: The number of clusters for K-medoids algorithm.
:type n_clusters: int
:param frame_start: First frame in pdb trajectory file to use for clustering.
:type frame_start: int
:param frame_stride: Advance by this many frames when reading pdb trajectories.
:type frame_stride: int
:param frame_end: Last frame in trajectory file to use for clustering.
:type frame_end: int
:param output_format: file format extension to write medoid coordinates to (default="pdb"), dcd also supported
:type output_format: str
:param output_dir: path to which cluster medoid structures and silhouette plots will be saved
:type output_dir: str
:param ouput_cluster_traj: option to output the trajectory of each cluster along with each medoid
:type ouput_cluster_traj: boolean
:param plot_silhouette: option to create silhouette plot of clustering results (default=True)
:type plot_silhouette: boolean
:param plot_rmsd_hist: option to plot a histogram of pairwise rmsd values (post-filtering)
:type plot_rmsd_hist: boolean
:param filter: option to apply neighborhood radius filtering to remove low-density data (default=False)
:type filter: boolean
:param filter_ratio: fraction of data points which pass through the neighborhood radius filter (default=0.25)
:type filter_ratio: float
:param filter_brute_step: step size in distance units for brute force filter radius optimization (final optimization searches between intervals) (default=0.05)
:type filter_brute_step: float
:param homopolymer_sym: if there is end-to-end symmetry, scan forwards and backwards sequences for lowest rmsd (default=False)
:type homopolymer_sym: boolean
:returns:
- medoid_positions ( np.array( float * unit.angstrom ( n_clusters x num_particles x 3 ) ) ) - A 3D numpy array of poses corresponding to the medoids of all trajectory clusters.
- cluster_sizes ( List ( int ) ) - A list of number of members in each cluster
- cluster_rmsd( np.array ( float ) ) - A 1D numpy array of rmsd (in cluster distance space) of samples to cluster centers
- silhouette_avg - ( float ) - average silhouette score across all clusters
- labels ( np.array ) - labels of frames taken from the original trajectory
- original_indices ( np.array ) - original indices of labels in the overall trajectory fed into this function
"""
if not os.path.exists(output_dir):
os.mkdir(output_dir)
top_from_pdb = None
if cgmodel is None:
top_from_pdb = file_list[0]
distances, traj_all, original_indices = get_rmsd_matrix(
file_list, cgmodel, frame_start, frame_stride, frame_end,
return_original_indices=True, homopolymer_sym=homopolymer_sym)
if filter:
# Filter distances:
distances, dense_indices, filter_ratio_actual, original_indices = \
filter_distances(distances,
filter_ratio=filter_ratio, filter_brute_step=filter_brute_step,
return_original_indices=True, original_indices=original_indices
)
if plot_rmsd_hist:
distances_row = np.reshape(distances, (distances.shape[0]*distances.shape[1],1))
# Remove the diagonal 0 elements:
distances_row = distances_row[distances_row != 0]
figure = plt.figure()
n_out, bin_edges_out, patch = plt.hist(
distances_row, bins=1000,density=True)
plt.xlabel('rmsd')
plt.ylabel('probability density')
plt.savefig(f'{output_dir}/distances_rmsd_hist.pdf')
plt.close()
# Cluster with sklearn-extra KMedoids
kmedoids = KMedoids(n_clusters=n_clusters,metric='precomputed').fit(distances)
# Get labels
labels = kmedoids.labels_
# Get medoid indices
medoid_indices = kmedoids.medoid_indices_
# Get medoid coordinates:
medoid_xyz = np.zeros([n_clusters,traj_all.n_atoms,3])
for k in range(n_clusters):
medoid_xyz[k,:,:] = traj_all[medoid_indices[k]].xyz[0]
# Write medoids to file
write_medoids_to_file(cgmodel, medoid_xyz, output_dir, output_format, top_from_pdb = top_from_pdb)
medoid_positions = medoid_xyz * unit.nanometer
# Write clusters to file
if output_cluster_traj:
write_clusters_to_file(labels, traj_all, output_dir, output_format)
# Get indices of frames in each cluster:
cluster_indices = {}
cluster_sizes = []
for k in range(n_clusters):
cluster_indices[k] = np.argwhere(labels==k)[:,0]
cluster_sizes.append(len(cluster_indices[k]))
# Assign intra-cluster distances to medoids
dist_to_medoids = {}
for k in range(n_clusters):
dist_to_medoids[k] = distances[cluster_indices[k],medoid_indices[k]]
# Compute cluster rmsd of samples to medoid within each cluster
cluster_rmsd = np.zeros(n_clusters)
for k in range(n_clusters):
for i in range(len(cluster_indices[k])):
cluster_rmsd[k] += np.power(dist_to_medoids[k][i],2)
cluster_rmsd[k] /= len(cluster_indices[k])
cluster_rmsd[k] = np.sqrt(cluster_rmsd[k])
# Get silhouette scores
silhouette_avg = silhouette_score(distances, kmedoids.labels_)
silhouette_sample_values = silhouette_samples(distances, kmedoids.labels_)
if plot_silhouette:
# Plot silhouette analysis
plotfile=f"{output_dir}/silhouette_kmedoids_ncluster_{n_clusters}.pdf"
make_silhouette_plot(
kmedoids, silhouette_sample_values, silhouette_avg,
n_clusters, cluster_rmsd, cluster_sizes, plotfile
)
return medoid_positions, cluster_sizes, cluster_rmsd, silhouette_avg, labels, original_indices
[docs]def get_cluster_medoid_positions_DBSCAN(
file_list, cgmodel, min_samples=5, eps=0.5,
frame_start=0, frame_stride=1, frame_end=-1, output_format="pdb",
output_dir="cluster_output", output_cluster_traj=False, plot_silhouette=True,
plot_rmsd_hist=True, filter=True, filter_ratio=0.25, filter_brute_step=0.05,
core_points_only=True, homopolymer_sym=False):
"""
Given PDB or DCD trajectory files and coarse grained model as input, this function performs DBSCAN clustering on the poses in the trajectory, and returns a list of the coordinates for the medoid pose of each cluster.
:param file_list: A list of PDB or DCD files to read and concatenate
:type file_list: List( str )
:param cgmodel: A CGModel() class object
:type cgmodel: class
:param min_samples: minimum of number of samples in neighborhood of a point to be considered a core point (includes point itself)
:type min_samples: int
:param eps: DBSCAN parameter neighborhood distance cutoff
:type eps: float
:param frame_start: First frame in trajectory file to use for clustering.
:type frame_start: int
:param frame_stride: Advance by this many frames when reading trajectories.
:type frame_stride: int
:param frame_end: Last frame in trajectory file to use for clustering.
:type frame_end: int
:param output_format: file format extension to write medoid coordinates to (default="pdb"), dcd also supported
:type output_format: str
:param output_dir: directory to write clustering medoid and plot files
:type output_dir: str
:param plot_silhouette: option to create silhouette plot(default=True)
:type plot_silhouette: boolean
:param filter: option to apply neighborhood radius filtering to remove low-density data (default=True)
:type filter: boolean
:param filter_ratio: fraction of data points which pass through the neighborhood radius filter (default=0.25)
:type filter_ratio: float
:param filter_brute_step: step size in distance units for brute force filter radius optimization (final optimization searches between intervals) (default=0.05)
:type filter_brute_step: float
:param core_points_only: use only core points to calculate medoid structures (default=True)
:type core_points_only: boolean
:param homopolymer_sym: if there is end-to-end symmetry, scan forwards and backwards sequences for lowest rmsd (default=False)
:type homopolymer_sym: boolean
:returns:
- medoid_positions ( np.array( float * unit.angstrom ( n_clusters x num_particles x 3 ) ) ) - A 3D numpy array of poses corresponding to the medoids of all trajectory clusters.
- cluster_sizes ( List ( int ) ) - A list of number of members in each cluster
- cluster_rmsd( np.array ( float ) ) - A 1D numpy array of rmsd (in cluster distance space) of samples to cluster centers
- n_noise ( int ) - number of points classified as noise
- silhouette_avg - ( float ) - average silhouette score across all clusters
- labels ( np.array ) - labels of frames taken from the original trajectory
- original_indices ( np.array ) - original indices of labels in the overall trajectory fed into this function
"""
if not os.path.exists(output_dir):
os.mkdir(output_dir)
top_from_pdb = None
if cgmodel is None:
top_from_pdb = file_list[0]
distances, traj_all, original_indices = get_rmsd_matrix(
file_list, cgmodel, frame_start, frame_stride, frame_end,
return_original_indices=True, homopolymer_sym=homopolymer_sym)
if filter:
# Filter distances:
distances, dense_indices, filter_ratio_actual, original_indices = \
filter_distances(distances,
filter_ratio=filter_ratio, filter_brute_step=filter_brute_step,
return_original_indices=True, original_indices=original_indices
)
traj_all = traj_all[dense_indices]
if plot_rmsd_hist:
# Plot rmsd histogram:
distances_row = np.reshape(distances, (distances.shape[0]*distances.shape[1],1))
# Remove the diagonal 0 elements:
distances_row = distances_row[distances_row != 0]
figure = plt.figure()
n_out, bin_edges_out, patch = plt.hist(
distances_row, bins=1000,density=True)
plt.xlabel('rmsd')
plt.ylabel('probability density')
plt.savefig(f'{output_dir}/distances_rmsd_hist.pdf')
plt.close()
# Cluster with sklearn DBSCAN
dbscan = DBSCAN(min_samples=min_samples,eps=eps,metric='precomputed').fit(distances)
# The produces a cluster labels from 0 to n_clusters-1, and assigns -1 to noise points
# Get labels
labels = dbscan.labels_
# Get core sample indices:
core_sample_indices = dbscan.core_sample_indices_
# Number of clusters:
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
# Number of noise points:
n_noise = list(labels).count(-1)
# Get indices of frames in each cluster:
cluster_indices = {}
cluster_indices_core = {}
cluster_sizes = []
cluster_sizes_core = []
for k in range(n_clusters):
cluster_indices[k] = np.argwhere(labels==k)[:,0]
cluster_indices_core[k] = []
for elem in cluster_indices[k]:
if elem in core_sample_indices:
cluster_indices_core[k].append(elem)
cluster_sizes.append(len(cluster_indices[k]))
cluster_sizes_core.append(len(cluster_indices_core[k]))
# Get indices of frames classified as noise:
noise_indices = np.argwhere(labels==-1)[:,0]
# Find the structure closest to each center (medoid):
# OPTICS/DBSCAN does not have a built-in function to transform to cluster-distance space,
# as the centroids of the clusters are not physically meaningful in general. However, as
# RMSD between structures is our only clustering feature, the cluster centers (regions of
# high density) will likely be representative structures of each cluster.
# Following the protocol outlined in MDTraj example:
# http://mdtraj.org/1.9.3/examples/centroids.html
# Create distance matrices within each cluster:
distances_k = {}
if core_points_only:
for k in range(n_clusters):
distances_k[k] = np.zeros((cluster_sizes_core[k],cluster_sizes_core[k]))
for i in range(cluster_sizes_core[k]):
for j in range(cluster_sizes_core[k]):
distances_k[k][i,j] = distances[cluster_indices_core[k][i],cluster_indices_core[k][j]]
# Compute medoid based on similarity scores:
medoid_index = [] # Global index
intra_cluster_medoid_index = [] # Index within cluster
for k in range(n_clusters):
intra_cluster_medoid_index.append(
np.exp(-distances_k[k]/distances_k[k].std()).sum(axis=1).argmax()
)
# Here we need to use the global sample index to find the medoid structure:
medoid_index.append(cluster_indices_core[k][intra_cluster_medoid_index[k]])
else:
for k in range(n_clusters):
distances_k[k] = np.zeros((cluster_sizes[k],cluster_sizes[k]))
for i in range(cluster_sizes[k]):
for j in range(cluster_sizes[k]):
distances_k[k][i,j] = distances[cluster_indices[k][i],cluster_indices[k][j]]
# Compute medoid based on similarity scores:
medoid_index = [] # Global index
intra_cluster_medoid_index = [] # Index within cluster
for k in range(n_clusters):
intra_cluster_medoid_index.append(
np.exp(-distances_k[k]/distances_k[k].std()).sum(axis=1).argmax()
)
# Here we need to use the global sample index to find the medoid structure:
medoid_index.append(cluster_indices[k][intra_cluster_medoid_index[k]])
medoid_xyz = np.zeros([n_clusters,traj_all.n_atoms,3])
for k in range(n_clusters):
medoid_xyz[k,:,:] = traj_all[medoid_index[k]].xyz[0]
# Write medoids to file
write_medoids_to_file(cgmodel, medoid_xyz, output_dir, output_format, top_from_pdb = top_from_pdb)
medoid_positions = medoid_xyz * unit.nanometer
if output_cluster_traj:
write_clusters_to_file(labels, traj_all, output_dir, output_format)
# Compute intra-cluster rmsd of samples to medoid based on structure rmsd
cluster_rmsd = np.zeros(n_clusters)
for k in range(n_clusters):
cluster_rmsd[k] = np.sqrt(((distances_k[k][intra_cluster_medoid_index[k]]**2).sum())/len(cluster_indices[k]))
# Get silhouette scores
try:
silhouette_sample_values = silhouette_samples(distances, labels)
silhouette_avg = np.mean(silhouette_sample_values[labels!=-1])
if plot_silhouette:
# Plot silhouette analysis
plotfile = f"{output_dir}/silhouette_dbscan_min_sample_{min_samples}_eps_{eps}.pdf"
make_silhouette_plot(
dbscan, silhouette_sample_values, silhouette_avg,
n_clusters, cluster_rmsd, cluster_sizes, plotfile
)
except ValueError:
print("There are either no clusters, or no noise points identified. Try adjusting DBSCAN min_samples, eps parameters.")
silhouette_avg = None
return medoid_positions, cluster_sizes, cluster_rmsd, n_noise, silhouette_avg, labels, original_indices
[docs]def get_cluster_medoid_positions_OPTICS(
file_list, cgmodel, min_samples=5, xi=0.05,
frame_start=0, frame_stride=1, frame_end=-1, output_format="pdb", output_dir="cluster_output", output_cluster_traj = False,
plot_silhouette=True, plot_rmsd_hist=True, filter=True, filter_ratio=0.25, filter_brute_step=0.05,
homopolymer_sym=False):
"""
Given PDB or DCD trajectory files and coarse grained model as input, this function performs OPTICS clustering on the poses in the trajectory, and returns a list of the coordinates for the medoid pose of each cluster.
:param file_list: A list of PDB or DCD files to read and concatenate
:type file_list: List( str )
:param cgmodel: A CGModel() class object
:type cgmodel: class
:param min_samples: minimum of number of samples in neighborhood of a point to be considered a core point (includes point itself)
:type min_samples: int
:param xi: OPTICS parameter for minimum slope on reachability plot signifying a cluster boundary
:type xi: float
:param frame_start: First frame in trajectory file to use for clustering.
:type frame_start: int
:param frame_stride: Advance by this many frames when reading trajectories.
:type frame_stride: int
:param frame_end: Last frame in trajectory file to use for clustering.
:type frame_end: int
:param output_format: file format extension to write medoid coordinates to (default="pdb"), dcd also supported
:type output_format: str
:param output_dir: directory to write clustering medoid and plot files
:type output_dir: str
:param plot_silhouette: option to create silhouette plot(default=True)
:type plot_silhouette: boolean
:param filter: option to apply neighborhood radius filtering to remove low-density data (default=True)
:type filter: boolean
:param filter_ratio: fraction of data points which pass through the neighborhood radius filter (default=0.25)
:type filter_ratio: float
:param filter_brute_step: step size in distance units for brute force filter radius optimization (final optimization searches between intervals) (default=0.05)
:type filter_brute_step: float
:param homopolymer_sym: if there is end-to-end symmetry, scan forwards and backwards sequences for lowest rmsd (default=False)
:type homopolymer_sym: boolean
:returns:
- medoid_positions ( np.array( float * unit.angstrom ( n_clusters x num_particles x 3 ) ) ) - A 3D numpy array of poses corresponding to the medoids of all trajectory clusters.
- cluster_sizes ( List ( int ) ) - A list of number of members in each cluster
- cluster_rmsd( np.array ( float ) ) - A 1D numpy array of rmsd (in cluster distance space) of samples to cluster centers
- n_noise ( int ) - number of points classified as noise
- silhouette_avg - ( float ) - average silhouette score across all clusters
"""
if not os.path.exists(output_dir):
os.mkdir(output_dir)
top_from_pdb = None
if cgmodel is None:
top_from_pdb = file_list[0]
distances, traj_all, original_indices = get_rmsd_matrix(
file_list, cgmodel, frame_start, frame_stride, frame_end,
return_original_indices=True, homopolymer_sym=homopolymer_sym)
if filter:
# Filter distances:
distances, dense_indices, filter_ratio_actual, original_indices = \
filter_distances(distances,
filter_ratio=filter_ratio, filter_brute_step=filter_brute_step,
return_original_indices=True, original_indices=original_indices
)
traj_all = traj_all[dense_indices]
if plot_rmsd_hist:
# Plot rmsd histogram:
distances_row = np.reshape(distances, (distances.shape[0]*distances.shape[1],1))
# Remove the diagonal 0 elements:
distances_row = distances_row[distances_row != 0]
figure = plt.figure()
n_out, bin_edges_out, patch = plt.hist(
distances_row, bins=1000,density=True)
plt.xlabel('rmsd')
plt.ylabel('probability density')
plt.savefig(f'{output_dir}/distances_rmsd_hist.pdf')
plt.close()
# Cluster with sklearn OPTICS
optic = OPTICS(min_samples=min_samples,xi=xi,cluster_method='xi',metric='precomputed').fit(distances)
# The produces a cluster labels from 0 to n_clusters-1, and assigns -1 to noise points
# Get labels
labels = optic.labels_
# Number of clusters:
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
# Number of noise points:
n_noise = list(labels).count(-1)
# Get indices of frames in each cluster:
cluster_indices = {}
cluster_sizes = []
for k in range(n_clusters):
cluster_indices[k] = np.argwhere(labels==k)[:,0]
cluster_sizes.append(len(cluster_indices[k]))
# Get indices of frames classified as noise:
noise_indices = np.argwhere(labels==-1)[:,0]
# Find the structure closest to each center (medoid):
# OPTICS/DBSCAN does not have a built-in function to transform to cluster-distance space,
# as the centroids of the clusters are not physically meaningful in general. However, as
# RMSD between structures is our only clustering feature, the cluster centers (regions of
# high density) will likely be representative structures of each cluster.
# Following the protocol outlined in MDTraj example:
# http://mdtraj.org/1.9.3/examples/centroids.html
# Create distance matrices within each cluster:
distances_k = {}
for k in range(n_clusters):
distances_k[k] = np.zeros((cluster_sizes[k],cluster_sizes[k]))
for i in range(cluster_sizes[k]):
for j in range(cluster_sizes[k]):
distances_k[k][i,j] = distances[cluster_indices[k][i],cluster_indices[k][j]]
# Compute medoid based on similarity scores:
medoid_index = [] # Global index
intra_cluster_medoid_index = [] # Index within cluster
for k in range(n_clusters):
intra_cluster_medoid_index.append(
np.exp(-distances_k[k] / distances_k[k].std()).sum(axis=1).argmax()
)
# Here we need to use the global sample index to find the medoid structure:
medoid_index.append(cluster_indices[k][intra_cluster_medoid_index[k]])
medoid_xyz = np.zeros([n_clusters,traj_all.n_atoms,3])
for k in range(n_clusters):
medoid_xyz[k,:,:] = traj_all[medoid_index[k]].xyz[0]
# Write medoids to file
write_medoids_to_file(cgmodel, medoid_xyz, output_dir, output_format, top_from_pdb=top_from_pdb)
medoid_positions = medoid_xyz * unit.nanometer
if output_cluster_traj:
write_clusters_to_file(labels, traj_all, output_dir, output_format)
# Compute intra-cluster rmsd of samples to medoid based on structure rmsd
cluster_rmsd = np.zeros(n_clusters)
for k in range(n_clusters):
cluster_rmsd[k] = np.sqrt(((distances_k[k][intra_cluster_medoid_index[k]]**2).sum())/len(cluster_indices[k]))
# Get silhouette scores
try:
silhouette_sample_values = silhouette_samples(distances, labels)
silhouette_avg = np.mean(silhouette_sample_values[labels!=-1])
if plot_silhouette:
# Plot silhouette analysis
plotfile = f"{output_dir}/silhouette_optics_min_sample_{min_samples}_xi_{xi}.pdf"
make_silhouette_plot(
optic, silhouette_sample_values, silhouette_avg,
n_clusters, cluster_rmsd, cluster_sizes, plotfile
)
except ValueError:
print("There are either no clusters, or no noise points identified. Try adjusting OPTICS min_samples, xi parameters")
silhouette_avg = None
return medoid_positions, cluster_sizes, cluster_rmsd, n_noise, silhouette_avg, labels, original_indices
[docs]def filter_distances(distances, filter_ratio=0.25, return_original_indices=False, original_indices=None,
filter_brute_step=0.05):
"""
Function for filtering out data points with few neighbors within a cutoff radius
:param distances: square matrix of pairwise RMSD distances
:type distances: 2d numpy array
:param filter_ratio: desired fraction of data remaining after neighborhood radius filtering
:type filter_ratio: float
:param filter_brute_step: step size in distance units for brute force filter radius optimization (final optimization searches between intervals) (default=0.05)
:type filter_brute_step: float
:returns:
- distances_filtered (2d numpy array) - distance matrix of data points satisfying filter parameters
- neighbors_dense (1d numpy array) - indices of the original dataset which satisfy filter parameters
- filter_ratio (float) - fraction of data remaining after filtering
"""
filter_ratio_target = filter_ratio
# Run a series of 1d optimizations, with increasing number of number of neighbor criteria until
# we achieve the desired filter ratio
# This avoids bad convergence when non-integer number of neighbors has no effect on the objective function.
def get_filter_ratio(x0, density_cutoff):
# Function to minimize
cutoff_radius = x0
neighbors = np.zeros((len(distances[:,0])))
for i in range(len(distances[:,0])):
neighbors[i] = (distances[:,i]<=cutoff_radius).sum()-1
# Excludes the self data point
neighbors_dense = np.argwhere(neighbors>=density_cutoff)[:,0]
filter_ratio = len(neighbors_dense)/len(neighbors)
return (filter_ratio-filter_ratio_target)**2
# Optimize cutoff_radius, density_cutoff parameters to get desired filter ratio
# A value of 0.05 is reasonable for rmsd distances, 75 is reasonable for torsion n-dimensional euclidean distances
# Bounds for brute force minimization (neither gradient or stochastic methods are reliable here)
# Here we set the lower bound to 1 brute step (finisher miniimization can find a solution below it if too large)
bounds = (filter_brute_step,np.max(distances))
# Starting number of neighbors within radius:
density_cutoff = 1
convergence = False
# Save all the results in case we don't meet the specified tolerance:
saved_results = {}
f_vals = []
# Construct vector of brute force radii to test:
brute_range = [slice(bounds[0],bounds[1],filter_brute_step)]
while convergence == False:
results = brute(
get_filter_ratio, brute_range, args=(density_cutoff,),
full_output=True,finish=fmin,
)
saved_results[density_cutoff] = results
f_vals.append(results[1])
cutoff_radius = results[0]
if results[1] <= 1E-6 and results[1] != filter_ratio_target:
# Second check is to tell if all data was filtered out (filter_ratio of 0)
convergence = True
else:
density_cutoff += 1
if density_cutoff > 15:
break
f_vals_array = np.asarray(f_vals)
# If filtering tolerance not met, use the best value:
if convergence == False:
opt_index = np.argmin(f_vals_array)
density_cutoff = opt_index+1
cutoff_radius = saved_results[density_cutoff][1]
# Apply filtering parameters:
neighbors = np.zeros((len(distances[:,0])))
for i in range(len(distances[:,0])):
neighbors[i] = (distances[:,i]<=cutoff_radius).sum()-1
# Excludes the self data point
neighbors_dense = np.argwhere(neighbors>=density_cutoff)[:,0]
# Need to select 1 dimension at a time:
distances_filtered = distances[:,neighbors_dense]
distances_filtered = distances_filtered[neighbors_dense,:]
if return_original_indices:
original_indices = original_indices[neighbors_dense]
filter_ratio_actual = len(neighbors_dense)/len(neighbors)
print(f"filtered {(1-filter_ratio_actual)*100} % of data using:")
print(f"cutoff radius = {cutoff_radius}")
print(f"number neighbors cutoff: {density_cutoff}")
if return_original_indices:
return distances_filtered, neighbors_dense, filter_ratio_actual, original_indices
return distances_filtered, neighbors_dense, filter_ratio_actual
[docs]def make_cluster_distance_plots(n_clusters,cluster_fit,dist_to_centroids,plotfile):
"""Internal function for creating cluster distance plots"""
n_sub = 0
for k in range(n_clusters):
n_sub += k
nrow = int(np.ceil(np.sqrt(n_sub)))
ncol = int(np.ceil(n_sub/nrow))
fig2, axs2 = plt.subplots(nrow,ncol,figsize=(8,10))
subplot_id = 1
for k in range(n_clusters):
for j in range(k+1,n_clusters):
row = int(np.ceil(subplot_id/ncol))-1
col = int((subplot_id-1)%ncol)
# axs subplot object is only subscriptable in dimensions it has multiple entries in
if nrow > 1 and ncol > 1:
# Color data by cluster assignment:
for c in range(n_clusters):
color = cm.nipy_spectral(float(c)/n_clusters)
axs2[row,col].plot(
dist_to_centroids[np.argwhere(cluster_fit.labels_==c),k],
dist_to_centroids[np.argwhere(cluster_fit.labels_==c),j],
'.',
markeredgecolor=color,
markerfacecolor=color,
label=f'k={c}',
)
axs2[row,col].set_xlabel(f'Distance to centroid {k}',
{'fontsize': 8})
axs2[row,col].set_ylabel(f'Distance to centroid {j}',
{'fontsize': 8})
if row == 0 and col == 0:
# Add legend:
axs2[row,col].legend()
if ncol > 1 and nrow == 1:
for c in range(n_clusters):
color = cm.nipy_spectral(float(c)/n_clusters)
axs2[col].plot(
dist_to_centroids[np.argwhere(cluster_fit.labels_==c),k],
dist_to_centroids[np.argwhere(cluster_fit.labels_==c),j],
'.',
markeredgecolor=color,
markerfacecolor=color,
label=f'k={c}',
)
axs2[col].set_xlabel(f'Distance to centroid {k}',
{'fontsize': 8})
axs2[col].set_ylabel(f'Distance to centroid {j}',
{'fontsize': 8})
if row == 0 and col == 0:
# Add legend:
axs2[col].legend()
if ncol == 1 and nrow == 1:
for c in range(n_clusters):
color = cm.nipy_spectral(float(c)/n_clusters)
axs2.plot(
dist_to_centroids[np.argwhere(cluster_fit.labels_==c),k],
dist_to_centroids[np.argwhere(cluster_fit.labels_==c),j],
'.',
markeredgecolor=color,
markerfacecolor=color,
label=f'k={c}',
)
axs2.set_xlabel(f'Distance to centroid {k}',
{'fontsize': 8})
axs2.set_ylabel(f'Distance to centroid {j}',
{'fontsize': 8})
# Add legend:
axs2.legend()
subplot_id += 1
plt.tight_layout()
plt.savefig(plotfile)
[docs]def get_rmsd_matrix(file_list, cgmodel, frame_start, frame_stride, frame_end,
return_original_indices=False, homopolymer_sym=False):
"""Internal function for reading trajectory files and computing rmsd"""
# Load files as {replica number: replica trajectory}
rep_traj = {}
if return_original_indices:
original_indices = []
if type(file_list) == str:
# If a single file, make into list:
file_list = file_list.split()
for i in range(len(file_list)):
if file_list[0][-3:] == 'dcd':
rep_traj[i] = md.load(file_list[i],top=md.Topology.from_openmm(cgmodel.topology))
else:
rep_traj[i] = md.load(file_list[i])
if return_original_indices:
if len(original_indices) == 0:
start = 0
else:
start = original_indices[-1][-1]
original_indices.append(start + np.arange(rep_traj[i].n_frames))
# Combine all trajectories, selecting specified frames
if frame_end == -1:
frame_end = rep_traj[0].n_frames
if frame_start == -1:
frame_start == frame_end
for i in range(len(file_list)):
if i == 0:
traj_all = rep_traj[i][frame_start:frame_end:frame_stride]
else:
traj_all = traj_all.join(rep_traj[i][frame_start:frame_end:frame_stride])
if return_original_indices:
original_indices[i] = original_indices[i][frame_start:frame_end:frame_stride]
# For homopolymers with end-to-end symmetry, we need to check the forward and backward
# sequences for the lowest RMSD.
if homopolymer_sym:
# ***Note: this assumes that the particles are indexed by monomer, and in the same
# order for each monomer.
# We check if the model is a linear homopolymer, since there may be multiple backbone
# beads per monomer.
#------------------------#
# Forward sequence #
#------------------------#
# Align structures with first frame as reference:
for i in range(1,traj_all.n_frames):
md.Trajectory.superpose(traj_all[i],traj_all[0])
# This rewrites to traj_all
# Compute pairwise rmsd:
distances_forward = np.empty((traj_all.n_frames, traj_all.n_frames))
for i in range(traj_all.n_frames):
distances_forward[i] = md.rmsd(traj_all, traj_all, i)
#------------------------#
# Reverse sequence #
#------------------------#
# We need to use the particle type list of the original model,
# and reconstruct monomer by monomer from the opposite end.
n_particles = rep_traj[0].n_atoms
particle_type_list = []
particle_indices_forward = []
particle_indices_reverse = []
for p in range(n_particles):
particle_indices_forward.append(p)
particle_type_list.append(cgmodel.get_particle_type_name(p))
# Check if linear chain with multiple beads per monomer:
mono = cgmodel.monomer_types
if len(mono) > 1:
print(f'Error: cannot apply end-to-end symmetry with multiple monomer types')
exit()
mono = mono[0]
mono_bond_start = mono['start']
mono_bond_end = mono['end']
mono_bond_list = mono['bond_list']
mono_particle_sequence = mono['particle_sequence']
#***TODO: add rigorous check for linear topology here.
n_particle_unique = len(set(particle_type_list))
n_particles_per_mono = len(mono_particle_sequence)
n_mono = int(n_particles/n_particles_per_mono)
if n_particle_unique == 1:
particle_indices_reverse = particle_indices_forward[::-1]
else:
for m in range(n_mono):
for p in range(n_particles_per_mono):
particle_indices_reverse.append(n_particles - n_particles_per_mono*(m+1) + p)
positions_rev = np.empty((traj_all.n_frames, traj_all.n_atoms, 3))
# Reassign the particle coordinates by the reverse index (coordinate rows)
for i in range(traj_all.n_frames):
for j in range(traj_all.n_atoms):
positions_rev[i,particle_indices_reverse[j],:] = traj_all[i].xyz[0][particle_indices_forward[j]]
# Make a new MDTraj object for the reverse positions.
# (coordinates seemingly cannot be modified in an existing one)
traj_reverse = md.Trajectory(
xyz=positions_rev,
topology=md.Topology.from_openmm(cgmodel.topology),
)
# Re-superpose with the flipped coordinates.
# Here the reference frame is the same as the original forward direction:
for i in range(traj_reverse.n_frames):
md.Trajectory.superpose(traj_reverse[i],traj_all[0])
# Compute pairwise rmsd between the reverse and forward structures:
# (reverse to reverse will be the same as forward to forward)
distances_reverse = np.empty((traj_all.n_frames, traj_reverse.n_frames))
for i in range(traj_all.n_frames):
distances_reverse[i] = md.rmsd(traj_reverse, traj_all, i)
# Second argument is the reference traj to measure to.
# Now, take the minimum distances:
distances = np.empty((traj_all.n_frames, traj_all.n_frames))
n_reversed = 0
for i in range(distances_reverse.shape[0]):
for j in range(distances_reverse.shape[1]):
if distances_forward[i,j] < distances_reverse[i,j]:
distances[i,j] = distances_forward[i,j]
else:
distances[i,j] = distances_reverse[i,j]
n_reversed += 1
print(f'{n_reversed} reverse distances used')
else:
# Align structures with first frame as reference:
for i in range(1,traj_all.n_frames):
md.Trajectory.superpose(traj_all[i],traj_all[0])
# This rewrites to traj_all
# Compute pairwise rmsd:
distances = np.empty((traj_all.n_frames, traj_all.n_frames))
for i in range(traj_all.n_frames):
distances[i] = md.rmsd(traj_all, traj_all, i)
if return_original_indices:
original_indices = np.concatenate(original_indices)
return distances, traj_all, original_indices.reshape(-1)
return distances, traj_all
def write_clusters_to_file(labels, traj_all, output_dir, output_format):
""""""
# Write ouput directory
if not os.path.exists(output_dir):
os.mkdir(output_dir)
clusters = np.unique(labels)
for k in clusters:
cluster_indices = np.argwhere(labels == k)
if k == -1:
k = "noise"
file_name = str(f"{output_dir}/cluster_{k}.{output_format}")
cluster_traj = traj_all.slice(cluster_indices.reshape(-1))
cluster_traj.save(file_name)
[docs]def write_medoids_to_file(cgmodel, medoid_positions, output_dir, output_format, top_from_pdb = None):
"""Internal function for writing medoid coordinates to file"""
# Write medoids to file
if not os.path.exists(output_dir):
os.mkdir(output_dir)
n_clusters = medoid_positions.shape[0]
for k in range(n_clusters):
positions = medoid_positions[k] * unit.nanometer
file_name = str(f"{output_dir}/medoid_{k}.{output_format}")
if cgmodel is None:
# Case for when no CG model is provided
temp_traj = md.load(top_from_pdb)
top = temp_traj.topology
traj = md.Trajectory(
xyz=positions.value_in_unit(unit.nanometer),
topology = top
)
traj.save_pdb(file_name)
else:
if output_format=="dcd":
dcdtraj = md.Trajectory(
xyz=positions.value_in_unit(unit.nanometer),
topology=md.Topology.from_openmm(cgmodel.topology),
)
md.Trajectory.save_dcd(dcdtraj,file_name)
else:
cgmodel.positions = positions
write_pdbfile_without_topology(cgmodel, file_name)
return
[docs]def make_silhouette_plot(
cluster_fit, silhouette_sample_values, silhouette_avg,
n_clusters, cluster_rmsd, cluster_sizes, plotfile):
"""Internal function for creating silhouette plot"""
fig1, ax1 = plt.subplots(1,1,figsize=(8,6))
y_lower = 10
for k in range(n_clusters):
kth_cluster_sil_values = silhouette_sample_values[cluster_fit.labels_==k]
kth_cluster_sil_values.sort()
y_upper = y_lower + cluster_sizes[k]
color = cm.nipy_spectral(float(k)/n_clusters)
ax1.fill_betweenx(
np.arange(y_lower, y_upper),0,
kth_cluster_sil_values,
facecolor=color, edgecolor=color, alpha=0.7)
ax1.text(-0.05, y_lower + 0.5 * cluster_sizes[k], str(k))
y_lower = y_upper + 10
ax1.set_xlabel("Silhouette coefficient")
ax1.set_ylabel("Cluster label")
ax1.axvline(x=silhouette_avg, color="red", linestyle="--")
ax1.set_yticks([]) # Clear y ticks/labels
ax1.set_xticks([-0.1, 0, 0.2, 0.4, 0.6, 0.8, 1])
xlim = ax1.get_xlim()
ylim = ax1.get_ylim()
for k in range(n_clusters):
ax1.text(xlim[0]+0.75*(xlim[1]-xlim[0]), (0.9-(k/25))*(ylim[1]-ylim[0]),
f"Cluster {k} RMSD: {cluster_rmsd[k]:.3f}",
{'fontsize': 8},
horizontalalignment='left')
plt.tight_layout()
plt.savefig(plotfile)
return
[docs]def concatenate_trajectories(pdb_file_list, combined_pdb_file="combined.pdb"):
"""
Given a list of PDB files, this function reads their coordinates, concatenates them, and saves the combined coordinates to a new file (useful for clustering with MSMBuilder).
:param pdb_file_list: A list of PDB files to read and concatenate
:type pdb_file_list: List( str )
:param combined_pdb_file: The name of file/path in which to save a combined version of the PDB files, default = "combined.pdb"
:type combined_pdb_file: str
:returns:
- combined_pdb_file ( str ) - The name/path for a file within which the concatenated coordinates will be written.
"""
traj_list = []
for pdb_file in pdb_file_list:
traj = md.load(pdb_file)
traj_list.append(traj)
return combined_pdb_file
[docs]def align_structures(reference_traj, target_traj):
"""
Given a reference trajectory, this function performs a structural alignment for a second input trajectory, with respect to the reference.
:param reference_traj: The trajectory to use as a reference for alignment.
:type reference_traj: `MDTraj() trajectory <http://mdtraj.org/1.6.2/api/generated/mdtraj.Trajectory.html>`_
:param target_traj: The trajectory to align with the reference.
:type target_traj: `MDTraj() trajectory <http://mdtraj.org/1.6.2/api/generated/mdtraj.Trajectory.html>`_
:returns:
- aligned_target_traj ( `MDTraj() trajectory <http://mdtraj.org/1.6.2/api/generated/mdtraj.Trajectory.html>`_ ) - The coordinates for the aligned trajectory.
"""
aligned_target_traj = target_traj.superpose(reference_traj)
return aligned_target_traj