Source code for qiflib.core.hyper

"""Hyper-distributions."""

from qiflib.core.secrets import Secrets
from qiflib.core.channel import Channel
from numpy import array, arange, zeros
from numpy import delete as npdelete

[docs] class Hyper: """Hyper-distribution. To create an instance of this class it is class it is necessary to have an instance of :py:class:`.Channel` class. Once created an instance of :py:class:`.Hyper`, the constructor generates the joint, outer and inner distributions. Parameters ---------- channel : core.Channel Channel object. Attributes ---------- channel : core.Channel Channel object. joint : numpy.ndarray Matrix of joint distribution. outer : numpy.ndarray Outer distribution. inners : numpy.ndarray Matrix of inner distributions. num_posteriors : int Number of posterior distributions resulted by reducing the hyper-distribution, i.e., remove columns that contains only zeros and merge columns which one of them a linear combination of the other. """ def __init__(self, channel): self._check_types(channel) self.channel = channel self.joint = self._generate_joint_distribution() self.outer, self.inners = self._generate_posteriors() self._reduce_hyper() self.num_posteriors = len(self.outer)
[docs] def update_prior(self, prior): """Update the prior distribution on set of secrets. The number of secrets must match the current number of rows of the channel. Parameters ---------- prior : list, numpy.ndarray Prior distribution on the set of secrets. prior[i] is the probability of secret named labels[i] beeing the real secret. """ self.channel.update_prior(prior) self.joint = self._generate_joint_distribution() self.outer, self.inners = self._generate_posteriors() self._reduce_hyper() self.num_posteriors = len(self.outer)
def _check_types(self, channel): if type(channel) != type(Channel(Secrets(['x1','x2'], [1,0]), ['y1'], array([[1],[1]]))): raise TypeError('The parameter \'channel\' must be a core.channel.Channel object') def _generate_joint_distribution(self): joint = [] channel_t = self.channel.matrix.T for i in arange(self.channel.num_outputs): joint.append(self.channel.secrets.prior * channel_t[i]) return array(joint).T def _generate_posteriors(self): joint_t = self.joint.T.copy() outer = [] for i in arange(self.channel.num_outputs): outer.append(joint_t[i].sum()) if outer[i] > 0: joint_t[i] = joint_t[i]/outer[i] return array(outer), joint_t.T def _reduce_hyper(self): """Given the hyper-distribution generated by _generate_posteriors remove columns with zeros and merge columns that are a linear combination of others. Thus algorithm has time complexity of O(n*m^2) where n is the number of secrets and m is the number of outputs in the. """ epsilon = 10**(-6) # Delete inners that have 0 probability of occuring zero_prob = self.outer < epsilon self.outer = npdelete(self.outer, zero_prob, 0) self.inners = npdelete(self.inners, zero_prob, 1) delete_inner = [False] * len(self.outer) for i in arange(self.inners.shape[1]): for j in arange(i+1, self.inners.shape[1]): # Check if inner i is equal to inner j if (abs(self.inners[:,i] - self.inners[:,j]) < epsilon).sum() == self.channel.secrets.num_secrets: delete_inner[j] = True # Delete inner j self.outer[i] += self.outer[j] # Merge inner j into inner i self.outer = npdelete(self.outer, delete_inner, 0) self.inners = npdelete(self.inners, delete_inner, 1)