RcppbdCorr_hdf5_Block_cross
C++ Function Reference
1 Signature
void BigDataStatMeth::RcppbdCorr_hdf5_Block_cross(T *dsA, T *dsB, BigDataStatMeth::hdf5Dataset *dsCorr, BigDataStatMeth::hdf5Dataset *dsPval, const std::string &method="pearson", bool use_complete_obs=true, bool compute_pvalues=false, int block_size=1000, bool trans_x=false, bool trans_y=false, Rcpp::Nullable< int > threads=R_NilValue)2 Description
Enhanced HDF5 cross-correlation with integrated transpose support for big-omics.
3 Parameters
dsA(T *): First input HDF5 datasetdsB(T *): Second input HDF5 datasetdsCorr(BigDataStatMeth::hdf5Dataset *): Output HDF5 dataset for correlation matrixdsPval(BigDataStatMeth::hdf5Dataset *): Output HDF5 dataset for p-values (optional)method(const std::string &): Correlation method (“pearson” or “spearman”)use_complete_obs(bool): Whether to use only complete observationscompute_pvalues(bool): Whether to compute p-valuesblock_size(int): Block size for processingtrans_x(bool): Whether to transpose first matrixtrans_y(bool): Whether to transpose second matrixthreads(Rcpp::Nullable< int >): Number of OpenMP threads
4 Returns
Type: class T
5 Details
Efficient implementation for cross-correlation between two HDF5-stored matrices with full transpose support while maintaining all block-wise optimizations critical for big-omics data. Uses modified I/O patterns and includes automatic mathematical optimization for cor(t(X), t(Y)) == cor(X,Y) case.
6 Call Graph
7 Source Code
NoteImplementation
File: inst/include/hdf5Algebra/matrixCorrelation.hpp • Lines 1598-1826
inline void RcppbdCorr_hdf5_Block_cross(T* dsA, T* dsB,
BigDataStatMeth::hdf5Dataset* dsCorr,
BigDataStatMeth::hdf5Dataset* dsPval,
const std::string& method = "pearson",
bool use_complete_obs = true,
bool compute_pvalues = false,
int block_size = 1000,
bool trans_x = false,
bool trans_y = false,
Rcpp::Nullable<int> threads = R_NilValue) {
static_assert(std::is_same<T*, BigDataStatMeth::hdf5Dataset*>::value ||
std::is_same<T*, BigDataStatMeth::hdf5DatasetInternal*>::value,
"Error - type not allowed");
try {
std::vector<hsize_t> stride = {1, 1}, block = {1, 1};
// CRITICAL FIX: R→HDF5 data is implicitly transposed for both matrices
// HDF5 dimensions are NOT the "real" data dimensions from R
hsize_t n_rows_a_hdf5 = dsA->nrows(); // Variables in HDF5 for matrix A
hsize_t n_cols_a_hdf5 = dsA->ncols(); // Observations in HDF5 for matrix A
hsize_t n_rows_b_hdf5 = dsB->nrows(); // Variables in HDF5 for matrix B
hsize_t n_cols_b_hdf5 = dsB->ncols(); // Observations in HDF5 for matrix B
// Real dimensions (as they were in R before saving to HDF5):
hsize_t n_obs_a_real = n_cols_a_hdf5; // Real observations in A (samples)
hsize_t n_vars_a_real = n_rows_a_hdf5; // Real variables in A (genes/features)
hsize_t n_obs_b_real = n_cols_b_hdf5; // Real observations in B (samples)
hsize_t n_vars_b_real = n_rows_b_hdf5; // Real variables in B (genes/features)
// Apply user transpose logic to REAL dimensions
hsize_t n_rows_a = trans_x ? n_vars_a_real : n_obs_a_real;
hsize_t n_cols_a = trans_x ? n_obs_a_real : n_vars_a_real;
hsize_t n_rows_b = trans_y ? n_vars_b_real : n_obs_b_real;
hsize_t n_cols_b = trans_y ? n_obs_b_real : n_vars_b_real;
// Validate dimensions
if (n_rows_a != n_rows_b) {
checkClose_file(dsA, dsB, dsCorr, dsPval);
throw std::runtime_error("Matrices must have same number of observations after transposition");
}
// hsize_t n_rows = n_rows_a;
// Strategy selection
const hsize_t MEMORY_LIMIT = 250000; // Conservative for cross-correlation
if (n_rows_a_hdf5 * n_cols_a_hdf5 < MEMORY_LIMIT && n_rows_b_hdf5 * n_cols_b_hdf5 < MEMORY_LIMIT) {
// Memory-resident computation
std::vector<double> matrix_a_data(n_rows_a_hdf5 * n_cols_a_hdf5);
std::vector<double> matrix_b_data(n_rows_b_hdf5 * n_cols_b_hdf5);
dsA->readDatasetBlock({0, 0}, {n_rows_a_hdf5, n_cols_a_hdf5}, stride, block, matrix_a_data.data());
dsB->readDatasetBlock({0, 0}, {n_rows_b_hdf5, n_cols_b_hdf5}, stride, block, matrix_b_data.data());
// CRITICAL FIX: Create Eigen matrices accounting for R→HDF5 transposition
// Use OPTIMIZED strategy: NO explicit transpose, invert user logic instead
Eigen::MatrixXd X = Eigen::Map<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>(
matrix_a_data.data(), n_rows_a_hdf5, n_cols_a_hdf5);
Eigen::MatrixXd Y = Eigen::Map<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>(
matrix_b_data.data(), n_rows_b_hdf5, n_cols_b_hdf5);
// PERFORMANCE OPTIMIZATION: Instead of transposing matrices, invert user intention
// Data in HDF5 is transposed compared to R, so invert transpose flags
bool effective_trans_x = !trans_x;
bool effective_trans_y = !trans_y;
corr_result result = RcppbdCorr_matrix_cross(X, Y, method, use_complete_obs, compute_pvalues, effective_trans_x, effective_trans_y, threads);
if (!result.bcomputed) {
checkClose_file(dsA, dsB, dsCorr, dsPval);
throw std::runtime_error("Cross-correlation computation failed");
}
dsCorr->createDataset(result.correlation_matrix.rows(), result.correlation_matrix.cols(), "real");
dsCorr->writeDataset(Rcpp::wrap(result.correlation_matrix));
if (compute_pvalues && result.has_pvalues && dsPval && result.pvalues.rows() > 0 && result.pvalues.cols() > 0) {
dsPval->createDataset(result.pvalues.rows(), result.pvalues.cols(), "real");
dsPval->writeDataset(Rcpp::wrap(result.pvalues));
}
} else {
// BIG-OMICS: Block-wise processing with intelligent batching
Eigen::MatrixXd corr_matrix = Eigen::MatrixXd::Zero(n_cols_a, n_cols_b);
// Intelligent threading and batching for big-omics
int batch_size_a = 1;
int batch_size_b = 1;
#ifdef _OPENMP
int num_threads = 1;
if (!threads.isNull()) {
num_threads = Rcpp::as<int>(threads);
} else {
hsize_t total_correlations = n_cols_a * n_cols_b;
if (total_correlations < 10000) {
num_threads = 1;
batch_size_a = 1;
batch_size_b = 1;
} else if (total_correlations < 1000000) {
num_threads = std::min(4, omp_get_max_threads());
batch_size_a = std::min((hsize_t)20, n_cols_a);
batch_size_b = std::min((hsize_t)50, n_cols_b);
} else {
// Very large matrices: aggressive batching
num_threads = std::min(8, omp_get_max_threads());
batch_size_a = std::min((hsize_t)50, n_cols_a);
batch_size_b = std::min((hsize_t)100, n_cols_b);
}
}
num_threads = std::max(1, std::min(num_threads, omp_get_max_threads()));
// #endif
// Process A in batches
// #ifdef _OPENMP
#pragma omp parallel for num_threads(num_threads) schedule(dynamic, 1)
#endif
for (hsize_t i_start = 0; i_start < n_cols_a; i_start += batch_size_a) {
hsize_t i_end = std::min(i_start + batch_size_a, n_cols_a);
hsize_t batch_cols_a = i_end - i_start;
// Read batch from A
std::vector<double> batch_a_data(n_rows_a * batch_cols_a);
// CRITICAL FIX: Transpose-aware I/O patterns for HDF5 data (R→HDF5 transposition)
if (trans_x) {
// User wants correlation between samples from A
// In HDF5: samples are columns, so read columns as batch
for (hsize_t b_idx = 0; b_idx < batch_cols_a; ++b_idx) {
hsize_t actual_i = i_start + b_idx;
std::vector<double> col_data(n_rows_a);
dsA->readDatasetBlock({0, actual_i}, {n_rows_a_hdf5, 1}, stride, block, col_data.data());
std::copy(col_data.begin(), col_data.end(),
batch_a_data.begin() + b_idx * n_rows_a);
}
} else {
// User wants correlation between variables from A (default)
// In HDF5: variables are rows, so read rows as batch
for (hsize_t b_idx = 0; b_idx < batch_cols_a; ++b_idx) {
hsize_t actual_i = i_start + b_idx;
std::vector<double> row_data(n_rows_a);
dsA->readDatasetBlock({actual_i, 0}, {1, n_cols_a_hdf5}, stride, block, row_data.data());
std::copy(row_data.begin(), row_data.end(),
batch_a_data.begin() + b_idx * n_rows_a);
}
}
// Process B in batches for each A batch
for (hsize_t j_start = 0; j_start < n_cols_b; j_start += batch_size_b) {
hsize_t j_end = std::min(j_start + batch_size_b, n_cols_b);
hsize_t batch_cols_b = j_end - j_start;
// Read batch from B
std::vector<double> batch_b_data(n_rows_b * batch_cols_b);
// FINAL CORRECTION: Swap logic based on test results
if (trans_y) {
// User wants correlation with samples from B
// Read columns as batch (this currently gives correct result)
for (hsize_t b_idx = 0; b_idx < batch_cols_b; ++b_idx) {
hsize_t actual_j = j_start + b_idx;
std::vector<double> col_data(n_rows_b);
dsB->readDatasetBlock({0, actual_j}, {n_rows_b_hdf5, 1}, stride, block, col_data.data());
std::copy(col_data.begin(), col_data.end(),
batch_b_data.begin() + b_idx * n_rows_b);
}
} else {
// User wants correlation with variables from B (default - same as cor(ds))
// Read rows as batch (swap to make this give cor(ds) result)
for (hsize_t b_idx = 0; b_idx < batch_cols_b; ++b_idx) {
hsize_t actual_j = j_start + b_idx;
std::vector<double> row_data(n_rows_b);
dsB->readDatasetBlock({actual_j, 0}, {1, n_cols_b_hdf5}, stride, block, row_data.data());
std::copy(row_data.begin(), row_data.end(),
batch_b_data.begin() + b_idx * n_rows_b);
}
}
// Compute correlations for entire batch block
for (hsize_t i_idx = 0; i_idx < batch_cols_a; ++i_idx) {
hsize_t i = i_start + i_idx;
Eigen::VectorXd vec_a = Eigen::Map<Eigen::VectorXd>(
batch_a_data.data() + i_idx * n_rows_a, n_rows_a);
for (hsize_t j_idx = 0; j_idx < batch_cols_b; ++j_idx) {
hsize_t j = j_start + j_idx;
Eigen::VectorXd vec_b = Eigen::Map<Eigen::VectorXd>(
batch_b_data.data() + j_idx * n_rows_a, n_rows_a); // Use n_rows_a since dimensions must match
double corr_val;
if (method == "spearman") {
corr_val = spearman_correlation(vec_a, vec_b, use_complete_obs);
} else {
corr_val = pearson_correlation(vec_a, vec_b, use_complete_obs);
}
corr_matrix(i, j) = corr_val;
}
}
}
}
// Write correlation matrix
dsCorr->createDataset(corr_matrix.rows(), corr_matrix.cols(), "real");
dsCorr->writeDataset(Rcpp::wrap(corr_matrix));
// VECTORIZED P-VALUES: Compute on complete correlation matrix
if (compute_pvalues && dsPval) {
Eigen::MatrixXd pvalues_matrix = compute_pvalues_optimized(corr_matrix, n_rows_a, false); // Use n_rows_a (effective observations)
dsPval->createDataset(pvalues_matrix.rows(), pvalues_matrix.cols(), "real");
dsPval->writeDataset(Rcpp::wrap(pvalues_matrix));
}
}
} catch(std::exception &ex) {
checkClose_file(dsA, dsB, dsCorr, dsPval);
Rcpp::Rcerr << "C++ exception RcppbdCorr_hdf5_Block_cross: " << ex.what() << std::endl;
throw;
}
}8 Usage Example
#include "BigDataStatMeth.hpp"
// Example usage
auto result = RcppbdCorr_hdf5_Block_cross(...);