Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 """
0019 An example demonstrating MinHashLSH.
0020 Run with:
0021   bin/spark-submit examples/src/main/python/ml/min_hash_lsh_example.py
0022 """
0023 from __future__ import print_function
0024 
0025 # $example on$
0026 from pyspark.ml.feature import MinHashLSH
0027 from pyspark.ml.linalg import Vectors
0028 from pyspark.sql.functions import col
0029 # $example off$
0030 from pyspark.sql import SparkSession
0031 
0032 if __name__ == "__main__":
0033     spark = SparkSession \
0034         .builder \
0035         .appName("MinHashLSHExample") \
0036         .getOrCreate()
0037 
0038     # $example on$
0039     dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
0040              (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
0041              (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
0042     dfA = spark.createDataFrame(dataA, ["id", "features"])
0043 
0044     dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
0045              (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
0046              (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
0047     dfB = spark.createDataFrame(dataB, ["id", "features"])
0048 
0049     key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
0050 
0051     mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
0052     model = mh.fit(dfA)
0053 
0054     # Feature Transformation
0055     print("The hashed dataset where hashed values are stored in the column 'hashes':")
0056     model.transform(dfA).show()
0057 
0058     # Compute the locality sensitive hashes for the input rows, then perform approximate
0059     # similarity join.
0060     # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
0061     # `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
0062     print("Approximately joining dfA and dfB on distance smaller than 0.6:")
0063     model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
0064         .select(col("datasetA.id").alias("idA"),
0065                 col("datasetB.id").alias("idB"),
0066                 col("JaccardDistance")).show()
0067 
0068     # Compute the locality sensitive hashes for the input rows, then perform approximate nearest
0069     # neighbor search.
0070     # We could avoid computing hashes by passing in the already-transformed dataset, e.g.
0071     # `model.approxNearestNeighbors(transformedA, key, 2)`
0072     # It may return less than 2 rows when not enough approximate near-neighbor candidates are
0073     # found.
0074     print("Approximately searching dfA for 2 nearest neighbors of the key:")
0075     model.approxNearestNeighbors(dfA, key, 2).show()
0076 
0077     # $example off$
0078 
0079     spark.stop()