Up

sparktk.models.clustering.gmm module

# vim: set encoding=utf-8

#  Copyright (c) 2016 Intel Corporation 
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

from sparktk.loggers import log_load; log_load(__name__); del log_load
from sparktk.propobj import PropertiesObject
from sparktk import TkContext
import os

__all__ = ["train", "load", "GaussianMixtureModel"]

def train(frame,
          observation_columns,
          column_scalings,
          k=2,
          max_iterations=20,
          convergence_tol=0.01,
          seed=None):
    """
    Creates a GaussianMixtureModel by training on the given frame

    :param frame: (Frame) frame of training data
    :param observation_columns: (List(str)) names of columns containing the observations for training
    :param column_scalings: (List(float)) column scalings for each of the observation columns.  The scaling
        value is multiplied by the corresponding value in the observation column
    :param k: (Optional(int)) number of clusters
    :param max_iterations: (Optional(int)) number of iterations for which the algorithm should run
    :param convergence_tol:  (Optional(float)) Largest change in log-likelihood at which convergence is considered to have occurred
    :param seed: (Optional(int)) seed for randomness
    :return: GaussianMixtureModel

    """   
    if frame is None:
        raise ValueError("frame cannot be None")

    tc = frame._tc
    _scala_obj = get_scala_obj(tc)
    seed = int(os.urandom(2).encode('hex'),16) if seed is None else seed
    scala_columns = tc.jutils.convert.to_scala_vector_string(observation_columns)
    scala_scalings = tc.jutils.convert.to_scala_vector_double(column_scalings)
    scala_model = _scala_obj.train(frame._scala,
                                   scala_columns,
                                   scala_scalings,
                                   k,
                                   max_iterations,
                                   convergence_tol,
                                   seed)
    return GaussianMixtureModel(tc, scala_model)


def load(path, tc=TkContext.implicit):
    """load GaussianMixtureModel from given path"""
    TkContext.validate(tc)
    return tc.load(path, GaussianMixtureModel)


def get_scala_obj(tc):
    """Gets reference to the scala object"""
    return tc.sc._jvm.org.trustedanalytics.sparktk.models.clustering.gmm.GaussianMixtureModel

class Gaussian(PropertiesObject):
    """
    Gaussian sigma and mu values for a trained GaussianMixtureModel
    """
    def __init__(self, tc, scala_result):
        self._tc = tc
        self._mu = scala_result.mu()
        self._sigma = scala_result.sigma()

    @property
    def mu(self):
        """ (list[float]) The mean vector of the distribution """
        return list(self._tc.jutils.convert.from_scala_seq(self._mu))

    @property
    def sigma(self):
        """ (list[list[float]]) The covariance matrix of the distribution """
        sigma_list = self._tc.jutils.convert.from_scala_seq(self._sigma)
        return [list(self._tc.jutils.convert.from_scala_seq(s)) for s in sigma_list]


class GaussianMixtureModel(PropertiesObject):
    """
    A trained GaussianMixtureModel model

    Example
    -------

        >>> import numpy as np
        >>> frame = tc.frame.create([[2, "ab"],
        ...                          [1,"cd"],
        ...                          [7,"ef"],
        ...                          [1,"gh"],
        ...                          [9,"ij"],
        ...                          [2,"kl"],
        ...                          [0,"mn"],
        ...                          [6,"op"],
        ...                          [5,"qr"]],
        ...                         [("data", float), ("name", str)])

        >>> frame.inspect()
        [#]  data  name
        ===============
        [0]   2  ab
        [1]   1  cd
        [2]   7  ef
        [3]   1  gh
        [4]   9  ij
        [5]   2  kl
        [6]   0  mn
        [7]   6  op
        [8]   5  qr

        >>> model = tc.models.clustering.gmm.train(frame, ["data"], [1.0], 3 ,seed=1)

        >>> model.k
        3

        >>> for g in model.gaussians:
        ...     print g
        mu    = [1.1984786097160265]
        sigma = [[0.5599222134199012]]
        mu    = [6.643997733061858]
        sigma = [[2.19222016401446]]
        mu    = [6.79435719737145]
        sigma = [[2.2637494400157774]]



        >>> predicted_frame =  model.predict(frame)

        >>> predicted_frame.inspect()
        [#]  data  name  predicted_cluster
        ==================================
        [0]   9.0  ij                    0
        [1]   2.0  ab                    1
        [2]   0.0  mn                    1
        [3]   5.0  qr                    0
        [4]   7.0  ef                    0
        [5]   1.0  cd                    1
        [6]   1.0  gh                    1
        [7]   6.0  op                    0
        [8]   2.0  kl                    1

        >>> model.observation_columns
        [u'data']

        >>> model.column_scalings
        [1.0]

        >>> model.save("sandbox/gmm")

        >>> restored = tc.load("sandbox/gmm")

        >>> model.cluster_sizes(frame) == restored.cluster_sizes(frame)
        True

    """

    def __init__(self, tc, scala_model):
        self._tc = tc
        tc.jutils.validate_is_jvm_instance_of(scala_model, get_scala_obj(tc))
        self._scala = scala_model


    @staticmethod
    def _from_scala(tc, scala_model):
        """Loads a trained gaussian mixture model from a scala model"""
        return GaussianMixtureModel(tc, scala_model)

    @property
    def observation_columns(self):
        """observation columns used for model training"""
        return list(self._tc.jutils.convert.from_scala_seq(self._scala.observationColumns()))

    @property
    def column_scalings(self):
        """column containing the scalings used for model training"""
        return list(self._tc.jutils.convert.from_scala_seq(self._scala.columnScalings()))

    @property
    def k(self):
        """maximum limit for number of resulting clusters"""
        return self._scala.k()

    @property
    def max_iterations(self):
        """maximum number of iterations"""
        return self._scala.maxIterations()

    @property
    def convergence_tol(self):
        """convergence tolerance"""
        return self._scala.convergenceTol()

    @property
    def seed(self):
        """seed used during training of the model"""
        return self._scala.seed()

    @property
    def gaussians(self):
        """Gaussian object, which contains the mu and sigma values"""
        g = self._tc.jutils.convert.from_scala_seq(self._scala.gaussians())
        results = []
        for i in g:
            results.append(Gaussian(self._tc, i))
        return results

    def cluster_sizes(self, frame):
        """a map of clusters and their sizes"""
        cs = self._scala.computeGmmClusterSize(frame._scala)
        return self._tc.jutils.convert.scala_map_to_python(cs)

    def predict(self, frame, columns=None):
        """
       Predicts the labels for the observation columns in the given input frame. Creates a new frame
       with the existing columns and a new predicted column.

       Parameters
       ----------

       :param frame: (Frame) Frame used for predicting the values
       :param c: (List[str]) Names of the observation columns.
       :return: (Frame) A new frame containing the original frame's columns and a prediction column
       """
        from sparktk.frame.frame import Frame
        c = self.__columns_to_option(columns)
        return Frame(self._tc, self._scala.predict(frame._scala, c))

    def __columns_to_option(self, c):
        if c is not None:
            c = self._tc.jutils.convert.to_scala_vector_string(c)
        return self._tc.jutils.convert.to_scala_option(c)


    def save(self, path):
        """save the trained model to the given path"""
        self._scala.save(self._tc._scala_sc, path)

    def export_to_mar(self, path):
        """
        Exports the trained model as a model archive (.mar) to the specified path

        Parameters
        ----------

        :param path: (str) Path to save the trained model
        :return: (str) Full path to the saved .mar file
        """
        if isinstance(path, basestring):
            return self._scala.exportToMar(self._tc._scala_sc, path)

del PropertiesObject

Functions

def load(

path, tc=<class 'sparktk.arguments.implicit'>)

load GaussianMixtureModel from given path

def load(path, tc=TkContext.implicit):
    """load GaussianMixtureModel from given path"""
    TkContext.validate(tc)
    return tc.load(path, GaussianMixtureModel)

def train(

frame, observation_columns, column_scalings, k=2, max_iterations=20, convergence_tol=0.01, seed=None)

Creates a GaussianMixtureModel by training on the given frame

frame(Frame):frame of training data
observation_columns(List(str)):names of columns containing the observations for training
column_scalings(List(float)):column scalings for each of the observation columns. The scaling value is multiplied by the corresponding value in the observation column
k(Optional(int)):number of clusters
max_iterations(Optional(int)):number of iterations for which the algorithm should run
convergence_tol: (Optional(float)) Largest change in log-likelihood at which convergence is considered to have occurred
seed(Optional(int)):seed for randomness

Returns: GaussianMixtureModel

def train(frame,
          observation_columns,
          column_scalings,
          k=2,
          max_iterations=20,
          convergence_tol=0.01,
          seed=None):
    """
    Creates a GaussianMixtureModel by training on the given frame

    :param frame: (Frame) frame of training data
    :param observation_columns: (List(str)) names of columns containing the observations for training
    :param column_scalings: (List(float)) column scalings for each of the observation columns.  The scaling
        value is multiplied by the corresponding value in the observation column
    :param k: (Optional(int)) number of clusters
    :param max_iterations: (Optional(int)) number of iterations for which the algorithm should run
    :param convergence_tol:  (Optional(float)) Largest change in log-likelihood at which convergence is considered to have occurred
    :param seed: (Optional(int)) seed for randomness
    :return: GaussianMixtureModel

    """   
    if frame is None:
        raise ValueError("frame cannot be None")

    tc = frame._tc
    _scala_obj = get_scala_obj(tc)
    seed = int(os.urandom(2).encode('hex'),16) if seed is None else seed
    scala_columns = tc.jutils.convert.to_scala_vector_string(observation_columns)
    scala_scalings = tc.jutils.convert.to_scala_vector_double(column_scalings)
    scala_model = _scala_obj.train(frame._scala,
                                   scala_columns,
                                   scala_scalings,
                                   k,
                                   max_iterations,
                                   convergence_tol,
                                   seed)
    return GaussianMixtureModel(tc, scala_model)

Classes

class GaussianMixtureModel

A trained GaussianMixtureModel model

Example:
>>> import numpy as np
>>> frame = tc.frame.create([[2, "ab"],
...                          [1,"cd"],
...                          [7,"ef"],
...                          [1,"gh"],
...                          [9,"ij"],
...                          [2,"kl"],
...                          [0,"mn"],
...                          [6,"op"],
...                          [5,"qr"]],
...                         [("data", float), ("name", str)])

>>> frame.inspect()
[#]  data  name
===============
[0]   2  ab
[1]   1  cd
[2]   7  ef
[3]   1  gh
[4]   9  ij
[5]   2  kl
[6]   0  mn
[7]   6  op
[8]   5  qr

>>> model = tc.models.clustering.gmm.train(frame, ["data"], [1.0], 3 ,seed=1)

>>> model.k
3

>>> for g in model.gaussians:
...     print g
mu    = [1.1984786097160265]
sigma = [[0.5599222134199012]]
mu    = [6.643997733061858]
sigma = [[2.19222016401446]]
mu    = [6.79435719737145]
sigma = [[2.2637494400157774]]



>>> predicted_frame =  model.predict(frame)

>>> predicted_frame.inspect()
[#]  data  name  predicted_cluster
==================================
[0]   9.0  ij                    0
[1]   2.0  ab                    1
[2]   0.0  mn                    1
[3]   5.0  qr                    0
[4]   7.0  ef                    0
[5]   1.0  cd                    1
[6]   1.0  gh                    1
[7]   6.0  op                    0
[8]   2.0  kl                    1

>>> model.observation_columns
[u'data']

>>> model.column_scalings
[1.0]

>>> model.save("sandbox/gmm")

>>> restored = tc.load("sandbox/gmm")

>>> model.cluster_sizes(frame) == restored.cluster_sizes(frame)
True
class GaussianMixtureModel(PropertiesObject):
    """
    A trained GaussianMixtureModel model

    Example
    -------

        >>> import numpy as np
        >>> frame = tc.frame.create([[2, "ab"],
        ...                          [1,"cd"],
        ...                          [7,"ef"],
        ...                          [1,"gh"],
        ...                          [9,"ij"],
        ...                          [2,"kl"],
        ...                          [0,"mn"],
        ...                          [6,"op"],
        ...                          [5,"qr"]],
        ...                         [("data", float), ("name", str)])

        >>> frame.inspect()
        [#]  data  name
        ===============
        [0]   2  ab
        [1]   1  cd
        [2]   7  ef
        [3]   1  gh
        [4]   9  ij
        [5]   2  kl
        [6]   0  mn
        [7]   6  op
        [8]   5  qr

        >>> model = tc.models.clustering.gmm.train(frame, ["data"], [1.0], 3 ,seed=1)

        >>> model.k
        3

        >>> for g in model.gaussians:
        ...     print g
        mu    = [1.1984786097160265]
        sigma = [[0.5599222134199012]]
        mu    = [6.643997733061858]
        sigma = [[2.19222016401446]]
        mu    = [6.79435719737145]
        sigma = [[2.2637494400157774]]



        >>> predicted_frame =  model.predict(frame)

        >>> predicted_frame.inspect()
        [#]  data  name  predicted_cluster
        ==================================
        [0]   9.0  ij                    0
        [1]   2.0  ab                    1
        [2]   0.0  mn                    1
        [3]   5.0  qr                    0
        [4]   7.0  ef                    0
        [5]   1.0  cd                    1
        [6]   1.0  gh                    1
        [7]   6.0  op                    0
        [8]   2.0  kl                    1

        >>> model.observation_columns
        [u'data']

        >>> model.column_scalings
        [1.0]

        >>> model.save("sandbox/gmm")

        >>> restored = tc.load("sandbox/gmm")

        >>> model.cluster_sizes(frame) == restored.cluster_sizes(frame)
        True

    """

    def __init__(self, tc, scala_model):
        self._tc = tc
        tc.jutils.validate_is_jvm_instance_of(scala_model, get_scala_obj(tc))
        self._scala = scala_model


    @staticmethod
    def _from_scala(tc, scala_model):
        """Loads a trained gaussian mixture model from a scala model"""
        return GaussianMixtureModel(tc, scala_model)

    @property
    def observation_columns(self):
        """observation columns used for model training"""
        return list(self._tc.jutils.convert.from_scala_seq(self._scala.observationColumns()))

    @property
    def column_scalings(self):
        """column containing the scalings used for model training"""
        return list(self._tc.jutils.convert.from_scala_seq(self._scala.columnScalings()))

    @property
    def k(self):
        """maximum limit for number of resulting clusters"""
        return self._scala.k()

    @property
    def max_iterations(self):
        """maximum number of iterations"""
        return self._scala.maxIterations()

    @property
    def convergence_tol(self):
        """convergence tolerance"""
        return self._scala.convergenceTol()

    @property
    def seed(self):
        """seed used during training of the model"""
        return self._scala.seed()

    @property
    def gaussians(self):
        """Gaussian object, which contains the mu and sigma values"""
        g = self._tc.jutils.convert.from_scala_seq(self._scala.gaussians())
        results = []
        for i in g:
            results.append(Gaussian(self._tc, i))
        return results

    def cluster_sizes(self, frame):
        """a map of clusters and their sizes"""
        cs = self._scala.computeGmmClusterSize(frame._scala)
        return self._tc.jutils.convert.scala_map_to_python(cs)

    def predict(self, frame, columns=None):
        """
       Predicts the labels for the observation columns in the given input frame. Creates a new frame
       with the existing columns and a new predicted column.

       Parameters
       ----------

       :param frame: (Frame) Frame used for predicting the values
       :param c: (List[str]) Names of the observation columns.
       :return: (Frame) A new frame containing the original frame's columns and a prediction column
       """
        from sparktk.frame.frame import Frame
        c = self.__columns_to_option(columns)
        return Frame(self._tc, self._scala.predict(frame._scala, c))

    def __columns_to_option(self, c):
        if c is not None:
            c = self._tc.jutils.convert.to_scala_vector_string(c)
        return self._tc.jutils.convert.to_scala_option(c)


    def save(self, path):
        """save the trained model to the given path"""
        self._scala.save(self._tc._scala_sc, path)

    def export_to_mar(self, path):
        """
        Exports the trained model as a model archive (.mar) to the specified path

        Parameters
        ----------

        :param path: (str) Path to save the trained model
        :return: (str) Full path to the saved .mar file
        """
        if isinstance(path, basestring):
            return self._scala.exportToMar(self._tc._scala_sc, path)

Ancestors (in MRO)

Instance variables

var column_scalings

column containing the scalings used for model training

var convergence_tol

convergence tolerance

var gaussians

Gaussian object, which contains the mu and sigma values

var k

maximum limit for number of resulting clusters

var max_iterations

maximum number of iterations

var observation_columns

observation columns used for model training

var seed

seed used during training of the model

Methods

def __init__(

self, tc, scala_model)

def __init__(self, tc, scala_model):
    self._tc = tc
    tc.jutils.validate_is_jvm_instance_of(scala_model, get_scala_obj(tc))
    self._scala = scala_model

def cluster_sizes(

self, frame)

a map of clusters and their sizes

def cluster_sizes(self, frame):
    """a map of clusters and their sizes"""
    cs = self._scala.computeGmmClusterSize(frame._scala)
    return self._tc.jutils.convert.scala_map_to_python(cs)

def export_to_mar(

self, path)

Exports the trained model as a model archive (.mar) to the specified path

Parameters:
path(str):Path to save the trained model

Returns(str): Full path to the saved .mar file

def export_to_mar(self, path):
    """
    Exports the trained model as a model archive (.mar) to the specified path
    Parameters
    ----------
    :param path: (str) Path to save the trained model
    :return: (str) Full path to the saved .mar file
    """
    if isinstance(path, basestring):
        return self._scala.exportToMar(self._tc._scala_sc, path)

def predict(

self, frame, columns=None)

Predicts the labels for the observation columns in the given input frame. Creates a new frame with the existing columns and a new predicted column.

Parameters:
frame(Frame):Frame used for predicting the values
c(List[str]):Names of the observation columns.

Returns(Frame): A new frame containing the original frame's columns and a prediction column

def predict(self, frame, columns=None):
    """
   Predicts the labels for the observation columns in the given input frame. Creates a new frame
   with the existing columns and a new predicted column.
   Parameters
   ----------
   :param frame: (Frame) Frame used for predicting the values
   :param c: (List[str]) Names of the observation columns.
   :return: (Frame) A new frame containing the original frame's columns and a prediction column
   """
    from sparktk.frame.frame import Frame
    c = self.__columns_to_option(columns)
    return Frame(self._tc, self._scala.predict(frame._scala, c))

def save(

self, path)

save the trained model to the given path

def save(self, path):
    """save the trained model to the given path"""
    self._scala.save(self._tc._scala_sc, path)

def to_dict(

self)

def to_dict(self):
    d = self._properties()
    d.update(self._attributes())
    return d

def to_json(

self)

def to_json(self):
    return json.dumps(self.to_dict())