Source code for handyspark.sql.schema

import numpy as np
import datetime
from operator import itemgetter
from pyspark.sql.types import StructType

_mapping = {str: 'string',
            bool: 'boolean',
            int: 'integer',
            float: 'float',
            datetime.datetime: 'timestamp',
            np.bool: 'boolean',
            np.int8: 'byte',
            np.int16: 'short',
            np.int32: 'integer',
            np.int64: 'long',
            np.float32: 'float',
            np.float64: 'double',
            np.ndarray: 'array',
            object: 'string',
            list: 'array',
            tuple: 'array',
            dict: 'map'}

[docs]def generate_schema(colnames, coltypes, nullables=None): """ Parameters ---------- colnames: list of string coltypes: list of type nullables: list of boolean, optional Returns ------- schema: StructType Spark DataFrame schema corresponding to Python/numpy types. """ assert len(colnames) == len(coltypes), "You must specify types for all columns." invalid_types = [] new_types = [] keys = list(map(itemgetter(0), list(_mapping.items()))) for coltype in coltypes: if coltype not in keys: invalid_types.append(coltype) else: if coltype == np.dtype('O'): new_types.append(str) else: new_types.append(keys[keys.index(coltype)]) assert len(invalid_types) == 0, "Invalid type(s) specified: {}".format(str(invalid_types)) if nullables is None: nullables = [True] * len(colnames) fields = [{"metadata": {}, "name": name, "nullable": nullable, "type": _mapping[typ]} for name, typ, nullable in zip(colnames, new_types, nullables)] return StructType.fromJson({"type": "struct", "fields": fields})