Getting Started with PySpark: Difference between revisions
No edit summary |
No edit summary |
||
| Line 44: | Line 44: | ||
def getErrorType ( errorString ): | def getErrorType ( errorString ): | ||
return | if errorString is None: | ||
map = { ' | return 'empty' | ||
map = { 'moved': ['MOVED'], 'zip': ['ZIP'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] } | |||
for key,searchlist in map.items(): | for key,searchlist in map.items(): | ||
for searchterm in searchlist: | for searchterm in searchlist: | ||
if ( errorString.find(searchterm) ): | print ("{} - {}".format(errorString, searchterm) ) | ||
return key | if ( errorString.find(searchterm) >= 0 ): | ||
return key | |||
return 'unknown' | |||
myudf = udf(getErrorType, StringType()) | |||
subsetDF = ( cleanDF | |||
subsetDF = ( cleanDF | |||
.select("COMMENT_DESC") | .select("COMMENT_DESC") | ||
.withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) | .withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) ) | ||
) | ) | ||
print(subsetDF.count()) | print(subsetDF.count()) | ||
subsetDF.show()</pre> | subsetDF.show() | ||
country_totals = ( subsetDF | |||
.select ( "ERROR_CLASSIFICATION") | |||
.groupby("ERROR_CLASSIFICATION") | |||
.agg(count("*").alias("count")) | |||
.sort(col("count").desc()) | |||
) | |||
country_totals.show()</pre> | |||
Revision as of 23:27, 30 October 2019
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
import pyspark.sql.functions as F
Load some data
df = spark.read.load("DEX03s - 2019-10-07.csv",
format="csv", sep=",", inferSchema="true", header="true")
Find columns that are more than 90% null
threshold = df.count() * .90 null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict() to_drop = [k for k, v in null_counts.items() if v >= threshold ]
Drop Null columns
clean = df.drop(*to_drop) display(clean)
Create a subset of records
subsetDF = cleanDF.limit(100).select("COMMENT_DESC")
map = { 'zip': ['ZIP'], 'moved': ['MOVED'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] }
print(subsetDF.count())
subsetDF.show()
This isn't working, but the intent is to map a new column based on a custom UDF function. I think it's maybe failing because functions are not allowed inside a UDF.
from pyspark.sql.functions import count, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def getErrorType ( errorString ):
if errorString is None:
return 'empty'
map = { 'moved': ['MOVED'], 'zip': ['ZIP'], 'apt': ['APT'], 'box': ['P O BOX'],'street': ['STREET','ADDRESS'] }
for key,searchlist in map.items():
for searchterm in searchlist:
print ("{} - {}".format(errorString, searchterm) )
if ( errorString.find(searchterm) >= 0 ):
return key
return 'unknown'
myudf = udf(getErrorType, StringType())
subsetDF = ( cleanDF
.select("COMMENT_DESC")
.withColumn('ERROR_CLASSIFICATION', myudf( cleanDF['COMMENT_DESC'] ) )
)
print(subsetDF.count())
subsetDF.show()
country_totals = ( subsetDF
.select ( "ERROR_CLASSIFICATION")
.groupby("ERROR_CLASSIFICATION")
.agg(count("*").alias("count"))
.sort(col("count").desc())
)
country_totals.show()