Up

sparktk.models.clustering.kmeans module

# vim: set encoding=utf-8

#  Copyright (c) 2016 Intel Corporation 
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

from sparktk.loggers import log_load; log_load(__name__); del log_load

from sparktk.propobj import PropertiesObject
from sparktk import TkContext

__all__ = ["train", "load", "KMeansModel"]

def train(frame, columns, k=2, scalings=None, max_iterations=20, convergence_tolerance=1e-4, seed=None, init_mode="k-means||"):
    """
    Creates a KMeansModel by training on the given frame

    :param frame: (Frame) frame of training data
    :param columns: (List[str]) names of columns containing the observations for training
    :param k: (Optional (int)) number of clusters
    :param scalings: (Optional(List[float])) column scalings for each of the observation columns.  The scaling value is multiplied by
     the corresponding value in the observation column
    :param max_iterations: (Optional(int)) number of iterations for which the algorithm should run
    :param convergence_tolerance: (Optional(float)) distance threshold within which we consider k-means to have converged. Default is 1e-4.
     If all centers move less than this Euclidean distance, we stop iterating one run
    :param seed: Optional(long) seed for randomness
    :param init_mode: (Optional(str)) the initialization technique for the algorithm.   It can be either "random" to choose
     random points as initial clusters or "k-means||" to use a parallel variant of k-means++. Default is "k-means||
    :return: (KMeansModel) trained KMeans model

    """
    if frame is None:
        raise ValueError("frame cannot be None")
    tc = frame._tc
    _scala_obj = get_scala_obj(tc)
    if isinstance(columns, basestring):
        columns = [columns]
    scala_columns = tc.jutils.convert.to_scala_vector_string(columns)
    if scalings:
        scala_scalings = tc.jutils.convert.to_scala_vector_double(scalings)
        scala_scalings = tc.jutils.convert.to_scala_option(scala_scalings)
    else:
        scala_scalings = tc.jutils.convert.to_scala_option(None)

    seed = seed if seed is None else long(seed)
    scala_seed = tc.jutils.convert.to_scala_option(seed)
    scala_model = _scala_obj.train(frame._scala, scala_columns, k, scala_scalings, max_iterations, convergence_tolerance, init_mode, scala_seed)
    return KMeansModel(tc, scala_model)


def load(path, tc=TkContext.implicit):
    """load KMeansModel from given path"""
    TkContext.validate(tc)
    return tc.load(path, KMeansModel)


def get_scala_obj(tc):
    """Gets reference to the scala object"""
    return tc.sc._jvm.org.trustedanalytics.sparktk.models.clustering.kmeans.KMeansModel


class KMeansModel(PropertiesObject):
    """
    A trained KMeans model

    Example
    -------

        >>> frame = tc.frame.create([[2, "ab"],
        ...                          [1,"cd"],
        ...                          [7,"ef"],
        ...                          [1,"gh"],
        ...                          [9,"ij"],
        ...                          [2,"kl"],
        ...                          [0,"mn"],
        ...                          [6,"op"],
        ...                          [5,"qr"]],
        ...                         [("data", float), ("name", str)])

        >>> model = tc.models.clustering.kmeans.train(frame, ["data"], 3, seed=5)

        >>> model.k
        3

        >>> sizes = model.compute_sizes(frame)

        >>> sizes
        [2, 2, 5]

        >>> wsse = model.compute_wsse(frame)

        >>> wsse
        5.3

        >>> predicted_frame = model.predict(frame)

        >>> predicted_frame.inspect()
        [#]  data  name  cluster
        ========================
        [0]   2.0  ab          1
        [1]   1.0  cd          1
        [2]   7.0  ef          0
        [3]   1.0  gh          1
        [4]   9.0  ij          0
        [5]   2.0  kl          1
        [6]   0.0  mn          1
        [7]   6.0  op          2
        [8]   5.0  qr          2


        >>> model.add_distance_columns(predicted_frame)

        >>> predicted_frame.inspect()
        [#]  data  name  cluster  distance0  distance1  distance2
        =========================================================
        [0]   2.0  ab          1       36.0       0.64      12.25
        [1]   1.0  cd          1       49.0       0.04      20.25
        [2]   7.0  ef          0        1.0      33.64       2.25
        [3]   1.0  gh          1       49.0       0.04      20.25
        [4]   9.0  ij          0        1.0      60.84      12.25
        [5]   2.0  kl          1       36.0       0.64      12.25
        [6]   0.0  mn          1       64.0       1.44      30.25
        [7]   6.0  op          2        4.0      23.04       0.25
        [8]   5.0  qr          2        9.0      14.44       0.25

        >>> model.columns
        [u'data']

        >>> model.scalings  # None


        >>> centroids = model.centroids

        >>> model.save("sandbox/kmeans1")

        >>> restored = tc.load("sandbox/kmeans1")

        >>> restored.centroids == centroids
        True

        >>> restored_sizes = restored.compute_sizes(predicted_frame)

        >>> restored_sizes == sizes
        True


        >>> predicted_frame2 = restored.predict(frame)

        >>> predicted_frame2.inspect()
        [#]  data  name  cluster
        ========================
        [0]   2.0  ab          1
        [1]   1.0  cd          1
        [2]   7.0  ef          0
        [3]   1.0  gh          1
        [4]   9.0  ij          0
        [5]   2.0  kl          1
        [6]   0.0  mn          1
        [7]   6.0  op          2
        [8]   5.0  qr          2

        >>> canonical_path = model.export_to_mar("sandbox/Kmeans.mar")


    """

    def __init__(self, tc, scala_model):
        self._tc = tc
        tc.jutils.validate_is_jvm_instance_of(scala_model, get_scala_obj(tc))
        self._scala = scala_model

    @staticmethod
    def _from_scala(tc, scala_model):
        return KMeansModel(tc, scala_model)

    @property
    def columns(self):
        return list(self._tc.jutils.convert.from_scala_seq(self._scala.columns()))

    @property
    def scalings(self):
        s = self._tc.jutils.convert.from_scala_option(self._scala.scalings())
        if s:
            return list(self._tc.jutils.convert.from_scala_seq(s))
        return None

    @property
    def k(self):
        return self._scala.k()

    @property
    def max_iterations(self):
        return self._scala.maxIterations()

    @property
    def initialization_mode(self):
        return self._scala.initializationMode()

    @property
    def centroids(self):
        return [list(item) for item in list(self._scala.centroidsAsArrays())]

    def compute_sizes(self, frame, columns=None):
        c = self.__columns_to_option(columns)
        return [int(n) for n in self._scala.computeClusterSizes(frame._scala, c)]

    def compute_wsse(self, frame, columns=None):
        c = self.__columns_to_option(columns)
        return self._scala.computeWsse(frame._scala, c)

    def predict(self, frame, columns=None):
        """
       Predicts the labels for the observation columns in the given input frame. Creates a new frame
       with the existing columns and a new predicted column.

       Parameters
       ----------

       :param frame: (Frame) Frame used for predicting the values
       :param c: (List[str]) Names of the observation columns.
       :return: (Frame) A new frame containing the original frame's columns and a prediction column
       """
        c = self.__columns_to_option(columns)
        from sparktk.frame.frame import Frame
        return Frame(self._tc, self._scala.predict(frame._scala, c))

    def add_distance_columns(self, frame, columns=None):
        c = self.__columns_to_option(columns)
        self._scala.addDistanceColumns(frame._scala, c)

    def __columns_to_option(self, columns):
        if isinstance(columns, basestring):
            columns = [columns]
        if columns is not None:
            columns = self._tc.jutils.convert.to_scala_vector_string(columns)
        return self._tc.jutils.convert.to_scala_option(columns)

    def save(self, path):
        if isinstance(path, basestring):
            self._scala.save(self._tc._scala_sc, path)

    def export_to_mar(self, path):
        """
        Exports the trained model as a model archive (.mar) to the specified path

        Parameters
        ----------

        :param path: (str) Path to save the trained model
        :return: (str) Full path to the saved .mar file
        """
        if isinstance(path, basestring):
            return self._scala.exportToMar(self._tc._scala_sc, path)

del PropertiesObject

Functions

def load(

path, tc=<class 'sparktk.arguments.implicit'>)

load KMeansModel from given path

def load(path, tc=TkContext.implicit):
    """load KMeansModel from given path"""
    TkContext.validate(tc)
    return tc.load(path, KMeansModel)

def train(

frame, columns, k=2, scalings=None, max_iterations=20, convergence_tolerance=0.0001, seed=None, init_mode='k-means||')

Creates a KMeansModel by training on the given frame

frame(Frame):frame of training data
columns(List[str]):names of columns containing the observations for training
k(Optional (int)):number of clusters
scalings(Optional(List[float])):column scalings for each of the observation columns. The scaling value is multiplied by the corresponding value in the observation column
max_iterations(Optional(int)):number of iterations for which the algorithm should run
convergence_tolerance(Optional(float)):distance threshold within which we consider k-means to have converged. Default is 1e-4. If all centers move less than this Euclidean distance, we stop iterating one run
seed: Optional(long) seed for randomness
init_mode(Optional(str)):the initialization technique for the algorithm. It can be either "random" to choose random points as initial clusters or "k-means||" to use a parallel variant of k-means++. Default is "k-means||

Returns(KMeansModel): trained KMeans model

def train(frame, columns, k=2, scalings=None, max_iterations=20, convergence_tolerance=1e-4, seed=None, init_mode="k-means||"):
    """
    Creates a KMeansModel by training on the given frame

    :param frame: (Frame) frame of training data
    :param columns: (List[str]) names of columns containing the observations for training
    :param k: (Optional (int)) number of clusters
    :param scalings: (Optional(List[float])) column scalings for each of the observation columns.  The scaling value is multiplied by
     the corresponding value in the observation column
    :param max_iterations: (Optional(int)) number of iterations for which the algorithm should run
    :param convergence_tolerance: (Optional(float)) distance threshold within which we consider k-means to have converged. Default is 1e-4.
     If all centers move less than this Euclidean distance, we stop iterating one run
    :param seed: Optional(long) seed for randomness
    :param init_mode: (Optional(str)) the initialization technique for the algorithm.   It can be either "random" to choose
     random points as initial clusters or "k-means||" to use a parallel variant of k-means++. Default is "k-means||
    :return: (KMeansModel) trained KMeans model

    """
    if frame is None:
        raise ValueError("frame cannot be None")
    tc = frame._tc
    _scala_obj = get_scala_obj(tc)
    if isinstance(columns, basestring):
        columns = [columns]
    scala_columns = tc.jutils.convert.to_scala_vector_string(columns)
    if scalings:
        scala_scalings = tc.jutils.convert.to_scala_vector_double(scalings)
        scala_scalings = tc.jutils.convert.to_scala_option(scala_scalings)
    else:
        scala_scalings = tc.jutils.convert.to_scala_option(None)

    seed = seed if seed is None else long(seed)
    scala_seed = tc.jutils.convert.to_scala_option(seed)
    scala_model = _scala_obj.train(frame._scala, scala_columns, k, scala_scalings, max_iterations, convergence_tolerance, init_mode, scala_seed)
    return KMeansModel(tc, scala_model)

Classes

class KMeansModel

A trained KMeans model

Example:
>>> frame = tc.frame.create([[2, "ab"],
...                          [1,"cd"],
...                          [7,"ef"],
...                          [1,"gh"],
...                          [9,"ij"],
...                          [2,"kl"],
...                          [0,"mn"],
...                          [6,"op"],
...                          [5,"qr"]],
...                         [("data", float), ("name", str)])

>>> model = tc.models.clustering.kmeans.train(frame, ["data"], 3, seed=5)

>>> model.k
3

>>> sizes = model.compute_sizes(frame)

>>> sizes
[2, 2, 5]

>>> wsse = model.compute_wsse(frame)

>>> wsse
5.3

>>> predicted_frame = model.predict(frame)

>>> predicted_frame.inspect()
[#]  data  name  cluster
========================
[0]   2.0  ab          1
[1]   1.0  cd          1
[2]   7.0  ef          0
[3]   1.0  gh          1
[4]   9.0  ij          0
[5]   2.0  kl          1
[6]   0.0  mn          1
[7]   6.0  op          2
[8]   5.0  qr          2


>>> model.add_distance_columns(predicted_frame)

>>> predicted_frame.inspect()
[#]  data  name  cluster  distance0  distance1  distance2
=========================================================
[0]   2.0  ab          1       36.0       0.64      12.25
[1]   1.0  cd          1       49.0       0.04      20.25
[2]   7.0  ef          0        1.0      33.64       2.25
[3]   1.0  gh          1       49.0       0.04      20.25
[4]   9.0  ij          0        1.0      60.84      12.25
[5]   2.0  kl          1       36.0       0.64      12.25
[6]   0.0  mn          1       64.0       1.44      30.25
[7]   6.0  op          2        4.0      23.04       0.25
[8]   5.0  qr          2        9.0      14.44       0.25

>>> model.columns
[u'data']

>>> model.scalings  # None


>>> centroids = model.centroids

>>> model.save("sandbox/kmeans1")

>>> restored = tc.load("sandbox/kmeans1")

>>> restored.centroids == centroids
True

>>> restored_sizes = restored.compute_sizes(predicted_frame)

>>> restored_sizes == sizes
True


>>> predicted_frame2 = restored.predict(frame)

>>> predicted_frame2.inspect()
[#]  data  name  cluster
========================
[0]   2.0  ab          1
[1]   1.0  cd          1
[2]   7.0  ef          0
[3]   1.0  gh          1
[4]   9.0  ij          0
[5]   2.0  kl          1
[6]   0.0  mn          1
[7]   6.0  op          2
[8]   5.0  qr          2

>>> canonical_path = model.export_to_mar("sandbox/Kmeans.mar")
class KMeansModel(PropertiesObject):
    """
    A trained KMeans model

    Example
    -------

        >>> frame = tc.frame.create([[2, "ab"],
        ...                          [1,"cd"],
        ...                          [7,"ef"],
        ...                          [1,"gh"],
        ...                          [9,"ij"],
        ...                          [2,"kl"],
        ...                          [0,"mn"],
        ...                          [6,"op"],
        ...                          [5,"qr"]],
        ...                         [("data", float), ("name", str)])

        >>> model = tc.models.clustering.kmeans.train(frame, ["data"], 3, seed=5)

        >>> model.k
        3

        >>> sizes = model.compute_sizes(frame)

        >>> sizes
        [2, 2, 5]

        >>> wsse = model.compute_wsse(frame)

        >>> wsse
        5.3

        >>> predicted_frame = model.predict(frame)

        >>> predicted_frame.inspect()
        [#]  data  name  cluster
        ========================
        [0]   2.0  ab          1
        [1]   1.0  cd          1
        [2]   7.0  ef          0
        [3]   1.0  gh          1
        [4]   9.0  ij          0
        [5]   2.0  kl          1
        [6]   0.0  mn          1
        [7]   6.0  op          2
        [8]   5.0  qr          2


        >>> model.add_distance_columns(predicted_frame)

        >>> predicted_frame.inspect()
        [#]  data  name  cluster  distance0  distance1  distance2
        =========================================================
        [0]   2.0  ab          1       36.0       0.64      12.25
        [1]   1.0  cd          1       49.0       0.04      20.25
        [2]   7.0  ef          0        1.0      33.64       2.25
        [3]   1.0  gh          1       49.0       0.04      20.25
        [4]   9.0  ij          0        1.0      60.84      12.25
        [5]   2.0  kl          1       36.0       0.64      12.25
        [6]   0.0  mn          1       64.0       1.44      30.25
        [7]   6.0  op          2        4.0      23.04       0.25
        [8]   5.0  qr          2        9.0      14.44       0.25

        >>> model.columns
        [u'data']

        >>> model.scalings  # None


        >>> centroids = model.centroids

        >>> model.save("sandbox/kmeans1")

        >>> restored = tc.load("sandbox/kmeans1")

        >>> restored.centroids == centroids
        True

        >>> restored_sizes = restored.compute_sizes(predicted_frame)

        >>> restored_sizes == sizes
        True


        >>> predicted_frame2 = restored.predict(frame)

        >>> predicted_frame2.inspect()
        [#]  data  name  cluster
        ========================
        [0]   2.0  ab          1
        [1]   1.0  cd          1
        [2]   7.0  ef          0
        [3]   1.0  gh          1
        [4]   9.0  ij          0
        [5]   2.0  kl          1
        [6]   0.0  mn          1
        [7]   6.0  op          2
        [8]   5.0  qr          2

        >>> canonical_path = model.export_to_mar("sandbox/Kmeans.mar")


    """

    def __init__(self, tc, scala_model):
        self._tc = tc
        tc.jutils.validate_is_jvm_instance_of(scala_model, get_scala_obj(tc))
        self._scala = scala_model

    @staticmethod
    def _from_scala(tc, scala_model):
        return KMeansModel(tc, scala_model)

    @property
    def columns(self):
        return list(self._tc.jutils.convert.from_scala_seq(self._scala.columns()))

    @property
    def scalings(self):
        s = self._tc.jutils.convert.from_scala_option(self._scala.scalings())
        if s:
            return list(self._tc.jutils.convert.from_scala_seq(s))
        return None

    @property
    def k(self):
        return self._scala.k()

    @property
    def max_iterations(self):
        return self._scala.maxIterations()

    @property
    def initialization_mode(self):
        return self._scala.initializationMode()

    @property
    def centroids(self):
        return [list(item) for item in list(self._scala.centroidsAsArrays())]

    def compute_sizes(self, frame, columns=None):
        c = self.__columns_to_option(columns)
        return [int(n) for n in self._scala.computeClusterSizes(frame._scala, c)]

    def compute_wsse(self, frame, columns=None):
        c = self.__columns_to_option(columns)
        return self._scala.computeWsse(frame._scala, c)

    def predict(self, frame, columns=None):
        """
       Predicts the labels for the observation columns in the given input frame. Creates a new frame
       with the existing columns and a new predicted column.

       Parameters
       ----------

       :param frame: (Frame) Frame used for predicting the values
       :param c: (List[str]) Names of the observation columns.
       :return: (Frame) A new frame containing the original frame's columns and a prediction column
       """
        c = self.__columns_to_option(columns)
        from sparktk.frame.frame import Frame
        return Frame(self._tc, self._scala.predict(frame._scala, c))

    def add_distance_columns(self, frame, columns=None):
        c = self.__columns_to_option(columns)
        self._scala.addDistanceColumns(frame._scala, c)

    def __columns_to_option(self, columns):
        if isinstance(columns, basestring):
            columns = [columns]
        if columns is not None:
            columns = self._tc.jutils.convert.to_scala_vector_string(columns)
        return self._tc.jutils.convert.to_scala_option(columns)

    def save(self, path):
        if isinstance(path, basestring):
            self._scala.save(self._tc._scala_sc, path)

    def export_to_mar(self, path):
        """
        Exports the trained model as a model archive (.mar) to the specified path

        Parameters
        ----------

        :param path: (str) Path to save the trained model
        :return: (str) Full path to the saved .mar file
        """
        if isinstance(path, basestring):
            return self._scala.exportToMar(self._tc._scala_sc, path)

Ancestors (in MRO)

  • KMeansModel
  • sparktk.propobj.PropertiesObject
  • __builtin__.object

Instance variables

var centroids

var columns

var initialization_mode

var k

var max_iterations

var scalings

Methods

def __init__(

self, tc, scala_model)

def __init__(self, tc, scala_model):
    self._tc = tc
    tc.jutils.validate_is_jvm_instance_of(scala_model, get_scala_obj(tc))
    self._scala = scala_model

def add_distance_columns(

self, frame, columns=None)

def add_distance_columns(self, frame, columns=None):
    c = self.__columns_to_option(columns)
    self._scala.addDistanceColumns(frame._scala, c)

def compute_sizes(

self, frame, columns=None)

def compute_sizes(self, frame, columns=None):
    c = self.__columns_to_option(columns)
    return [int(n) for n in self._scala.computeClusterSizes(frame._scala, c)]

def compute_wsse(

self, frame, columns=None)

def compute_wsse(self, frame, columns=None):
    c = self.__columns_to_option(columns)
    return self._scala.computeWsse(frame._scala, c)

def export_to_mar(

self, path)

Exports the trained model as a model archive (.mar) to the specified path

Parameters:
path(str):Path to save the trained model

Returns(str): Full path to the saved .mar file

def export_to_mar(self, path):
    """
    Exports the trained model as a model archive (.mar) to the specified path
    Parameters
    ----------
    :param path: (str) Path to save the trained model
    :return: (str) Full path to the saved .mar file
    """
    if isinstance(path, basestring):
        return self._scala.exportToMar(self._tc._scala_sc, path)

def predict(

self, frame, columns=None)

Predicts the labels for the observation columns in the given input frame. Creates a new frame with the existing columns and a new predicted column.

Parameters:
frame(Frame):Frame used for predicting the values
c(List[str]):Names of the observation columns.

Returns(Frame): A new frame containing the original frame's columns and a prediction column

def predict(self, frame, columns=None):
    """
   Predicts the labels for the observation columns in the given input frame. Creates a new frame
   with the existing columns and a new predicted column.
   Parameters
   ----------
   :param frame: (Frame) Frame used for predicting the values
   :param c: (List[str]) Names of the observation columns.
   :return: (Frame) A new frame containing the original frame's columns and a prediction column
   """
    c = self.__columns_to_option(columns)
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.predict(frame._scala, c))

def save(

self, path)

def save(self, path):
    if isinstance(path, basestring):
        self._scala.save(self._tc._scala_sc, path)

def to_dict(

self)

def to_dict(self):
    d = self._properties()
    d.update(self._attributes())
    return d

def to_json(

self)

def to_json(self):
    return json.dumps(self.to_dict())