I would like to demonstrate a case tutorial of building a predictive model that predicts whether a customer will like a certain product. The original model with the real world data has been tested on the platform of spark, but I will be using a mock-up data set for this tutorial.

Since unbalanced data set is a very common in real business world, this tutorial will specifically showcase some of the tactics that could effectively deal with such challenge using PySpark.

Concretely, this session will cover the following topics:

- Case Scenario and Data set
- Data Pre-processings – NAs replacement, one-hot encoding, pipe-lining, training and validation splits, etc.
- Using mllib random forest classifier for binary classification.
- Measuring performance using AUC score.
- Different strategies to handle the problem of unbalanced dataset:
- Down-Sampling and Up-Sampling
- Ensemble of Down-Sampling models

**The Case Scenario **

Let’s assume your manager one day approaches you and asks you to build a Product Recommendation Engine. The engine should be able to predict the probabilities for each product being liked by a customer, when relevant data such as customer’s details, product’s info and so on is provided. And your model will then recommend the top 5 products based on those probabilities.

Your stakeholder is business department who will eventually use your model for recommendations. Specifically, each Sales Rep will ‘consult’ your model by telling it what type of customer she is going to visit before she actually sets on her trip. She will then bring along the recommended products list for the pitch, hoping that her trip will become fruitful.

**The Data **

You receive the data from your friendly *BI team*. Thankfully, they made your life easier by crunching all the data into one nice clean csv table so that you won’t need to painfully join and merge from different tables. Peeking at the top 3 rows of the table shows you the following:

Some of the predictors represent properties of products such as *product_price* or *product_features*, whereas others contain information of the customer, e.g. *customer title*, *age*.

On the other side, your *Big Data* *team* has set up the spark platform for you, and ingested the table into Data Lake so you can access it easily in PySpark. We start by importing the libraries and loading the data:

from pyspark.ml import Pipeline from pyspark.ml.classification import RandomForestClassifier as RF from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator from pyspark.ml.tuning import CrossValidator, ParamGridBuilder import numpy as np import functools from pyspark.ml.feature import OneHotEncoder tableData = sqlContext.table('your_table_containing_products_feedback_information') cols_select = ['prod_price', 'prod_feat_1', 'prod_feat_2', 'cust_age', 'prod_feat_3', 'cust_region', 'prod_type', 'cust_sex', 'cust_title', 'feedback'] df = tableData.select(cols_select).dropDuplicates()

## Data Pre-processing Steps

### 1. Skewed responses

There are three types of responses – ** Positive, Neutral and Negative**. The first step we can do is see how they are distributed:

from matplotlib import pyplot as plt %matplotlib inline responses = df.groupBy('feedback').count().collect() categories = [i[0] for i in responses] counts = [i[1] for i in responses] ind = np.array(range(len(categories))) width = 0.35 plt.bar(ind, counts, width=width, color='r') plt.ylabel('counts') plt.title('Response distribution') plt.xticks(ind + width/2., categories)

The distribution looks quite skewed in the sense that ‘Positive’ cases are much more than ‘Neural’ and ‘Negative’ ones, and the volume of ‘Negative’ cases is extremely low.

The problem with ‘Negative’ cases here is most serious. However, since our job is to differentiate ‘Positive’ cases from either ‘Neutral’ or ‘Negative’ ones, why don’t we just combine the ‘Neutral’ and ‘Negative’ and form one group? So we choose to convert all ‘Neutral’ cases to ‘Negative’.

from pyspark.sql.functions import udf from pyspark.sql.types import StringType binarize = lambda x: 'Negative' if x == 'Neutral' else x udfValueToCategory = udf(binarize, StringType()) df = df.withColumn("binary_response", udfConvertResponse("feedback"))

Notice we have created a new column called ‘binary_response’, and use it to hold the binary cases of ‘Positive’ and ‘Negative’.

However, we have not solved the unbalanced data set issue since the ‘Positive’ cases are a lot more than ‘Negative’ ones. We will look into it later on by applying some strategies like *down-sampling* and* ensemble of sub-samplings*.

### 2. Filling NA values and casting data types

We convert numeric cols into ‘float’ or ‘int’ depending on the values. There are also categorical cols that contain null values, so we can fill in those with a’NA’ string as a new category, and leave the rest cols unchanged:

cols_select = ['prod_price', 'prod_feat_1', 'prod_feat_2', 'cust_age', 'prod_feat_3', 'cust_region', 'prod_type', 'cust_sex', 'cust_title', 'feedback', 'binary_response'] df = df.select(df.prod_price.cast('float'), # convert numeric cols (int or float) into a 'int' or 'float' df.prod_feat_1.cast('float'), df.prod_feat_2.cast('float'), df.cust_age.cast('int'), *cols_select[4:]) df = df.fillna({'cust_region': 'NA', 'cust_title': 'NA', 'prod_type': 'NA'}) # fill in 'N/A' entries for certain cols

### 3. Categorical col that has too many discrete values

We are also interested to see if there are any categorical cols that have too many levels (or distinct values).

for col in df.columns[4:-2]: print(col, df.select(col).distinct().count())

prod_feat_3 553 cust_region 12 prod_type 35 cust_sex 2 cust_title 12

The prod_feat_3 simply has too many levels (discrete values)! And a simple way to resolve the problem is to group all the categories that rank lower than a threshold into one category, namely “MinorityCategory”. Below is how we do:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType COUNT_THRESHOLD = 150 # threshold to filter # create a temporary col "count" as counting for each value of "prod_feat_3" prodFeat3Count = df.groupBy("prod_feat_3").count() df = df.join(prodFeat3Count, "prod_feat_3", "inner") def convertMinority(originalCol, colCount): if colCount > COUNT_THRESHOLD: return originalCol else: return 'MinorityCategory' createNewColFromTwo = udf(convertMinority, StringType()) df = df.withColumn('prod_feat_3_reduced', createNewColFromTwo(df['prod_feat_3'], df['count'])) df = df.drop('prod_feat_3') df = df.drop('count')

### 4. One-hot encoding categorical cols

For those categorical cols, we will apply one-hot encoding method to convert them into dummy cols:

column_vec_in = ['prod_feat_3_reduced', 'cust_region', 'prod_type', 'cust_sex', 'cust_title'] column_vec_out = ['prod_feat_3_reduced_catVec','cust_region_catVec', 'prod_type_catVec','cust_sex_catVec', 'cust_title_catVec'] indexers = [StringIndexer(inputCol=x, outputCol=x+'_tmp') for x in column_vec_in ] encoders = [OneHotEncoder(dropLast=False, inputCol=x+"_tmp", outputCol=y) for x,y in zip(column_vec_in, column_vec_out)] tmp = [[i,j] for i,j in zip(indexers, encoders)] tmp = [i for sublist in tmp for i in sublist]

Finally, we can group all the predictors as ‘features’, and the response col as ‘label’. We then streamline the entire process using a function called ‘Pipeline’ which will do all the jobs sequentially for us.

# prepare labeled sets cols_now = ['prod_price', 'prod_feat_1', 'prod_feat_2', 'cust_age', 'prod_feat_3_reduced_catVec', 'cust_region_catVec', 'prod_type_catVec', 'cust_sex_catVec', 'cust_title_catVec'] assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features') labelIndexer = StringIndexer(inputCol='binary_response', outputCol="label") tmp += [assembler_features, labelIndexer] pipeline = Pipeline(stages=tmp)

### 5. Split into training and validation sets.

This part is straightforward. We randomly select 80% as the training data, and the remaining 20% as test set or validation set.

Notice: It is important to set seed for the randomSplit() function in order to get same split for each run. (This is the crucial step for the success of the subsequent tests later on)

allData = pipeline.fit(df).transform(df) allData.cache() trainingData, testData = allData.randomSplit([0.8,0.2], seed=0) # need to ensure same split for each time print("Distribution of Pos and Neg in trainingData is: ", trainingData.groupBy("label").count().take(3))

Distribution of Pos and Neg in trainingData is: [Row(label=1.0, count=144014), Row(label=0.0, count=520771)]

## Prediction and Evaluation of AUC

### Train and prediction

We are using a Random Forest with numTrees = 200. And we train on trainingData and predict on testData.

rf = RF(labelCol='label', featuresCol='features',numTrees=200) fit = rf.fit(trainingData) transformed = fit.transform(testData)

### AUC

Use the test data labels to calculate AUC score against the predicted probabilities:

from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric results = transformed.select(['probability', 'label']) ## prepare score-label set results_collect = results.collect() results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect] scoreAndLabels = sc.parallelize(results_list) metrics = metric(scoreAndLabels) print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

The ROC score is (@numTrees=200): 0.6425143766095695

To visualize the AUC score, we can draw the ROC curve as below:

from sklearn.metrics import roc_curve, auc fpr = dict() tpr = dict() roc_auc = dict() y_test = [i[1] for i in results_list] y_score = [i[0] for i in results_list] fpr, tpr, _ = roc_curve(y_test, y_score) roc_auc = auc(fpr, tpr) %matplotlib inline plt.figure() plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc) plt.plot([0, 1], [0, 1], 'k--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('Receiver operating characteristic example') plt.legend(loc="lower right") plt.show()

As we can see from above, the area between the blue line and dashed line measures the usefulness our model has gained over a random guess of flipping a coin.

The score is 0.64, which is not too optimistic

Well it could be because the features in the data set do not have enough information to train our model, and maybe we should talk with our BI team to see if it is possible to obtain additional insightful features.

But that’s not the focus of this tutorial anyway :)

To gain a better understanding of our model’s performance, we can plot the distribution of our predictions:

all_probs = transformed.select("probability").collect() pos_probs = [i[0][0] for i in all_probs] neg_probs = [i[0][1] for i in all_probs] from matplotlib import pyplot as plt %matplotlib inline # pos plt.hist(pos_probs, 50, normed=1, facecolor='green', alpha=0.75) plt.xlabel('predicted_values') plt.ylabel('Counts') plt.title('Probabilities for positive cases') plt.grid(True) plt.show() # neg plt.hist(neg_probs, 50, normed=1, facecolor='green', alpha=0.75) plt.xlabel('predicted_values') plt.ylabel('Counts') plt.title('Probabilities for negative cases') plt.grid(True) plt.show()

As can be seen, the predicted probabilities are highly skewed towards the Positive. This is not surprising as this is demonstrated by the fact that the percentage of data which are positive is around 79%!

It is time to dig into the unbalanced data issue.

## Up- and Down-Samplings

Since the data set is highly skewed – we have more Positive training samples than Negative training samples – we will need to try out some strategies that counter the unbalance.

Unfortunately, the Random Forest implementation in spark’s mllib package doesn’t have the ‘**Class Weights**‘ parameter that we could tune, which could have taken care of the problem internally within the model itself (i.e. it penalizes more when the model mis-classifies a minority class than a majority one). Thus we will need to manually implement some naive methods from scratch.

The simplest ways we can do are up- or down-samplings. Up-sampling means to randomly sample (with replacement) some training cases from the minor classes (the negative cases in this case) as additional data points added into training data, whereas down-sampling means to randomly filter out some of the majority cases. Both methods will tend to make the training data more balanced (however at the cost of bias and overfitting).

### Down-sampling

Here’s the way to implement down-sampling

from numpy.random import randint from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType RATIO_ADJUST = 2.0 ## ratio of pos to neg in the df_subsample counts = trainingData.select('binary_response').groupBy('binary_response').count().collect() higherBound = counts[0][1] TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * higherBound) randGen = lambda x: randint(0, higherBound) if x == 'Positive' else -1 udfRandGen = udf(randGen, IntegerType()) trainingData = trainingData.withColumn("randIndex", udfRandGen("binary_response")) df_subsample = trainingData.filter(trainingData['randIndex'] < TRESHOLD_TO_FILTER) df_subsample = df_subsample.drop('randIndex') print("Distribution of Pos and Neg cases of the down-sampled training data are: \n", df_subsample.groupBy("label").count().take(3))

Distribution of Pos and Neg cases of the down-sampled training data are: [Row(label=1.0, count=144014), Row(label=0.0, count=287482)]

**Explanation – For trainingData:** we randomly assigned an int as ‘randIndex’ to each majority data point, and then filter out those whose ‘randIndex’ is larger than a threshold we have calculated, so that the data points from the majority class – ‘Positive’ – will be much less. However, we won’t touch the data points from the minority class – ‘Negative’ – so the ‘count’ value in Row(label=1.0, count=**144014**) shown above is exactly the same as previously for trainingData.

**For testData**: We will not do anything about it now.

Same way for training and validating as before:

## training and prediction rf = RF(labelCol='label', featuresCol='features',numTrees=200) fit = rf.fit(df_subsample) transformed = fit.transform(testData)

Results:

## results and evaluation from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric results = transformed.select(['probability', 'label']) results_collect = results.collect() results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect] scoreAndLabels = sc.parallelize(results_list) metrics = metric(scoreAndLabels) print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

Awesome! Our method seems to work out and the ROC improves slightly to 0.646.

*I won’t paste the code for up-sampling cause it’s essentially quite straightforward. It did improve the score (slightly) as well!*

### Ensemble of Down-samplings

Let’s take another look at the down-sampling method above.

Since each time when we do a subsampling of trainingData, we will be throwing away some data points that belong to the “Positive” class, thus we will miss out information which could potentially be used to train our model.

Therefore, we want to take multiple down-samplings of the trainingData, each of which will give us a slightly different data set to train our model. In the end, we will ensemble, or take the average of, the total prediction results from all the models trained using different data sets, and hopefully to get a better overall predictions.

Let’s jolt down the ideas in codes:

from numpy.random import randint from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric RATIO_ADJUST = 3.0 ## ratio of pos to neg in the df_subsample TOTAL_MODELS = 10 total_results = None final_result = None #counts = trainingData.select('binary_response').groupBy('binary_response').count().collect() highestBound = counts[0][1] TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * highestBound) ## UDF randGen = lambda x: randint(0, highestBound) if x == 'Positive' else -1 udfRandGen = udf(randGen, IntegerType()) ## ensembling for N in range(TOTAL_MODELS): print("Round: ", N) trainingDataIndexed = trainingData.withColumn("randIndex", udfRandGen("binary_response")) df_subsample = trainingDataIndexed.filter(trainingDataIndexed['randIndex'] < TRESHOLD_TO_FILTER).drop('randIndex') ## training and prediction rf = RF(labelCol='label', featuresCol='features',numTrees=200) fit = rf.fit(df_subsample) transformed = fit.transform(testData) result_pair = transformed.select(['probability', 'label']) result_pair = result_pair.collect() this_result = np.array([float(i[0][1]) for i in result_pair]) this_result = list(this_result.argsort().argsort() / (float(len(this_result) + 1))) ## sum up all the predictions, and average to get final_result if total_results is None: total_results = this_result else: total_results = [i+j for i, j in zip(this_result, total_results)] final_result = [i/(N+1) for i in total_results] results_list = [(float(i), float(j[1])) for i, j in zip(final_result, result_pair)] scoreAndLabels = sc.parallelize(results_list) metrics = metric(scoreAndLabels) print("The ROC score is (@numTrees=200): ", metrics.areaUnderROC)

**Explanation**: Basically, the algorithm is very similar to down-sampling: we are doing down-sampling multiple times and average the total results in terms of ranking! (meaning instead of raw predicted probabilities, for each round we rank the probabilities first and then take the average between rounds)

Round: 0 The ROC score is (@numTrees=200): 0.6456296366007628 Round: 1 The ROC score is (@numTrees=200): 0.6475210701955153 Round: 2 The ROC score is (@numTrees=200): 0.6488169677072237 Round: 3 The ROC score is (@numTrees=200): 0.6490333812262444 Round: 4 The ROC score is (@numTrees=200): 0.6490997896881725 Round: 5 The ROC score is (@numTrees=200): 0.648347665785477 Round: 6 The ROC score is (@numTrees=200): 0.6486544723987375 Round: 7 The ROC score is (@numTrees=200): 0.6492410064530146 Round: 8 The ROC score is (@numTrees=200): 0.6493154941849306 Round: 9 The ROC score is (@numTrees=200): 0.6483560027574977

The ensemble approach seems to give another boost to the AUC score!

**WARNING **of using down- or up-sampling: If what you care is **only the ROC**, which measures the probability **rankings** of each case, rather than the **actual probability** of being Positive or Negative, it is ok to use subsampling methods.

*However, if the actually probability matters to you, applying subsampling will distort the predicted probability distribution, and the actual probability might be wrong or over- / under-estimated.*

## Other approach to handling unbalanced dataset

Of course these are just the starting and sometimes naive approaches for handling unbalanced classes. There are some useful blogs or paper which talk about different strategies in greater details.

Alternately, you could try sklearn package’s Random Forest on PySpark which have class weight parameter to tune.

Leave a comment if you have questions or some ideas. :)

Can you share the sample data in a link so that we can run the exercise on our own. Thanks in advance.

LikeLike

The original data is our proprietary data, whereas the outputs shown inside the tutorial are masked so that both its values and names are fake, only for demonstrate purpose.

Therefore am sorry to say I don’t have a sample data to show at the moment.

LikeLike

Hello,

Very informative article. Can you also share how to get the Variable Importance from RF? Thanks

LikeLike

Hi,

Sorry as far as I know feature importance is not implemented in PySpark for random forest.

LikeLike

for x,y in zip(column_vec_in, column_vec_out)]

TypeError: __init__() got an unexpected keyword argument ‘outputCol’…………….. Facing this error while implementing the One-hot encoding technique

LikeLike

You should run the complete line “encoders = [OneHotEncoder(dropLast=False, inputCol=x+”_tmp”, outputCol=y)

for x,y in zip(column_vec_in, column_vec_out)]”

instead of part of it I guess

LikeLike

column_vec_in = [‘business_type’]

column_vec_out = [‘business_type_Vec’]

indexers = [StringIndexer(inputCol=x, outputCol=x + ‘_tmp’)

for x in column_vec_in]

encoders = [OneHotEncoder(dropLast=False, inputCol=x + “_tmp”, outputCol=y)

for x, y in zip(column_vec_in, column_vec_out)]

tmp = [[i, j] for i, j in zip(indexers, encoders)]

tmp = [i for sublist in tmp for i in sublist]

cols_now = [‘business_type_Vec’]

assembler_features = VectorAssembler(inputCols=cols_now, outputCol=’features’)

labelIndexer = StringIndexer(inputCol=’binary_response’, outputCol=”label”)

tmp += [assembler_features, labelIndexer]

pipeline = Pipeline(stages=tmp)

I just Implemented as you implemented. How can Iook into the new dataframes which has dummy columns also.

LikeLike

You have created the pipeline but not yet fit it onto your df. running: allData = pipeline.fit(df).transform(df)

will fit onto your df and create the new sparse dataframe allData which has the one-hot cols.

LikeLike

Thank you for sharing this, I was trying to figure out a way to group categorical AND numerical columns together using vectorassembler. And your code worked perfectly!

LikeLike

Hi Weimin:

I do have a question regarding the process you use for OneHotEncoding and building the pipeline. Specifically, can you explain what these two lines do (can’t find it on Spark documentation):

tmp = [[i,j] for i,j in zip(indexers, encoders)]

tmp = [i for sublist in tmp for i in sublist]

In your code, it looks like these two lines serve as reference for the “tmp += [assembler_features, labelIndexer]” , and the ‘sublist’ is each row of the dataframe after it’s been indexed and encoded? I’m a bit confused about the order of execution of these lines, since i don’t see the .transform() on the encoder and indexer commands.

Thanks!

LikeLike

Hi Jenny,

It’s a bit python trick I applied by reference to this:

http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python

Basically, I am trying to make tmp a flat list instead of list of lists, so “tmp = [i for sublist in tmp for i in sublist]” will flatten it.

LikeLike

Thanks for the explanation!

LikeLike

I’m trying to scale this for multi-class classification.

I have used a StringIndexer for the output column like so..

labelIndexer = StringIndexer(inputCol=’multi_response’, outputCol=”label”)

I seem to be always getting a binary prediction…

Thoughts on what I might be missing?

LikeLike

Thanks Weimin, this is a very helpful example. I’ve been trying to follow your downsampling logic for the THRESHOLD_TO_FILTER calculation. In order to get desired results, I had to modify slightly your code from:

highestBound = counts[0][1]

TRESHOLD_TO_FILTER = int(RATIO_ADJUST * float(counts[1][1]) / counts[0][1] * highestBound)

to:

higherBound = counts[0][1]

TRESHOLD_TO_FILTER = int(adjust_ratio * float(counts[0][1]) / counts[1][1] * higherBound)

Does this change agree with your data or am I misinterpreting your logic?

Thanks!

LikeLike