Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import sys
0019 
0020 from pyspark import since, SparkContext
0021 from pyspark.sql.column import _to_seq, _to_java_column
0022 
0023 __all__ = ["Window", "WindowSpec"]
0024 
0025 
0026 def _to_java_cols(cols):
0027     sc = SparkContext._active_spark_context
0028     if len(cols) == 1 and isinstance(cols[0], list):
0029         cols = cols[0]
0030     return _to_seq(sc, cols, _to_java_column)
0031 
0032 
0033 class Window(object):
0034     """
0035     Utility functions for defining window in DataFrames.
0036 
0037     For example:
0038 
0039     >>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
0040     >>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
0041 
0042     >>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
0043     >>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
0044 
0045     .. note:: When ordering is not defined, an unbounded window frame (rowFrame,
0046          unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined,
0047          a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.
0048 
0049     .. versionadded:: 1.4
0050     """
0051 
0052     _JAVA_MIN_LONG = -(1 << 63)  # -9223372036854775808
0053     _JAVA_MAX_LONG = (1 << 63) - 1  # 9223372036854775807
0054     _PRECEDING_THRESHOLD = max(-sys.maxsize, _JAVA_MIN_LONG)
0055     _FOLLOWING_THRESHOLD = min(sys.maxsize, _JAVA_MAX_LONG)
0056 
0057     unboundedPreceding = _JAVA_MIN_LONG
0058 
0059     unboundedFollowing = _JAVA_MAX_LONG
0060 
0061     currentRow = 0
0062 
0063     @staticmethod
0064     @since(1.4)
0065     def partitionBy(*cols):
0066         """
0067         Creates a :class:`WindowSpec` with the partitioning defined.
0068         """
0069         sc = SparkContext._active_spark_context
0070         jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
0071         return WindowSpec(jspec)
0072 
0073     @staticmethod
0074     @since(1.4)
0075     def orderBy(*cols):
0076         """
0077         Creates a :class:`WindowSpec` with the ordering defined.
0078         """
0079         sc = SparkContext._active_spark_context
0080         jspec = sc._jvm.org.apache.spark.sql.expressions.Window.orderBy(_to_java_cols(cols))
0081         return WindowSpec(jspec)
0082 
0083     @staticmethod
0084     @since(2.1)
0085     def rowsBetween(start, end):
0086         """
0087         Creates a :class:`WindowSpec` with the frame boundaries defined,
0088         from `start` (inclusive) to `end` (inclusive).
0089 
0090         Both `start` and `end` are relative positions from the current row.
0091         For example, "0" means "current row", while "-1" means the row before
0092         the current row, and "5" means the fifth row after the current row.
0093 
0094         We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
0095         and ``Window.currentRow`` to specify special boundary values, rather than using integral
0096         values directly.
0097 
0098         A row based boundary is based on the position of the row within the partition.
0099         An offset indicates the number of rows above or below the current row, the frame for the
0100         current row starts or ends. For instance, given a row based sliding frame with a lower bound
0101         offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from
0102         index 4 to index 7.
0103 
0104         >>> from pyspark.sql import Window
0105         >>> from pyspark.sql import functions as func
0106         >>> from pyspark.sql import SQLContext
0107         >>> sc = SparkContext.getOrCreate()
0108         >>> sqlContext = SQLContext(sc)
0109         >>> tup = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
0110         >>> df = sqlContext.createDataFrame(tup, ["id", "category"])
0111         >>> window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
0112         >>> df.withColumn("sum", func.sum("id").over(window)).sort("id", "category", "sum").show()
0113         +---+--------+---+
0114         | id|category|sum|
0115         +---+--------+---+
0116         |  1|       a|  2|
0117         |  1|       a|  3|
0118         |  1|       b|  3|
0119         |  2|       a|  2|
0120         |  2|       b|  5|
0121         |  3|       b|  3|
0122         +---+--------+---+
0123 
0124         :param start: boundary start, inclusive.
0125                       The frame is unbounded if this is ``Window.unboundedPreceding``, or
0126                       any value less than or equal to -9223372036854775808.
0127         :param end: boundary end, inclusive.
0128                     The frame is unbounded if this is ``Window.unboundedFollowing``, or
0129                     any value greater than or equal to 9223372036854775807.
0130         """
0131         if start <= Window._PRECEDING_THRESHOLD:
0132             start = Window.unboundedPreceding
0133         if end >= Window._FOLLOWING_THRESHOLD:
0134             end = Window.unboundedFollowing
0135         sc = SparkContext._active_spark_context
0136         jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
0137         return WindowSpec(jspec)
0138 
0139     @staticmethod
0140     @since(2.1)
0141     def rangeBetween(start, end):
0142         """
0143         Creates a :class:`WindowSpec` with the frame boundaries defined,
0144         from `start` (inclusive) to `end` (inclusive).
0145 
0146         Both `start` and `end` are relative from the current row. For example,
0147         "0" means "current row", while "-1" means one off before the current row,
0148         and "5" means the five off after the current row.
0149 
0150         We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
0151         and ``Window.currentRow`` to specify special boundary values, rather than using integral
0152         values directly.
0153 
0154         A range-based boundary is based on the actual value of the ORDER BY
0155         expression(s). An offset is used to alter the value of the ORDER BY expression, for
0156         instance if the current ORDER BY expression has a value of 10 and the lower bound offset
0157         is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
0158         number of constraints on the ORDER BY expressions: there can be only one expression and this
0159         expression must have a numerical data type. An exception can be made when the offset is
0160         unbounded, because no value modification is needed, in this case multiple and non-numeric
0161         ORDER BY expression are allowed.
0162 
0163         >>> from pyspark.sql import Window
0164         >>> from pyspark.sql import functions as func
0165         >>> from pyspark.sql import SQLContext
0166         >>> sc = SparkContext.getOrCreate()
0167         >>> sqlContext = SQLContext(sc)
0168         >>> tup = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
0169         >>> df = sqlContext.createDataFrame(tup, ["id", "category"])
0170         >>> window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
0171         >>> df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show()
0172         +---+--------+---+
0173         | id|category|sum|
0174         +---+--------+---+
0175         |  1|       a|  4|
0176         |  1|       a|  4|
0177         |  1|       b|  3|
0178         |  2|       a|  2|
0179         |  2|       b|  5|
0180         |  3|       b|  3|
0181         +---+--------+---+
0182 
0183         :param start: boundary start, inclusive.
0184                       The frame is unbounded if this is ``Window.unboundedPreceding``, or
0185                       any value less than or equal to max(-sys.maxsize, -9223372036854775808).
0186         :param end: boundary end, inclusive.
0187                     The frame is unbounded if this is ``Window.unboundedFollowing``, or
0188                     any value greater than or equal to min(sys.maxsize, 9223372036854775807).
0189         """
0190         if start <= Window._PRECEDING_THRESHOLD:
0191             start = Window.unboundedPreceding
0192         if end >= Window._FOLLOWING_THRESHOLD:
0193             end = Window.unboundedFollowing
0194         sc = SparkContext._active_spark_context
0195         jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
0196         return WindowSpec(jspec)
0197 
0198 
0199 class WindowSpec(object):
0200     """
0201     A window specification that defines the partitioning, ordering,
0202     and frame boundaries.
0203 
0204     Use the static methods in :class:`Window` to create a :class:`WindowSpec`.
0205 
0206     .. versionadded:: 1.4
0207     """
0208 
0209     def __init__(self, jspec):
0210         self._jspec = jspec
0211 
0212     @since(1.4)
0213     def partitionBy(self, *cols):
0214         """
0215         Defines the partitioning columns in a :class:`WindowSpec`.
0216 
0217         :param cols: names of columns or expressions
0218         """
0219         return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols)))
0220 
0221     @since(1.4)
0222     def orderBy(self, *cols):
0223         """
0224         Defines the ordering columns in a :class:`WindowSpec`.
0225 
0226         :param cols: names of columns or expressions
0227         """
0228         return WindowSpec(self._jspec.orderBy(_to_java_cols(cols)))
0229 
0230     @since(1.4)
0231     def rowsBetween(self, start, end):
0232         """
0233         Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
0234 
0235         Both `start` and `end` are relative positions from the current row.
0236         For example, "0" means "current row", while "-1" means the row before
0237         the current row, and "5" means the fifth row after the current row.
0238 
0239         We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
0240         and ``Window.currentRow`` to specify special boundary values, rather than using integral
0241         values directly.
0242 
0243         :param start: boundary start, inclusive.
0244                       The frame is unbounded if this is ``Window.unboundedPreceding``, or
0245                       any value less than or equal to max(-sys.maxsize, -9223372036854775808).
0246         :param end: boundary end, inclusive.
0247                     The frame is unbounded if this is ``Window.unboundedFollowing``, or
0248                     any value greater than or equal to min(sys.maxsize, 9223372036854775807).
0249         """
0250         if start <= Window._PRECEDING_THRESHOLD:
0251             start = Window.unboundedPreceding
0252         if end >= Window._FOLLOWING_THRESHOLD:
0253             end = Window.unboundedFollowing
0254         return WindowSpec(self._jspec.rowsBetween(start, end))
0255 
0256     @since(1.4)
0257     def rangeBetween(self, start, end):
0258         """
0259         Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
0260 
0261         Both `start` and `end` are relative from the current row. For example,
0262         "0" means "current row", while "-1" means one off before the current row,
0263         and "5" means the five off after the current row.
0264 
0265         We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
0266         and ``Window.currentRow`` to specify special boundary values, rather than using integral
0267         values directly.
0268 
0269         :param start: boundary start, inclusive.
0270                       The frame is unbounded if this is ``Window.unboundedPreceding``, or
0271                       any value less than or equal to max(-sys.maxsize, -9223372036854775808).
0272         :param end: boundary end, inclusive.
0273                     The frame is unbounded if this is ``Window.unboundedFollowing``, or
0274                     any value greater than or equal to min(sys.maxsize, 9223372036854775807).
0275         """
0276         if start <= Window._PRECEDING_THRESHOLD:
0277             start = Window.unboundedPreceding
0278         if end >= Window._FOLLOWING_THRESHOLD:
0279             end = Window.unboundedFollowing
0280         return WindowSpec(self._jspec.rangeBetween(start, end))
0281 
0282 
0283 def _test():
0284     import doctest
0285     import pyspark.sql.window
0286     SparkContext('local[4]', 'PythonTest')
0287     globs = pyspark.sql.window.__dict__.copy()
0288     (failure_count, test_count) = doctest.testmod(
0289         pyspark.sql.window, globs=globs,
0290         optionflags=doctest.NORMALIZE_WHITESPACE)
0291     if failure_count:
0292         sys.exit(-1)
0293 
0294 
0295 if __name__ == "__main__":
0296     _test()