0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 """
0019 An example demonstrating BucketedRandomProjectionLSH.
0020 Run with:
0021 bin/spark-submit examples/src/main/python/ml/bucketed_random_projection_lsh_example.py
0022 """
0023 from __future__ import print_function
0024
0025
0026 from pyspark.ml.feature import BucketedRandomProjectionLSH
0027 from pyspark.ml.linalg import Vectors
0028 from pyspark.sql.functions import col
0029
0030 from pyspark.sql import SparkSession
0031
0032 if __name__ == "__main__":
0033 spark = SparkSession \
0034 .builder \
0035 .appName("BucketedRandomProjectionLSHExample") \
0036 .getOrCreate()
0037
0038
0039 dataA = [(0, Vectors.dense([1.0, 1.0]),),
0040 (1, Vectors.dense([1.0, -1.0]),),
0041 (2, Vectors.dense([-1.0, -1.0]),),
0042 (3, Vectors.dense([-1.0, 1.0]),)]
0043 dfA = spark.createDataFrame(dataA, ["id", "features"])
0044
0045 dataB = [(4, Vectors.dense([1.0, 0.0]),),
0046 (5, Vectors.dense([-1.0, 0.0]),),
0047 (6, Vectors.dense([0.0, 1.0]),),
0048 (7, Vectors.dense([0.0, -1.0]),)]
0049 dfB = spark.createDataFrame(dataB, ["id", "features"])
0050
0051 key = Vectors.dense([1.0, 0.0])
0052
0053 brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
0054 numHashTables=3)
0055 model = brp.fit(dfA)
0056
0057
0058 print("The hashed dataset where hashed values are stored in the column 'hashes':")
0059 model.transform(dfA).show()
0060
0061
0062
0063
0064
0065 print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
0066 model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
0067 .select(col("datasetA.id").alias("idA"),
0068 col("datasetB.id").alias("idB"),
0069 col("EuclideanDistance")).show()
0070
0071
0072
0073
0074
0075 print("Approximately searching dfA for 2 nearest neighbors of the key:")
0076 model.approxNearestNeighbors(dfA, key, 2).show()
0077
0078
0079 spark.stop()