Source code for symdet.generator_extraction.generators

"""
This program and the accompanying materials are made available under the terms of the
Eclipse Public License v2.0 which accompanies this distribution, and is available at
https://www.eclipse.org/legal/epl-v20.html
SPDX-License-Identifier: EPL-2.0

Copyright Contributors to the Zincware Project.

Generators
==========
Python module to extract generators from data.
"""
import tensorflow as tf
import numpy as np
from tqdm import tqdm
import random
from sklearn.linear_model import LinearRegression
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from typing import Tuple


[docs]class GeneratorExtraction: """ Class to extract generators from point clouds Attributes ---------- point_cloud : tf.Tensor Point cloud on which to perform regression. delta : float Width of the hyperplane to be considered. epsilon : float Distance between points to be considered as a point pair. candidate_runs : int Number of times to generate candidates before running PCA. Tune if convergence is not found. basis : tf.Tensor Orthonormal basis of the point cloud. hyperplane_set : tf.Tensor Set of all points in the hyperplane. point_pairs : list A list of tuples where each tuple value is an index of a point. The tuple indices correspond to point pairs. i.e. if the tuple is (0, 400), then indices 0 and 400 in the hyperplane set are a pair connected by the application/s of the generators. dimension : int Dimensionality of the points. generator_candidates : list Generator candidates on which to perform PCA. constrained_generators : np.ndarray: Constrained generator candidates. """ def __init__( self, point_cloud: tf.Tensor, delta: float = 0.5, epsilon: float = 0.3, candidate_runs: int = 10, ): """ Constructor for the GeneratorExtraction class. Parameters ---------- point_cloud : tf.Tensor Point cloud on which to perform regression delta : float Width of the hyperplane to be considered epsilon : float Distance between points to be considered as a point pair. candidate_runs : int Number of times to generate candidates before running PCA. Tune if convergence is not found. """ self.point_cloud = point_cloud self.delta = delta self.epsilon = epsilon self.candidate_runs = candidate_runs self.basis: tf.Tensor self.hyperplane_set: tf.Tensor self.point_pairs: list self.dimension = self._get_dimension() self.generator_candidates = [] self.constrained_generators = [] def _get_dimension(self): """ Get the dimension of the points in the point cloud. Returns ------- dimension : int Dimension of the points in the point cloud. """ return len(self.point_cloud[0]) def _remove_redundancy(self): """ Remove the redundancy in the data with PCA. Returns ------- Updates the class state. Notes ----- Currently not implemented as it is not required. """ pass def _generate_basis_set(self, gs_precision: int): """ Build the basis set. Parameters ---------- gs_precision : int Number of decimals after 0 to consider orthogonal. Returns ------- Updates the basis set and enforces a check. """ basis = list(self._start_gs()) if self.dimension > 2: basis_candidates = np.zeros((self.dimension, self.dimension)) for i, vector in enumerate(basis_candidates): vector[i] = 1 reduced_candidates = self._eliminate_closest_vector( basis, list(basis_candidates) ) for item in reduced_candidates: basis.append(self._perform_gs(item, basis)) self.basis = tf.convert_to_tensor(basis) # set the class attribute self._gs_check(gs_precision) def _gs_check(self, gs_precision: int): """ Check to see that all basis vectors are orthogonal to one another. If this assert fails, the session will end. Parameters ---------- gs_precision : int Number of decimals after 0 to consider orthogonal. Returns ------- Will throw an exception if the assert fails. """ for basis in self.basis: np.testing.assert_almost_equal(np.linalg.norm(basis), 1) # check the normalization. for test in self.basis: if all(test == basis): continue np.testing.assert_almost_equal(np.dot(basis, test), 0, decimal=gs_precision) # check the orthogonality. def _perform_gs(self, vector: list, basis_set: list) -> np.ndarray: """ Perform the Gram-Schmidt procedure. Parameters ---------- vector : list Vector to orthonormalize basis_set : list Current basis set Returns ------- basis_vector : list Orthonormalized basis vector """ basis_vector = vector for basis_item in basis_set: basis_vector -= self._projection_operator(basis_item, basis_vector) return basis_vector / np.linalg.norm(basis_vector) def _eliminate_closest_vector( self, reference_vectors: list, test_vectors: list) -> np.ndarray: """ Remove the closest vectors in the theoretical basis set Parameters ---------- reference_vectors : list First two basis vectors test_vectors : list Vectors to test against Returns ------- basis_vectors : np.ndarray Returns the basis vectors which minimizes the sum of squares of the scalar product. """ distances = [] for vector in test_vectors: d_1 = np.dot(vector, reference_vectors[0]) d_2 = np.dot(vector, reference_vectors[1]) distances.append(d_1 ** 2 + d_2 ** 2) return np.array(test_vectors)[np.argsort(distances)][: int(self.dimension - 2)] def _start_gs(self) -> Tuple[np.ndarray, np.ndarray]: """ Pick the two first random basis vectors. Returns ------- basis_vectors : list first two randomly selected, normalized basis vectors. """ index_1 = random.randint(0, len(self.point_cloud) - 1) index_2 = random.randint(0, len(self.point_cloud) - 1) vector_1 = self.point_cloud[index_1] / np.linalg.norm(self.point_cloud[index_1]) vector_2 = self.point_cloud[index_2] - self._projection_operator( vector_1, self.point_cloud[index_2] ) vector_2 /= np.linalg.norm(vector_2) return vector_1, vector_2 @staticmethod def _projection_operator(u, v) -> np.ndarray: """ Perform the projection of u onto v. Returns ------- proj_{u}(v) : np.ndarray Returns the projection of u on v. This is required for the Gram-Schmidt process. """ return (np.dot(u, v) / np.dot(u, u)) * u def _construct_hyperplane_set(self): """ Build the hyperplane set. Returns ------- Updates the class state """ if self.dimension == 2: self.hyperplane_set = self.point_cloud else: self.hyperplane_set = [] for point in self.point_cloud: truth_table = [] for vector in self.basis[2:]: truth_table.append(np.dot(point, vector) < self.delta) if all(truth_table): self.hyperplane_set.append(point) self.hyperplane_set = tf.convert_to_tensor(self.hyperplane_set) def _identify_point_pairs(self): """ Identify point pairs within the hyperplane set. Returns ------- Updates the class state. """ self.point_pairs = [] for index, origin in enumerate(self.hyperplane_set): for pair_index, reference in enumerate(self.hyperplane_set): # Skip the same point if pair_index == index: continue else: distance = np.linalg.norm(origin - reference) if distance < self.epsilon: self.point_pairs.append((index, pair_index)) def _perform_regression(self): """ Perform regression on the point pairs and extract generator candidates. Returns ------- Updates the class state. """ self._simple_regression() if self.dimension > 2: self._constrain_generators() def _constrain_generators(self): """ If the dimension is greater than 2, the generators should be constrained by Equation 15 in the referenced paper. Returns ------- Updates the class. """ for generator in self.generator_candidates: outcome = [] for basis in self.basis[2:]: data = np.matmul( generator.reshape(self.dimension, self.dimension), basis ) outcome = np.concatenate((outcome, data)) if np.allclose(outcome, [0 for _ in range(len(outcome))], atol=1): self.constrained_generators.append(generator) def _full_regression(self): """ Perform full regression to extract the generators. Returns ------- Updates the class state. """ self._simple_regression() unconstrained_generators = np.array(self.generator_candidates).reshape( (len(self.generator_candidates), self.dimension, self.dimension) ) self.generator_candidates = [] for generator in unconstrained_generators: truth_array = [] for vector in self.basis[2:]: test = sum(generator * vector) truth_array.append(all(abs(test) < 1)) if all(truth_array): self.generator_candidates.append( np.array(generator).reshape((1, self.dimension * self.dimension)) ) def _simple_regression(self): """ In the case where additional constraints are not needed, we simply perform regression on the problem to extract generator candidates. Returns ------- Updates the class state. """ Y = [] X = [] for pair in self.point_pairs: points = [self.hyperplane_set[pair[0]], self.hyperplane_set[pair[1]]] sigma = self._compute_sigma(points) Y.append( ((points[0] - points[1]) * np.linalg.norm(points[0])) / (sigma * np.linalg.norm(points[1] - points[0])) ) X.append(points[0]) generator = [] for i in range(self.dimension): generator = np.concatenate( (generator, LinearRegression().fit(X, np.array(Y)[:, i]).coef_) ) self.generator_candidates.append(generator) def _compute_sigma(self, pair) -> int: """ Compute the directional information about the point pair. Parameters ---------- pair : list A point pair from the point cloud. Returns ------- sigma : int A direction measurement used to identify direction information in the basis set. """ return np.sign( (np.dot(pair[0], self.basis[0]) * np.dot(pair[1], self.basis[1])) - (np.dot(pair[0], self.basis[1]) * np.dot(pair[1], self.basis[0])) ) def _extract_generators(self, pca_components: int, factor: bool = True) -> Tuple[np.ndarray, np.ndarray]: """ Perform PCA on candidates and extract true generators. Parameters ---------- pca_components : int Number of pca components to use. Returns ------- pca_components : np.ndarray pca components. factor : bool If true, factor the values by the dimensionality. variance : np.ndarray The explained variance list. """ if self.dimension > 2: pca = PCA(n_components=pca_components) pca.fit(self.constrained_generators) pca = PCA(n_components=pca_components) pca.fit(self.generator_candidates) if factor: return np.sqrt(self.dimension) * pca.components_, pca.explained_variance_ratio_ else: return pca.components_, pca.explained_variance_ratio_ def _plot_results(self, std_values: list, save: bool = False): """ Plot the results of the analysis. Parameters ---------- std_values : list explained variance list to be plotted. save : bool If true, the plot will be saved. Returns ------- Plots and image and saves it if required. """ plt.plot([i for i in range(len(std_values))], std_values, "o-") plt.xlabel("No. PCA Components") plt.ylabel("Explained Variance (%)") plt.xticks(np.arange(len(std_values)), [i + 1 for i in range(len(std_values))]) plt.xlim(-0.1, len(std_values) - 0.9) plt.ylim(-0.1, 1.1) plt.grid() if save: plt.savefig("PCA_STD.svg", dpi=800, format="svg") plt.show()
[docs] def perform_generator_extraction( self, pca_components: int = 4, plot: bool = False, save: bool = False, factor: bool = True, gs_precision: int = 5) -> Tuple: """ Collect all methods and perform the generator extraction. Parameters ---------- pca_components : int Number of pca components to checked in the reduction. plot : bool If True, the outcomes will be plotted. factor : bool If true, factor the values by the dimensionality. save : bool If True, and plot is also True, the plots will be saved. gs_precision : int Number of decimals after 0 to consider orthogonal. Returns ------- generators : list Return a list of generators. std_array : list explained variance list. """ for _ in tqdm(range(self.candidate_runs), ncols=100, desc="Producing generator candidates"): try: self._remove_redundancy() self._generate_basis_set(gs_precision) self._construct_hyperplane_set() self._identify_point_pairs() self._perform_regression() except ValueError: continue generators, std_array = self._extract_generators(pca_components=pca_components, factor=factor) for i, item in enumerate(generators): print(f"Principle Component {i + 1}: Explained Variance: {std_array[i]}") print(item.reshape((self.dimension, self.dimension))) print("\n") if plot: self._plot_results(std_array, save=save) return generators, std_array