RcppbdCorr_hdf5_Block_cross

C++ Function Reference

1 Signature

void BigDataStatMeth::RcppbdCorr_hdf5_Block_cross(T *dsA, T *dsB, BigDataStatMeth::hdf5Dataset *dsCorr, BigDataStatMeth::hdf5Dataset *dsPval, const std::string &method="pearson", bool use_complete_obs=true, bool compute_pvalues=false, int block_size=1000, bool trans_x=false, bool trans_y=false, Rcpp::Nullable< int > threads=R_NilValue)

2 Description

Enhanced HDF5 cross-correlation with integrated transpose support for big-omics.

3 Parameters

  • dsA (T *): First input HDF5 dataset
  • dsB (T *): Second input HDF5 dataset
  • dsCorr (BigDataStatMeth::hdf5Dataset *): Output HDF5 dataset for correlation matrix
  • dsPval (BigDataStatMeth::hdf5Dataset *): Output HDF5 dataset for p-values (optional)
  • method (const std::string &): Correlation method (“pearson” or “spearman”)
  • use_complete_obs (bool): Whether to use only complete observations
  • compute_pvalues (bool): Whether to compute p-values
  • block_size (int): Block size for processing
  • trans_x (bool): Whether to transpose first matrix
  • trans_y (bool): Whether to transpose second matrix
  • threads (Rcpp::Nullable< int >): Number of OpenMP threads

4 Returns

Type: class T

5 Details

Efficient implementation for cross-correlation between two HDF5-stored matrices with full transpose support while maintaining all block-wise optimizations critical for big-omics data. Uses modified I/O patterns and includes automatic mathematical optimization for cor(t(X), t(Y)) == cor(X,Y) case.

6 Call Graph

Function dependencies

7 Source Code

File: inst/include/hdf5Algebra/matrixCorrelation.hppLines 1598-1826

inline void RcppbdCorr_hdf5_Block_cross(T* dsA, T* dsB,
                                            BigDataStatMeth::hdf5Dataset* dsCorr,
                                            BigDataStatMeth::hdf5Dataset* dsPval,
                                            const std::string& method = "pearson",
                                            bool use_complete_obs = true,
                                            bool compute_pvalues = false,
                                            int block_size = 1000,
                                            bool trans_x = false,
                                            bool trans_y = false,
                                            Rcpp::Nullable<int> threads = R_NilValue) {
        
        static_assert(std::is_same<T*, BigDataStatMeth::hdf5Dataset*>::value ||
                      std::is_same<T*, BigDataStatMeth::hdf5DatasetInternal*>::value,
                      "Error - type not allowed");
        
        try {
            std::vector<hsize_t> stride = {1, 1}, block = {1, 1};
            
            // CRITICAL FIX: R→HDF5 data is implicitly transposed for both matrices
            // HDF5 dimensions are NOT the "real" data dimensions from R
            hsize_t n_rows_a_hdf5 = dsA->nrows();   // Variables in HDF5 for matrix A
            hsize_t n_cols_a_hdf5 = dsA->ncols();   // Observations in HDF5 for matrix A
            hsize_t n_rows_b_hdf5 = dsB->nrows();   // Variables in HDF5 for matrix B
            hsize_t n_cols_b_hdf5 = dsB->ncols();   // Observations in HDF5 for matrix B
            
            // Real dimensions (as they were in R before saving to HDF5):
            hsize_t n_obs_a_real = n_cols_a_hdf5;     // Real observations in A (samples)
            hsize_t n_vars_a_real = n_rows_a_hdf5;    // Real variables in A (genes/features)
            hsize_t n_obs_b_real = n_cols_b_hdf5;     // Real observations in B (samples)
            hsize_t n_vars_b_real = n_rows_b_hdf5;    // Real variables in B (genes/features)
            
            // Apply user transpose logic to REAL dimensions
            hsize_t n_rows_a = trans_x ? n_vars_a_real : n_obs_a_real;
            hsize_t n_cols_a = trans_x ? n_obs_a_real : n_vars_a_real;
            hsize_t n_rows_b = trans_y ? n_vars_b_real : n_obs_b_real;
            hsize_t n_cols_b = trans_y ? n_obs_b_real : n_vars_b_real;
            
            // Validate dimensions
            if (n_rows_a != n_rows_b) {
                checkClose_file(dsA, dsB, dsCorr, dsPval);
                throw std::runtime_error("Matrices must have same number of observations after transposition");
            }
            
            // hsize_t n_rows = n_rows_a;
            
            // Strategy selection
            const hsize_t MEMORY_LIMIT = 250000; // Conservative for cross-correlation
            
            if (n_rows_a_hdf5 * n_cols_a_hdf5 < MEMORY_LIMIT && n_rows_b_hdf5 * n_cols_b_hdf5 < MEMORY_LIMIT) {
                
                // Memory-resident computation
                std::vector<double> matrix_a_data(n_rows_a_hdf5 * n_cols_a_hdf5);
                std::vector<double> matrix_b_data(n_rows_b_hdf5 * n_cols_b_hdf5);
                
                dsA->readDatasetBlock({0, 0}, {n_rows_a_hdf5, n_cols_a_hdf5}, stride, block, matrix_a_data.data());
                dsB->readDatasetBlock({0, 0}, {n_rows_b_hdf5, n_cols_b_hdf5}, stride, block, matrix_b_data.data());
                
                // CRITICAL FIX: Create Eigen matrices accounting for R→HDF5 transposition
                // Use OPTIMIZED strategy: NO explicit transpose, invert user logic instead
                Eigen::MatrixXd X = Eigen::Map<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>(
                    matrix_a_data.data(), n_rows_a_hdf5, n_cols_a_hdf5);
                Eigen::MatrixXd Y = Eigen::Map<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>(
                    matrix_b_data.data(), n_rows_b_hdf5, n_cols_b_hdf5);
                
                // PERFORMANCE OPTIMIZATION: Instead of transposing matrices, invert user intention
                // Data in HDF5 is transposed compared to R, so invert transpose flags
                bool effective_trans_x = !trans_x;
                bool effective_trans_y = !trans_y;
                
                corr_result result = RcppbdCorr_matrix_cross(X, Y, method, use_complete_obs, compute_pvalues, effective_trans_x, effective_trans_y, threads);
                
                if (!result.bcomputed) {
                    checkClose_file(dsA, dsB, dsCorr, dsPval);
                    throw std::runtime_error("Cross-correlation computation failed");
                }
                
                dsCorr->createDataset(result.correlation_matrix.rows(), result.correlation_matrix.cols(), "real");
                dsCorr->writeDataset(Rcpp::wrap(result.correlation_matrix));
                
                if (compute_pvalues && result.has_pvalues && dsPval && result.pvalues.rows() > 0 && result.pvalues.cols() > 0) {
                    dsPval->createDataset(result.pvalues.rows(), result.pvalues.cols(), "real");
                    dsPval->writeDataset(Rcpp::wrap(result.pvalues));
                }
                
                
            } else {
                
                // BIG-OMICS: Block-wise processing with intelligent batching
                Eigen::MatrixXd corr_matrix = Eigen::MatrixXd::Zero(n_cols_a, n_cols_b);
                
                // Intelligent threading and batching for big-omics
                
                int batch_size_a = 1;
                int batch_size_b = 1;
                
        #ifdef _OPENMP
                int num_threads = 1;
                if (!threads.isNull()) {
                    num_threads = Rcpp::as<int>(threads);
                } else {
                    hsize_t total_correlations = n_cols_a * n_cols_b;
                    
                    if (total_correlations < 10000) {
                        num_threads = 1;
                        batch_size_a = 1;
                        batch_size_b = 1;
                    } else if (total_correlations < 1000000) {
                        num_threads = std::min(4, omp_get_max_threads());
                        batch_size_a = std::min((hsize_t)20, n_cols_a);
                        batch_size_b = std::min((hsize_t)50, n_cols_b);
                    } else {
                        // Very large matrices: aggressive batching
                        num_threads = std::min(8, omp_get_max_threads());
                        batch_size_a = std::min((hsize_t)50, n_cols_a);
                        batch_size_b = std::min((hsize_t)100, n_cols_b);
                    }
                }
                num_threads = std::max(1, std::min(num_threads, omp_get_max_threads()));
        // #endif
                
                // Process A in batches
        // #ifdef _OPENMP
        #pragma omp parallel for num_threads(num_threads) schedule(dynamic, 1)
        #endif
                for (hsize_t i_start = 0; i_start < n_cols_a; i_start += batch_size_a) {
                    hsize_t i_end = std::min(i_start + batch_size_a, n_cols_a);
                    hsize_t batch_cols_a = i_end - i_start;
                    
                    // Read batch from A
                    std::vector<double> batch_a_data(n_rows_a * batch_cols_a);
                    
                    // CRITICAL FIX: Transpose-aware I/O patterns for HDF5 data (R→HDF5 transposition)
                    if (trans_x) {
                        // User wants correlation between samples from A
                        // In HDF5: samples are columns, so read columns as batch
                        for (hsize_t b_idx = 0; b_idx < batch_cols_a; ++b_idx) {
                            hsize_t actual_i = i_start + b_idx;
                            std::vector<double> col_data(n_rows_a);
                            dsA->readDatasetBlock({0, actual_i}, {n_rows_a_hdf5, 1}, stride, block, col_data.data());
                            std::copy(col_data.begin(), col_data.end(), 
                                      batch_a_data.begin() + b_idx * n_rows_a);
                        }
                    } else {
                        // User wants correlation between variables from A (default)
                        // In HDF5: variables are rows, so read rows as batch
                        for (hsize_t b_idx = 0; b_idx < batch_cols_a; ++b_idx) {
                            hsize_t actual_i = i_start + b_idx;
                            std::vector<double> row_data(n_rows_a);
                            dsA->readDatasetBlock({actual_i, 0}, {1, n_cols_a_hdf5}, stride, block, row_data.data());
                            std::copy(row_data.begin(), row_data.end(), 
                                      batch_a_data.begin() + b_idx * n_rows_a);
                        }
                    }
                    
                    // Process B in batches for each A batch
                    for (hsize_t j_start = 0; j_start < n_cols_b; j_start += batch_size_b) {
                        hsize_t j_end = std::min(j_start + batch_size_b, n_cols_b);
                        hsize_t batch_cols_b = j_end - j_start;
                        
                        // Read batch from B
                        std::vector<double> batch_b_data(n_rows_b * batch_cols_b);
                        
                        // FINAL CORRECTION: Swap logic based on test results
                        if (trans_y) {
                            // User wants correlation with samples from B
                            // Read columns as batch (this currently gives correct result)
                            for (hsize_t b_idx = 0; b_idx < batch_cols_b; ++b_idx) {
                                hsize_t actual_j = j_start + b_idx;
                                std::vector<double> col_data(n_rows_b);
                                dsB->readDatasetBlock({0, actual_j}, {n_rows_b_hdf5, 1}, stride, block, col_data.data());
                                std::copy(col_data.begin(), col_data.end(), 
                                          batch_b_data.begin() + b_idx * n_rows_b);
                            }
                        } else {
                            // User wants correlation with variables from B (default - same as cor(ds))
                            // Read rows as batch (swap to make this give cor(ds) result)
                            for (hsize_t b_idx = 0; b_idx < batch_cols_b; ++b_idx) {
                                hsize_t actual_j = j_start + b_idx;
                                std::vector<double> row_data(n_rows_b);
                                dsB->readDatasetBlock({actual_j, 0}, {1, n_cols_b_hdf5}, stride, block, row_data.data());
                                std::copy(row_data.begin(), row_data.end(), 
                                          batch_b_data.begin() + b_idx * n_rows_b);
                            }
                        }
                        
                        // Compute correlations for entire batch block
                        for (hsize_t i_idx = 0; i_idx < batch_cols_a; ++i_idx) {
                            hsize_t i = i_start + i_idx;
                            
                            Eigen::VectorXd vec_a = Eigen::Map<Eigen::VectorXd>(
                                batch_a_data.data() + i_idx * n_rows_a, n_rows_a);
                            
                            for (hsize_t j_idx = 0; j_idx < batch_cols_b; ++j_idx) {
                                hsize_t j = j_start + j_idx;
                                
                                Eigen::VectorXd vec_b = Eigen::Map<Eigen::VectorXd>(
                                    batch_b_data.data() + j_idx * n_rows_a, n_rows_a);  // Use n_rows_a since dimensions must match
                                
                                double corr_val;
                                if (method == "spearman") {
                                    corr_val = spearman_correlation(vec_a, vec_b, use_complete_obs);
                                } else {
                                    corr_val = pearson_correlation(vec_a, vec_b, use_complete_obs);
                                }
                                
                                corr_matrix(i, j) = corr_val;
                            }
                        }
                    }
                }
                
                // Write correlation matrix
                dsCorr->createDataset(corr_matrix.rows(), corr_matrix.cols(), "real");
                dsCorr->writeDataset(Rcpp::wrap(corr_matrix));
                
                // VECTORIZED P-VALUES: Compute on complete correlation matrix
                if (compute_pvalues && dsPval) {
                    Eigen::MatrixXd pvalues_matrix = compute_pvalues_optimized(corr_matrix, n_rows_a, false);  // Use n_rows_a (effective observations)
                    dsPval->createDataset(pvalues_matrix.rows(), pvalues_matrix.cols(), "real");
                    dsPval->writeDataset(Rcpp::wrap(pvalues_matrix));
                }
            }
            
        } catch(std::exception &ex) {
            checkClose_file(dsA, dsB, dsCorr, dsPval);
            Rcpp::Rcerr << "C++ exception RcppbdCorr_hdf5_Block_cross: " << ex.what() << std::endl;
            throw;
        }
    }

8 Usage Example

#include "BigDataStatMeth.hpp"

// Example usage
auto result = RcppbdCorr_hdf5_Block_cross(...);