Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 An example demonstrating BucketedRandomProjectionLSH.
0020 Run with:
0021   bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
0022 """
0023 from __future__ import print_function
0024 
0025 # $example on$
0026 from pyspark.ml.feature import BucketedRandomProjectionLSH
0027 from pyspark.ml.linalg import Vectors
0028 from pyspark.sql.functions import col
0029 # $example off$
0030 from pyspark.sql import SparkSession
0031 
0032 if __name__ == "__main__":
0033     spark = SparkSession \
0034         .builder \
0035         .appName("BucketedRandomProjectionLSHExample") \
0036         .getOrCreate()
0037 
0038     # $example on$
0039     dataA = [(0, Vectors.dense([1.0, 1.0]),),
0040              (1, Vectors.dense([1.0, -1.0]),),
0041              (2, Vectors.dense([-1.0, -1.0]),),
0042              (3, Vectors.dense([-1.0, 1.0]),)]
0043     dfA = spark.createDataFrame(dataA, ["id", "features"])
0044 
0045     dataB = [(4, Vectors.dense([1.0, 0.0]),),
0046              (5, Vectors.dense([-1.0, 0.0]),),
0047              (6, Vectors.dense([0.0, 1.0]),),
0048              (7, Vectors.dense([0.0, -1.0]),)]
0049     dfB = spark.createDataFrame(dataB, ["id", "features"])
0050 
0051     key = Vectors.dense([1.0, 0.0])
0052 
0053     brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
0054                                       numHashTables=3)
0055     model = brp.fit(dfA)
0056 
0057     # Feature Transformation
0058     print("The hashed dataset where hashed values are stored in the column 'hashes':")
0059     model.transform(dfA).show()
0060 
0061     # Compute the locality sensitive hashes for the input rows, then perform approximate
0062     # similarity join.
0063     # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
0064     # `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
0065     print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
0066     model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
0067         .select(col("datasetA.id").alias("idA"),
0068                 col("datasetB.id").alias("idB"),
0069                 col("EuclideanDistance")).show()
0070 
0071     # Compute the locality sensitive hashes for the input rows, then perform approximate nearest
0072     # neighbor search.
0073     # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
0074     # `model.approxNearestNeighbors(transformedA, key, 2)`
0075     print("Approximately searching dfA for 2 nearest neighbors of the key:")
0076     model.approxNearestNeighbors(dfA, key, 2).show()
0077     # $example off$
0078 
0079     spark.stop()