First_level_SvdBlock_decomposition_hdf5
C++ Function Reference
1 Signature
void BigDataStatMeth::First_level_SvdBlock_decomposition_hdf5(T *dsA, std::string strGroupName, int k, int q, int nev, bool bcenter, bool bscale, double dthreshold, Rcpp::Nullable< int > threads=R_NilValue)2 Parameters
dsA(T *)strGroupName(std::string)k(int)q(int)nev(int)bcenter(bool)bscale(bool)dthreshold(double)threads(Rcpp::Nullable< int >)
3 Returns
Type: class T
4 Call Graph
5 Source Code
NoteImplementation
File: inst/include/hdf5Algebra/matrixSvdBlock.hpp • Lines 238-467
inline void First_level_SvdBlock_decomposition_hdf5( T* dsA, std::string strGroupName, int k, int q, int nev, bool bcenter, bool bscale,
double dthreshold, Rcpp::Nullable<int> threads = R_NilValue)
{
static_assert(std::is_same<T*, BigDataStatMeth::hdf5Dataset* >::value ||
std::is_same<T*, BigDataStatMeth::hdf5DatasetInternal* >::value,
"Error - type not allowed");
BigDataStatMeth::hdf5DatasetInternal* normalizedData = nullptr;
BigDataStatMeth::hdf5Dataset* unlimDataset = nullptr;
try{
std::vector<svdPositions> paralPos;
int M, p, n, irows, icols, normalsize;
// int ithreads;
bool transp = false;
Rcpp::Nullable<int> wsize = R_NilValue;
irows = dsA->ncols();
icols = dsA->nrows();
// ithreads = get_number_threads(threads, R_NilValue);
// Work with transposed matrix
if( irows >= icols ) {
transp = true;
n = icols;
p = irows;
normalsize = n;
} else {
n = irows;
p = icols;
normalsize = p;
}
Eigen::MatrixXd datanormal = Eigen::MatrixXd::Zero(2, normalsize);
get_HDF5_mean_sd_by_column(dsA, datanormal, true, true, wsize);
M = pow(k, q);
if(M>p)
throw std::runtime_error("k^q must not be greater than the number of columns in the matrix");
//.. 2025-02-10 ..// double block_size = std::floor((double)icols/(double)M);
double block_size = std::round((double)p/(double)M);
if (bcenter==true || bscale==true) { // Create dataset to store normalized data
normalizedData = new BigDataStatMeth::hdf5DatasetInternal (dsA->getFullPath(), strGroupName, "normalmatrix", true);
// normalizedData->createDataset( irows, icols, "real");
normalizedData->createDataset( irows, icols, "real");
//.Caldria revisar... .// normalizedData->createDataset( n, p, "real");
}
// Get all the offsets and counts inside the file to write and read
paralPos = prepareForParallelization( dsA, M, k, transp, block_size, strGroupName + "/A");
#pragma omp parallel num_threads( get_number_threads(threads, R_NilValue) ) shared (normalizedData)
{
// Get data from M blocks in initial matrix
#pragma omp for schedule (dynamic) ordered
for( int i = 0; i< M ; i++)
{
Eigen::MatrixXd restmp;
// 1.- Get SVD from all blocks
// a) Get all blocks from initial matrix
Eigen::MatrixXd X;
std::vector<double> vdX( paralPos[i].count[0] * paralPos[i].count[1] );
#pragma omp critical(accessFile)
{
dsA->readDatasetBlock( {paralPos[i].totOffset[0], paralPos[i].totOffset[1]}, {paralPos[i].count[0], paralPos[i].count[1]}, paralPos[i].stride, paralPos[i].block, vdX.data() );
}
if(transp==false){
X = Eigen::Map<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic, Eigen::ColMajor>> (vdX.data(), paralPos[i].count[1], paralPos[i].count[0] );
} else {
X = Eigen::Map<Eigen::Matrix<double, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>> (vdX.data(), paralPos[i].count[0], paralPos[i].count[1] );
}
// Normalize data
if (bcenter==true || bscale==true)
{
std::vector<hsize_t> offset_tmp = {paralPos[i].totOffset[1], paralPos[i].totOffset[0]};
std::vector<hsize_t> count_tmp = {paralPos[i].count[1], paralPos[i].count[0]};
if(transp == true) {
offset_tmp = {paralPos[i].totOffset[0], paralPos[i].totOffset[1]};
count_tmp = {paralPos[i].count[0], paralPos[i].count[1]};
}
if( transp == false) {
// X = RcppNormalize_Data(X, bcenter, bscale, transp, datanormal.block(0, offset_tmp[1], 2, count_tmp[1]));
X = RcppNormalizeColwise(X, bcenter, bscale);
#pragma omp critical(accessFile)
{
normalizedData->writeDatasetBlock( Rcpp::wrap(X), offset_tmp, count_tmp, paralPos[i].stride, paralPos[i].block, false);
}
} else {
X = RcppNormalize_Data(X, bcenter, bscale, transp, datanormal.block(0, offset_tmp[0], 2, count_tmp[0]));
count_tmp = {(unsigned long long)X.cols(), (unsigned long long)X.rows()};
offset_tmp = {paralPos[i].totOffset[1], paralPos[i].totOffset[0]};
#pragma omp critical(accessFile)
{
normalizedData->writeDatasetBlock( Rcpp::wrap(X.transpose()), offset_tmp, count_tmp, paralPos[i].stride, paralPos[i].block, false);
}
}
}
// Rcpp::Rcout<<"\nPeta a l'step 1? - 1";
{
// b) SVD for each block
svdeig retsvd;
retsvd = RcppbdSVD_lapack(X, false, false, false);
int size_d = (retsvd.d).size();
int nzeros = 0;
if( (retsvd.d)[size_d - 1] <= dthreshold ){
nzeros = 1;
for( int j = (size_d - 2); ( j>1 && (retsvd.d)[i] <= dthreshold ); j-- ) {
nzeros++;
}
}
// c) U*d
// Create diagonal matrix from svd decomposition d
int isize = (retsvd.d).size() - nzeros;
if( isize < 2 ) {
isize = 2;
}
Eigen::MatrixXd d = Eigen::MatrixXd::Zero(isize, isize);
d.diagonal() = (retsvd.d).head(isize);
Eigen::MatrixXd u = (retsvd.u).block(0, 0, (retsvd.u).rows(), isize);
// if( u.size() < MAXELEMSINBLOCK ) {
if (static_cast<hsize_t>(u.size()) < MAXELEMSINBLOCK) {
restmp = u*d;
} else{
restmp = Rcpp_block_matrix_mul( u, d, R_NilValue);
}
}
paralPos[i].write_count = {(hsize_t)restmp.rows(), (hsize_t)restmp.cols()};
// d) Write results to hdf5 file
#pragma omp ordered
{
#pragma omp critical(accessFile)
{
if( i%(M/k) == 0 || ( (i%(M/k) > 0 && !exists_HDF5_element(dsA->getFileptr(), paralPos[i].strDatasetName )) ) )
{
unlimDataset = new BigDataStatMeth::hdf5DatasetInternal(dsA->getFullPath(), paralPos[i].strDatasetName, true );
unlimDataset->createUnlimitedDataset(paralPos[i].write_count[0], paralPos[i].write_count[1], "real");
delete unlimDataset; unlimDataset = nullptr;
paralPos[i].write_offset = 0;
}
unlimDataset = new BigDataStatMeth::hdf5DatasetInternal(dsA->getFullPath(), paralPos[i].strDatasetName, true );
unlimDataset->openDataset();
// Extend dataset before to put the data
if((i%(M/k)) != 0 && paralPos[i].write_offset > 0) {
unlimDataset->extendUnlimitedDataset(0, paralPos[i].write_count[1] );
}
// std::vector<double> vtmpc(restmp.data(), restmp.data() + restmp.rows() * restmp.cols());
unlimDataset->writeDatasetBlock( Rcpp::wrap(restmp), {0, paralPos[i].write_offset}, paralPos[i].write_count, paralPos[i].stride, paralPos[i].block, false);
delete unlimDataset; unlimDataset = nullptr;
if( i<M-1 )
paralPos[i+1].write_offset = paralPos[i].write_offset + paralPos[i].write_count[1];
}
}
}
}
if (bcenter==true || bscale==true) {
delete normalizedData; normalizedData = nullptr;
}
} catch( H5::FileIException& error ) {
checkClose_file(dsA, unlimDataset, normalizedData);
Rcpp::Rcerr<<"\nc++ exception First_level_SvdBlock_decomposition_hdf5 (File IException)\n";
return void();
} catch( H5::DataSetIException& error ) {
checkClose_file(dsA, unlimDataset, normalizedData);
Rcpp::Rcerr<<"\nc++ exception First_level_SvdBlock_decomposition_hdf5 (DataSet IException)\n";
return void();
} catch( H5::GroupIException& error ) {
checkClose_file(dsA, unlimDataset, normalizedData);
Rcpp::Rcerr<<"\nc++ exception First_level_SvdBlock_decomposition_hdf5 (Group IException)\n";
return void();
} catch( H5::DataTypeIException& error ) {
checkClose_file(dsA, unlimDataset, normalizedData);
Rcpp::Rcerr<<"\nc++ exception First_level_SvdBlock_decomposition_hdf5 (DataType IException)\n";
return void();
} catch( H5::DataSpaceIException& error ) {
checkClose_file(dsA, unlimDataset, normalizedData);
Rcpp::Rcerr<<"\nc++ exception First_level_SvdBlock_decomposition_hdf5 (DataSpace IException)\n";
return void();
} catch(std::exception &ex) {
checkClose_file(dsA, unlimDataset, normalizedData);
Rcpp::Rcerr<<"c++ exception First_level_SvdBlock_decomposition_hdf5 \n"<< ex.what();
return void();
} catch (...) {
checkClose_file(dsA, unlimDataset, normalizedData);
Rcpp::Rcerr<<"\nC++ exception First_level_SvdBlock_decomposition_hdf5 (unknown reason)";
return void();
}
return void();
}6 Usage Example
#include "BigDataStatMeth.hpp"
// Example usage
auto result = First_level_SvdBlock_decomposition_hdf5(...);