In this example, we will attempt to predict whether a pull request will be merged. The dataset we will be using is a newer version of the one published at MSR 2014 by Gousios and Zaidman, available here. The dataset comes in CSV format.
For this task, we will be using PySpark and the Spark ML library, to demonstrate how nicely Spark integrates with the extremely rich Python Data Science/Big Data ecosystem.
We begin our exploration by importing typical libraries from the Python world:
from ggplot import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
/usr/local/lib/python2.7/site-packages/ggplot/utils.py:81: FutureWarning: pandas.tslib is deprecated and will be removed in a future version. You can access Timestamp as pandas.Timestamp pd.tslib.Timestamp, /usr/local/lib/python2.7/site-packages/ggplot/stats/smoothers.py:4: FutureWarning: The pandas.lib module is deprecated and will be removed in a future version. These are private functions and can be accessed from pandas._libs.lib instead from pandas.lib import Timestamp /usr/local/lib/python2.7/site-packages/statsmodels/compat/pandas.py:56: FutureWarning: The pandas.core.datetools module is deprecated and will be removed in a future version. Please use the pandas.tseries module instead. from pandas.core import datetools
--------------------------------------------------------------------------- ImportError Traceback (most recent call last) <ipython-input-1-819c9ac1830d> in <module>() 1 from ggplot import * ----> 2 from pyspark.sql.types import * 3 from pyspark.sql.functions import * 4 5 import pandas as pd ImportError: No module named pyspark.sql.types
In contrast to our previous examples, we will be using higher level components; one of them is the Spark CSV reader. This allows us to read a CSV file into a Spark Dataframe in one line; most importantly, it can automatically infer the schema (data types) from our CSV, which saves us lots of boring work!
df = sqlContext.read.csv("hdfs://bdp1:8020/pullreqs.csv",
sep=",", header=True, inferSchema=True).cache()
df = sqlContext.read.csv("../datasets/pullreqs.csv",
sep=",",
header=True,
inferSchema=True).cache()
sqlContext.registerDataFrameAsTable(df, "pullreqs")
We are also using Spark's SQLContext
instead of our typical SparkContext
. This allows us to run SQL queries directly on top our CSV data! Spark will in the background distribute the processing load on all workers.
PySpark is integrated to the Pydata ecosystem; this allows us to use the toPandas()
function,
which will convert a distributed Spark DataFrame to a local Pandas dataframe. The are many legitimate reasons for
which this is a good thing: we can for example use the data for plotting or apply some sckit-learn machine learning algorith on top of it. In our case, we exploit the fact that Jupyter, our notebook system, knows about Pandas dataframes to print the results locally.
df.printSchema()
root |-- _c0: integer (nullable = true) |-- pull_req_id: integer (nullable = true) |-- project_name: string (nullable = true) |-- lang: string (nullable = true) |-- github_id: integer (nullable = true) |-- created_at: integer (nullable = true) |-- merged_at: string (nullable = true) |-- closed_at: integer (nullable = true) |-- lifetime_minutes: integer (nullable = true) |-- mergetime_minutes: string (nullable = true) |-- merged_using: string (nullable = true) |-- conflict: boolean (nullable = true) |-- forward_links: boolean (nullable = true) |-- intra_branch: boolean (nullable = true) |-- description_length: integer (nullable = true) |-- num_commits: integer (nullable = true) |-- num_commits_open: integer (nullable = true) |-- num_pr_comments: integer (nullable = true) |-- num_issue_comments: integer (nullable = true) |-- num_commit_comments: integer (nullable = true) |-- num_comments: integer (nullable = true) |-- num_commit_comments_open: integer (nullable = true) |-- num_participants: integer (nullable = true) |-- files_added_open: integer (nullable = true) |-- files_deleted_open: integer (nullable = true) |-- files_modified_open: integer (nullable = true) |-- files_changed_open: integer (nullable = true) |-- src_files_open: integer (nullable = true) |-- doc_files_open: integer (nullable = true) |-- other_files_open: integer (nullable = true) |-- files_added: integer (nullable = true) |-- files_deleted: integer (nullable = true) |-- files_modified: integer (nullable = true) |-- files_changed: integer (nullable = true) |-- src_files: integer (nullable = true) |-- doc_files: integer (nullable = true) |-- other_files: integer (nullable = true) |-- src_churn_open: integer (nullable = true) |-- test_churn_open: integer (nullable = true) |-- src_churn: integer (nullable = true) |-- test_churn: integer (nullable = true) |-- new_entropy: double (nullable = true) |-- entropy_diff: double (nullable = true) |-- commits_on_files_touched: integer (nullable = true) |-- commits_to_hottest_file: integer (nullable = true) |-- hotness: double (nullable = true) |-- at_mentions_description: integer (nullable = true) |-- at_mentions_comments: integer (nullable = true) |-- perc_external_contribs: double (nullable = true) |-- sloc: integer (nullable = true) |-- test_lines_per_kloc: double (nullable = true) |-- test_cases_per_kloc: double (nullable = true) |-- asserts_per_kloc: double (nullable = true) |-- stars: integer (nullable = true) |-- team_size: integer (nullable = true) |-- workload: integer (nullable = true) |-- ci: string (nullable = true) |-- requester: string (nullable = true) |-- closer: string (nullable = true) |-- merger: string (nullable = true) |-- prev_pullreqs: integer (nullable = true) |-- requester_succ_rate: double (nullable = true) |-- followers: integer (nullable = true) |-- main_team_member: boolean (nullable = true) |-- social_connection: boolean (nullable = true) |-- prior_interaction_issue_events: integer (nullable = true) |-- prior_interaction_issue_comments: integer (nullable = true) |-- prior_interaction_pr_events: integer (nullable = true) |-- prior_interaction_pr_comments: integer (nullable = true) |-- prior_interaction_commits: integer (nullable = true) |-- prior_interaction_commit_comments: integer (nullable = true) |-- first_response: integer (nullable = true) |-- prior_interaction_comments: integer (nullable = true) |-- prior_interaction_events: integer (nullable = true) |-- has_ci: boolean (nullable = true) |-- merged: boolean (nullable = true)
sqlContext.sql("""select merged_using, count(*) as occurences
from pullreqs
group by merged_using
order by occurences desc""").show()
+--------------------+----------+ | merged_using|occurences| +--------------------+----------+ | github| 364528| | commits_in_master| 342339| | unknown| 138566| | merged_in_comments| 29273| |commit_sha_in_com...| 23234| | fixes_in_commit| 18125| +--------------------+----------+
The above SQL query can be also defined programmatically. At the background, Spark is compiling SQL to a series of calls like the ones we show below.
df.\
groupBy(df.merged_using).\
agg({'merged_using': 'count'}).\
orderBy(desc("count(merged_using)")).\
show()
+--------------------+-------------------+ | merged_using|count(merged_using)| +--------------------+-------------------+ | github| 364528| | commits_in_master| 342339| | unknown| 138566| | merged_in_comments| 29273| |commit_sha_in_com...| 23234| | fixes_in_commit| 18125| +--------------------+-------------------+
sqlContext.sql("""select count(distinct(project_name)) as num_projects
from pullreqs""").show()
+------------+ |num_projects| +------------+ | 5543| +------------+
Ok, but how many projects are there per programming language? Let's do a quick SQL query:
sqlContext.sql("""select lang, count(*) as num_projects
from (
select distinct(project_name), lang
from pullreqs
) as project_langs
group by lang
order by num_projects desc""").show()
+----------+------------+ | lang|num_projects| +----------+------------+ |javascript| 1726| | python| 1518| | ruby| 1086| | java| 1075| | scala| 138| +----------+------------+
We can do exactly the same query using Dataframe transformations:
df.select(df["project_name"], df["lang"]).\
distinct().\
groupBy(df["lang"]).\
agg({"lang":"count"}).\
orderBy("count(lang)", ascending=False).\
show()
+----------+-----------+ | lang|count(lang)| +----------+-----------+ |javascript| 1726| | python| 1518| | ruby| 1086| | java| 1075| | scala| 138| +----------+-----------+
r = sqlContext.sql("""select project_name, count(*) as num_prs
from pullreqs
group by project_name""").toPandas()
ggplot(aes(x = 'num_prs'), data=r) + \
geom_histogram(binwidth=5) + \
scale_x_log() + \
ylab("Num projects") + xlab("Num PRs")
<ggplot: (277803345)>
How much time does it take to process a PR? To find out, we plot a boxplot with the time it takes to close a PR. We first convert our data into a Pandas Dataframe, we then apply a conversion to hours (diving seconds by 3600) and then we apply a log transformation. We do that so that we eliminate the effect of outliers on our plot.
As we can see, the majority (75%) of all pull requests are closed in less than 10^4.4 hours (or 1046 days), while more than 50% are closed in just 7 days.
df.withColumn("duration", df['closed_at'] - df['created_at']).\
select('duration').\
toPandas().\
apply(lambda x: x / 3600).\
apply(np.log).\
plot.\
box()
<matplotlib.axes._subplots.AxesSubplot at 0x108ea0e50>
Not many
df.select('num_participants').toPandas().describe()
num_participants | |
---|---|
count | 916065.000000 |
mean | 1.332705 |
std | 1.491926 |
min | 0.000000 |
25% | 0.000000 |
50% | 1.000000 |
75% | 2.000000 |
max | 292.000000 |
A few
df.select('num_comments').toPandas().describe()
num_comments | |
---|---|
count | 916065.000000 |
mean | 3.329427 |
std | 9.958141 |
min | 0.000000 |
25% | 0.000000 |
50% | 1.000000 |
75% | 3.000000 |
max | 1242.000000 |
It is important for our machine learning task to check whether
df.groupBy('merged').count().toPandas()
merged | count | |
---|---|---|
0 | True | 777499 |
1 | False | 138566 |
We are trying to create a predictor that given a set of features (metrics about various aspects of a single PR), it will come up with a response (prediction): either True (PR will be merged) or False (PR will not be merged). Our predictor is a typical binary classification task.
There are various algorithms that given a set of feature vectors (vectors/arrays of numerical values that correspond to our features) they will come up with a model to predict an outcome. The ones we will be using are:
The first step in any binary classification task is to define the features and the response variable:
feature_cols = [
'intra_branch',
'description_length',
'num_commits_open',
'num_commit_comments_open',
'files_added_open',
'files_deleted_open',
'files_changed_open',
'doc_files_open',
'other_files_open',
'src_churn_open',
'test_churn_open',
'new_entropy',
'hotness',
'at_mentions_description',
'perc_external_contribs',
'test_cases_per_kloc',
'requester_succ_rate',
'followers',
'main_team_member',
'social_connection',
'prior_interaction_events',
'has_ci'
]
response = 'merged'
# We drop any possible duplicates and cache the resulting data frame
data = df.select(feature_cols).dropDuplicates().cache()
Most machine learning algorithm implementations work with numerical vectors. For this reason, we need to convert all features that are not of type Integer
in our source data frame to numeric. Fortunately, we only have a few Boolean
features that can be represeted as Integer
s by assigning 1 to True values and 0 to False with a simple type cast. If we had factors (those take values from a predefined set) as features, we would need to use a StringIndexer
on them.
SparkML needs all features to be in a single vector per data point. For this, we use the VectorAssember
transformation.
SparkML makes use of pipelines to organize the application of transformations on data frames. In our case, we only need to build a pipeline that performs the VectorAssember
step, but those pipelines can be arbitrarily large.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
# All boolean columns
boolean = ['intra_branch', 'main_team_member',
'social_connection', 'has_ci']
boolean_out = map(lambda x: x + "_int", boolean)
# Update the feature_cols with information about the new column names
feature_cols = [item for item in set(feature_cols) \
if item not in set(boolean)] + \
boolean_out
# Type cast boolean columns to Integers (convert to numeric)
for x in boolean:
df = df.withColumn(x + "_int", df[x].cast(IntegerType()))
df = df.withColumn("merged_int", df['merged'].cast(IntegerType()))
# Convert feature columns to a numeric vector
assembler_features = VectorAssembler(inputCols=feature_cols,
outputCol='features')
# Construct and execute a pipeline, cache the results.
pipeline = Pipeline(stages=[assembler_features])
allData = pipeline.fit(df).transform(df).cache()
After we have our data in the desirable format, we can start experimenting with machine learning algorithms. For that, we need to have 2 datasets: one to apply our algorithm on and one to test the resulting model with.
(trainingData, testData) = allData.randomSplit([0.9, 0.1], seed=42)
The process of running an ML algorithm on a dataset consists of the following steps:
To evaluate the training (step 2), and since we are using binary classification, we need to test how well our model can predict against a ground truth included in the test set. The result of step 2 is the so called confusion matrix. The confusion matrix looks like this:
Ground truth | |||
---|---|---|---|
True | False | ||
Prediction | True | TP | FP |
False | FN | TN |
Using this table we can come up with many metrics that capture several aspects of the classification performance, such as:
$$ Precision = \frac{TP}{TP + FP} $$$$ Recall = \frac{TP}{TP + FN} $$$$ Accuracy = \frac{TP + TN}{TP + FP + FN + TN} $$$$ False\ positive\ rate = \frac{FP}{FP + TN} $$When predicting, most binary classification algorithms return the probability that a result is either true or false, rather than the actual result; depending on how strong our prediction requirements are, we need to vary the threshold after which we start believing a prediction. This threshold is usually set at 0.7, so if our classifier reports a probability of 0.73 for a classification result being True, we take its value as True.
By varying this threshold from 0.5 to 1, we can see how strong our classifier believes its results are; we can then test them against reality (our ground truth) to see how strong they actually are. To do so, we plot the classifier's recall (or True Positive Rate) against its False Positive Rate for various values of threshold. This will result in a plot (called the Receiver Operating Characteristic curve) that looks like the following:
The area under each curve can be calculated; this results in a composite metric (Area Under the (ROC) Curve) that describes the predictive power of our classifier against a random classifier. AUC enables comparisons of classifiers with each other and is especially useful as a metric in case of unbalanced datasets, as in our case.
For our purposes, we will be using the AUC metric.
from pyspark.ml.evaluation import BinaryClassificationEvaluator
## Calculate and return the AUC metric
def evaluate(testData, predictions):
evaluator = BinaryClassificationEvaluator(labelCol="merged_int",
rawPredictionCol="rawPrediction")
print "AUC: %f" % evaluator.evaluate(predictions)
As with other regressions, binary logistic regression does polynomial fiting: it attempts to compute the co-efficients $a$ and $b$ and the intercept $c$ of a polynomial of the form $y = ax_1 + bx_2+ c$, where $x_1, x_2$ and $y$ come from the training data, in order to maximize a training criterion (the workings under the hood are very different though).
With default settings, it does a terrible job on our dataset, with an AUC of 0.5.
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3,
elasticNetParam=0.8 ,labelCol="merged_int")
lrModel = lr.fit(trainingData)
evaluate(testData, lrModel.transform(testData))
AUC: 0.500000
SVMs attempt to partition datasets using hyperplanes in a way that dataset points are as far as possible in high-dimentional spaces.
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol="merged_int")
lsvcModel = lsvc.fit(trainingData)
evaluate(testData, lsvcModel.transform(testData))
AUC: 0.662181
A decision (classification) tree is a tree structure where intermediate nodes represent a decision that partitions the data while leafs represent a classification class. Decision trees are easy to train and very intuitive to interpret, as they mimic human decision making. Unfortunately though they tend to overfit (especially if we allow them to grow to very big depths) and not robust against data variation.
On our dataset, they perform rather well.
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="merged_int")
dtModel = dt.fit(trainingData)
evaluate(testData, dtModel.transform(testData))
AUC: 0.751664
Random forests work by building many decision trees. For buildingeach tree, a (configurable) number of features is selected randomly, while the trees are grown up to a (configurable) depth. Then, at prediction time, all trees are asked about their "opinion" on the input feature values, and the majority vote wins.
RF are a very easy algorithm to train and default values usually lead to good perfomance out of the box.
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.evaluation import BinaryClassificationEvaluator
rf = RF(labelCol='merged_int', featuresCol='features',
numTrees=100, maxDepth=5)
rfModel = rf.fit(trainingData)
evaluate(testData, rfModel.transform(testData))
AUC: 0.787138
One particularly interesting property of tree ensemble methods, such as random forests, is that they provide us with an indication of which factors are most influential whed doing a classification.
pd.DataFrame(data=zip(feature_cols, rfModel.featureImportances),
columns=("feature", "importance")).\
sort_values("importance", ascending=False).\
head(n = 10)
feature | importance | |
---|---|---|
1 | requester_succ_rate | 0.366819 |
14 | prior_interaction_events | 0.341332 |
19 | main_team_member_int | 0.164281 |
13 | num_commits_open | 0.027224 |
18 | intra_branch_int | 0.016054 |
3 | new_entropy | 0.013582 |
16 | src_churn_open | 0.013561 |
20 | social_connection_int | 0.010925 |
5 | perc_external_contribs | 0.009141 |
4 | description_length | 0.008753 |
Boosting refers to a general class of algorithms that try create strong predictors using ensembles of weak ones (e.g. decision trees). After training a weak classifier, boosting algorithms will weight training examples in a way that will favour those that were misclassifed by the current ensemble. Thus, new weak classifiers will be using more training data that were misclassified.
Gradient boosting is a form of boosting that uses gradient decent as an optimization method and stopping criterium.
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10, maxDepth=5,
labelCol="merged_int", seed=42)
gbtModel = gbt.fit(trainingData)
evaluate(testData, gbtModel.transform(testData))
AUC: 0.797577
pd.DataFrame(data=zip(feature_cols, gbtModel.featureImportances),
columns=("feature", "importance")).\
sort_values("importance", ascending=False).\
head(n = 10)
feature | importance | |
---|---|---|
1 | requester_succ_rate | 0.469043 |
14 | prior_interaction_events | 0.158051 |
19 | main_team_member_int | 0.093084 |
20 | social_connection_int | 0.056571 |
4 | description_length | 0.052702 |
16 | src_churn_open | 0.031374 |
18 | intra_branch_int | 0.025636 |
9 | test_cases_per_kloc | 0.022868 |
15 | files_changed_open | 0.021664 |
13 | num_commits_open | 0.014422 |
As we have seen already, most algorithms contain many knobs (configuration parameters) that we can use to tune the model building. Those are often called hyper parameters. To tune them in a systematic manner, Spark ML offers a tool called Grid
that accepts value ranges for all hyper parameters.
For algorithms that are not robust against overfitting (building a model that predicts very well the training set but fails to generalize), for example decision trees, it is often advisable to train it on various instances of the training set and report average precision/recall/F1/AUC scores. This process is called cross-validation. In typical random selection $k$-fold cross validation, the algorith will randomly sample the dataset $k$ times (usually 80%/20% for training and evaluation data), build a model and evaluate it on the remaining training data. Cross validation can also be used as a guard against overfiting when trying to find the optimal model when doing hyper-parameter optimization.
In the following snippet we attempt to improve our decision tree model by tuning several parameters. As we can see, the defaults are pretty ok as we only achieve marginal improvement.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
.addGrid(dt.maxDepth, [4, 8])
.addGrid(dt.maxBins, [16, 32, 64])
.addGrid(dt.minInstancesPerNode, [1, 2],)
.build())
evaluator = BinaryClassificationEvaluator(labelCol="merged_int",
rawPredictionCol="rawPrediction")
cv = CrossValidator(estimator=dt,
estimatorParamMaps=paramGrid,
evaluator=evaluator, numFolds=3)
cvModel = cv.fit(trainingData)
evaluate(testData, cvModel.transform(testData))
AUC: 0.758221
cvModel.bestModel
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_42a99d738fd535e9c533) of depth 4 with 31 nodes
The AUC metric, while comprehensive, tells us half the truth about the prediction performance. As we can see below, the best algorithm (GBT) consistently mispredicts the merged = False
case, even though it does a good job with the True
case. This probably happens because the classifier overfits to the majority case (merged = True
in our case).
def missprediction_rate(model, testData):
predictions = model.transform(testData)
predictions = predictions.\
select(predictions.merged_int.cast("double").alias('ground_truth'), \
'prediction')
true_predictions = predictions.\
where(predictions.ground_truth == 1.0).\
count()
false_predictions = predictions.\
where(predictions.ground_truth == 0.0).\
count()
true_misspredictions = predictions.\
where(predictions.ground_truth != predictions.prediction).\
where(predictions.ground_truth == 1.0).\
count()
false_misspredictions = predictions.\
where(predictions.ground_truth != predictions.prediction).\
where(predictions.ground_truth == 0.0).\
count()
return ((float(false_misspredictions) / false_predictions) * 100),\
((float(true_misspredictions) / true_predictions) * 100)
print "SVM missprediction rate: False: %f, True: %f" % \
missprediction_rate(lsvcModel, testData)
print "RF missprediction rate: False: %f, True: %f" % \
missprediction_rate(rfModel, testData)
print "GBT missprediction rate: False: %f, True: %f" % \
missprediction_rate(gbtModel, testData)
print "CVModel missprediction rate: False: %f, True: %f" % \
missprediction_rate(cvModel, testData)
SVM missprediction rate: False: 100.000000, True: 0.001285 RF missprediction rate: False: 85.828935, True: 1.278559 GBT missprediction rate: False: 70.999928, True: 3.000437 CVModel missprediction rate: False: 73.689538, True: 2.821824
One solution to this problem is to 'penalize' the classifier learning by removing datapoints during learning. Specifically, we will keep all the minority class items and subsample our majority class to 2x the number of the majority class items.
minority_class = allData.where(allData.merged_int == 0)
majority_class = allData.where(allData.merged_int != 0).\
sample(False, 0.5).\
limit(minority_class.count() * 2)
balancedAllData = minority_class.union(majority_class)
(balancedTrainingData, balancedTestData) = \
balancedAllData.randomSplit([0.9, 0.1], seed=42)
Next, we retrain RF on our balanced dataset. What we see is that while the AUC is roughly the same, the missprediction rate has been vastly improved in the False
case, while became only slightly worse in the True
case.
balancedRfModel = rf.fit(balancedTrainingData)
evaluate(balancedTestData, balancedRfModel.transform(balancedTestData))
print "Balanced RF missprediction rate: False: %f, True: %f" %\
missprediction_rate(balancedRfModel, balancedTestData)
AUC: 0.783390 Balanced RF missprediction rate: False: 64.615050, True: 6.347009
For completeness, we also train a GBT classifier on the balanced dataset. The results are the same as in the RF case.
balancedGBTModel = gbt.fit(balancedTrainingData)
evaluate(balancedTestData, balancedGBTModel.transform(balancedTestData))
print "Balanced GBT missprediction rate: False: %f, True: %f" %\
missprediction_rate(balancedGBTModel, balancedTestData)
AUC: 0.795024 Balanced GBT missprediction rate: False: 54.965927, True: 8.678860
The answer to the question of whether to use the default model or the balanced one boils down to priorities: do we care about the False case, even if it is relatively rare?