Source code for handyspark.util

from math import isnan, isinf
import pandas as pd
from pyspark.ml.linalg import DenseVector
from pyspark.rdd import RDD
from pyspark.sql import functions as F, DataFrame
from pyspark.sql.types import ArrayType, DoubleType, StructType, StructField
from pyspark.mllib.common import _java2py, _py2java
import traceback

[docs]def none2default(value, default): return value if value is not None else default
[docs]def none2zero(value): return none2default(value, 0)
[docs]def ensure_list(value): if value is None: return [] if isinstance(value, (list, tuple)): return value else: return [value]
[docs]def check_columns(df, colnames): if colnames is not None: available = df.columns colnames = ensure_list(colnames) colnames = [col if isinstance(col, str) else col.colname for col in colnames] diff = set(colnames).difference(set(available)) assert not len(diff), "DataFrame does not have {} column(s)".format(str(list(diff))[1:-1])
[docs]class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m'
[docs]class HandyException(Exception): def __init__(self, *args, **kwargs): try: # Summary is a boolean argument # If True, it prints the exception summary # This way, we can avoid printing the summary all # the way along the exception "bubbling up" summary = kwargs['summary'] if summary: print(HandyException.exception_summary()) except KeyError: pass
[docs] @staticmethod def colortext(text, color_code): return color_code + text + (bcolors.ENDC if text[-4:] != bcolors.ENDC else '')
[docs] @staticmethod def errortext(text): # Makes exception summary both BOLD and RED (FAIL) return HandyException.colortext(HandyException.colortext(text, bcolors.FAIL), bcolors.BOLD)
[docs] @staticmethod def exception_summary(): # Gets the error stack msg = traceback.format_exc() try: # Builds the "frame" around the text top = HandyException.errortext('-' * 75 + '\nHANDY EXCEPTION SUMMARY\n') bottom = HandyException.errortext('-' * 75) # Gets the information about the error and makes it BOLD and RED info = list(filter(lambda t: len(t) and t[0] != '\t', msg.split('\n')[::-1])) error = HandyException.errortext('Error\t: {}'.format(info[0])) # Figure out where the error happened - location (file/notebook), line and function idx = [t.strip()[:4] for t in info].index('File') where = [v.strip() for v in info[idx].strip().split(',')] location, line, func = where[0][5:], where[1][5:], where[2][3:] # If it is a notebook, figures out the cell if 'ipython-input' in location: location = 'IPython - In [{}]'.format(location.split('-')[2]) # If it is a pyspark error, just go with it if 'pyspark' in error: new_msg = '\n{}\n{}\n{}'.format(top, error, bottom) # Otherwise, build the summary else: new_msg = '\n{}\nLocation: {}\nLine\t: {}\nFunction: {}\n{}\n{}'.format(top, location, line, func, error, bottom) return new_msg except Exception as e: # If we managed to raise an exception while trying to format the original exception... # Oh, well... return 'This is awkward... \n{}'.format(str(e))
[docs]def get_buckets(rdd, buckets): """Extracted from pyspark.rdd.RDD.histogram function """ if buckets < 1: raise ValueError("number of buckets must be >= 1") # filter out non-comparable elements def comparable(x): if x is None: return False if type(x) is float and isnan(x): return False return True filtered = rdd.filter(comparable) # faster than stats() def minmax(a, b): return min(a[0], b[0]), max(a[1], b[1]) try: minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax) except TypeError as e: if " empty " in str(e): raise ValueError("can not generate buckets from empty RDD") raise if minv == maxv or buckets == 1: return [minv, maxv], [filtered.count()] try: inc = (maxv - minv) / buckets except TypeError: raise TypeError("Can not generate buckets with non-number in RDD") if isinf(inc): raise ValueError("Can not generate buckets with infinite value") # keep them as integer if possible inc = int(inc) if inc * buckets != maxv - minv: inc = (maxv - minv) * 1.0 / buckets buckets = [i * inc + minv for i in range(buckets)] buckets.append(maxv) # fix accumulated error return buckets
[docs]def dense_to_array(sdf, colname, new_colname): """Casts a Vector column into a new Array column. """ sql_ctx = sdf.sql_ctx # Gets type of original column coltype = sdf.notHandy().select(colname).dtypes[0][1] # If it is indeed a vector... if coltype == 'vector': idx = sdf.columns.index(colname) schema = StructType(sdf.schema.fields + [StructField(new_colname, ArrayType(DoubleType()), True)]) res = sql_ctx.createDataFrame(sdf.rdd.map(tuple) .map(lambda t: t + (DenseVector(t[idx]).values.tolist(),)), schema=schema) # Otherwise just copy the original column into a new one else: res = sdf.withColumn(new_colname, F.col(colname)) # Makes it a HandyFrame if isinstance(res, DataFrame): res = res.toHandy() return res
[docs]def disassemble(sdf, colname, new_colnames=None): """Disassembles a Vector/Array column into multiple columns """ array_col = '_{}'.format(colname) # Gets type of original column coltype = sdf.notHandy().select(colname).schema.fields[0].dataType.typeName() # If it is a vector or array... if coltype in ['vectorudt', 'array']: # Makes the conversion from vector to array (or not :-)) tdf = dense_to_array(sdf, colname, array_col) # Checks the MIN size of the arrays in the dataset # If there are arrays with multiple sizes, it can still safely # convert up to that size size = tdf.notHandy().select(F.min(F.size(array_col))).take(1)[0][0] # If no new names were given, just uses the original name and # a sequence number as suffix if new_colnames is None: new_colnames = ['{}_{}'.format(colname, i) for i in range(size)] assert len(new_colnames) == size, \ "There must be {} column names, only {} found!".format(size, len(new_colnames)) # Uses `getItem` to disassemble the array into multiple columns res = tdf.select(*sdf.columns, *(F.col(array_col).getItem(i).alias(n) for i, n in zip(range(size), new_colnames))) # Otherwise just copy the original column into a new one else: if new_colnames is None: new_colnames = [colname] res = sdf.withColumn(new_colnames[0], F.col(colname)) # Makes it a HandyFrame if isinstance(res, DataFrame): res = res.toHandy() return res
[docs]def get_jvm_class(cl): """Builds JVM class name from Python class """ return 'org.apache.{}.{}'.format(cl.__module__[2:], cl.__name__)
[docs]def call_scala_method(py_class, scala_method, df, *args): """Given a Python class, calls a method from its Scala equivalent """ sc = df.sql_ctx._sc # Gets the Java class from the JVM, given the name built from the Python class java_class = getattr(sc._jvm , get_jvm_class(py_class)) # Converts all columns into doubles and access it as Java DF jdf = df.select(*(F.col(col).astype('double') for col in df.columns))._jdf # Creates a Java object from both Java class and DataFrame java_obj = java_class(jdf) # Converts remaining args from Python to Java as well args = [_py2java(sc, a) for a in args] # Gets method from Java Object and passes arguments to it to get results java_res = getattr(java_obj, scala_method)(*args) # Converts results from Java back to Python res = _java2py(sc, java_res) # If result is an RDD, it could be the case its elements are still # serialized tuples from Scala... if isinstance(res, RDD): try: # Takes the first element from the result, to check what it is first = res.take(1)[0] # If it is a dictionary, we need to check its value if isinstance(first, dict): first = list(first.values())[0] # If the value is a scala tuple, we need to deserialize it if first.startswith('scala.Tuple'): serde = sc._jvm.org.apache.spark.mllib.api.python.SerDe # We assume it is a Tuple2 and deserialize it java_res = serde.fromTuple2RDD(java_res) # Finally, we convert the deserialized result from Java to Python res = _java2py(sc, java_res) except IndexError: pass return res
[docs]def counts_to_df(value_counts, colnames, n_points): """DO NOT USE IT! """ pdf = pd.DataFrame(value_counts .to_frame('count') .reset_index() .apply(lambda row: dict({'count': row['count']}, **dict(zip(colnames, row['index'].toArray()))), axis=1) .values .tolist()) pdf['count'] /= pdf['count'].sum() proportions = pdf['count'] / pdf['count'].min() factor = int(n_points / proportions.sum()) pdf = pd.concat([pdf[colnames], (proportions * factor).astype(int)], axis=1) combinations = pdf.apply(lambda row: row.to_dict(), axis=1).values.tolist() return pd.DataFrame([dict(v) for c in combinations for v in int(c.pop('count')) * [list(c.items())]])