0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 import sys
0019 import json
0020 import warnings
0021
0022 if sys.version >= '3':
0023 basestring = str
0024 long = int
0025
0026 from py4j.java_gateway import is_instance_of
0027
0028 from pyspark import copy_func, since
0029 from pyspark.context import SparkContext
0030 from pyspark.rdd import ignore_unicode_prefix
0031 from pyspark.sql.types import *
0032
0033 __all__ = ["Column"]
0034
0035
0036 def _create_column_from_literal(literal):
0037 sc = SparkContext._active_spark_context
0038 return sc._jvm.functions.lit(literal)
0039
0040
0041 def _create_column_from_name(name):
0042 sc = SparkContext._active_spark_context
0043 return sc._jvm.functions.col(name)
0044
0045
0046 def _to_java_column(col):
0047 if isinstance(col, Column):
0048 jcol = col._jc
0049 elif isinstance(col, basestring):
0050 jcol = _create_column_from_name(col)
0051 else:
0052 raise TypeError(
0053 "Invalid argument, not a string or column: "
0054 "{0} of type {1}. "
0055 "For column literals, use 'lit', 'array', 'struct' or 'create_map' "
0056 "function.".format(col, type(col)))
0057 return jcol
0058
0059
0060 def _to_seq(sc, cols, converter=None):
0061 """
0062 Convert a list of Column (or names) into a JVM Seq of Column.
0063
0064 An optional `converter` could be used to convert items in `cols`
0065 into JVM Column objects.
0066 """
0067 if converter:
0068 cols = [converter(c) for c in cols]
0069 return sc._jvm.PythonUtils.toSeq(cols)
0070
0071
0072 def _to_list(sc, cols, converter=None):
0073 """
0074 Convert a list of Column (or names) into a JVM (Scala) List of Column.
0075
0076 An optional `converter` could be used to convert items in `cols`
0077 into JVM Column objects.
0078 """
0079 if converter:
0080 cols = [converter(c) for c in cols]
0081 return sc._jvm.PythonUtils.toList(cols)
0082
0083
0084 def _unary_op(name, doc="unary operator"):
0085 """ Create a method for given unary operator """
0086 def _(self):
0087 jc = getattr(self._jc, name)()
0088 return Column(jc)
0089 _.__doc__ = doc
0090 return _
0091
0092
0093 def _func_op(name, doc=''):
0094 def _(self):
0095 sc = SparkContext._active_spark_context
0096 jc = getattr(sc._jvm.functions, name)(self._jc)
0097 return Column(jc)
0098 _.__doc__ = doc
0099 return _
0100
0101
0102 def _bin_func_op(name, reverse=False, doc="binary function"):
0103 def _(self, other):
0104 sc = SparkContext._active_spark_context
0105 fn = getattr(sc._jvm.functions, name)
0106 jc = other._jc if isinstance(other, Column) else _create_column_from_literal(other)
0107 njc = fn(self._jc, jc) if not reverse else fn(jc, self._jc)
0108 return Column(njc)
0109 _.__doc__ = doc
0110 return _
0111
0112
0113 def _bin_op(name, doc="binary operator"):
0114 """ Create a method for given binary operator
0115 """
0116 def _(self, other):
0117 jc = other._jc if isinstance(other, Column) else other
0118 njc = getattr(self._jc, name)(jc)
0119 return Column(njc)
0120 _.__doc__ = doc
0121 return _
0122
0123
0124 def _reverse_op(name, doc="binary operator"):
0125 """ Create a method for binary operator (this object is on right side)
0126 """
0127 def _(self, other):
0128 jother = _create_column_from_literal(other)
0129 jc = getattr(jother, name)(self._jc)
0130 return Column(jc)
0131 _.__doc__ = doc
0132 return _
0133
0134
0135 class Column(object):
0136
0137 """
0138 A column in a DataFrame.
0139
0140 :class:`Column` instances can be created by::
0141
0142 # 1. Select a column out of a DataFrame
0143
0144 df.colName
0145 df["colName"]
0146
0147 # 2. Create from an expression
0148 df.colName + 1
0149 1 / df.colName
0150
0151 .. versionadded:: 1.3
0152 """
0153
0154 def __init__(self, jc):
0155 self._jc = jc
0156
0157
0158 __neg__ = _func_op("negate")
0159 __add__ = _bin_op("plus")
0160 __sub__ = _bin_op("minus")
0161 __mul__ = _bin_op("multiply")
0162 __div__ = _bin_op("divide")
0163 __truediv__ = _bin_op("divide")
0164 __mod__ = _bin_op("mod")
0165 __radd__ = _bin_op("plus")
0166 __rsub__ = _reverse_op("minus")
0167 __rmul__ = _bin_op("multiply")
0168 __rdiv__ = _reverse_op("divide")
0169 __rtruediv__ = _reverse_op("divide")
0170 __rmod__ = _reverse_op("mod")
0171 __pow__ = _bin_func_op("pow")
0172 __rpow__ = _bin_func_op("pow", reverse=True)
0173
0174
0175 __eq__ = _bin_op("equalTo")
0176 __ne__ = _bin_op("notEqual")
0177 __lt__ = _bin_op("lt")
0178 __le__ = _bin_op("leq")
0179 __ge__ = _bin_op("geq")
0180 __gt__ = _bin_op("gt")
0181
0182 _eqNullSafe_doc = """
0183 Equality test that is safe for null values.
0184
0185 :param other: a value or :class:`Column`
0186
0187 >>> from pyspark.sql import Row
0188 >>> df1 = spark.createDataFrame([
0189 ... Row(id=1, value='foo'),
0190 ... Row(id=2, value=None)
0191 ... ])
0192 >>> df1.select(
0193 ... df1['value'] == 'foo',
0194 ... df1['value'].eqNullSafe('foo'),
0195 ... df1['value'].eqNullSafe(None)
0196 ... ).show()
0197 +-------------+---------------+----------------+
0198 |(value = foo)|(value <=> foo)|(value <=> NULL)|
0199 +-------------+---------------+----------------+
0200 | true| true| false|
0201 | null| false| true|
0202 +-------------+---------------+----------------+
0203 >>> df2 = spark.createDataFrame([
0204 ... Row(value = 'bar'),
0205 ... Row(value = None)
0206 ... ])
0207 >>> df1.join(df2, df1["value"] == df2["value"]).count()
0208 0
0209 >>> df1.join(df2, df1["value"].eqNullSafe(df2["value"])).count()
0210 1
0211 >>> df2 = spark.createDataFrame([
0212 ... Row(id=1, value=float('NaN')),
0213 ... Row(id=2, value=42.0),
0214 ... Row(id=3, value=None)
0215 ... ])
0216 >>> df2.select(
0217 ... df2['value'].eqNullSafe(None),
0218 ... df2['value'].eqNullSafe(float('NaN')),
0219 ... df2['value'].eqNullSafe(42.0)
0220 ... ).show()
0221 +----------------+---------------+----------------+
0222 |(value <=> NULL)|(value <=> NaN)|(value <=> 42.0)|
0223 +----------------+---------------+----------------+
0224 | false| true| false|
0225 | false| false| true|
0226 | true| false| false|
0227 +----------------+---------------+----------------+
0228
0229 .. note:: Unlike Pandas, PySpark doesn't consider NaN values to be NULL.
0230 See the `NaN Semantics`_ for details.
0231 .. _NaN Semantics:
0232 https://spark.apache.org/docs/latest/sql-programming-guide.html#nan-semantics
0233 .. versionadded:: 2.3.0
0234 """
0235 eqNullSafe = _bin_op("eqNullSafe", _eqNullSafe_doc)
0236
0237
0238
0239 __and__ = _bin_op('and')
0240 __or__ = _bin_op('or')
0241 __invert__ = _func_op('not')
0242 __rand__ = _bin_op("and")
0243 __ror__ = _bin_op("or")
0244
0245
0246 def __contains__(self, item):
0247 raise ValueError("Cannot apply 'in' operator against a column: please use 'contains' "
0248 "in a string column or 'array_contains' function for an array column.")
0249
0250
0251 _bitwiseOR_doc = """
0252 Compute bitwise OR of this expression with another expression.
0253
0254 :param other: a value or :class:`Column` to calculate bitwise or(|) against
0255 this :class:`Column`.
0256
0257 >>> from pyspark.sql import Row
0258 >>> df = spark.createDataFrame([Row(a=170, b=75)])
0259 >>> df.select(df.a.bitwiseOR(df.b)).collect()
0260 [Row((a | b)=235)]
0261 """
0262 _bitwiseAND_doc = """
0263 Compute bitwise AND of this expression with another expression.
0264
0265 :param other: a value or :class:`Column` to calculate bitwise and(&) against
0266 this :class:`Column`.
0267
0268 >>> from pyspark.sql import Row
0269 >>> df = spark.createDataFrame([Row(a=170, b=75)])
0270 >>> df.select(df.a.bitwiseAND(df.b)).collect()
0271 [Row((a & b)=10)]
0272 """
0273 _bitwiseXOR_doc = """
0274 Compute bitwise XOR of this expression with another expression.
0275
0276 :param other: a value or :class:`Column` to calculate bitwise xor(^) against
0277 this :class:`Column`.
0278
0279 >>> from pyspark.sql import Row
0280 >>> df = spark.createDataFrame([Row(a=170, b=75)])
0281 >>> df.select(df.a.bitwiseXOR(df.b)).collect()
0282 [Row((a ^ b)=225)]
0283 """
0284
0285 bitwiseOR = _bin_op("bitwiseOR", _bitwiseOR_doc)
0286 bitwiseAND = _bin_op("bitwiseAND", _bitwiseAND_doc)
0287 bitwiseXOR = _bin_op("bitwiseXOR", _bitwiseXOR_doc)
0288
0289 @since(1.3)
0290 def getItem(self, key):
0291 """
0292 An expression that gets an item at position ``ordinal`` out of a list,
0293 or gets an item by key out of a dict.
0294
0295 >>> df = spark.createDataFrame([([1, 2], {"key": "value"})], ["l", "d"])
0296 >>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
0297 +----+------+
0298 |l[0]|d[key]|
0299 +----+------+
0300 | 1| value|
0301 +----+------+
0302 """
0303 if isinstance(key, Column):
0304 warnings.warn(
0305 "A column as 'key' in getItem is deprecated as of Spark 3.0, and will not "
0306 "be supported in the future release. Use `column[key]` or `column.key` syntax "
0307 "instead.",
0308 DeprecationWarning)
0309 return self[key]
0310
0311 @since(1.3)
0312 def getField(self, name):
0313 """
0314 An expression that gets a field by name in a StructField.
0315
0316 >>> from pyspark.sql import Row
0317 >>> df = spark.createDataFrame([Row(r=Row(a=1, b="b"))])
0318 >>> df.select(df.r.getField("b")).show()
0319 +---+
0320 |r.b|
0321 +---+
0322 | b|
0323 +---+
0324 >>> df.select(df.r.a).show()
0325 +---+
0326 |r.a|
0327 +---+
0328 | 1|
0329 +---+
0330 """
0331 if isinstance(name, Column):
0332 warnings.warn(
0333 "A column as 'name' in getField is deprecated as of Spark 3.0, and will not "
0334 "be supported in the future release. Use `column[name]` or `column.name` syntax "
0335 "instead.",
0336 DeprecationWarning)
0337 return self[name]
0338
0339 def __getattr__(self, item):
0340 if item.startswith("__"):
0341 raise AttributeError(item)
0342 return self[item]
0343
0344 def __getitem__(self, k):
0345 if isinstance(k, slice):
0346 if k.step is not None:
0347 raise ValueError("slice with step is not supported.")
0348 return self.substr(k.start, k.stop)
0349 else:
0350 return _bin_op("apply")(self, k)
0351
0352 def __iter__(self):
0353 raise TypeError("Column is not iterable")
0354
0355
0356 _contains_doc = """
0357 Contains the other element. Returns a boolean :class:`Column` based on a string match.
0358
0359 :param other: string in line
0360
0361 >>> df.filter(df.name.contains('o')).collect()
0362 [Row(age=5, name=u'Bob')]
0363 """
0364 _rlike_doc = """
0365 SQL RLIKE expression (LIKE with Regex). Returns a boolean :class:`Column` based on a regex
0366 match.
0367
0368 :param other: an extended regex expression
0369
0370 >>> df.filter(df.name.rlike('ice$')).collect()
0371 [Row(age=2, name=u'Alice')]
0372 """
0373 _like_doc = """
0374 SQL like expression. Returns a boolean :class:`Column` based on a SQL LIKE match.
0375
0376 :param other: a SQL LIKE pattern
0377
0378 See :func:`rlike` for a regex version
0379
0380 >>> df.filter(df.name.like('Al%')).collect()
0381 [Row(age=2, name=u'Alice')]
0382 """
0383 _startswith_doc = """
0384 String starts with. Returns a boolean :class:`Column` based on a string match.
0385
0386 :param other: string at start of line (do not use a regex `^`)
0387
0388 >>> df.filter(df.name.startswith('Al')).collect()
0389 [Row(age=2, name=u'Alice')]
0390 >>> df.filter(df.name.startswith('^Al')).collect()
0391 []
0392 """
0393 _endswith_doc = """
0394 String ends with. Returns a boolean :class:`Column` based on a string match.
0395
0396 :param other: string at end of line (do not use a regex `$`)
0397
0398 >>> df.filter(df.name.endswith('ice')).collect()
0399 [Row(age=2, name=u'Alice')]
0400 >>> df.filter(df.name.endswith('ice$')).collect()
0401 []
0402 """
0403
0404 contains = ignore_unicode_prefix(_bin_op("contains", _contains_doc))
0405 rlike = ignore_unicode_prefix(_bin_op("rlike", _rlike_doc))
0406 like = ignore_unicode_prefix(_bin_op("like", _like_doc))
0407 startswith = ignore_unicode_prefix(_bin_op("startsWith", _startswith_doc))
0408 endswith = ignore_unicode_prefix(_bin_op("endsWith", _endswith_doc))
0409
0410 @ignore_unicode_prefix
0411 @since(1.3)
0412 def substr(self, startPos, length):
0413 """
0414 Return a :class:`Column` which is a substring of the column.
0415
0416 :param startPos: start position (int or Column)
0417 :param length: length of the substring (int or Column)
0418
0419 >>> df.select(df.name.substr(1, 3).alias("col")).collect()
0420 [Row(col=u'Ali'), Row(col=u'Bob')]
0421 """
0422 if type(startPos) != type(length):
0423 raise TypeError(
0424 "startPos and length must be the same type. "
0425 "Got {startPos_t} and {length_t}, respectively."
0426 .format(
0427 startPos_t=type(startPos),
0428 length_t=type(length),
0429 ))
0430 if isinstance(startPos, int):
0431 jc = self._jc.substr(startPos, length)
0432 elif isinstance(startPos, Column):
0433 jc = self._jc.substr(startPos._jc, length._jc)
0434 else:
0435 raise TypeError("Unexpected type: %s" % type(startPos))
0436 return Column(jc)
0437
0438 @ignore_unicode_prefix
0439 @since(1.5)
0440 def isin(self, *cols):
0441 """
0442 A boolean expression that is evaluated to true if the value of this
0443 expression is contained by the evaluated values of the arguments.
0444
0445 >>> df[df.name.isin("Bob", "Mike")].collect()
0446 [Row(age=5, name=u'Bob')]
0447 >>> df[df.age.isin([1, 2, 3])].collect()
0448 [Row(age=2, name=u'Alice')]
0449 """
0450 if len(cols) == 1 and isinstance(cols[0], (list, set)):
0451 cols = cols[0]
0452 cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
0453 sc = SparkContext._active_spark_context
0454 jc = getattr(self._jc, "isin")(_to_seq(sc, cols))
0455 return Column(jc)
0456
0457
0458 _asc_doc = """
0459 Returns a sort expression based on ascending order of the column.
0460
0461 >>> from pyspark.sql import Row
0462 >>> df = spark.createDataFrame([('Tom', 80), ('Alice', None)], ["name", "height"])
0463 >>> df.select(df.name).orderBy(df.name.asc()).collect()
0464 [Row(name=u'Alice'), Row(name=u'Tom')]
0465 """
0466 _asc_nulls_first_doc = """
0467 Returns a sort expression based on ascending order of the column, and null values
0468 return before non-null values.
0469
0470 >>> from pyspark.sql import Row
0471 >>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
0472 >>> df.select(df.name).orderBy(df.name.asc_nulls_first()).collect()
0473 [Row(name=None), Row(name=u'Alice'), Row(name=u'Tom')]
0474
0475 .. versionadded:: 2.4
0476 """
0477 _asc_nulls_last_doc = """
0478 Returns a sort expression based on ascending order of the column, and null values
0479 appear after non-null values.
0480
0481 >>> from pyspark.sql import Row
0482 >>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
0483 >>> df.select(df.name).orderBy(df.name.asc_nulls_last()).collect()
0484 [Row(name=u'Alice'), Row(name=u'Tom'), Row(name=None)]
0485
0486 .. versionadded:: 2.4
0487 """
0488 _desc_doc = """
0489 Returns a sort expression based on the descending order of the column.
0490
0491 >>> from pyspark.sql import Row
0492 >>> df = spark.createDataFrame([('Tom', 80), ('Alice', None)], ["name", "height"])
0493 >>> df.select(df.name).orderBy(df.name.desc()).collect()
0494 [Row(name=u'Tom'), Row(name=u'Alice')]
0495 """
0496 _desc_nulls_first_doc = """
0497 Returns a sort expression based on the descending order of the column, and null values
0498 appear before non-null values.
0499
0500 >>> from pyspark.sql import Row
0501 >>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
0502 >>> df.select(df.name).orderBy(df.name.desc_nulls_first()).collect()
0503 [Row(name=None), Row(name=u'Tom'), Row(name=u'Alice')]
0504
0505 .. versionadded:: 2.4
0506 """
0507 _desc_nulls_last_doc = """
0508 Returns a sort expression based on the descending order of the column, and null values
0509 appear after non-null values.
0510
0511 >>> from pyspark.sql import Row
0512 >>> df = spark.createDataFrame([('Tom', 80), (None, 60), ('Alice', None)], ["name", "height"])
0513 >>> df.select(df.name).orderBy(df.name.desc_nulls_last()).collect()
0514 [Row(name=u'Tom'), Row(name=u'Alice'), Row(name=None)]
0515
0516 .. versionadded:: 2.4
0517 """
0518
0519 asc = ignore_unicode_prefix(_unary_op("asc", _asc_doc))
0520 asc_nulls_first = ignore_unicode_prefix(_unary_op("asc_nulls_first", _asc_nulls_first_doc))
0521 asc_nulls_last = ignore_unicode_prefix(_unary_op("asc_nulls_last", _asc_nulls_last_doc))
0522 desc = ignore_unicode_prefix(_unary_op("desc", _desc_doc))
0523 desc_nulls_first = ignore_unicode_prefix(_unary_op("desc_nulls_first", _desc_nulls_first_doc))
0524 desc_nulls_last = ignore_unicode_prefix(_unary_op("desc_nulls_last", _desc_nulls_last_doc))
0525
0526 _isNull_doc = """
0527 True if the current expression is null.
0528
0529 >>> from pyspark.sql import Row
0530 >>> df = spark.createDataFrame([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)])
0531 >>> df.filter(df.height.isNull()).collect()
0532 [Row(height=None, name=u'Alice')]
0533 """
0534 _isNotNull_doc = """
0535 True if the current expression is NOT null.
0536
0537 >>> from pyspark.sql import Row
0538 >>> df = spark.createDataFrame([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)])
0539 >>> df.filter(df.height.isNotNull()).collect()
0540 [Row(height=80, name=u'Tom')]
0541 """
0542
0543 isNull = ignore_unicode_prefix(_unary_op("isNull", _isNull_doc))
0544 isNotNull = ignore_unicode_prefix(_unary_op("isNotNull", _isNotNull_doc))
0545
0546 @since(1.3)
0547 def alias(self, *alias, **kwargs):
0548 """
0549 Returns this column aliased with a new name or names (in the case of expressions that
0550 return more than one column, such as explode).
0551
0552 :param alias: strings of desired column names (collects all positional arguments passed)
0553 :param metadata: a dict of information to be stored in ``metadata`` attribute of the
0554 corresponding :class:`StructField <pyspark.sql.types.StructField>` (optional, keyword
0555 only argument)
0556
0557 .. versionchanged:: 2.2
0558 Added optional ``metadata`` argument.
0559
0560 >>> df.select(df.age.alias("age2")).collect()
0561 [Row(age2=2), Row(age2=5)]
0562 >>> df.select(df.age.alias("age3", metadata={'max': 99})).schema['age3'].metadata['max']
0563 99
0564 """
0565
0566 metadata = kwargs.pop('metadata', None)
0567 assert not kwargs, 'Unexpected kwargs where passed: %s' % kwargs
0568
0569 sc = SparkContext._active_spark_context
0570 if len(alias) == 1:
0571 if metadata:
0572 jmeta = sc._jvm.org.apache.spark.sql.types.Metadata.fromJson(
0573 json.dumps(metadata))
0574 return Column(getattr(self._jc, "as")(alias[0], jmeta))
0575 else:
0576 return Column(getattr(self._jc, "as")(alias[0]))
0577 else:
0578 if metadata:
0579 raise ValueError('metadata can only be provided for a single column')
0580 return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))
0581
0582 name = copy_func(alias, sinceversion=2.0, doc=":func:`name` is an alias for :func:`alias`.")
0583
0584 @ignore_unicode_prefix
0585 @since(1.3)
0586 def cast(self, dataType):
0587 """ Convert the column into type ``dataType``.
0588
0589 >>> df.select(df.age.cast("string").alias('ages')).collect()
0590 [Row(ages=u'2'), Row(ages=u'5')]
0591 >>> df.select(df.age.cast(StringType()).alias('ages')).collect()
0592 [Row(ages=u'2'), Row(ages=u'5')]
0593 """
0594 if isinstance(dataType, basestring):
0595 jc = self._jc.cast(dataType)
0596 elif isinstance(dataType, DataType):
0597 from pyspark.sql import SparkSession
0598 spark = SparkSession.builder.getOrCreate()
0599 jdt = spark._jsparkSession.parseDataType(dataType.json())
0600 jc = self._jc.cast(jdt)
0601 else:
0602 raise TypeError("unexpected type: %s" % type(dataType))
0603 return Column(jc)
0604
0605 astype = copy_func(cast, sinceversion=1.4, doc=":func:`astype` is an alias for :func:`cast`.")
0606
0607 @since(1.3)
0608 def between(self, lowerBound, upperBound):
0609 """
0610 A boolean expression that is evaluated to true if the value of this
0611 expression is between the given columns.
0612
0613 >>> df.select(df.name, df.age.between(2, 4)).show()
0614 +-----+---------------------------+
0615 | name|((age >= 2) AND (age <= 4))|
0616 +-----+---------------------------+
0617 |Alice| true|
0618 | Bob| false|
0619 +-----+---------------------------+
0620 """
0621 return (self >= lowerBound) & (self <= upperBound)
0622
0623 @since(1.4)
0624 def when(self, condition, value):
0625 """
0626 Evaluates a list of conditions and returns one of multiple possible result expressions.
0627 If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
0628
0629 See :func:`pyspark.sql.functions.when` for example usage.
0630
0631 :param condition: a boolean :class:`Column` expression.
0632 :param value: a literal value, or a :class:`Column` expression.
0633
0634 >>> from pyspark.sql import functions as F
0635 >>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
0636 +-----+------------------------------------------------------------+
0637 | name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
0638 +-----+------------------------------------------------------------+
0639 |Alice| -1|
0640 | Bob| 1|
0641 +-----+------------------------------------------------------------+
0642 """
0643 if not isinstance(condition, Column):
0644 raise TypeError("condition should be a Column")
0645 v = value._jc if isinstance(value, Column) else value
0646 jc = self._jc.when(condition._jc, v)
0647 return Column(jc)
0648
0649 @since(1.4)
0650 def otherwise(self, value):
0651 """
0652 Evaluates a list of conditions and returns one of multiple possible result expressions.
0653 If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
0654
0655 See :func:`pyspark.sql.functions.when` for example usage.
0656
0657 :param value: a literal value, or a :class:`Column` expression.
0658
0659 >>> from pyspark.sql import functions as F
0660 >>> df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
0661 +-----+-------------------------------------+
0662 | name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
0663 +-----+-------------------------------------+
0664 |Alice| 0|
0665 | Bob| 1|
0666 +-----+-------------------------------------+
0667 """
0668 v = value._jc if isinstance(value, Column) else value
0669 jc = self._jc.otherwise(v)
0670 return Column(jc)
0671
0672 @since(1.4)
0673 def over(self, window):
0674 """
0675 Define a windowing column.
0676
0677 :param window: a :class:`WindowSpec`
0678 :return: a Column
0679
0680 >>> from pyspark.sql import Window
0681 >>> window = Window.partitionBy("name").orderBy("age") \
0682 .rowsBetween(Window.unboundedPreceding, Window.currentRow)
0683 >>> from pyspark.sql.functions import rank, min
0684 >>> from pyspark.sql.functions import desc
0685 >>> df.withColumn("rank", rank().over(window)) \
0686 .withColumn("min", min('age').over(window)).sort(desc("age")).show()
0687 +---+-----+----+---+
0688 |age| name|rank|min|
0689 +---+-----+----+---+
0690 | 5| Bob| 1| 5|
0691 | 2|Alice| 1| 2|
0692 +---+-----+----+---+
0693 """
0694 from pyspark.sql.window import WindowSpec
0695 if not isinstance(window, WindowSpec):
0696 raise TypeError("window should be WindowSpec")
0697 jc = self._jc.over(window._jspec)
0698 return Column(jc)
0699
0700 def __nonzero__(self):
0701 raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
0702 "'~' for 'not' when building DataFrame boolean expressions.")
0703 __bool__ = __nonzero__
0704
0705 def __repr__(self):
0706 return 'Column<%s>' % self._jc.toString().encode('utf8')
0707
0708
0709 def _test():
0710 import doctest
0711 from pyspark.sql import SparkSession
0712 import pyspark.sql.column
0713 globs = pyspark.sql.column.__dict__.copy()
0714 spark = SparkSession.builder\
0715 .master("local[4]")\
0716 .appName("sql.column tests")\
0717 .getOrCreate()
0718 sc = spark.sparkContext
0719 globs['spark'] = spark
0720 globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
0721 .toDF(StructType([StructField('age', IntegerType()),
0722 StructField('name', StringType())]))
0723
0724 (failure_count, test_count) = doctest.testmod(
0725 pyspark.sql.column, globs=globs,
0726 optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
0727 spark.stop()
0728 if failure_count:
0729 sys.exit(-1)
0730
0731
0732 if __name__ == "__main__":
0733 _test()