What’s the benefit of flight delay prediction? For clients, it gives a more accurate expectation about flight time, thus allowing them to plan their time accordingly. For airline companies, it shows where they can minimize flight delays ,thereby minimizing expenses and increasing customers satisfaction. Sounds good right?
In this post we will show how you can use InsightEdge to do exactly that and achieve real-time flight delay predictions.
We will create a solution based on a decision tree algorithm described by Carol McDonald in her MapR blog post.
InsightEdge Architecture
For performing real-time predictions we will use Spark Streaming combined with Apache Kafka, which will simulate an endless and continuous data flow. For the prediction part, we will use Spark Machine Learning and decision tree algorithm. Streamed data will be processed by a decision tree model and results are saved into InsightEdge data grid for future usage.
Our solution consists of two parts (Spark jobs):
- Model Training
- Flight Delay Prediction
Let’s take a look at these two jobs in detail. All codes and instructions can be found on github.
‘Model Training’ Spark Job
Model training job is a one-time job designed to model initial training and store it in the data grid, so the model can then be used during the second job. In this post, we won’t go into too much detail about machine learning algorithms and decision tree model training. If you’d like, you can familiarize yourself with it with the help of Carol McDonald’s blog post we mentioned earlier.
First Spark job consists of 3 simple steps:
1. Load data, split it on training and testing part, save testing part for second job usage using the same data set from Carol McDonald’s blog:
flight_data_file = ...
sc = SparkContext(appName="Flight prediction model training")
text_rdd = sc.textFile(flight_data_file)
splits = text_rdd.randomSplit([0.7, 0.3])
(training_rdd, test_rdd) = (splits[0], splits[1])
test_rdd.coalesce(1, True).saveAsTextFile(...)
2. During the second job we will convert flight data into LabeledPoint, so we will need to store integer representations of origin, destination and carrier in the data grid:
all_flights_rdd = text_rdd.map(lambda r: Utils.parse_flight(r))
carrier_mapping = dict(all_flights_rdd.map(lambda flight: flight.carrier).distinct().zipWithIndex().collect())
origin_mapping = dict(all_flights_rdd.map(lambda flight: flight.origin).distinct().zipWithIndex().collect())
destination_mapping = dict(all_flights_rdd.map(lambda flight: flight.destination).distinct().zipWithIndex().collect())
sqlc = SQLContext(sc)
save_mapping(carrier_mapping, DF_SUFFIX + ".CarrierMap", sqlc)
save_mapping(origin_mapping, DF_SUFFIX + ".OriginMap", sqlc)
save_mapping(destination_mapping, DF_SUFFIX + ".DestinationMap", sqlc)
3. Train a model and save it to the data grid:
training_data = training_rdd.map(Utils.parse_flight).map(lambda rdd: Utils.create_labeled_point(rdd, carrier_mapping, origin_mapping, destination_mapping))
classes_count = 2
impurity = "gini"
max_depth = 9
max_bins = 7000
model = DecisionTree.trainClassifier(training_data, classes_count, categorical_features_info, impurity, max_depth, max_bins)
Utils.save_model_to_grid(model, sc)
‘Flight delay prediction’ Spark Job
Second Spark job loads model and mappings from the grid, reads data from stream and uses the model for prediction. Predictions will be stored in the grid along with flight data.
Second Spark job in 3 easy steps:
1. Load models and mappings form data grid:
sc = SparkContext(appName="Flight delay prediction job")
model = DecisionTreeModel(Utils.load_model_from_grid(sc))
sqlc = SQLContext(sc)
carrier_mapping = load_mapping(DF_SUFFIX + ".CarrierMap", sqlc)
origin_mapping = load_mapping(DF_SUFFIX + ".OriginMap", sqlc)
destination_mapping = load_mapping(DF_SUFFIX + ".DestinationMap", sqlc)
2. Open Kafka stream and parse lines with flight data:
ssc = StreamingContext(sc, 3)
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
3. Parse a bunch of lines (rdd), make a prediction and save it to the data grid:
lines.foreachRDD(predict_and_save)
def predict_and_save(rdd):
if not rdd.isEmpty():
parsed_flights = rdd.map(Utils.parse_flight)
labeled_points = parsed_flights.map(lambda flight: Utils.create_labeled_point(flight, carrier_mapping, origin_mapping, destination_mapping))
predictions = model.predict(labeled_points.map(lambda x: x.features))
labels_and_predictions = labeled_points.map(lambda lp: lp.label).zip(predictions).zip(parsed_flights).map(to_row())
df = sqlc.createDataFrame(labels_and_predictions)
df.write.format(IE_FORMAT).mode("append").save(DF_SUFFIX + ".FlightWithPrediction")
Running the Demo and Examining Results
To run the demo we need to perform the following steps:
- Start up InsightEdge
- Start up Kafka and create a topic
- Submit Model Training job
- Submit Flight Prediction job
- Push the test data into Kafka’s topic
You can find detailed instructions here to help you run the demo.
After all steps have been completed, we can examine what was stored in the data grid.
- Day – day of the month
- Origin – origin airport
- Destination – destination airport
- Distance – distance between airports in miles
- Carrier – airline company
- Actual_delay_minutes – actual flight delay in minutes
- Prediction – whether our model made a correct or incorrect prediction
Since we store prediction result alongside with actual flight delays, we can see the ratio of correct and incorrect predictions:
What’s Next?
In this post we built a simple, real-time prediction application using Spark ML combined with Spark Streaming on top of InsightEdge. We haven’t built the perfect solution just yet and there is always room improve it, e.g.:
- You may want to take a look at other ML algorithm or tune existing algorithms to give a better prediction rate.
- Over time this model might become outdated. In order to keep it up to date we will need to come up with a model update strategy. There are two possible solutions you can use:
- Incremental algorithms: A model built on such algorithms will update itself every time it encounters new data.
- Periodical model retraining: Here the solution is to store income data and periodically preform model retraining and substitute an existing model with an updated one.