By David Causse, Senior Software Engineer
What is the difference?
Like plants, Wikidata entities can be small, with just a stem and a leaf, or large, with many branches and dense foliage. To know how a plant evolves as it grows you need to capture its states while it changes; for plants, a solution is offered through time-lapse photography. To track the changes made to an entity in Wikidata to the triplestore, a similar approach is used: take a snapshot of the entity after every edit and send it as a whole to the triplestore.
But as the entity grows the size of the data required to be sent to the triplestore grows as well. This is inefficient as individual changes made to Wikidata entities are small. Generally, only one leaf is added, removed, or changed in a single edit. The new WDQS Streaming Updater makes it possible to identify what has changed on the entity after an edit and only ship this information to the triplestore.
A Patch for RDF
Spotting differences between two snapshots can be challenging. Fortunately, the semantics of an RDF document can be expressed using the set theory, and its relative complement operation could not find a better use case.
We, the Search Platform Team, thought that it would be trivial to capture the difference between two versions of the same entity. We evaluated some previous attempts at defining a patching technique (TurtlePatch and RDF-patch) for RDF stores and whether they would meet our requirements:
- No need to fetch prior information and to converse with the RDF store before applying the mutations (this would be a major impediment to increasing throughput)
- Only send the actual triples that are added or removed instead of sending the whole entity graph
Unfortunately, none of the techniques we evaluated could meet our requirements when it comes to RDF blank nodes. In the context of the Wikibase RDF model, blank nodes are used to denote the concept of SomeValue (mistakenly referred to as an unknown value in the UI). In the RDF specification, blank nodes are locally scoped to the file or RDF store where they are used, and so do not have persistent or portable identifiers. Because blank nodes are thus not referenceable, the triples using them cannot be deleted explicitly.
Since the triplestore we use does not allow tracking the identity of the blank nodes, we decided to experiment with skolemization, the approach suggested by RDF 1.1, and the patching techniques we evaluated. Unfortunately, there was no reasonable way to do this without introducing a breaking change, so we attempted to ease the transition by introducing the wikibase:isSomeValue() custom function. We also provided a test service at https://query-preview.wikidata.org for users and bot owners to anticipate any potential problems for their use cases.
The joy of the playground
Conceptually, what we had to build seemed relatively simple—an updater that would consume a stream of changes made to the entities, fetch the RDF of each entity and generate a diff from the previously seen version, and apply it to the triplestore.
However, complications arose as we got into the details, and this led us to evaluate Flink, a stateful streaming processor, to do the job.
A novice Flink developer discovering all the cool stuff that Flink can do (Curriculum Vitae, Marble Run by Alex Schmid, photo credit Gwen and James Anderson, commons link)
Building diffs requires reconstructing the revision history of a Wikidata entity, and the Mediawiki event bus can assist us with several topics:
- Revision create
- Page delete/suppress
- Page undeletes
Because events may not be well ordered in their individual topics, and also because we read from multiple Kafka topics with varying throughputs, we cannot simply process them as we read them. Instead, we have to reorder them so that the events can be processed in the same order seen in the revision history.
With events properly ordered we can then determine what operation we want to perform:
- Fully import an entity
- Find the diff between the new revision of an entity and the previously seen revision
- Delete the entity
These operations require keeping the state of what was previously seen and knowing that we can also detect some discrepancies. For instance:
- Trying to delete an entity that was never seen before
- Trying to diff with a revision older than the previously seen one
- Plus everything else that we can detect from the state we store and the data available in the events
Similar to what we do for late events, we capture such discrepancies in a side-output for later analysis and reconciliation.
Generally speaking, writing a Flink application is quite straightforward once you fully understand its core concept of watermarks.
The reality of the crowd
When building a Flink application you will rapidly learn that you cannot claim victory without running a backfill test. Simply put, backfilling means reading historical events stored in the input queues. What we learned here is that Flink is optimized for throughput and will consume events as fast as it can, putting all the components of the pipeline under maximum load.
If the components are not fast enough to handle the throughput, Flink can apply back-pressure to instruct the source components to slow down. Back-pressure is not something you want to see in normal conditions (real-time speed), but it is a necessary evil when backfilling—especially when the Kafka throughput is high compared to what your pipeline can handle. It allows Flink to keep events outside of the pipeline when they don’t have a chance of getting processed in a reasonable amount of time.
It is a bit like deciding to start waiting on a 200-meters line at the entrance of the Louvre a few dozen minutes before they close the doors. You may be a bit disappointed to realize that you waited for nothing and will have to come back the next day.
This is basically what happened during the first backfill test we did. By using windows to re-order the events, we stopped the propagation of the back-pressure. Events are added to the window state, and this is very cheap compared to the rest of the pipeline. Unfortunately, these windows have to be fired at some point. Since we have to deal with idled sources, the watermark may jump ahead in time very quickly causing too many timers to be fired at the same time and moving millions of events to the next part of the pipeline, which is the slowest.
This caused a vicious cycle: when all the timers were applied in one go, they blocked the checkpoint barriers for too long, causing the pipeline to fail. With the timers being held in the saved checkpoints, there was no way to recover from them; it was too late.
The solution was to reconnect the flow between operators (removing the need to buffer and create timers) so that the back-pressure could be propagated to the source. The idea was simply to partially re-order the events. Since most events (95%) are well ordered and deletions are relatively rare, we could advance most of the events without buffering them. Unfortunately, for this, we had to give up some nice high-level features like windows and write our own process function. We would like to thank Ververica for their help on this project by providing training and support.
So… what is the difference?
Dealing with small RDF patches provides several advantages. The size of the payload sent to the triplestore is a lot smaller than with a full reconciliation technique—which sends the whole entity graph. It also allows you to batch more edits together. Our testing shows that between 5% and 20% of the triples can be compressed, thanks to batching.
Detecting some discrepancies and inconsistencies becomes possible, too. When applying the RDF patch, the triplestore is able to tell us how many mutations were actually performed compared to how many were expected.
Performance-wise we saw a noticeable increase in throughput (up to nine times faster in the best case scenario) with an average of 3600 edits per minute, while our initial goal was only 1000 edits.
Other metrics on the Blazegraph servers saw favorable shifts as well:
- Read operations decreased from around 1000 operations per second to 60, since they were no longer required
- Load decreased from 1.2 to 0.4 on average, since there is a lot less data to process during updates
Working with smaller patches gives us more flexibility in tuning the throughput. In the current system, the payload sent to the triplestore can be very large, and we had to limit the number of entities updated in a single batch because of that. Did you know that you can list the longest pages of a wiki using Special:LongPages? Can you guess the size of the longest item on Wikidata? Now that we have smaller payloads (ten times smaller on average), we can trade latency for throughput or vice versa by batching more or fewer patches together depending on our needs.
So… yes it makes a big difference!
The good news is that we finally had a fully functioning stream of RDF changes that could be applied to the triplestore much faster than the current system; the bad news is that it was far from being done, as we still had to find a home for Flink in our infrastructure.