PySpark tutorial – a case study using Random Forest on unbalanced dataset

build-business

I would like to demonstrate a case tutorial of building a predictive model that predicts whether a customer will like a certain product. The original model with the real world data has been tested on the platform of spark, but I will be using a mock-up data set for this tutorial.

Since unbalanced data set is a very common in real business world, this tutorial will specifically showcase some of the tactics that could effectively deal with such challenge using PySpark.

Concretely, this session will cover the following topics:

  • Case Scenario and Data set
  • Data Pre-processings – NAs replacement, one-hot encoding, pipe-lining, training and validation splits, etc.
  • Using mllib random forest classifier for binary classification.
  • Measuring performance using AUC score.
  • Different strategies to handle the problem of unbalanced dataset:
    • Down-Sampling and Up-Sampling
    • Ensemble of Down-Sampling models

The Case Scenario 

Let’s assume your manager one day approaches you and asks you to build a Product Recommendation Engine. The engine should be able to predict the probabilities for each product being liked by a customer, when relevant data such as customer’s details, product’s info and so on is provided. And your model will then recommend the top 5 products based on those probabilities.

Your stakeholder is business department who will eventually use your model for recommendations. Specifically, each Sales Rep will ‘consult’ your model by telling it what type of customer she is going to visit before she actually sets on her trip. She will then bring along the recommended products list for the pitch, hoping that her trip will become fruitful.

The Data 

You receive the data from your friendly BI team. Thankfully, they made your life easier by crunching all the data into one nice clean csv table so that you won’t need to painfully join and merge from different tables. Peeking at the top 3 rows of the table shows you the following:

table

Some of the predictors represent properties of products such as product_price or product_features, whereas others contain information of the customer, e.g. customer title, age.

On the other side, your Big Data team has set up the spark platform for you, and ingested the table into Data Lake so you can access it easily in PySpark. We start by importing the libraries and loading the data:

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
import functools
from pyspark.ml.feature import OneHotEncoder

tableData = sqlContext.table('your_table_containing_products_feedback_information')

cols_select = ['prod_price',
               'prod_feat_1',
               'prod_feat_2',
               'cust_age',
               'prod_feat_3',
               'cust_region',
               'prod_type',
               'cust_sex',
               'cust_title',
               'feedback']
df = tableData.select(cols_select).dropDuplicates()

Data Pre-processing Steps

1. Skewed responses

There are three types of responses – Positive, Neutral and Negative. The first step we can do is see how they are distributed:

from matplotlib import pyplot as plt
%matplotlib inline

responses = df.groupBy('feedback').count().collect()
categories = [i[0] for i in responses]
counts = [i[1] for i in responses]

ind = np.array(range(len(categories)))
width = 0.35
plt.bar(ind, counts, width=width, color='r')

plt.ylabel('counts')
plt.title('Response distribution')
plt.xticks(ind + width/2., categories)

dis

The distribution looks quite skewed in the sense that ‘Positive’ cases are much more than ‘Neural’ and ‘Negative’ ones, and the volume of ‘Negative’ cases is extremely low.

The problem with ‘Negative’ cases here is most serious. However, since our job is to differentiate ‘Positive’ cases from either ‘Neutral’ or ‘Negative’ ones, why don’t we just combine the ‘Neutral’ and ‘Negative’ and form one group? So we choose to convert all ‘Neutral’ cases to ‘Negative’.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

binarize = lambda x: 'Negative' if x == 'Neutral' else x

udfValueToCategory = udf(binarize, StringType())
df = df.withColumn("binary_response", udfConvertResponse("feedback"))

Notice we have created a new column called ‘binary_response’, and use it to hold the binary cases of ‘Positive’ and ‘Negative’.

However, we have not solved the unbalanced data set issue since the ‘Positive’ cases are a lot more than ‘Negative’ ones. We will look into it later on by applying some strategies like down-sampling and ensemble of sub-samplings.

2. Filling NA values and casting data types

We convert numeric cols into ‘float’ or ‘int’ depending on the values. There are also categorical cols that contain null values, so we can fill in those with a’NA’ string as a new category, and leave the rest cols unchanged:

cols_select = ['prod_price',
               'prod_feat_1',
               'prod_feat_2',
               'cust_age',
               'prod_feat_3',
               'cust_region',
               'prod_type',
               'cust_sex',
               'cust_title',
               'feedback',
               'binary_response']

df = df.select(df.prod_price.cast('float'), # convert numeric cols (int or float) into a 'int' or 'float'
               df.prod_feat_1.cast('float'),
               df.prod_feat_2.cast('float'),
               df.cust_age.cast('int'),
               *cols_select[4:])

df = df.fillna({'cust_region': 'NA', 'cust_title': 'NA', 'prod_type': 'NA'}) # fill in 'N/A' entries for certain cols

3. Categorical col that has too many discrete values

We are also interested to see if there are any categorical cols that have too many levels (or distinct values).

for col in df.columns[4:-2]:
    print(col, df.select(col).distinct().count())
prod_feat_3 553
cust_region 12
prod_type 35
cust_sex 2
cust_title 12

The prod_feat_3 simply has too many levels (discrete values)! And a simple way to resolve the problem is to group all the categories that rank lower than a threshold into one category, namely “MinorityCategory”. Below is how we do:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

COUNT_THRESHOLD = 150 # threshold to filter 

# create a temporary col "count" as counting for each value of "prod_feat_3"
prodFeat3Count = df.groupBy("prod_feat_3").count()
df = df.join(prodFeat3Count, "prod_feat_3", "inner")

def convertMinority(originalCol, colCount):
    if colCount > COUNT_THRESHOLD:
        return originalCol
    else:
        return 'MinorityCategory'
createNewColFromTwo = udf(convertMinority, StringType())
df = df.withColumn('prod_feat_3_reduced', createNewColFromTwo(df['prod_feat_3'], df['count']))
df = df.drop('prod_feat_3')
df = df.drop('count')

4. One-hot encoding categorical cols

For those categorical cols, we will apply one-hot encoding method to convert them into dummy cols:

column_vec_in = ['prod_feat_3_reduced', 'cust_region', 'prod_type', 'cust_sex', 'cust_title']
column_vec_out = ['prod_feat_3_reduced_catVec','cust_region_catVec', 'prod_type_catVec','cust_sex_catVec',
'cust_title_catVec']

indexers = [StringIndexer(inputCol=x, outputCol=x+'_tmp')
            for x in column_vec_in ]

encoders = [OneHotEncoder(dropLast=False, inputCol=x+"_tmp", outputCol=y)
for x,y in zip(column_vec_in, column_vec_out)]
tmp = [[i,j] for i,j in zip(indexers, encoders)]
tmp = [i for sublist in tmp for i in sublist]

Finally, we can group all the predictors as ‘features’, and the response col as ‘label’. We then streamline the entire process using a function called ‘Pipeline’ which will do all the jobs sequentially for us.

# prepare labeled sets
cols_now = ['prod_price',
            'prod_feat_1',
            'prod_feat_2',
            'cust_age',
            'prod_feat_3_reduced_catVec',
            'cust_region_catVec',
            'prod_type_catVec',
            'cust_sex_catVec',
            'cust_title_catVec']
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features')
labelIndexer = StringIndexer(inputCol='binary_response', outputCol="label")
tmp += [assembler_features, labelIndexer]
pipeline = Pipeline(stages=tmp)

5. Split into training and validation sets.

This part is straightforward. We randomly select 80% as the training data, and the remaining 20% as test set or validation set.

Notice: It is important to set seed for the randomSplit() function in order to get same split for each run. (This is the crucial step for the success of the subsequent tests later on)

allData = pipeline.fit(df).transform(df)
allData.cache()
trainingData, testData = allData.randomSplit([0.8,0.2], seed=0) # need to ensure same split for each time
print("Distribution of Pos and Neg in trainingData is: ", trainingData.groupBy("label").count().take(3))
Distribution of Pos and Neg in trainingData is:  [Row(label=1.0, count=144014), Row(label=0.0, count=520771)]

Prediction and Evaluation of AUC

Train and prediction

We are using a Random Forest with numTrees = 200. And we train on trainingData and predict on testData.

rf = RF(labelCol='label', featuresCol='features',numTrees=200)
fit = rf.fit(trainingData)
transformed = fit.transform(testData)

AUC

Use the test data labels to calculate AUC score against the predicted probabilities:

from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = transformed.select(['probability', 'label'])

## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)

metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

The ROC score is (@numTrees=200): 0.6425143766095695

To visualize the AUC score, we can draw the ROC curve as below:

from sklearn.metrics import roc_curve, auc

fpr = dict()
tpr = dict()
roc_auc = dict()

y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]

fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)

%matplotlib inline
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()

roc_64

As we can see from above, the area between the blue line and dashed line measures the usefulness our model has gained over a random guess of flipping a coin.

The score is 0.64, which is not too optimistic

Well it could be because the features in the data set do not have enough information to train our model, and maybe we should talk with our BI team to see if it is possible to obtain additional insightful features.

But that’s not the focus of this tutorial anyway :)

To gain a better understanding of our model’s performance, we can plot the distribution of our predictions:

all_probs = transformed.select("probability").collect()
pos_probs = [i[0][0] for i in all_probs]
neg_probs = [i[0][1] for i in all_probs]

from matplotlib import pyplot as plt
%matplotlib inline

# pos
plt.hist(pos_probs, 50, normed=1, facecolor='green', alpha=0.75)
plt.xlabel('predicted_values')
plt.ylabel('Counts')
plt.title('Probabilities for positive cases')
plt.grid(True)
plt.show()

# neg
plt.hist(neg_probs, 50, normed=1, facecolor='green', alpha=0.75)
plt.xlabel('predicted_values')
plt.ylabel('Counts')
plt.title('Probabilities for negative cases')
plt.grid(True)
plt.show()

As can be seen, the predicted probabilities are highly skewed towards the Positive. This is not surprising as this is demonstrated by the fact that the percentage of data which are positive is around 79%!

It is time to dig into the unbalanced data issue.

Up- and Down-Samplings

Since the data set is highly skewed – we have more Positive training samples than Negative training samples – we will need to try out some strategies that counter the unbalance.

Unfortunately, the Random Forest implementation in spark’s mllib package doesn’t have the ‘Class Weights‘ parameter that we could tune, which could have taken care of the problem internally within the model itself (i.e. it penalizes more when the model mis-classifies a minority class than a majority one). Thus we will need to manually implement some naive methods from scratch.

The simplest ways we can do are up- or down-samplings. Up-sampling means to randomly sample (with replacement) some training cases from the minor classes (the negative cases in this case) as additional data points added into training data, whereas down-sampling means to randomly filter out some of the majority cases. Both methods will tend to make the training data more balanced (however at the cost of bias and overfitting).

Down-sampling

Here’s the way to implement down-sampling

from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

RATIO_ADJUST = 2.0 ## ratio of pos to neg in the df_subsample

counts = trainingData.select('binary_response').groupBy('binary_response').count().collect()
higherBound = counts[0][1]
TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * higherBound)

randGen = lambda x: randint(0, higherBound) if x == 'Positive' else -1

udfRandGen = udf(randGen, IntegerType())
trainingData = trainingData.withColumn("randIndex", udfRandGen("binary_response"))
df_subsample = trainingData.filter(trainingData['randIndex'] < TRESHOLD_TO_FILTER)
df_subsample = df_subsample.drop('randIndex')

print("Distribution of Pos and Neg cases of the down-sampled training data are: \n", df_subsample.groupBy("label").count().take(3))
Distribution of Pos and Neg cases of the down-sampled training data are: 
 [Row(label=1.0, count=144014), Row(label=0.0, count=287482)]

Explanation – For trainingData: we randomly assigned an int as ‘randIndex’ to each majority data point, and then filter out those whose ‘randIndex’ is larger than a threshold we have calculated, so that the data points from the majority class – ‘Positive’ – will be much less. However, we won’t touch the data points from the minority class – ‘Negative’ – so the ‘count’ value in Row(label=1.0, count=144014) shown above is exactly the same as previously for trainingData.

For testData: We will not do anything about it now.

Same way for training and validating as before:

## training and prediction
rf = RF(labelCol='label', featuresCol='features',numTrees=200)
fit = rf.fit(df_subsample)
transformed = fit.transform(testData)

Results:

## results and evaluation
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = transformed.select(['probability', 'label'])

results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)

metrics = metric(scoreAndLabels)
print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)
The ROC score is (@numTrees=200):  0.6463328674547113

Awesome! Our method seems to work out and the ROC improves slightly to 0.646.

I won’t paste the code for up-sampling cause it’s essentially quite straightforward. It did improve the score (slightly) as well!

Ensemble of Down-samplings

Let’s take another look at the down-sampling method above.

Since each time when we do a subsampling of trainingData, we will be throwing away some data points that belong to the “Positive” class, thus we will miss out information which could potentially be used to train our model.

Therefore, we want to take multiple down-samplings of the trainingData, each of which will give us a slightly different data set to train our model. In the end, we will ensemble, or take the average of, the total prediction results from all the models trained using different data sets, and hopefully to get a better overall predictions.

Let’s jolt down the ideas in codes:

from numpy.random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric

RATIO_ADJUST = 3.0 ## ratio of pos to neg in the df_subsample
TOTAL_MODELS = 10
total_results = None
final_result = None

#counts = trainingData.select('binary_response').groupBy('binary_response').count().collect()
highestBound = counts[0][1]
TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * highestBound)
## UDF
randGen = lambda x: randint(0, highestBound) if x == 'Positive' else -1
udfRandGen = udf(randGen, IntegerType())

## ensembling
for N in range(TOTAL_MODELS):
    print("Round: ", N)
    trainingDataIndexed = trainingData.withColumn("randIndex", udfRandGen("binary_response"))
    df_subsample = trainingDataIndexed.filter(trainingDataIndexed['randIndex'] < TRESHOLD_TO_FILTER).drop('randIndex')
    ## training and prediction
    rf = RF(labelCol='label', featuresCol='features',numTrees=200)
    fit = rf.fit(df_subsample)
    transformed = fit.transform(testData)
    result_pair = transformed.select(['probability', 'label'])
    result_pair = result_pair.collect()
    this_result = np.array([float(i[0][1]) for i in result_pair])
    this_result = list(this_result.argsort().argsort() / (float(len(this_result) + 1)))

    ## sum up all the predictions, and average to get final_result
    if total_results is None:
       total_results = this_result
    else:
       total_results = [i+j for i, j in zip(this_result, total_results)]
    final_result = [i/(N+1) for i in total_results]

    results_list = [(float(i), float(j[1])) for i, j in zip(final_result, result_pair)]
    scoreAndLabels = sc.parallelize(results_list)

    metrics = metric(scoreAndLabels)
 print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

Explanation: Basically, the algorithm is very similar to down-sampling: we are doing down-sampling multiple times and average the total results in terms of ranking! (meaning instead of raw predicted probabilities, for each round we rank the probabilities first and then take the average between rounds)

Round:  0
The ROC score is (@numTrees=200):  0.6456296366007628
Round:  1
The ROC score is (@numTrees=200):  0.6475210701955153
Round:  2
The ROC score is (@numTrees=200):  0.6488169677072237
Round:  3
The ROC score is (@numTrees=200):  0.6490333812262444
Round:  4
The ROC score is (@numTrees=200):  0.6490997896881725
Round:  5
The ROC score is (@numTrees=200):  0.648347665785477
Round:  6
The ROC score is (@numTrees=200):  0.6486544723987375
Round:  7
The ROC score is (@numTrees=200):  0.6492410064530146
Round:  8
The ROC score is (@numTrees=200):  0.6493154941849306
Round:  9
The ROC score is (@numTrees=200):  0.6483560027574977

The ensemble approach seems to give another boost to the AUC score!

WARNING of using down- or up-sampling: If what you care is only the ROC, which measures the probability rankings of each case, rather than the actual probability of being Positive or Negative, it is ok to use subsampling methods.

However,  if the actually probability matters to you, applying subsampling will distort the predicted probability distribution, and the actual probability might be wrong or over- / under-estimated.

Other approach to handling unbalanced dataset

Of course these are just the starting and sometimes naive approaches for handling unbalanced classes. There are some useful blogs or paper which talk about different strategies in greater details.

Alternately, you could try sklearn package’s Random Forest on PySpark which have class weight parameter to tune.

Leave a comment if you have questions or some ideas. :)

13 thoughts on “PySpark tutorial – a case study using Random Forest on unbalanced dataset

  1. Can you share the sample data in a link so that we can run the exercise on our own. Thanks in advance.

    Like

    1. The original data is our proprietary data, whereas the outputs shown inside the tutorial are masked so that both its values and names are fake, only for demonstrate purpose.
      Therefore am sorry to say I don’t have a sample data to show at the moment.

      Like

  2. Hello,
    Very informative article. Can you also share how to get the Variable Importance from RF? Thanks

    Like

    1. Hi,
      Sorry as far as I know feature importance is not implemented in PySpark for random forest.

      Like

  3. for x,y in zip(column_vec_in, column_vec_out)]
    TypeError: __init__() got an unexpected keyword argument ‘outputCol’…………….. Facing this error while implementing the One-hot encoding technique

    Like

    1. You should run the complete line “encoders = [OneHotEncoder(dropLast=False, inputCol=x+”_tmp”, outputCol=y)
      for x,y in zip(column_vec_in, column_vec_out)]”

      instead of part of it I guess

      Like

      1. column_vec_in = [‘business_type’]
        column_vec_out = [‘business_type_Vec’]

        indexers = [StringIndexer(inputCol=x, outputCol=x + ‘_tmp’)
        for x in column_vec_in]

        encoders = [OneHotEncoder(dropLast=False, inputCol=x + “_tmp”, outputCol=y)
        for x, y in zip(column_vec_in, column_vec_out)]
        tmp = [[i, j] for i, j in zip(indexers, encoders)]
        tmp = [i for sublist in tmp for i in sublist]

        cols_now = [‘business_type_Vec’]
        assembler_features = VectorAssembler(inputCols=cols_now, outputCol=’features’)
        labelIndexer = StringIndexer(inputCol=’binary_response’, outputCol=”label”)
        tmp += [assembler_features, labelIndexer]
        pipeline = Pipeline(stages=tmp)

        I just Implemented as you implemented. How can Iook into the new dataframes which has dummy columns also.

        Like

      2. You have created the pipeline but not yet fit it onto your df. running: allData = pipeline.fit(df).transform(df)
        will fit onto your df and create the new sparse dataframe allData which has the one-hot cols.

        Like

  4. Thank you for sharing this, I was trying to figure out a way to group categorical AND numerical columns together using vectorassembler. And your code worked perfectly!

    Like

  5. Hi Weimin:

    I do have a question regarding the process you use for OneHotEncoding and building the pipeline. Specifically, can you explain what these two lines do (can’t find it on Spark documentation):

    tmp = [[i,j] for i,j in zip(indexers, encoders)]
    tmp = [i for sublist in tmp for i in sublist]

    In your code, it looks like these two lines serve as reference for the “tmp += [assembler_features, labelIndexer]” , and the ‘sublist’ is each row of the dataframe after it’s been indexed and encoded? I’m a bit confused about the order of execution of these lines, since i don’t see the .transform() on the encoder and indexer commands.

    Thanks!

    Like

    1. Hi Jenny,

      It’s a bit python trick I applied by reference to this:

      http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python

      Basically, I am trying to make tmp a flat list instead of list of lists, so “tmp = [i for sublist in tmp for i in sublist]” will flatten it.

      Like

      1. Thanks for the explanation!

        Like

  6. I’m trying to scale this for multi-class classification.

    I have used a StringIndexer for the output column like so..
    labelIndexer = StringIndexer(inputCol=’multi_response’, outputCol=”label”)

    I seem to be always getting a binary prediction…

    Thoughts on what I might be missing?

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this:
search previous next tag category expand menu location phone mail time cart zoom edit close