Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 import sys
0019 import json
0020 import warnings
0021 
0022 if sys.version >= '3':
0023     basestring = str
0024     long = int
0025 
0026 from py4j.java_gateway import is_instance_of
0027 
0028 from pyspark import copy_func, since
0029 from pyspark.context import SparkContext
0030 from pyspark.rdd import ignore_unicode_prefix
0031 from pyspark.sql.types import *
0032 
0033 __all__ = ["Column"]
0034 
0035 
0036 def _create_column_from_literal(literal):
0037     sc = SparkContext._active_spark_context
0038     return sc._jvm.functions.lit(literal)
0039 
0040 
0041 def _create_column_from_name(name):
0042     sc = SparkContext._active_spark_context
0043     return sc._jvm.functions.col(name)
0044 
0045 
0046 def _to_java_column(col):
0047     if isinstance(col, Column):
0048         jcol = col._jc
0049     elif isinstance(col, basestring):
0050         jcol = _create_column_from_name(col)
0051     else:
0052         raise TypeError(
0053             "Invalid argument, not a string or column: "
0054             "{0} of type {1}. "
0055             "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
0056             "function.".format(col, type(col)))
0057     return jcol
0058 
0059 
0060 def _to_seq(sc, cols, converter=None):
0061     """
0062     Convert a list of Column (or names) into a JVM Seq of Column.
0063 
0064     An optional `converter` could be used to convert items in `cols`
0065     into JVM Column objects.
0066     """
0067     if converter:
0068         cols = [converter(c) for c in cols]
0069     return sc._jvm.PythonUtils.toSeq(cols)
0070 
0071 
0072 def _to_list(sc, cols, converter=None):
0073     """
0074     Convert a list of Column (or names) into a JVM (Scala) List of Column.
0075 
0076     An optional `converter` could be used to convert items in `cols`
0077     into JVM Column objects.
0078     """
0079     if converter:
0080         cols = [converter(c) for c in cols]
0081     return sc._jvm.PythonUtils.toList(cols)
0082 
0083 
0084 def _unary_op(name, doc="unary operator"):
0085     """ Create a method for given unary operator """
0086     def _(self):
0087         jc = getattr(self._jc, name)()
0088         return Column(jc)
0089     _.__doc__ = doc
0090     return _
0091 
0092 
0093 def _func_op(name, doc=''):
0094     def _(self):
0095         sc = SparkContext._active_spark_context
0096         jc = getattr(sc._jvm.functions, name)(self._jc)
0097         return Column(jc)
0098     _.__doc__ = doc
0099     return _
0100 
0101 
0102 def _bin_func_op(name, reverse=False, doc="binary function"):
0103     def _(self, other):
0104         sc = SparkContext._active_spark_context
0105         fn = getattr(sc._jvm.functions, name)
0106         jc = other._jc if isinstance(other, Column) else _create_column_from_literal(other)
0107         njc = fn(self._jc, jc) if not reverse else fn(jc, self._jc)
0108         return Column(njc)
0109     _.__doc__ = doc
0110     return _
0111 
0112 
0113 def _bin_op(name, doc="binary operator"):
0114     """ Create a method for given binary operator
0115     """
0116     def _(self, other):
0117         jc = other._jc if isinstance(other, Column) else other
0118         njc = getattr(self._jc, name)(jc)
0119         return Column(njc)
0120     _.__doc__ = doc
0121     return _
0122 
0123 
0124 def _reverse_op(name, doc="binary operator"):
0125     """ Create a method for binary operator (this object is on right side)
0126     """
0127     def _(self, other):
0128         jother = _create_column_from_literal(other)
0129         jc = getattr(jother, name)(self._jc)
0130         return Column(jc)
0131     _.__doc__ = doc
0132     return _
0133 
0134 
0135 class Column(object):
0136 
0137     """
0138     A column in a DataFrame.
0139 
0140     :class:`Column` instances can be created by::
0141 
0142         # 1. Select a column out of a DataFrame
0143 
0144         df.colName
0145         df["colName"]
0146 
0147         # 2. Create from an expression
0148         df.colName + 1
0149         1 / df.colName
0150 
0151     .. versionadded:: 1.3
0152     """
0153 
0154     def __init__(self, jc):
0155         self._jc = jc
0156 
0157     # arithmetic operators
0158     __neg__ = _func_op("negate")
0159     __add__ = _bin_op("plus")
0160     __sub__ = _bin_op("minus")
0161     __mul__ = _bin_op("multiply")
0162     __div__ = _bin_op("divide")
0163     __truediv__ = _bin_op("divide")
0164     __mod__ = _bin_op("mod")
0165     __radd__ = _bin_op("plus")
0166     __rsub__ = _reverse_op("minus")
0167     __rmul__ = _bin_op("multiply")
0168     __rdiv__ = _reverse_op("divide")
0169     __rtruediv__ = _reverse_op("divide")
0170     __rmod__ = _reverse_op("mod")
0171     __pow__ = _bin_func_op("pow")
0172     __rpow__ = _bin_func_op("pow", reverse=True)
0173 
0174     # logistic operators
0175     __eq__ = _bin_op("equalTo")
0176     __ne__ = _bin_op("notEqual")
0177     __lt__ = _bin_op("lt")
0178     __le__ = _bin_op("leq")
0179     __ge__ = _bin_op("geq")
0180     __gt__ = _bin_op("gt")
0181 
0182     _eqNullSafe_doc = """
0183     Equality test that is safe for null values.
0184 
0185     :param other: a value or :class:`Column`
0186 
0187     >>> from pyspark.sql import Row
0188     >>> df1 = spark.createDataFrame([
0189     ...     Row(id=1, value='foo'),
0190     ...     Row(id=2, value=None)
0191     ... ])
0192     >>> df1.select(
0193     ...     df1['value'] == 'foo',
0194     ...     df1['value'].eqNullSafe('foo'),
0195     ...     df1['value'].eqNullSafe(None)
0196     ... ).show()
0197     +-------------+---------------+----------------+
0198     |(value = foo)|(value <=> foo)|(value <=> NULL)|
0199     +-------------+---------------+----------------+
0200     |         true|           true|           false|
0201     |         null|          false|            true|
0202     +-------------+---------------+----------------+
0203     >>> df2 = spark.createDataFrame([
0204     ...     Row(value = 'bar'),
0205     ...     Row(value = None)
0206     ... ])
0207     >>> df1.join(df2, df1["value"] == df2["value"]).count()
0208     0
0209     >>> df1.join(df2, df1["value"].eqNullSafe(df2["value"])).count()
0210     1
0211     >>> df2 = spark.createDataFrame([
0212     ...     Row(id=1, value=float('NaN')),
0213     ...     Row(id=2, value=42.0),
0214     ...     Row(id=3, value=None)
0215     ... ])
0216     >>> df2.select(
0217     ...     df2['value'].eqNullSafe(None),
0218     ...     df2['value'].eqNullSafe(float('NaN')),
0219     ...     df2['value'].eqNullSafe(42.0)
0220     ... ).show()
0221     +----------------+---------------+----------------+
0222     |(value <=> NULL)|(value <=> NaN)|(value <=> 42.0)|
0223     +----------------+---------------+----------------+
0224     |           false|           true|           false|
0225     |           false|          false|            true|
0226     |            true|          false|           false|
0227     +----------------+---------------+----------------+
0228 
0229     .. note:: Unlike Pandas, PySpark doesn't consider NaN values to be NULL.
0230        See the `NaN Semantics`_ for details.
0231     .. _NaN Semantics:
0232        https://spark.apache.org/docs/latest/sql-programming-guide.html#nan-semantics
0233     .. versionadded:: 2.3.0
0234     """
0235     eqNullSafe = _bin_op("eqNullSafe", _eqNullSafe_doc)
0236 
0237     # `and`, `or`, `not` cannot be overloaded in Python,
0238     # so use bitwise operators as boolean operators
0239     __and__ = _bin_op('and')
0240     __or__ = _bin_op('or')
0241     __invert__ = _func_op('not')
0242     __rand__ = _bin_op("and")
0243     __ror__ = _bin_op("or")
0244 
0245     # container operators
0246     def __contains__(self, item):
0247         raise ValueError("Cannot apply 'in' operator against a column: please use 'contains' "
0248                          "in a string column or 'array_contains' function for an array column.")
0249 
0250     # bitwise operators
0251     _bitwiseOR_doc = """
0252     Compute bitwise OR of this expression with another expression.
0253 
0254     :param other: a value or :class:`Column` to calculate bitwise or(|) against
0255                   this :class:`Column`.
0256 
0257     >>> from pyspark.sql import Row
0258     >>> df = spark.createDataFrame([Row(a=170, b=75)])
0259     >>> df.select(df.a.bitwiseOR(df.b)).collect()
0260     [Row((a | b)=235)]
0261     """
0262     _bitwiseAND_doc = """
0263     Compute bitwise AND of this expression with another expression.
0264 
0265     :param other: a value or :class:`Column` to calculate bitwise and(&) against
0266                   this :class:`Column`.
0267 
0268     >>> from pyspark.sql import Row
0269     >>> df = spark.createDataFrame([Row(a=170, b=75)])
0270     >>> df.select(df.a.bitwiseAND(df.b)).collect()
0271     [Row((a & b)=10)]
0272     """
0273     _bitwiseXOR_doc = """
0274     Compute bitwise XOR of this expression with another expression.
0275 
0276     :param other: a value or :class:`Column` to calculate bitwise xor(^) against
0277                   this :class:`Column`.
0278 
0279     >>> from pyspark.sql import Row
0280     >>> df = spark.createDataFrame([Row(a=170, b=75)])
0281     >>> df.select(df.a.bitwiseXOR(df.b)).collect()
0282     [Row((a ^ b)=225)]
0283     """
0284 
0285     bitwiseOR = _bin_op("bitwiseOR", _bitwiseOR_doc)
0286     bitwiseAND = _bin_op("bitwiseAND", _bitwiseAND_doc)
0287     bitwiseXOR = _bin_op("bitwiseXOR", _bitwiseXOR_doc)
0288 
0289     @since(1.3)
0290     def getItem(self, key):
0291         """
0292         An expression that gets an item at position ``ordinal`` out of a list,
0293         or gets an item by key out of a dict.
0294 
0295         >>> df = spark.createDataFrame([([1, 2], {"key": "value"})], ["l", "d"])
0296         >>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
0297         +----+------+
0298         |l[0]|d[key]|
0299         +----+------+
0300         |   1| value|
0301         +----+------+
0302         """
0303         if isinstance(key, Column):
0304             warnings.warn(
0305                 "A column as 'key' in getItem is deprecated as of Spark 3.0, and will not "
0306                 "be supported in the future release. Use `column[key]` or `column.key` syntax "
0307                 "instead.",
0308                 DeprecationWarning)
0309         return self[key]
0310 
0311     @since(1.3)
0312     def getField(self, name):
0313         """
0314         An expression that gets a field by name in a StructField.
0315 
0316         >>> from pyspark.sql import Row
0317         >>> df = spark.createDataFrame([Row(r=Row(a=1, b="b"))])
0318         >>> df.select(df.r.getField("b")).show()
0319         +---+
0320         |r.b|
0321         +---+
0322         |  b|
0323         +---+
0324         >>> df.select(df.r.a).show()
0325         +---+
0326         |r.a|
0327         +---+
0328         |  1|
0329         +---+
0330         """
0331         if isinstance(name, Column):
0332             warnings.warn(
0333                 "A column as 'name' in getField is deprecated as of Spark 3.0, and will not "
0334                 "be supported in the future release. Use `column[name]` or `column.name` syntax "
0335                 "instead.",
0336                 DeprecationWarning)
0337         return self[name]
0338 
0339     def __getattr__(self, item):
0340         if item.startswith("__"):
0341             raise AttributeError(item)
0342         return self[item]
0343 
0344     def __getitem__(self, k):
0345         if isinstance(k, slice):
0346             if k.step is not None:
0347                 raise ValueError("slice with step is not supported.")
0348             return self.substr(k.start, k.stop)
0349         else:
0350             return _bin_op("apply")(self, k)
0351 
0352     def __iter__(self):
0353         raise TypeError("Column is not iterable")
0354 
0355     # string methods
0356     _contains_doc = """
0357     Contains the other element. Returns a boolean :class:`Column` based on a string match.
0358 
0359     :param other: string in line
0360 
0361     >>> df.filter(df.name.contains('o')).collect()
0362     [Row(age=5, name=u'Bob')]
0363     """
0364     _rlike_doc = """
0365     SQL RLIKE expression (LIKE with Regex). Returns a boolean :class:`Column` based on a regex
0366     match.
0367 
0368     :param other: an extended regex expression
0369 
0370     >>> df.filter(df.name.rlike('ice$')).collect()
0371     [Row(age=2, name=u'Alice')]
0372     """
0373     _like_doc = """
0374     SQL like expression. Returns a boolean :class:`Column` based on a SQL LIKE match.
0375 
0376     :param other: a SQL LIKE pattern
0377 
0378     See :func:`rlike` for a regex version
0379 
0380     >>> df.filter(df.name.like('Al%')).collect()
0381     [Row(age=2, name=u'Alice')]
0382     """
0383     _startswith_doc = """
0384     String starts with. Returns a boolean :class:`Column` based on a string match.
0385 
0386     :param other: string at start of line (do not use a regex `^`)
0387 
0388     >>> df.filter(df.name.startswith('Al')).collect()
0389     [Row(age=2, name=u'Alice')]
0390     >>> df.filter(df.name.startswith('^Al')).collect()
0391     []
0392     """
0393     _endswith_doc = """
0394     String ends with. Returns a boolean :class:`Column` based on a string match.
0395 
0396     :param other: string at end of line (do not use a regex `$`)
0397 
0398     >>> df.filter(df.name.endswith('ice')).collect()
0399     [Row(age=2, name=u'Alice')]
0400     >>> df.filter(df.name.endswith('ice$')).collect()
0401     []
0402     """
0403 
0404     contains = ignore_unicode_prefix(_bin_op("contains", _contains_doc))
0405     rlike = ignore_unicode_prefix(_bin_op("rlike", _rlike_doc))
0406     like = ignore_unicode_prefix(_bin_op("like", _like_doc))
0407     startswith = ignore_unicode_prefix(_bin_op("startsWith", _startswith_doc))
0408     endswith = ignore_unicode_prefix(_bin_op("endsWith", _endswith_doc))
0409 
0410     @ignore_unicode_prefix
0411     @since(1.3)
0412     def substr(self, startPos, length):
0413         """
0414         Return a :class:`Column` which is a substring of the column.
0415 
0416         :param startPos: start position (int or Column)
0417         :param length:  length of the substring (int or Column)
0418 
0419         >>> df.select(df.name.substr(1, 3).alias("col")).collect()
0420         [Row(col=u'Ali'), Row(col=u'Bob')]
0421         """
0422         if type(startPos) != type(length):
0423             raise TypeError(
0424                 "startPos and length must be the same type. "
0425                 "Got {startPos_t} and {length_t}, respectively."
0426                 .format(
0427                     startPos_t=type(startPos),
0428                     length_t=type(length),
0429                 ))
0430         if isinstance(startPos, int):
0431             jc = self._jc.substr(startPos, length)
0432         elif isinstance(startPos, Column):
0433             jc = self._jc.substr(startPos._jc, length._jc)
0434         else:
0435             raise TypeError("Unexpected type: %s" % type(startPos))
0436         return Column(jc)
0437 
0438     @ignore_unicode_prefix
0439     @since(1.5)
0440     def isin(self, *cols):
0441         """
0442         A boolean expression that is evaluated to true if the value of this
0443         expression is contained by the evaluated values of the arguments.
0444 
0445         >>> df[df.name.isin("Bob", "Mike")].collect()
0446         [Row(age=5, name=u'Bob')]
0447         >>> df[df.age.isin([1, 2, 3])].collect()
0448         [Row(age=2, name=u'Alice')]
0449         """
0450         if len(cols) == 1 and isinstance(cols[0], (list, set)):
0451             cols = cols[0]
0452         cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
0453         sc = SparkContext._active_spark_context
0454         jc = getattr(self._jc, "isin")(_to_seq(sc, cols))
0455         return Column(jc)
0456 
0457     # order
0458     _asc_doc = """
0459     Returns a sort expression based on ascending order of the column.
0460 
0461     >>> from pyspark.sql import Row
0462     >>> df = spark.createDataFrame([('Tom', 80), ('Alice', None)], ["name", "height"])
0463     >>> df.select(df.name).orderBy(df.name.asc()).collect()
0464     [Row(name=u'Alice'), Row(name=u'Tom')]
0465     """
0466     _asc_nulls_first_doc = """
0467     Returns a sort expression based on ascending order of the column, and null values
0468     return before non-null values.
0469 
0470     >>> from pyspark.sql import Row
0471     >>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
0472     >>> df.select(df.name).orderBy(df.name.asc_nulls_first()).collect()
0473     [Row(name=None), Row(name=u'Alice'), Row(name=u'Tom')]
0474 
0475     .. versionadded:: 2.4
0476     """
0477     _asc_nulls_last_doc = """
0478     Returns a sort expression based on ascending order of the column, and null values
0479     appear after non-null values.
0480 
0481     >>> from pyspark.sql import Row
0482     >>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
0483     >>> df.select(df.name).orderBy(df.name.asc_nulls_last()).collect()
0484     [Row(name=u'Alice'), Row(name=u'Tom'), Row(name=None)]
0485 
0486     .. versionadded:: 2.4
0487     """
0488     _desc_doc = """
0489     Returns a sort expression based on the descending order of the column.
0490 
0491     >>> from pyspark.sql import Row
0492     >>> df = spark.createDataFrame([('Tom', 80), ('Alice', None)], ["name", "height"])
0493     >>> df.select(df.name).orderBy(df.name.desc()).collect()
0494     [Row(name=u'Tom'), Row(name=u'Alice')]
0495     """
0496     _desc_nulls_first_doc = """
0497     Returns a sort expression based on the descending order of the column, and null values
0498     appear before non-null values.
0499 
0500     >>> from pyspark.sql import Row
0501     >>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
0502     >>> df.select(df.name).orderBy(df.name.desc_nulls_first()).collect()
0503     [Row(name=None), Row(name=u'Tom'), Row(name=u'Alice')]
0504 
0505     .. versionadded:: 2.4
0506     """
0507     _desc_nulls_last_doc = """
0508     Returns a sort expression based on the descending order of the column, and null values
0509     appear after non-null values.
0510 
0511     >>> from pyspark.sql import Row
0512     >>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
0513     >>> df.select(df.name).orderBy(df.name.desc_nulls_last()).collect()
0514     [Row(name=u'Tom'), Row(name=u'Alice'), Row(name=None)]
0515 
0516     .. versionadded:: 2.4
0517     """
0518 
0519     asc = ignore_unicode_prefix(_unary_op("asc", _asc_doc))
0520     asc_nulls_first = ignore_unicode_prefix(_unary_op("asc_nulls_first", _asc_nulls_first_doc))
0521     asc_nulls_last = ignore_unicode_prefix(_unary_op("asc_nulls_last", _asc_nulls_last_doc))
0522     desc = ignore_unicode_prefix(_unary_op("desc", _desc_doc))
0523     desc_nulls_first = ignore_unicode_prefix(_unary_op("desc_nulls_first", _desc_nulls_first_doc))
0524     desc_nulls_last = ignore_unicode_prefix(_unary_op("desc_nulls_last", _desc_nulls_last_doc))
0525 
0526     _isNull_doc = """
0527     True if the current expression is null.
0528 
0529     >>> from pyspark.sql import Row
0530     >>> df = spark.createDataFrame([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)])
0531     >>> df.filter(df.height.isNull()).collect()
0532     [Row(height=None, name=u'Alice')]
0533     """
0534     _isNotNull_doc = """
0535     True if the current expression is NOT null.
0536 
0537     >>> from pyspark.sql import Row
0538     >>> df = spark.createDataFrame([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)])
0539     >>> df.filter(df.height.isNotNull()).collect()
0540     [Row(height=80, name=u'Tom')]
0541     """
0542 
0543     isNull = ignore_unicode_prefix(_unary_op("isNull", _isNull_doc))
0544     isNotNull = ignore_unicode_prefix(_unary_op("isNotNull", _isNotNull_doc))
0545 
0546     @since(1.3)
0547     def alias(self, *alias, **kwargs):
0548         """
0549         Returns this column aliased with a new name or names (in the case of expressions that
0550         return more than one column, such as explode).
0551 
0552         :param alias: strings of desired column names (collects all positional arguments passed)
0553         :param metadata: a dict of information to be stored in ``metadata`` attribute of the
0554             corresponding :class:`StructField <pyspark.sql.types.StructField>` (optional, keyword
0555             only argument)
0556 
0557         .. versionchanged:: 2.2
0558            Added optional ``metadata`` argument.
0559 
0560         >>> df.select(df.age.alias("age2")).collect()
0561         [Row(age2=2), Row(age2=5)]
0562         >>> df.select(df.age.alias("age3", metadata={'max': 99})).schema['age3'].metadata['max']
0563         99
0564         """
0565 
0566         metadata = kwargs.pop('metadata', None)
0567         assert not kwargs, 'Unexpected kwargs where passed: %s' % kwargs
0568 
0569         sc = SparkContext._active_spark_context
0570         if len(alias) == 1:
0571             if metadata:
0572                 jmeta = sc._jvm.org.apache.spark.sql.types.Metadata.fromJson(
0573                     json.dumps(metadata))
0574                 return Column(getattr(self._jc, "as")(alias[0], jmeta))
0575             else:
0576                 return Column(getattr(self._jc, "as")(alias[0]))
0577         else:
0578             if metadata:
0579                 raise ValueError('metadata can only be provided for a single column')
0580             return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))
0581 
0582     name = copy_func(alias, sinceversion=2.0, doc=":func:`name` is an alias for :func:`alias`.")
0583 
0584     @ignore_unicode_prefix
0585     @since(1.3)
0586     def cast(self, dataType):
0587         """ Convert the column into type ``dataType``.
0588 
0589         >>> df.select(df.age.cast("string").alias('ages')).collect()
0590         [Row(ages=u'2'), Row(ages=u'5')]
0591         >>> df.select(df.age.cast(StringType()).alias('ages')).collect()
0592         [Row(ages=u'2'), Row(ages=u'5')]
0593         """
0594         if isinstance(dataType, basestring):
0595             jc = self._jc.cast(dataType)
0596         elif isinstance(dataType, DataType):
0597             from pyspark.sql import SparkSession
0598             spark = SparkSession.builder.getOrCreate()
0599             jdt = spark._jsparkSession.parseDataType(dataType.json())
0600             jc = self._jc.cast(jdt)
0601         else:
0602             raise TypeError("unexpected type: %s" % type(dataType))
0603         return Column(jc)
0604 
0605     astype = copy_func(cast, sinceversion=1.4, doc=":func:`astype` is an alias for :func:`cast`.")
0606 
0607     @since(1.3)
0608     def between(self, lowerBound, upperBound):
0609         """
0610         A boolean expression that is evaluated to true if the value of this
0611         expression is between the given columns.
0612 
0613         >>> df.select(df.name, df.age.between(2, 4)).show()
0614         +-----+---------------------------+
0615         | name|((age >= 2) AND (age <= 4))|
0616         +-----+---------------------------+
0617         |Alice|                       true|
0618         |  Bob|                      false|
0619         +-----+---------------------------+
0620         """
0621         return (self >= lowerBound) & (self <= upperBound)
0622 
0623     @since(1.4)
0624     def when(self, condition, value):
0625         """
0626         Evaluates a list of conditions and returns one of multiple possible result expressions.
0627         If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
0628 
0629         See :func:`pyspark.sql.functions.when` for example usage.
0630 
0631         :param condition: a boolean :class:`Column` expression.
0632         :param value: a literal value, or a :class:`Column` expression.
0633 
0634         >>> from pyspark.sql import functions as F
0635         >>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
0636         +-----+------------------------------------------------------------+
0637         | name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
0638         +-----+------------------------------------------------------------+
0639         |Alice|                                                          -1|
0640         |  Bob|                                                           1|
0641         +-----+------------------------------------------------------------+
0642         """
0643         if not isinstance(condition, Column):
0644             raise TypeError("condition should be a Column")
0645         v = value._jc if isinstance(value, Column) else value
0646         jc = self._jc.when(condition._jc, v)
0647         return Column(jc)
0648 
0649     @since(1.4)
0650     def otherwise(self, value):
0651         """
0652         Evaluates a list of conditions and returns one of multiple possible result expressions.
0653         If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
0654 
0655         See :func:`pyspark.sql.functions.when` for example usage.
0656 
0657         :param value: a literal value, or a :class:`Column` expression.
0658 
0659         >>> from pyspark.sql import functions as F
0660         >>> df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
0661         +-----+-------------------------------------+
0662         | name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
0663         +-----+-------------------------------------+
0664         |Alice|                                    0|
0665         |  Bob|                                    1|
0666         +-----+-------------------------------------+
0667         """
0668         v = value._jc if isinstance(value, Column) else value
0669         jc = self._jc.otherwise(v)
0670         return Column(jc)
0671 
0672     @since(1.4)
0673     def over(self, window):
0674         """
0675         Define a windowing column.
0676 
0677         :param window: a :class:`WindowSpec`
0678         :return: a Column
0679 
0680         >>> from pyspark.sql import Window
0681         >>> window = Window.partitionBy("name").orderBy("age") \
0682                 .rowsBetween(Window.unboundedPreceding, Window.currentRow)
0683         >>> from pyspark.sql.functions import rank, min
0684         >>> from pyspark.sql.functions import desc
0685         >>> df.withColumn("rank", rank().over(window)) \
0686                 .withColumn("min", min('age').over(window)).sort(desc("age")).show()
0687         +---+-----+----+---+
0688         |age| name|rank|min|
0689         +---+-----+----+---+
0690         |  5|  Bob|   1|  5|
0691         |  2|Alice|   1|  2|
0692         +---+-----+----+---+
0693         """
0694         from pyspark.sql.window import WindowSpec
0695         if not isinstance(window, WindowSpec):
0696             raise TypeError("window should be WindowSpec")
0697         jc = self._jc.over(window._jspec)
0698         return Column(jc)
0699 
0700     def __nonzero__(self):
0701         raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
0702                          "'~' for 'not' when building DataFrame boolean expressions.")
0703     __bool__ = __nonzero__
0704 
0705     def __repr__(self):
0706         return 'Column<%s>' % self._jc.toString().encode('utf8')
0707 
0708 
0709 def _test():
0710     import doctest
0711     from pyspark.sql import SparkSession
0712     import pyspark.sql.column
0713     globs = pyspark.sql.column.__dict__.copy()
0714     spark = SparkSession.builder\
0715         .master("local[4]")\
0716         .appName("sql.column tests")\
0717         .getOrCreate()
0718     sc = spark.sparkContext
0719     globs['spark'] = spark
0720     globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
0721         .toDF(StructType([StructField('age', IntegerType()),
0722                           StructField('name', StringType())]))
0723 
0724     (failure_count, test_count) = doctest.testmod(
0725         pyspark.sql.column, globs=globs,
0726         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
0727     spark.stop()
0728     if failure_count:
0729         sys.exit(-1)
0730 
0731 
0732 if __name__ == "__main__":
0733     _test()