Source code for maayanlab_bioinformatics.normalization.quantile_legacy
import logging
import numpy as np
import pandas as pd
from functools import singledispatch
[docs]
@singledispatch
def quantile_normalize(mat):
''' Perform quantile normalization on the values of a matrix
In the case of a pd.DataFrame, preserve the index on the output frame.
See: https://en.wikipedia.org/wiki/Quantile_normalization
'''
logging.warn('Unrecognized type: ' + type(mat).__name__)
return quantile_normalize_np(mat)
[docs]
@quantile_normalize.register
def quantile_normalize_np(mat: np.ndarray):
# sort vector in np (reuse in np)
sorted_vec = np.sort(mat, axis=0)
# rank vector in np (no dict necessary)
rank = sorted_vec.mean(axis=1)
# construct quantile normalized matrix
return np.array([
[
rank[i]
for i in np.searchsorted(sorted_vec[:, c], mat[:, c])
] for c in range(mat.shape[1])
]).T
[docs]
@quantile_normalize.register
def quantile_normalize_pd(mat: pd.DataFrame):
return pd.DataFrame(
quantile_normalize_np(mat.values),
index=mat.index,
columns=mat.columns,
)
[docs]
def quantile_normalize_h5(in_mat, out_mat, tmp=None):
import os, tempfile, h5py
'''
Maximum memory required (3 * in_mat.shape[1] * sizeof(dtype))
Storage required 4 * in_mat.size
- input matrix
- transposed copy
- sorted & transposed copy
- output matrix
'''
assert isinstance(in_mat, h5py.Dataset)
assert isinstance(out_mat, h5py.Dataset)
assert in_mat.shape == out_mat.shape
# transpose + sort
tmp_f = tempfile.mktemp() if tmp is None else tmp
tmp_h5 = h5py.File(tmp_f, 'w')
tmp_T_mat = tmp_h5.create_dataset('tmp_T', shape=(in_mat.shape[1], in_mat.shape[0]), dtype=in_mat.dtype)
tmp_T_sorted_mat = tmp_h5.create_dataset('tmp_T_sorted', shape=(in_mat.shape[1], in_mat.shape[0]), dtype=in_mat.dtype)
sorted_col_vec_agg_rank = np.zeros(in_mat.shape[0])
for col in range(in_mat.shape[1]):
# this single read is potentially expensive but the two writes are cheap
col_vec = in_mat[:, col]
tmp_T_mat[col, :] = col_vec
sorted_col_vec = np.sort(col_vec)
tmp_T_sorted_mat[col, :] = sorted_col_vec
sorted_col_vec_agg_rank += sorted_col_vec
# setup rank matrix
sorted_col_vec_agg_rank /= in_mat.shape[1]
# construct output matrix
for c in range(in_mat.shape[1]):
# this write is potentially expensive but the reads are cheap
out_mat[:, c] = [
sorted_col_vec_agg_rank[i]
for i in np.searchsorted(tmp_T_sorted_mat[c, :], tmp_T_mat[c, :])
]
# close and remove tmp file
tmp_h5.close()
os.remove(tmp_f)
return out_mat