Up

sparktk.graph.graph module

# vim: set encoding=utf-8

#  Copyright (c) 2016 Intel Corporation 
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import logging
logger = logging.getLogger('sparktk')
from graphframes.graphframe import GraphFrame, _from_java_gf
from py4j.protocol import Py4JJavaError
from pyspark.sql.utils import IllegalArgumentException
from sparktk import TkContext
from sparktk.frame.frame import Frame
from sparktk.arguments import require_type


# import constructors for the API's sake (not actually dependencies of the Graph)
from sparktk.graph.constructors.create import create
from sparktk.graph.constructors.import_orientdb_graph import import_orientdb_graph

__all__ = ["create",
           "Graph",
           "import_orientdb_graph",
           "load"]


class Graph(object):
    """
    sparktk Graph

    Represents a graph with a frame defining vertices and another frame defining edges.  It is implemented as a very
    thin wrapper of the spark GraphFrame (https://github.com/graphframes/graphframes) onto which additional methods
    are available.

    A vertices frame defines the vertices for the graph and must have a schema with a column
    named "id" which provides unique vertex ID.  All other columns are treated as vertex properties.
    If a column is also found named "vertex_type", it will be used as a special label to denote the type
    of vertex, for example, when interfacing with logic (such as a graph DB) which expects a
    specific vertex type.

    An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the
    vertex ids of the edge.  All other columns are treated as edge properties.  If a column is also found named
    "edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic
    (such as a graph DB) which expects a specific edge type.

    Unlike sparktk Frames, this Graph is immutable (it cannot be changed).  The vertices and edges may be extracted as
    sparktk Frame objects, but those frames then take on a life of their own apart from this graph object.  Those frames
    could be transformed and used to build a new graph object if necessary.

    The underlying spark GraphFrame is available as the 'graphframe' property of this object.


    Examples
    --------

        >>> viewers = tc.frame.create([['fred', 0],
        ...                            ['wilma', 0],
        ...                            ['pebbles', 1],
        ...                            ['betty', 0],
        ...                            ['barney', 0],
        ...                            ['bamm bamm', 1]],
        ...                           schema= [('id', str), ('kids', int)])

        >>> titles = ['Croods', 'Jurassic Park', '2001', 'Ice Age', 'Land Before Time']

        >>> movies = tc.frame.create([[t] for t in titles], schema=[('id', str)])

        >>> vertices = viewers.copy()

        >>> vertices.append(movies)

        >>> vertices.inspect(20)
        [##]  id                kids
        ============================
        [0]   fred                 0
        [1]   wilma                0
        [2]   pebbles              1
        [3]   betty                0
        [4]   barney               0
        [5]   bamm bamm            1
        [6]   Croods            None
        [7]   Jurassic Park     None
        [8]   2001              None
        [9]   Ice Age           None
        [10]  Land Before Time  None

        >>> edges = tc.frame.create([['fred','Croods',5],
        ...                          ['fred','Jurassic Park',5],
        ...                          ['fred','2001',2],
        ...                          ['fred','Ice Age',4],
        ...                          ['wilma','Jurassic Park',3],
        ...                          ['wilma','2001',5],
        ...                          ['wilma','Ice Age',4],
        ...                          ['pebbles','Croods',4],
        ...                          ['pebbles','Land Before Time',3],
        ...                          ['pebbles','Ice Age',5],
        ...                          ['betty','Croods',5],
        ...                          ['betty','Jurassic Park',3],
        ...                          ['betty','Land Before Time',4],
        ...                          ['betty','Ice Age',3],
        ...                          ['barney','Croods',5],
        ...                          ['barney','Jurassic Park',5],
        ...                          ['barney','Land Before Time',3],
        ...                          ['barney','Ice Age',5],
        ...                          ['bamm bamm','Croods',5],
        ...                          ['bamm bamm','Land Before Time',3]],
        ...                         schema = ['src', 'dst', 'rating'])

        >>> edges.inspect(20)
        [##]  src        dst               rating
        =========================================
        [0]   fred       Croods                 5
        [1]   fred       Jurassic Park          5
        [2]   fred       2001                   2
        [3]   fred       Ice Age                4
        [4]   wilma      Jurassic Park          3
        [5]   wilma      2001                   5
        [6]   wilma      Ice Age                4
        [7]   pebbles    Croods                 4
        [8]   pebbles    Land Before Time       3
        [9]   pebbles    Ice Age                5
        [10]  betty      Croods                 5
        [11]  betty      Jurassic Park          3
        [12]  betty      Land Before Time       4
        [13]  betty      Ice Age                3
        [14]  barney     Croods                 5
        [15]  barney     Jurassic Park          5
        [16]  barney     Land Before Time       3
        [17]  barney     Ice Age                5
        [18]  bamm bamm  Croods                 5
        [19]  bamm bamm  Land Before Time       3

        >>> graph = tc.graph.create(vertices, edges)

        >>> graph
        Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])


        >>> graph.save("sandbox/old_movie_graph")

        >>> restored = tc.load("sandbox/old_movie_graph")

        >>> restored
        Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])


    """
    def __init__(self, tc, source_or_vertices_frame, edges_frame=None):
        self._tc = tc
        self._scala = None

        # (note that the Scala code will validate appropriate frame schemas)

        if isinstance(source_or_vertices_frame, Frame):
            # Python Vertices and Edges Frames
            vertices_frame = source_or_vertices_frame
            require_type(Frame,
                         edges_frame,
                         'edges_frame',
                         "Providing a vertices frame requires also providing an edges frame")
            self._scala = self._create_scala_graph_from_scala_frames(self._tc,
                                                                     vertices_frame._scala,
                                                                     edges_frame._scala)
        else:
            source = source_or_vertices_frame
            require_type(None,
                         edges_frame,
                         'edges_frame',
                         'If edges_frames is provided, then a valid vertex frame must be provided as the first arg, instead of type %s' % type(source))
            if self._is_scala_graph(source):
                # Scala Graph
                self._scala = source
            elif isinstance(source, GraphFrame):
                # python GraphFrame
                scala_graphframe =  source._jvm_graph
                self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, scala_graphframe)
            elif self._is_scala_graphframe(source):
                # scala GraphFrame
                self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, source)
            else:
                raise TypeError("Cannot create from source type %s" % type(source))

    def __repr__(self):
        return self._scala.toString()

    @staticmethod
    def _get_scala_graph_class(tc):
        """Gets reference to the sparktk scala Graph class"""
        return tc.sc._jvm.org.trustedanalytics.sparktk.graph.Graph

    @staticmethod
    def _get_scala_graphframe_class(tc):
        """Gets reference to the scala GraphFrame class"""
        return tc.sc._jvm.org.graphframes.GraphFrame

    @staticmethod
    def _create_scala_graph_from_scala_graphframe(tc, scala_graphframe):
        try:
            return tc.sc._jvm.org.trustedanalytics.sparktk.graph.Graph(scala_graphframe)
        except (Py4JJavaError, IllegalArgumentException) as e:
            raise ValueError(str(e))

    @staticmethod
    def _create_scala_graph_from_scala_frames(tc, scala_vertices_frame, scala_edges_frame):
        try:
            return tc.sc._jvm.org.trustedanalytics.sparktk.graph.internal.constructors.FromFrames.create(scala_vertices_frame, scala_edges_frame)
        except (Py4JJavaError, IllegalArgumentException) as e:
            raise ValueError(str(e))

    # this one for the Loader:
    @staticmethod
    def _from_scala(tc, scala_graph):
        """creates a python Frame for the given scala Frame"""
        return Graph(tc, scala_graph)

    def _is_scala_graph(self, item):
        return self._tc._jutils.is_jvm_instance_of(item, self._get_scala_graph_class(self._tc))

    def _is_scala_graphframe(self, item):
        return self._tc._jutils.is_jvm_instance_of(item, self._get_scala_graphframe_class(self._tc))

    ##########################################################################
    # API
    ##########################################################################

    @property
    def graphframe(self):
        """The underlying graphframe object which exposes several methods and properties"""
        return _from_java_gf(self._scala.graphFrame(), self._tc.sql_context)

    def create_vertices_frame(self):
        """Creates a frame representing the vertices stored in this graph"""
        from sparktk.frame.frame import Frame
        return Frame(self._tc, self._scala.graphFrame().vertices())

    def create_edges_frame(self):
        """Creates a frame representing the edges stored in this graph"""
        from sparktk.frame.frame import Frame
        return Frame(self._tc, self._scala.graphFrame().edges())

    # Graph Operations
    from sparktk.graph.ops.connected_components import connected_components
    from sparktk.graph.ops.clustering_coefficient import clustering_coefficient
    from sparktk.graph.ops.degrees import degrees
    from sparktk.graph.ops.export_to_orientdb import export_to_orientdb
    from sparktk.graph.ops.global_clustering_coefficient import global_clustering_coefficient
    from sparktk.graph.ops.label_propagation import label_propagation
    from sparktk.graph.ops.loopy_belief_propagation import loopy_belief_propagation
    from sparktk.graph.ops.page_rank import page_rank
    from sparktk.graph.ops.save import save
    from sparktk.graph.ops.triangle_count import triangle_count
    from sparktk.graph.ops.vertex_count import vertex_count
    from sparktk.graph.ops.weighted_degrees import weighted_degrees


def load(path, tc=TkContext.implicit):
    """load Graph from given path"""
    TkContext.validate(tc)
    return tc.load(path, Graph)

Functions

def create(

source_or_vertices_frame, edges_frame=None, tc=<class 'sparktk.arguments.implicit'>)

Create a sparktk Graph from two sparktk Frames (or some other source)

Parameters:
source_or_vertices_frame: a graph source or a vertices frame Valid sources include: a python and spark GraphFrame, or a scala Graph Otherwise if a vertices frame is provided, then the edges_frame arg must also be supplied. A vertices frame defines the vertices for the graph and must have a schema with a column named "id" which provides unique vertex ID. All other columns are treated as vertex properties. If a column is also found named "vertex_type", it will be used as a special label to denote the type of vertex, for example, when interfacing with logic (such as a graph DB) which expects a specific vertex type.
edges_frame(valid only if the source_or_vertices_frame arg is a vertices Frame):An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the vertex ids of the edge. All other columns are treated as edge properties. If a column is also found named "edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic (such as a graph DB) which expects a specific edge type.
def create(source_or_vertices_frame, edges_frame=None, tc=TkContext.implicit):
    """
    Create a sparktk Graph from two sparktk Frames (or some other source)

    Parameters
    ----------
    :param source_or_vertices_frame: a graph source or a vertices frame
                        Valid sources include: a python and spark GraphFrame, or a scala Graph
                        Otherwise if a vertices frame is provided, then the edges_frame arg must also be supplied.
                        A vertices frame defines the vertices for the graph and must have a schema with a column
                        named "id" which provides unique vertex ID.  All other columns are treated as vertex properties.
                        If a column is also found named "vertex_type", it will be used as a special label to denote the
                        type of vertex, for example, when interfacing with logic (such as a graph DB) which expects a
                        specific vertex type.

    :param edges_frame: (valid only if the source_or_vertices_frame arg is a vertices Frame) An edge frame defines the
                        edges of the graph; schema must have columns names "src" and "dst" which provide the vertex ids
                        of the edge.  All other columns are treated as edge properties.  If a column is also found named
                        "edge_type", it will be used as a special label to denote the type of edge, for example, when
                        interfacing with logic (such as a graph DB) which expects a specific edge type.
    """
    TkContext.validate(tc)
    from sparktk.graph.graph import Graph
    return Graph(tc, source_or_vertices_frame, edges_frame)

def import_orientdb_graph(

db_url, user_name, password, root_password, tc=<class 'sparktk.arguments.implicit'>)

Import graph from OrientDB to spark-tk as spark-tk graph (Spark GraphFrame)

Parameters
----------
:param:(str) db_url: OrientDB URI
:param:(str) user_name: the database username
:param:(str) password: the database password
:param :(str)root_password: OrientDB server password

Example
-------

   >>> v = tc.frame.create([("a", "Alice", 34,"F"),
    ...     ("b", "Bob", 36,"M"),
    ...     ("c", "Charlie", 30,"M"),
    ...     ("d", "David", 29,"M"),
    ...     ("e", "Esther", 32,"F"),
    ...     ("f", "Fanny", 36,"F"),
    ...     ], ["id", "name", "age","gender"])

    >>> e = tc.frame.create([("a", "b", "friend"),
    ...     ("b", "c", "follow"),
    ...     ("c", "b", "follow"),
    ...     ("f", "c", "follow"),
    ...     ("e", "f", "follow"),
    ...     ("e", "d", "friend"),
    ...     ("d", "a", "friend"),
    ...     ("a", "e", "friend")
    ...     ], ["src", "dst", "relationship"])

    >>> sparktk_graph = tc.graph.create(v,e)

    >>> db = "test_db"

    >>> sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")

    >>> imported_gf = tc.graph.import_orientdb_graph(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password")

    >>> imported_gf.graphframe.vertices.show()

+-------+------+---+---+ | name|gender| id|age| +-------+------+---+---+ | Bob| M| b| 36| | David| M| d| 29| |Charlie| M| c| 30| | Alice| F| a| 34| | Esther| F| e| 32| | Fanny| F| f| 36| +-------+------+---+---+

    >>> imported_gf.graphframe.edges.show()

+---+------------+---+ |dst|relationship|src| +---+------------+---+ | f| follow| e| | b| follow| c| | c| follow| b| | c| follow| f| | b| friend| a| | a| friend| d| | d| friend| e| | e| friend| a| +---+------------+---+

def import_orientdb_graph(db_url, user_name, password, root_password,tc=TkContext.implicit):
    """
    Import graph from OrientDB to spark-tk as spark-tk graph (Spark GraphFrame)

    Parameters
    ----------
    :param:(str) db_url: OrientDB URI
    :param:(str) user_name: the database username
    :param:(str) password: the database password
    :param :(str)root_password: OrientDB server password

    Example
    -------

       >>> v = tc.frame.create([("a", "Alice", 34,"F"),
        ...     ("b", "Bob", 36,"M"),
        ...     ("c", "Charlie", 30,"M"),
        ...     ("d", "David", 29,"M"),
        ...     ("e", "Esther", 32,"F"),
        ...     ("f", "Fanny", 36,"F"),
        ...     ], ["id", "name", "age","gender"])

        >>> e = tc.frame.create([("a", "b", "friend"),
        ...     ("b", "c", "follow"),
        ...     ("c", "b", "follow"),
        ...     ("f", "c", "follow"),
        ...     ("e", "f", "follow"),
        ...     ("e", "d", "friend"),
        ...     ("d", "a", "friend"),
        ...     ("a", "e", "friend")
        ...     ], ["src", "dst", "relationship"])

        >>> sparktk_graph = tc.graph.create(v,e)

        >>> db = "test_db"

        >>> sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")

        >>> imported_gf = tc.graph.import_orientdb_graph(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password")

        >>> imported_gf.graphframe.vertices.show()

+-------+------+---+---+
|   name|gender| id|age|
+-------+------+---+---+
|    Bob|     M|  b| 36|
|  David|     M|  d| 29|
|Charlie|     M|  c| 30|
|  Alice|     F|  a| 34|
| Esther|     F|  e| 32|
|  Fanny|     F|  f| 36|
+-------+------+---+---+

        >>> imported_gf.graphframe.edges.show()

+---+------------+---+
|dst|relationship|src|
+---+------------+---+
|  f|      follow|  e|
|  b|      follow|  c|
|  c|      follow|  b|
|  c|      follow|  f|
|  b|      friend|  a|
|  a|      friend|  d|
|  d|      friend|  e|
|  e|      friend|  a|
+---+------------+---+

    """
    TkContext.validate(tc)
    scala_graph = tc.sc._jvm.org.trustedanalytics.sparktk.graph.internal.constructors.fromorientdb.ImportFromOrientdb.importOrientdbGraph(tc.jutils.get_scala_sc(), db_url,user_name,password,root_password)
    from sparktk.graph.graph import Graph
    return Graph(tc, scala_graph)

def load(

path, tc=<class 'sparktk.arguments.implicit'>)

load Graph from given path

def load(path, tc=TkContext.implicit):
    """load Graph from given path"""
    TkContext.validate(tc)
    return tc.load(path, Graph)

Classes

class Graph

sparktk Graph

Represents a graph with a frame defining vertices and another frame defining edges. It is implemented as a very thin wrapper of the spark GraphFrame (https://github.com/graphframes/graphframes) onto which additional methods are available.

A vertices frame defines the vertices for the graph and must have a schema with a column named "id" which provides unique vertex ID. All other columns are treated as vertex properties. If a column is also found named "vertex_type", it will be used as a special label to denote the type of vertex, for example, when interfacing with logic (such as a graph DB) which expects a specific vertex type.

An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the vertex ids of the edge. All other columns are treated as edge properties. If a column is also found named "edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic (such as a graph DB) which expects a specific edge type.

Unlike sparktk Frames, this Graph is immutable (it cannot be changed). The vertices and edges may be extracted as sparktk Frame objects, but those frames then take on a life of their own apart from this graph object. Those frames could be transformed and used to build a new graph object if necessary.

The underlying spark GraphFrame is available as the 'graphframe' property of this object.

Examples:
>>> viewers = tc.frame.create([['fred', 0],
...                            ['wilma', 0],
...                            ['pebbles', 1],
...                            ['betty', 0],
...                            ['barney', 0],
...                            ['bamm bamm', 1]],
...                           schema= [('id', str), ('kids', int)])

>>> titles = ['Croods', 'Jurassic Park', '2001', 'Ice Age', 'Land Before Time']

>>> movies = tc.frame.create([[t] for t in titles], schema=[('id', str)])

>>> vertices = viewers.copy()

>>> vertices.append(movies)

>>> vertices.inspect(20)
[##]  id                kids
============================
[0]   fred                 0
[1]   wilma                0
[2]   pebbles              1
[3]   betty                0
[4]   barney               0
[5]   bamm bamm            1
[6]   Croods            None
[7]   Jurassic Park     None
[8]   2001              None
[9]   Ice Age           None
[10]  Land Before Time  None

>>> edges = tc.frame.create([['fred','Croods',5],
...                          ['fred','Jurassic Park',5],
...                          ['fred','2001',2],
...                          ['fred','Ice Age',4],
...                          ['wilma','Jurassic Park',3],
...                          ['wilma','2001',5],
...                          ['wilma','Ice Age',4],
...                          ['pebbles','Croods',4],
...                          ['pebbles','Land Before Time',3],
...                          ['pebbles','Ice Age',5],
...                          ['betty','Croods',5],
...                          ['betty','Jurassic Park',3],
...                          ['betty','Land Before Time',4],
...                          ['betty','Ice Age',3],
...                          ['barney','Croods',5],
...                          ['barney','Jurassic Park',5],
...                          ['barney','Land Before Time',3],
...                          ['barney','Ice Age',5],
...                          ['bamm bamm','Croods',5],
...                          ['bamm bamm','Land Before Time',3]],
...                         schema = ['src', 'dst', 'rating'])

>>> edges.inspect(20)
[##]  src        dst               rating
=========================================
[0]   fred       Croods                 5
[1]   fred       Jurassic Park          5
[2]   fred       2001                   2
[3]   fred       Ice Age                4
[4]   wilma      Jurassic Park          3
[5]   wilma      2001                   5
[6]   wilma      Ice Age                4
[7]   pebbles    Croods                 4
[8]   pebbles    Land Before Time       3
[9]   pebbles    Ice Age                5
[10]  betty      Croods                 5
[11]  betty      Jurassic Park          3
[12]  betty      Land Before Time       4
[13]  betty      Ice Age                3
[14]  barney     Croods                 5
[15]  barney     Jurassic Park          5
[16]  barney     Land Before Time       3
[17]  barney     Ice Age                5
[18]  bamm bamm  Croods                 5
[19]  bamm bamm  Land Before Time       3

>>> graph = tc.graph.create(vertices, edges)

>>> graph
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])


>>> graph.save("sandbox/old_movie_graph")

>>> restored = tc.load("sandbox/old_movie_graph")

>>> restored
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
class Graph(object):
    """
    sparktk Graph

    Represents a graph with a frame defining vertices and another frame defining edges.  It is implemented as a very
    thin wrapper of the spark GraphFrame (https://github.com/graphframes/graphframes) onto which additional methods
    are available.

    A vertices frame defines the vertices for the graph and must have a schema with a column
    named "id" which provides unique vertex ID.  All other columns are treated as vertex properties.
    If a column is also found named "vertex_type", it will be used as a special label to denote the type
    of vertex, for example, when interfacing with logic (such as a graph DB) which expects a
    specific vertex type.

    An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the
    vertex ids of the edge.  All other columns are treated as edge properties.  If a column is also found named
    "edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic
    (such as a graph DB) which expects a specific edge type.

    Unlike sparktk Frames, this Graph is immutable (it cannot be changed).  The vertices and edges may be extracted as
    sparktk Frame objects, but those frames then take on a life of their own apart from this graph object.  Those frames
    could be transformed and used to build a new graph object if necessary.

    The underlying spark GraphFrame is available as the 'graphframe' property of this object.


    Examples
    --------

        >>> viewers = tc.frame.create([['fred', 0],
        ...                            ['wilma', 0],
        ...                            ['pebbles', 1],
        ...                            ['betty', 0],
        ...                            ['barney', 0],
        ...                            ['bamm bamm', 1]],
        ...                           schema= [('id', str), ('kids', int)])

        >>> titles = ['Croods', 'Jurassic Park', '2001', 'Ice Age', 'Land Before Time']

        >>> movies = tc.frame.create([[t] for t in titles], schema=[('id', str)])

        >>> vertices = viewers.copy()

        >>> vertices.append(movies)

        >>> vertices.inspect(20)
        [##]  id                kids
        ============================
        [0]   fred                 0
        [1]   wilma                0
        [2]   pebbles              1
        [3]   betty                0
        [4]   barney               0
        [5]   bamm bamm            1
        [6]   Croods            None
        [7]   Jurassic Park     None
        [8]   2001              None
        [9]   Ice Age           None
        [10]  Land Before Time  None

        >>> edges = tc.frame.create([['fred','Croods',5],
        ...                          ['fred','Jurassic Park',5],
        ...                          ['fred','2001',2],
        ...                          ['fred','Ice Age',4],
        ...                          ['wilma','Jurassic Park',3],
        ...                          ['wilma','2001',5],
        ...                          ['wilma','Ice Age',4],
        ...                          ['pebbles','Croods',4],
        ...                          ['pebbles','Land Before Time',3],
        ...                          ['pebbles','Ice Age',5],
        ...                          ['betty','Croods',5],
        ...                          ['betty','Jurassic Park',3],
        ...                          ['betty','Land Before Time',4],
        ...                          ['betty','Ice Age',3],
        ...                          ['barney','Croods',5],
        ...                          ['barney','Jurassic Park',5],
        ...                          ['barney','Land Before Time',3],
        ...                          ['barney','Ice Age',5],
        ...                          ['bamm bamm','Croods',5],
        ...                          ['bamm bamm','Land Before Time',3]],
        ...                         schema = ['src', 'dst', 'rating'])

        >>> edges.inspect(20)
        [##]  src        dst               rating
        =========================================
        [0]   fred       Croods                 5
        [1]   fred       Jurassic Park          5
        [2]   fred       2001                   2
        [3]   fred       Ice Age                4
        [4]   wilma      Jurassic Park          3
        [5]   wilma      2001                   5
        [6]   wilma      Ice Age                4
        [7]   pebbles    Croods                 4
        [8]   pebbles    Land Before Time       3
        [9]   pebbles    Ice Age                5
        [10]  betty      Croods                 5
        [11]  betty      Jurassic Park          3
        [12]  betty      Land Before Time       4
        [13]  betty      Ice Age                3
        [14]  barney     Croods                 5
        [15]  barney     Jurassic Park          5
        [16]  barney     Land Before Time       3
        [17]  barney     Ice Age                5
        [18]  bamm bamm  Croods                 5
        [19]  bamm bamm  Land Before Time       3

        >>> graph = tc.graph.create(vertices, edges)

        >>> graph
        Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])


        >>> graph.save("sandbox/old_movie_graph")

        >>> restored = tc.load("sandbox/old_movie_graph")

        >>> restored
        Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])


    """
    def __init__(self, tc, source_or_vertices_frame, edges_frame=None):
        self._tc = tc
        self._scala = None

        # (note that the Scala code will validate appropriate frame schemas)

        if isinstance(source_or_vertices_frame, Frame):
            # Python Vertices and Edges Frames
            vertices_frame = source_or_vertices_frame
            require_type(Frame,
                         edges_frame,
                         'edges_frame',
                         "Providing a vertices frame requires also providing an edges frame")
            self._scala = self._create_scala_graph_from_scala_frames(self._tc,
                                                                     vertices_frame._scala,
                                                                     edges_frame._scala)
        else:
            source = source_or_vertices_frame
            require_type(None,
                         edges_frame,
                         'edges_frame',
                         'If edges_frames is provided, then a valid vertex frame must be provided as the first arg, instead of type %s' % type(source))
            if self._is_scala_graph(source):
                # Scala Graph
                self._scala = source
            elif isinstance(source, GraphFrame):
                # python GraphFrame
                scala_graphframe =  source._jvm_graph
                self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, scala_graphframe)
            elif self._is_scala_graphframe(source):
                # scala GraphFrame
                self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, source)
            else:
                raise TypeError("Cannot create from source type %s" % type(source))

    def __repr__(self):
        return self._scala.toString()

    @staticmethod
    def _get_scala_graph_class(tc):
        """Gets reference to the sparktk scala Graph class"""
        return tc.sc._jvm.org.trustedanalytics.sparktk.graph.Graph

    @staticmethod
    def _get_scala_graphframe_class(tc):
        """Gets reference to the scala GraphFrame class"""
        return tc.sc._jvm.org.graphframes.GraphFrame

    @staticmethod
    def _create_scala_graph_from_scala_graphframe(tc, scala_graphframe):
        try:
            return tc.sc._jvm.org.trustedanalytics.sparktk.graph.Graph(scala_graphframe)
        except (Py4JJavaError, IllegalArgumentException) as e:
            raise ValueError(str(e))

    @staticmethod
    def _create_scala_graph_from_scala_frames(tc, scala_vertices_frame, scala_edges_frame):
        try:
            return tc.sc._jvm.org.trustedanalytics.sparktk.graph.internal.constructors.FromFrames.create(scala_vertices_frame, scala_edges_frame)
        except (Py4JJavaError, IllegalArgumentException) as e:
            raise ValueError(str(e))

    # this one for the Loader:
    @staticmethod
    def _from_scala(tc, scala_graph):
        """creates a python Frame for the given scala Frame"""
        return Graph(tc, scala_graph)

    def _is_scala_graph(self, item):
        return self._tc._jutils.is_jvm_instance_of(item, self._get_scala_graph_class(self._tc))

    def _is_scala_graphframe(self, item):
        return self._tc._jutils.is_jvm_instance_of(item, self._get_scala_graphframe_class(self._tc))

    ##########################################################################
    # API
    ##########################################################################

    @property
    def graphframe(self):
        """The underlying graphframe object which exposes several methods and properties"""
        return _from_java_gf(self._scala.graphFrame(), self._tc.sql_context)

    def create_vertices_frame(self):
        """Creates a frame representing the vertices stored in this graph"""
        from sparktk.frame.frame import Frame
        return Frame(self._tc, self._scala.graphFrame().vertices())

    def create_edges_frame(self):
        """Creates a frame representing the edges stored in this graph"""
        from sparktk.frame.frame import Frame
        return Frame(self._tc, self._scala.graphFrame().edges())

    # Graph Operations
    from sparktk.graph.ops.connected_components import connected_components
    from sparktk.graph.ops.clustering_coefficient import clustering_coefficient
    from sparktk.graph.ops.degrees import degrees
    from sparktk.graph.ops.export_to_orientdb import export_to_orientdb
    from sparktk.graph.ops.global_clustering_coefficient import global_clustering_coefficient
    from sparktk.graph.ops.label_propagation import label_propagation
    from sparktk.graph.ops.loopy_belief_propagation import loopy_belief_propagation
    from sparktk.graph.ops.page_rank import page_rank
    from sparktk.graph.ops.save import save
    from sparktk.graph.ops.triangle_count import triangle_count
    from sparktk.graph.ops.vertex_count import vertex_count
    from sparktk.graph.ops.weighted_degrees import weighted_degrees

Ancestors (in MRO)

  • Graph
  • __builtin__.object

Instance variables

var graphframe

The underlying graphframe object which exposes several methods and properties

Methods

def __init__(

self, tc, source_or_vertices_frame, edges_frame=None)

def __init__(self, tc, source_or_vertices_frame, edges_frame=None):
    self._tc = tc
    self._scala = None
    # (note that the Scala code will validate appropriate frame schemas)
    if isinstance(source_or_vertices_frame, Frame):
        # Python Vertices and Edges Frames
        vertices_frame = source_or_vertices_frame
        require_type(Frame,
                     edges_frame,
                     'edges_frame',
                     "Providing a vertices frame requires also providing an edges frame")
        self._scala = self._create_scala_graph_from_scala_frames(self._tc,
                                                                 vertices_frame._scala,
                                                                 edges_frame._scala)
    else:
        source = source_or_vertices_frame
        require_type(None,
                     edges_frame,
                     'edges_frame',
                     'If edges_frames is provided, then a valid vertex frame must be provided as the first arg, instead of type %s' % type(source))
        if self._is_scala_graph(source):
            # Scala Graph
            self._scala = source
        elif isinstance(source, GraphFrame):
            # python GraphFrame
            scala_graphframe =  source._jvm_graph
            self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, scala_graphframe)
        elif self._is_scala_graphframe(source):
            # scala GraphFrame
            self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, source)
        else:
            raise TypeError("Cannot create from source type %s" % type(source))

def clustering_coefficient(

self)

The clustering coefficient of a vertex provides a measure of how tightly clustered that vertex's neighborhood is.

Formally:

.. math::

cc(v) = rac{ \| { (u,v,w) \in V^3: \ {u,v}, {u, w}, {v,w } \in E } \| }{\| { (u,v,w) \in V^3: \ {v, u }, {v, w} \in E } \|}

For further reading on clustering coefficients, see http://en.wikipedia.org/wiki/Clustering_coefficient.

This method returns a frame with the vertex id associated with it's local clustering coefficient

Parameters:

Returns(Frame): Frame containing the vertex id's and their clustering coefficient

Examples:
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]

>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)

>>> result = graph.clustering_coefficient()
>>> result.inspect()
[#]  id  clustering_coefficient
===============================
[0]   1          0.333333333333
[1]   2                     1.0
[2]   3                     1.0
[3]   4                     0.0
[4]   5                     0.0
def clustering_coefficient(self):
    """
    The clustering coefficient of a vertex provides a measure of how
    tightly clustered that vertex's neighborhood is.
    
    Formally:
    
    .. math::
    
       cc(v)  = \frac{ \| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\}, \{v,w \} \in \
           E \} \| }{\| \{ (u,v,w) \in V^3: \ \{v, u \}, \{v, w\} \in E \} \|}
    
    For further reading on clustering
    coefficients, see http://en.wikipedia.org/wiki/Clustering_coefficient.
    
    This method returns a frame with the vertex id associated with it's local
    clustering coefficient

    Parameters
    ----------

    :return: (Frame) Frame containing the vertex id's and their clustering coefficient

    Examples
    --------

        >>> vertex_schema = [('id', int)]
        >>> edge_schema = [('src', int), ('dst', int)]

        >>> vertex_rows = [ [1], [2], [3], [4], [5] ]
        >>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)

        >>> result = graph.clustering_coefficient()
        >>> result.inspect()
        [#]  id  clustering_coefficient
        ===============================
        [0]   1          0.333333333333
        [1]   2                     1.0
        [2]   3                     1.0
        [3]   4                     0.0
        [4]   5                     0.0

    """
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.clusteringCoefficient())

def connected_components(

self)

Connected components determines groups all the vertices in a particular graph by whether or not there is path between these vertices. This method returns a frame with the vertices and their corresponding component

Parameters:

Returns(Frame): Frame containing the vertex id's and their components

Examples:
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]

>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)

>>> result = graph.connected_components()
>>> result.inspect() 
[#]  id  component
==================
[0]   1          1
[1]   2          1
[2]   3          1
[3]   4          4
[4]   5          4
def connected_components(self):
    """

    Connected components determines groups all the vertices in a particular graph
    by whether or not there is path between these vertices. This method returns
    a frame with the vertices and their corresponding component

    Parameters
    ----------

    :return: (Frame) Frame containing the vertex id's and their components

    Examples
    --------

        >>> vertex_schema = [('id', int)]
        >>> edge_schema = [('src', int), ('dst', int)]

        >>> vertex_rows = [ [1], [2], [3], [4], [5] ]
        >>> edge_rows = [ [1, 2], [1, 3], [2, 3], [4, 5] ]
        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)

        >>> result = graph.connected_components()
        >>> result.inspect() 
        [#]  id  component
        ==================
        [0]   1          1
        [1]   2          1
        [2]   3          1
        [3]   4          4
        [4]   5          4

    """
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.connectedComponents())

def create_edges_frame(

self)

Creates a frame representing the edges stored in this graph

def create_edges_frame(self):
    """Creates a frame representing the edges stored in this graph"""
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.graphFrame().edges())

def create_vertices_frame(

self)

Creates a frame representing the vertices stored in this graph

def create_vertices_frame(self):
    """Creates a frame representing the vertices stored in this graph"""
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.graphFrame().vertices())

def degrees(

self, degree_option='undirected')

Degree Calculation

A fundamental quantity in graph analysis is the degree of a vertex: The degree of a vertex is the number of edges adjacent to it.

For a directed edge relation, a vertex has both an out-degree (the number of edges leaving the vertex) and an in-degree (the number of edges entering the vertex).

Parameters:
degree_option(String):Either in, out or undirected. String describing the direction of edges

Returns(Frame): Frame containing the vertex id's an their weights

Examples:
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]

>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)

>>> result = graph.degrees(degree_option="out")
>>> result.inspect() 
[#]  id  degree
===============
[0]   1       3
[1]   2       1
[2]   3       0
[3]   4       1
[4]   5       0


>>> result = graph.degrees(degree_option="in")
>>> result.inspect()
[#]  id  degree
===============
[0]   1       0
[1]   2       1
[2]   3       2
[3]   4       1
[4]   5       1
def degrees(self, degree_option='undirected'):
    """
    **Degree Calculation**

    A fundamental quantity in graph analysis is the degree of a vertex:
    The degree of a vertex is the number of edges adjacent to it.

    For a directed edge relation, a vertex has both an out-degree (the number of
    edges leaving the vertex) and an in-degree (the number of edges entering the
    vertex).

    Parameters
    ----------

    :param degree_option: (String) Either in, out or undirected. String describing the direction of edges

    :return: (Frame) Frame containing the vertex id's an their weights

    Examples
    --------

        >>> vertex_schema = [('id', int)]
        >>> edge_schema = [('src', int), ('dst', int)]

        >>> vertex_rows = [ [1], [2], [3], [4], [5] ]
        >>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)

        >>> result = graph.degrees(degree_option="out")
        >>> result.inspect() 
        [#]  id  degree
        ===============
        [0]   1       3
        [1]   2       1
        [2]   3       0
        [3]   4       1
        [4]   5       0


        >>> result = graph.degrees(degree_option="in")
        >>> result.inspect()
        [#]  id  degree
        ===============
        [0]   1       0
        [1]   2       1
        [2]   3       2
        [3]   4       1
        [4]   5       1

    """
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.degree(degree_option))

def export_to_orientdb(

self, db_url, user_name, password, root_password, vertex_type_column_name=None, edge_type_column_name=None, batch_size=1000)

Export Spark-tk Graph (GraphFrame) to OrientDB API creates OrientDB database with the given database name, URL and credentials. It exports the vertex and edge dataframes schema OrientDB based on the given vertex_type_column_name and edge_type_column_name. If any of them was None it exports it to the base type class.

Parameters:

:param:(str) db_url: OrientDB URI :param:(str) user_name: the database username :param:(str) password: the database password :param:(str) root_password: OrientDB server password :param:(Optional(str)) vertex_type_column_name: column name from the vertex data frame specified to be the vertex type :param:(Optional(str)) edge_type_column_name: column name from the edge data frame specified to be the edge type :param:(int) batch_size: batch size for graph ETL to OrientDB database

Example:
>>> v = tc.frame.create([("a", "Alice", 34,"F"),
...     ("b", "Bob", 36,"M"),
...     ("c", "Charlie", 30,"M"),
...     ("d", "David", 29,"M"),
...     ("e", "Esther", 32,"F"),
...     ("f", "Fanny", 36,"F"),
...     ], ["id", "name", "age","gender"])

>>> e = tc.frame.create([("a", "b", "friend"),
...     ("b", "c", "follow"),
...     ("c", "b", "follow"),
...     ("f", "c", "follow"),
...     ("e", "f", "follow"),
...     ("e", "d", "friend"),
...     ("d", "a", "friend"),
...     ("a", "e", "friend")
...     ], ["src", "dst", "relationship"])

>>> sparktk_graph = tc.graph.create(v,e)

>>> db = "test_db"

>>> result = sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")

>>> result
db_uri                    = remote:hostname:2424/test_db
edge_types                = {u'follow': 4L, u'friend': 4L}
exported_edges_summary    = {u'Total Exported Edges Count': 8L, u'Failure Count': 0L}
exported_vertices_summary = {u'Total Exported Vertices Count': 6L, u'Failure Count': 0L}
vertex_types              = {u'M': 3L, u'F': 3L}
def export_to_orientdb(self, db_url, user_name, password, root_password,vertex_type_column_name=None, edge_type_column_name=None,batch_size=1000):

    """
    Export Spark-tk Graph (GraphFrame) to OrientDB API creates OrientDB database with the given database name, URL
    and credentials. It exports the vertex and edge dataframes schema OrientDB based on the given vertex_type_column_name
    and edge_type_column_name. If any of them was None it exports it to the base type class.

    Parameters
    ----------

    :param:(str) db_url: OrientDB URI
    :param:(str) user_name: the database username
    :param:(str) password: the database password
    :param:(str) root_password: OrientDB server password
    :param:(Optional(str)) vertex_type_column_name: column name from the vertex data frame specified to be the vertex type
    :param:(Optional(str)) edge_type_column_name: column name from the edge data frame specified to be the edge type
    :param:(int) batch_size: batch size for graph ETL to OrientDB database

    Example
    -------

        >>> v = tc.frame.create([("a", "Alice", 34,"F"),
        ...     ("b", "Bob", 36,"M"),
        ...     ("c", "Charlie", 30,"M"),
        ...     ("d", "David", 29,"M"),
        ...     ("e", "Esther", 32,"F"),
        ...     ("f", "Fanny", 36,"F"),
        ...     ], ["id", "name", "age","gender"])

        >>> e = tc.frame.create([("a", "b", "friend"),
        ...     ("b", "c", "follow"),
        ...     ("c", "b", "follow"),
        ...     ("f", "c", "follow"),
        ...     ("e", "f", "follow"),
        ...     ("e", "d", "friend"),
        ...     ("d", "a", "friend"),
        ...     ("a", "e", "friend")
        ...     ], ["src", "dst", "relationship"])

        >>> sparktk_graph = tc.graph.create(v,e)

        >>> db = "test_db"

        >>> result = sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")

        >>> result
        db_uri                    = remote:hostname:2424/test_db
        edge_types                = {u'follow': 4L, u'friend': 4L}
        exported_edges_summary    = {u'Total Exported Edges Count': 8L, u'Failure Count': 0L}
        exported_vertices_summary = {u'Total Exported Vertices Count': 6L, u'Failure Count': 0L}
        vertex_types              = {u'M': 3L, u'F': 3L}
    """
    return ExportToOrientdbReturn(self._tc,self._scala.exportToOrientdb(db_url, user_name, password, root_password,self._tc._jutils.convert.to_scala_option(vertex_type_column_name),self._tc._jutils.convert.to_scala_option(edge_type_column_name), batch_size))

def global_clustering_coefficient(

self)

The clustering coefficient of a graph provides a measure of how tightly clustered an undirected graph is.

More formally:

.. math::

cc(G)  = rac{ \| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\}, \{v,w \} \in             E \} \| }{\| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\} \in E \} \|}
Parameters:

Returns(Double): The global clustering coefficient of the graph

Examples:
The clustering coefficient on a graph with some triangles will be
greater than 0

>>> vertex_schema = [('id', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)

>>> edge_schema = [('src', int), ('dst', int)]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)

>>> graph.global_clustering_coefficient()
0.5

The clustering coefficient on a graph with no triangles (a tree) is
0

>>> vertex_rows_star = [ [1], [2], [3], [4]]
>>> vertex_frame_star = tc.frame.create(vertex_rows_star, vertex_schema)

>>> edge_rows_star = [ [1, 2], [1, 3], [1, 4]]
>>> edge_frame_star = tc.frame.create(edge_rows_star, edge_schema)

>>> graph_star = tc.graph.create(vertex_frame_star, edge_frame_star)

>>> graph_star.global_clustering_coefficient()
0.0
def global_clustering_coefficient(self):
    """

    The clustering coefficient of a graph provides a measure of how tightly
    clustered an undirected graph is.

    More formally:

    .. math::

        cc(G)  = \frac{ \| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\}, \{v,w \} \in \
            E \} \| }{\| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\} \in E \} \|}


    Parameters
    ----------

    :return: (Double) The global clustering coefficient of the graph

    Examples
    --------
        
        The clustering coefficient on a graph with some triangles will be
        greater than 0

        >>> vertex_schema = [('id', int)]
        >>> vertex_rows = [ [1], [2], [3], [4], [5] ]
        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)

        >>> edge_schema = [('src', int), ('dst', int)]
        >>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)

        >>> graph.global_clustering_coefficient()
        0.5

        The clustering coefficient on a graph with no triangles (a tree) is
        0

        >>> vertex_rows_star = [ [1], [2], [3], [4]]
        >>> vertex_frame_star = tc.frame.create(vertex_rows_star, vertex_schema)

        >>> edge_rows_star = [ [1, 2], [1, 3], [1, 4]]
        >>> edge_frame_star = tc.frame.create(edge_rows_star, edge_schema)

        >>> graph_star = tc.graph.create(vertex_frame_star, edge_frame_star)

        >>> graph_star.global_clustering_coefficient()
        0.0

    """
    return self._scala.globalClusteringCoefficient()

def label_propagation(

self, max_iterations)

Parameters:

Assigns label based off of proximity to different vertices. The labels are initially 1 unique label per vertex (the vertex id), and as the algorithm runs some of these get erased

Note this algorithm is neither guaranteed to converge, nor guaranteed to converge to the correct value.

This calls graph frames label propagation which can be found at

http://graphframes.github.io/api/scala/index.html#org.graphframes.lib.LabelPropagation

Returns(Frame): Frame containing the vertex id's and the community they are a member of

Examples:
>>> vertex_schema = [('id', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]

>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> edge_schema = [('src', int), ('dst', int)]

>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)

>>> result = graph.label_propagation(10)
>>> result.inspect()
[#]  id  label
==============
[0]   1      1
[1]   2      2
[2]   3      2
[3]   4      2
[4]   5      1
def label_propagation(self, max_iterations):
    """

    Parameters
    ----------

    Assigns label based off of proximity to different vertices. The labels
    are initially 1 unique label per vertex (the vertex id), and as the
    algorithm runs some of these get erased

    Note this algorithm is neither guaranteed to converge, nor guaranteed to
    converge to the correct value.

    This calls graph frames label propagation which can be found at 

    http://graphframes.github.io/api/scala/index.html#org.graphframes.lib.LabelPropagation

    :return: (Frame) Frame containing the vertex id's and the community they are a member of

    Examples
    --------

        >>> vertex_schema = [('id', int)]
        >>> vertex_rows = [ [1], [2], [3], [4], [5] ]

        >>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
        >>> edge_schema = [('src', int), ('dst', int)]

        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)

        >>> result = graph.label_propagation(10)
        >>> result.inspect()
        [#]  id  label
        ==============
        [0]   1      1
        [1]   2      2
        [2]   3      2
        [3]   4      2
        [4]   5      1

    """
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.labelPropagation(max_iterations))

def loopy_belief_propagation(

self, prior, edge_weight, max_iterations=10)

Performs loopy belief propagation on a graph representing a Potts model. This optimizes based off of user provided priors.

Parameters:
prior(String):The name of the column of space delimited string of floats representing the prior distribution on a vertex
edge_weight(String):The name of the column of weight value on edges
max_iterations: The number of iterations to run for
Examples:
>>> vertex_schema = [('id', int), ('label', float), ("prior_val", str), ("was_labeled", int)]
>>> vertex_rows = [ [1, 1, "0.7 0.3", 1], [2, 1, "0.7 0.3", 1], [3, 5, "0.7 0.3", 0], [4, 5, "0.7 0.3", 0], [5, 5, "0.7 0.3", 1] ]

>>> edge_schema = [('src', int), ('dst', int), ('weight', int)]
>>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]

>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> vertex_frame.inspect()
[#]  id  label  prior_val  was_labeled
======================================
[0]   1    1.0  0.7 0.3              1
[1]   2    1.0  0.7 0.3              1
[2]   3    5.0  0.7 0.3              0
[3]   4    5.0  0.7 0.3              0
[4]   5    5.0  0.7 0.3              1

>>> result = graph.loopy_belief_propagation("prior_val", "weight", 2)
>>> result.inspect() 
[#]  id  label  prior_val  was_labeled
======================================
[0]   1    1.0  0.7 0.3              1
[1]   2    1.0  0.7 0.3              1
[2]   3    5.0  0.7 0.3              0
[3]   4    5.0  0.7 0.3              0
[4]   5    5.0  0.7 0.3              1
<BLANKLINE>
[#]  posterior
==============================================
[0]  [0.9883347610773112,0.011665238922688819]
[1]  [0.9743014865548763,0.025698513445123698]
[2]  [0.9396772870897875,0.06032271291021254]
[3]  [0.9319529856190276,0.06804701438097235]
[4]  [0.8506957305238876,0.1493042694761125]
def loopy_belief_propagation(self, prior, edge_weight, max_iterations=10):
    """

    Performs loopy belief propagation on a graph representing a Potts model. This optimizes based off of
    user provided priors.

    Parameters
    ----------

    :param prior: (String) The name of the column of space delimited string of floats representing the prior distribution on a vertex
    :param edge_weight: (String) The name of the column of weight value on edges
    :param max_iterations: The number of iterations to run for

    Examples
    --------

        >>> vertex_schema = [('id', int), ('label', float), ("prior_val", str), ("was_labeled", int)]
        >>> vertex_rows = [ [1, 1, "0.7 0.3", 1], [2, 1, "0.7 0.3", 1], [3, 5, "0.7 0.3", 0], [4, 5, "0.7 0.3", 0], [5, 5, "0.7 0.3", 1] ]

        >>> edge_schema = [('src', int), ('dst', int), ('weight', int)]
        >>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]

        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)
        >>> vertex_frame.inspect()
        [#]  id  label  prior_val  was_labeled
        ======================================
        [0]   1    1.0  0.7 0.3              1
        [1]   2    1.0  0.7 0.3              1
        [2]   3    5.0  0.7 0.3              0
        [3]   4    5.0  0.7 0.3              0
        [4]   5    5.0  0.7 0.3              1

        >>> result = graph.loopy_belief_propagation("prior_val", "weight", 2)
        >>> result.inspect() 
        [#]  id  label  prior_val  was_labeled
        ======================================
        [0]   1    1.0  0.7 0.3              1
        [1]   2    1.0  0.7 0.3              1
        [2]   3    5.0  0.7 0.3              0
        [3]   4    5.0  0.7 0.3              0
        [4]   5    5.0  0.7 0.3              1
        
        [#]  posterior
        ==============================================
        [0]  [0.9883347610773112,0.011665238922688819]
        [1]  [0.9743014865548763,0.025698513445123698]
        [2]  [0.9396772870897875,0.06032271291021254]
        [3]  [0.9319529856190276,0.06804701438097235]
        [4]  [0.8506957305238876,0.1493042694761125]



    """
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.loopyBeliefPropagation(prior, edge_weight, max_iterations))

def page_rank(

self, convergence_tolerance=None, reset_probability=None, max_iterations=None)

Page Rank

Page Rank is a popular statistic that ranks vertices based off of connectivity in the global graph

Exactly 1 of convergence_tolerance and max_iterations must be set (termination criteria)

Parameters:

:convergence_tolerance: (Float) If the difference between successive iterations is less than this, the algorithm terminates. Mutually exclusive with max_iterations :reset_probability: (Float) Value for the reset probabiity in the page rank algorithm :max_iterations: (Int) Maximum number of iterations the page rank should run before terminating. Mutually exclusive with convergence_tolerance

Returns(Frame): Frame containing the vertex id's and their page rank

Examples:
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]

>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)

>>> result = graph.page_rank(max_iterations=20)
>>> result.inspect()
[#]  id  pagerank
=================
[0]   1      0.15
[1]   2    0.1925
[2]   3  0.356125
[3]   4    0.1925
[4]   5  0.313625
def page_rank(self, convergence_tolerance=None, reset_probability=None, max_iterations=None):
    """
    **Page Rank**

    Page Rank is a popular statistic that ranks vertices based off of
    connectivity in the global graph

    Exactly 1 of convergence_tolerance and max_iterations must be set (termination criteria)

    Parameters
    ----------

    :convergence_tolerance: (Float) If the difference between successive iterations is less than this, the algorithm terminates. Mutually exclusive with max_iterations
    :reset_probability: (Float) Value for the reset probabiity in the page rank algorithm
    :max_iterations: (Int) Maximum number of iterations the page rank should run before terminating. Mutually exclusive with convergence_tolerance

    :return: (Frame) Frame containing the vertex id's and their page rank 

    Examples
    --------

        >>> vertex_schema = [('id', int)]
        >>> edge_schema = [('src', int), ('dst', int)]

        >>> vertex_rows = [ [1], [2], [3], [4], [5] ]
        >>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)

        >>> result = graph.page_rank(max_iterations=20)
        >>> result.inspect()
        [#]  id  pagerank
        =================
        [0]   1      0.15
        [1]   2    0.1925
        [2]   3  0.356125
        [3]   4    0.1925
        [4]   5  0.313625

    """
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.pageRank(
        self._tc.jutils.convert.to_scala_option(max_iterations),
        self._tc.jutils.convert.to_scala_option(reset_probability),
        self._tc.jutils.convert.to_scala_option(convergence_tolerance)))

def save(

self, path)

Persists the graph to the given file path

def save(self, path):
    """Persists the graph to the given file path"""
    self._scala.save(path)

def triangle_count(

self)

Counts the number of triangles each vertex is a part of

Parameters:

Returns(Frame): Frame containing the vertex id's and the count of the number of triangle they are in

Examples:
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]

>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)

>>> result = graph.triangle_count()
>>> result.inspect()
[#]  count  id 
==============
[0]      1   1
[1]      1   2
[2]      1   3
[3]      0   4
[4]      0   5
def triangle_count(self):
    """
    Counts the number of triangles each vertex is a part of

    Parameters
    ----------

    :return: (Frame) Frame containing the vertex id's and the count of the number of triangle they are in

    Examples
    --------

        >>> vertex_schema = [('id', int)]
        >>> edge_schema = [('src', int), ('dst', int)]

        >>> vertex_rows = [ [1], [2], [3], [4], [5] ]
        >>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)

        >>> result = graph.triangle_count()
        >>> result.inspect()
        [#]  count  id 
        ==============
        [0]      1   1
        [1]      1   2
        [2]      1   3
        [3]      0   4
        [4]      0   5


    """
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.triangleCount())

def vertex_count(

self)

Returns the number of rows in the vertices frame in this graph

Example:
>>> from graphframes import examples

>>> gf = examples.Graphs(tc.sql_context).friends()

>>> from sparktk.graph.graph import Graph

>>> g = Graph(tc, gf)

>>> g.vertex_count()
6


>>> v = tc.frame.create([("a", "Alice", 34),
...     ("b", "Bob", 36),
...     ("c", "Charlie", 30),
...     ("d", "David", 29),
...     ("e", "Esther", 32),
...     ("f", "Fanny", 36),
...     ], ["id", "name", "age"])

>>> e = tc.frame.create([("a", "b", "friend"),
...     ("b", "c", "follow"),
...     ("c", "b", "follow"),
...     ("f", "c", "follow"),
...     ("e", "f", "follow"),
...     ("e", "d", "friend"),
...     ("d", "a", "friend"),
...     ("a", "e", "friend")
...     ], ["src", "dst", "relationship"])

>>> g2 = tc.graph.create(v, e)

>>> g2.vertex_count()
6
def vertex_count(self):

    """
    Returns the number of rows in the vertices frame in this graph

    Example
    -------

        >>> from graphframes import examples

        >>> gf = examples.Graphs(tc.sql_context).friends()

        >>> from sparktk.graph.graph import Graph

        >>> g = Graph(tc, gf)

        >>> g.vertex_count()
        6


        >>> v = tc.frame.create([("a", "Alice", 34),
        ...     ("b", "Bob", 36),
        ...     ("c", "Charlie", 30),
        ...     ("d", "David", 29),
        ...     ("e", "Esther", 32),
        ...     ("f", "Fanny", 36),
        ...     ], ["id", "name", "age"])

        >>> e = tc.frame.create([("a", "b", "friend"),
        ...     ("b", "c", "follow"),
        ...     ("c", "b", "follow"),
        ...     ("f", "c", "follow"),
        ...     ("e", "f", "follow"),
        ...     ("e", "d", "friend"),
        ...     ("d", "a", "friend"),
        ...     ("a", "e", "friend")
        ...     ], ["src", "dst", "relationship"])

        >>> g2 = tc.graph.create(v, e)

        >>> g2.vertex_count()
        6

    """
    return int(self._scala.vertexCount())

def weighted_degrees(

self, edge_weight, degree_option='undirected', default_weight=0.0)

Degree Calculation

A fundamental quantity in graph analysis is the degree of a vertex: The degree of a vertex is the number of edges adjacent to it.

For a directed edge relation, a vertex has both an out-degree (the number of edges leaving the vertex) and an in-degree (the number of edges entering the vertex).

In the presence of edge weights, vertices can have weighted degrees: The weighted degree of a vertex is the sum of weights of edges adjacent to it. Analogously, the weighted in-degree of a vertex is the sum of the weights of the edges entering it, and the weighted out-degree is the sum of the weights of the edges leaving the vertex. If a property is missing or empty on particular vertex, the default weight is used.

Parameters:
edge_weight(String):Name of the property that contains and edge weight
degree_option(String):Either in, out or undirected. String describing the direction of edges
default_weight(Numeric):Default weight value if a vertex has no value for the edge weight property

Returns(Frame): Frame containing the vertex id's an their weights

Examples:
>>> vertex_schema = [('id', int), ('label', float)]
>>> edge_schema = [('src', int), ('dst', int), ('weight', int)]

>>> vertex_rows = [ [1, 1], [2, 1], [3, 5], [4, 5], [5, 5] ]
>>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)

>>> graph = tc.graph.create(vertex_frame, edge_frame)

>>> result = graph.weighted_degrees(edge_weight="weight", degree_option="out")
>>> result.inspect() 
[#]  id  degree
===============
[0]   1       4
[1]   2       1
[2]   3       0
[3]   4       1
[4]   5       0


>>> result = graph.weighted_degrees(edge_weight="weight", degree_option="in")
>>> result.inspect()
[#]  id  degree
===============
[0]   1       0
[1]   2       2
[2]   3       2
[3]   4       1
[4]   5       1
def weighted_degrees(self, edge_weight, degree_option='undirected', default_weight=0.0):
    """
    **Degree Calculation**

    A fundamental quantity in graph analysis is the degree of a vertex:
    The degree of a vertex is the number of edges adjacent to it.

    For a directed edge relation, a vertex has both an out-degree (the number of
    edges leaving the vertex) and an in-degree (the number of edges entering the
    vertex).

    In the presence of edge weights, vertices can have weighted degrees: The
    weighted degree of a vertex is the sum of weights of edges adjacent to it.
    Analogously, the weighted in-degree of a vertex is the sum of the weights of
    the edges entering it, and the weighted out-degree is the sum
    of the weights of the edges leaving the vertex. If a property is missing or
    empty on particular vertex, the default weight is used.

    Parameters
    ----------

    :param edge_weight: (String) Name of the property that contains and edge weight
    :param degree_option: (String) Either in, out or undirected. String describing the direction of edges
    :param default_weight: (Numeric) Default weight value if a vertex has no value for the edge weight property

    :return: (Frame) Frame containing the vertex id's an their weights

    Examples
    --------

        >>> vertex_schema = [('id', int), ('label', float)]
        >>> edge_schema = [('src', int), ('dst', int), ('weight', int)]

        >>> vertex_rows = [ [1, 1], [2, 1], [3, 5], [4, 5], [5, 5] ]
        >>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
        >>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
        >>> edge_frame = tc.frame.create(edge_rows, edge_schema)

        >>> graph = tc.graph.create(vertex_frame, edge_frame)

        >>> result = graph.weighted_degrees(edge_weight="weight", degree_option="out")
        >>> result.inspect() 
        [#]  id  degree
        ===============
        [0]   1       4
        [1]   2       1
        [2]   3       0
        [3]   4       1
        [4]   5       0


        >>> result = graph.weighted_degrees(edge_weight="weight", degree_option="in")
        >>> result.inspect()
        [#]  id  degree
        ===============
        [0]   1       0
        [1]   2       2
        [2]   3       2
        [3]   4       1
        [4]   5       1

    """
    from sparktk.frame.frame import Frame
    return Frame(self._tc, self._scala.weightedDegree(edge_weight, degree_option, default_weight))