"""Hyper-distributions."""
from qiflib.core.secrets import Secrets
from qiflib.core.channel import Channel
from numpy import array, arange, zeros
from numpy import delete as npdelete
[docs]
class Hyper:
"""Hyper-distribution. To create an instance of this class it is
class it is necessary to have an instance of :py:class:`.Channel`
class. Once created an instance of :py:class:`.Hyper`, the constructor
generates the joint, outer and inner distributions.
Parameters
----------
channel : core.Channel
Channel object.
Attributes
----------
channel : core.Channel
Channel object.
joint : numpy.ndarray
Matrix of joint distribution.
outer : numpy.ndarray
Outer distribution.
inners : numpy.ndarray
Matrix of inner distributions.
num_posteriors : int
Number of posterior distributions resulted by reducing the
hyper-distribution, i.e., remove columns that contains only
zeros and merge columns which one of them a linear combination
of the other.
"""
def __init__(self, channel):
self._check_types(channel)
self.channel = channel
self.joint = self._generate_joint_distribution()
self.outer, self.inners = self._generate_posteriors()
self._reduce_hyper()
self.num_posteriors = len(self.outer)
[docs]
def update_prior(self, prior):
"""Update the prior distribution on set of secrets.
The number of secrets must match the current number of rows of the channel.
Parameters
----------
prior : list, numpy.ndarray
Prior distribution on the set of secrets. prior[i] is the
probability of secret named labels[i] beeing the real secret.
"""
self.channel.update_prior(prior)
self.joint = self._generate_joint_distribution()
self.outer, self.inners = self._generate_posteriors()
self._reduce_hyper()
self.num_posteriors = len(self.outer)
def _check_types(self, channel):
if type(channel) != type(Channel(Secrets(['x1','x2'], [1,0]), ['y1'], array([[1],[1]]))):
raise TypeError('The parameter \'channel\' must be a core.channel.Channel object')
def _generate_joint_distribution(self):
joint = []
channel_t = self.channel.matrix.T
for i in arange(self.channel.num_outputs):
joint.append(self.channel.secrets.prior * channel_t[i])
return array(joint).T
def _generate_posteriors(self):
joint_t = self.joint.T.copy()
outer = []
for i in arange(self.channel.num_outputs):
outer.append(joint_t[i].sum())
if outer[i] > 0:
joint_t[i] = joint_t[i]/outer[i]
return array(outer), joint_t.T
def _reduce_hyper(self):
"""Given the hyper-distribution generated by _generate_posteriors
remove columns with zeros and merge columns that are a linear
combination of others. Thus algorithm has time complexity of O(n*m^2)
where n is the number of secrets and m is the number of outputs in
the.
"""
epsilon = 10**(-6)
# Delete inners that have 0 probability of occuring
zero_prob = self.outer < epsilon
self.outer = npdelete(self.outer, zero_prob, 0)
self.inners = npdelete(self.inners, zero_prob, 1)
delete_inner = [False] * len(self.outer)
for i in arange(self.inners.shape[1]):
for j in arange(i+1, self.inners.shape[1]):
# Check if inner i is equal to inner j
if (abs(self.inners[:,i] - self.inners[:,j]) < epsilon).sum() == self.channel.secrets.num_secrets:
delete_inner[j] = True # Delete inner j
self.outer[i] += self.outer[j] # Merge inner j into inner i
self.outer = npdelete(self.outer, delete_inner, 0)
self.inners = npdelete(self.inners, delete_inner, 1)