0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019
0020 from pyspark import since, SparkContext
0021 from pyspark.sql.column import _to_seq, _to_java_column
0022
0023 __all__ = ["Window", "WindowSpec"]
0024
0025
0026 def _to_java_cols(cols):
0027 sc = SparkContext._active_spark_context
0028 if len(cols) == 1 and isinstance(cols[0], list):
0029 cols = cols[0]
0030 return _to_seq(sc, cols, _to_java_column)
0031
0032
0033 class Window(object):
0034 """
0035 Utility functions for defining window in DataFrames.
0036
0037 For example:
0038
0039 >>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
0040 >>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
0041
0042 >>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
0043 >>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
0044
0045 .. note:: When ordering is not defined, an unbounded window frame (rowFrame,
0046 unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined,
0047 a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.
0048
0049 .. versionadded:: 1.4
0050 """
0051
0052 _JAVA_MIN_LONG = -(1 << 63)
0053 _JAVA_MAX_LONG = (1 << 63) - 1
0054 _PRECEDING_THRESHOLD = max(-sys.maxsize, _JAVA_MIN_LONG)
0055 _FOLLOWING_THRESHOLD = min(sys.maxsize, _JAVA_MAX_LONG)
0056
0057 unboundedPreceding = _JAVA_MIN_LONG
0058
0059 unboundedFollowing = _JAVA_MAX_LONG
0060
0061 currentRow = 0
0062
0063 @staticmethod
0064 @since(1.4)
0065 def partitionBy(*cols):
0066 """
0067 Creates a :class:`WindowSpec` with the partitioning defined.
0068 """
0069 sc = SparkContext._active_spark_context
0070 jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
0071 return WindowSpec(jspec)
0072
0073 @staticmethod
0074 @since(1.4)
0075 def orderBy(*cols):
0076 """
0077 Creates a :class:`WindowSpec` with the ordering defined.
0078 """
0079 sc = SparkContext._active_spark_context
0080 jspec = sc._jvm.org.apache.spark.sql.expressions.Window.orderBy(_to_java_cols(cols))
0081 return WindowSpec(jspec)
0082
0083 @staticmethod
0084 @since(2.1)
0085 def rowsBetween(start, end):
0086 """
0087 Creates a :class:`WindowSpec` with the frame boundaries defined,
0088 from `start` (inclusive) to `end` (inclusive).
0089
0090 Both `start` and `end` are relative positions from the current row.
0091 For example, "0" means "current row", while "-1" means the row before
0092 the current row, and "5" means the fifth row after the current row.
0093
0094 We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
0095 and ``Window.currentRow`` to specify special boundary values, rather than using integral
0096 values directly.
0097
0098 A row based boundary is based on the position of the row within the partition.
0099 An offset indicates the number of rows above or below the current row, the frame for the
0100 current row starts or ends. For instance, given a row based sliding frame with a lower bound
0101 offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from
0102 index 4 to index 7.
0103
0104 >>> from pyspark.sql import Window
0105 >>> from pyspark.sql import functions as func
0106 >>> from pyspark.sql import SQLContext
0107 >>> sc = SparkContext.getOrCreate()
0108 >>> sqlContext = SQLContext(sc)
0109 >>> tup = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
0110 >>> df = sqlContext.createDataFrame(tup, ["id", "category"])
0111 >>> window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
0112 >>> df.withColumn("sum", func.sum("id").over(window)).sort("id", "category", "sum").show()
0113 +---+--------+---+
0114 | id|category|sum|
0115 +---+--------+---+
0116 | 1| a| 2|
0117 | 1| a| 3|
0118 | 1| b| 3|
0119 | 2| a| 2|
0120 | 2| b| 5|
0121 | 3| b| 3|
0122 +---+--------+---+
0123
0124 :param start: boundary start, inclusive.
0125 The frame is unbounded if this is ``Window.unboundedPreceding``, or
0126 any value less than or equal to -9223372036854775808.
0127 :param end: boundary end, inclusive.
0128 The frame is unbounded if this is ``Window.unboundedFollowing``, or
0129 any value greater than or equal to 9223372036854775807.
0130 """
0131 if start <= Window._PRECEDING_THRESHOLD:
0132 start = Window.unboundedPreceding
0133 if end >= Window._FOLLOWING_THRESHOLD:
0134 end = Window.unboundedFollowing
0135 sc = SparkContext._active_spark_context
0136 jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rowsBetween(start, end)
0137 return WindowSpec(jspec)
0138
0139 @staticmethod
0140 @since(2.1)
0141 def rangeBetween(start, end):
0142 """
0143 Creates a :class:`WindowSpec` with the frame boundaries defined,
0144 from `start` (inclusive) to `end` (inclusive).
0145
0146 Both `start` and `end` are relative from the current row. For example,
0147 "0" means "current row", while "-1" means one off before the current row,
0148 and "5" means the five off after the current row.
0149
0150 We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
0151 and ``Window.currentRow`` to specify special boundary values, rather than using integral
0152 values directly.
0153
0154 A range-based boundary is based on the actual value of the ORDER BY
0155 expression(s). An offset is used to alter the value of the ORDER BY expression, for
0156 instance if the current ORDER BY expression has a value of 10 and the lower bound offset
0157 is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
0158 number of constraints on the ORDER BY expressions: there can be only one expression and this
0159 expression must have a numerical data type. An exception can be made when the offset is
0160 unbounded, because no value modification is needed, in this case multiple and non-numeric
0161 ORDER BY expression are allowed.
0162
0163 >>> from pyspark.sql import Window
0164 >>> from pyspark.sql import functions as func
0165 >>> from pyspark.sql import SQLContext
0166 >>> sc = SparkContext.getOrCreate()
0167 >>> sqlContext = SQLContext(sc)
0168 >>> tup = [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")]
0169 >>> df = sqlContext.createDataFrame(tup, ["id", "category"])
0170 >>> window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
0171 >>> df.withColumn("sum", func.sum("id").over(window)).sort("id", "category").show()
0172 +---+--------+---+
0173 | id|category|sum|
0174 +---+--------+---+
0175 | 1| a| 4|
0176 | 1| a| 4|
0177 | 1| b| 3|
0178 | 2| a| 2|
0179 | 2| b| 5|
0180 | 3| b| 3|
0181 +---+--------+---+
0182
0183 :param start: boundary start, inclusive.
0184 The frame is unbounded if this is ``Window.unboundedPreceding``, or
0185 any value less than or equal to max(-sys.maxsize, -9223372036854775808).
0186 :param end: boundary end, inclusive.
0187 The frame is unbounded if this is ``Window.unboundedFollowing``, or
0188 any value greater than or equal to min(sys.maxsize, 9223372036854775807).
0189 """
0190 if start <= Window._PRECEDING_THRESHOLD:
0191 start = Window.unboundedPreceding
0192 if end >= Window._FOLLOWING_THRESHOLD:
0193 end = Window.unboundedFollowing
0194 sc = SparkContext._active_spark_context
0195 jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
0196 return WindowSpec(jspec)
0197
0198
0199 class WindowSpec(object):
0200 """
0201 A window specification that defines the partitioning, ordering,
0202 and frame boundaries.
0203
0204 Use the static methods in :class:`Window` to create a :class:`WindowSpec`.
0205
0206 .. versionadded:: 1.4
0207 """
0208
0209 def __init__(self, jspec):
0210 self._jspec = jspec
0211
0212 @since(1.4)
0213 def partitionBy(self, *cols):
0214 """
0215 Defines the partitioning columns in a :class:`WindowSpec`.
0216
0217 :param cols: names of columns or expressions
0218 """
0219 return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols)))
0220
0221 @since(1.4)
0222 def orderBy(self, *cols):
0223 """
0224 Defines the ordering columns in a :class:`WindowSpec`.
0225
0226 :param cols: names of columns or expressions
0227 """
0228 return WindowSpec(self._jspec.orderBy(_to_java_cols(cols)))
0229
0230 @since(1.4)
0231 def rowsBetween(self, start, end):
0232 """
0233 Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
0234
0235 Both `start` and `end` are relative positions from the current row.
0236 For example, "0" means "current row", while "-1" means the row before
0237 the current row, and "5" means the fifth row after the current row.
0238
0239 We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
0240 and ``Window.currentRow`` to specify special boundary values, rather than using integral
0241 values directly.
0242
0243 :param start: boundary start, inclusive.
0244 The frame is unbounded if this is ``Window.unboundedPreceding``, or
0245 any value less than or equal to max(-sys.maxsize, -9223372036854775808).
0246 :param end: boundary end, inclusive.
0247 The frame is unbounded if this is ``Window.unboundedFollowing``, or
0248 any value greater than or equal to min(sys.maxsize, 9223372036854775807).
0249 """
0250 if start <= Window._PRECEDING_THRESHOLD:
0251 start = Window.unboundedPreceding
0252 if end >= Window._FOLLOWING_THRESHOLD:
0253 end = Window.unboundedFollowing
0254 return WindowSpec(self._jspec.rowsBetween(start, end))
0255
0256 @since(1.4)
0257 def rangeBetween(self, start, end):
0258 """
0259 Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
0260
0261 Both `start` and `end` are relative from the current row. For example,
0262 "0" means "current row", while "-1" means one off before the current row,
0263 and "5" means the five off after the current row.
0264
0265 We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
0266 and ``Window.currentRow`` to specify special boundary values, rather than using integral
0267 values directly.
0268
0269 :param start: boundary start, inclusive.
0270 The frame is unbounded if this is ``Window.unboundedPreceding``, or
0271 any value less than or equal to max(-sys.maxsize, -9223372036854775808).
0272 :param end: boundary end, inclusive.
0273 The frame is unbounded if this is ``Window.unboundedFollowing``, or
0274 any value greater than or equal to min(sys.maxsize, 9223372036854775807).
0275 """
0276 if start <= Window._PRECEDING_THRESHOLD:
0277 start = Window.unboundedPreceding
0278 if end >= Window._FOLLOWING_THRESHOLD:
0279 end = Window.unboundedFollowing
0280 return WindowSpec(self._jspec.rangeBetween(start, end))
0281
0282
0283 def _test():
0284 import doctest
0285 import pyspark.sql.window
0286 SparkContext('local[4]', 'PythonTest')
0287 globs = pyspark.sql.window.__dict__.copy()
0288 (failure_count, test_count) = doctest.testmod(
0289 pyspark.sql.window, globs=globs,
0290 optionflags=doctest.NORMALIZE_WHITESPACE)
0291 if failure_count:
0292 sys.exit(-1)
0293
0294
0295 if __name__ == "__main__":
0296 _test()