Wikimedia’s Event Data Platform – Event Intake
By Andrew Otto, Staff Site Reliability Engineer
In our previous post in this series, we described how Wikimedia manages and distributes event schemas. Now that we’ve got that nailed down, let’s look into how we use them to ensure events conform to their schemas.
Kafka has client implementations in many languages, but those implementations don’t do any schema validation. If we were using Avro with Confluent’s Schema Registry, we’d need custom Serializers (e.g. KafkaAvroSerializer or this Python AvroProducer) that know how to contact a specific Schema Registry service before they can produce data. Fortunately for us, we don’t need to do any special serialization, as we don’t need a schema to serialize JSON data. However, we’d like to enforce that event data in Kafka topics always contain events that validate with a specific schema.
Even though Kafka clients exist in many languages, the quality and freshness of those client implementations vary. One of Wikimedia’s main Kafka use cases is producing event data from MediaWiki, which is written in PHP. Our MediaWiki application servers handle around 15000 requests per second. Kafka is pretty efficient at producing messages, but the bootstrap and connection time for a new Kafka producer isn’t quite as fast. Being PHP, MediaWiki would have to create and connect a new Kafka producer client for each of these requests, which would add serious overhead and latency to the application servers handling these requests.
We also need to intake events from external sources, like browsers and mobile apps, which don’t have access to internal Kafka clusters.
Sometimes you need to produce data to Kafka without a Kafka client. Confluent has a Kafka-HTTP REST Proxy to handle situations like these. Wikimedia has EventGate.
EventGate is a NodeJS library and HTTP service that receives event data in HTTP POST requests, validates the event, and then produces it. The validate and produce logic are pluggable, but the original intention was for EventGate to validate events against JSONSchemas looked up from
$schema URIs, and produce them to Kafka.
When used as an HTTP service, the API is very simple:
The POST body should be a JSON serialized array of events (or just a single event object) to validate and produce. It is expected that each event contains information that the provided validate and produce logic can use to do their jobs. The provided EventValidator implementation of validate expects that events have a schema URI field (default
$schema) that can be used to lookup the event’s schema for validation. The provided Kafka produce functionality expects that a configured field in the event contains the destination ‘stream’ name, which will be used to determine the Kafka topic that the event should be produced.
Note that the API itself does not include information about what topic an event should be produced to, or what schema the body of events should be validated with. This is intentional. Even though EventGate was built to use with Kafka, we wanted to keep the main functionality generic and pluggable, and so the API had to be generic too. We also wanted to be able to batch events of multiple types and destinations in a single HTTP request, so an API parameter that specified e.g. a single schema would not help if the POSTed events could all have different schemas.
The provided implementations in the main EventGate repository work well. They solve the problem of finding event schemas at runtime and using them to validate events, and they can produce to Kafka. But they are missing a crucial requirement: the ability to restrict the types of events that make it into a Kafka topic.
Confluent’s REST Proxy doesn’t do this either. Instead, it relies on you setting specific topic configurations that specify the name of a ‘strategy’ implementation that (by default) maps from either the topic name to the name (subject) of a schema in the schema registry. Your producer implementation must know how to look up topic configurations and interact with the schema registry. This is all fine if you are using Java, as Confluent expects you to. But what if you are using a different language? You’ll now not only need a good Kafka client, but also an implementation of e.g.
io.confluent.kafka.serializers.subject.TopicNameStrategy in your language.
Wikimedia solves this by creating what we call ‘stream configuration’, and using that to map from stream names (which are just an abstraction of Kafka topic names) to configuration about those streams, including the title of the JSONSchema that is allowed. JSONSchemas have a
title, and we expect that all schema versions of the same lineage have the same
title. The configuration for a stream has a
schema_title setting. When our EventGate validate function gets an event to validate, it extracts both the
$schema URI and the stream name from the event. The stream name is used to lookup stream configuration, from which the
schema_title setting is compared with the schema’s
title. If they match, and if the event validates against its JSONSchema, it is allowed to be produced.
We also use stream configuration for other things, including arbitrary configuration of remote producer clients. This allows product engineers and analysts to change certain behaviors of data producers, like the sampling rate, via configuration rather than code deployment.
Wikimedia’s stream configuration is useful for us, but its implementation would have been difficult to make generic for outside users, so we did not include it in the main EventGate library. We also had the need for some other Wikimedia specific customizations of EventGate. Since EventGate is pluggable, our implementation is in a separate repository that uses EventGate as a dependency.
Pros and cons
I’ve talked a lot about how Confluent does things, and why we’ve done things differently. However, I’d like to acknowledge that Confluent is building software for use by many different organizations, which is not easy to do. The Schema Registry and all of the custom code needed to use it does have one huge advantage: you can use a Kafka client to produce messages directly to Kafka, and still get all the automated schema validation done for you in your application code. Kafka clients have some amazing features (exactly once, transactions, etc.) that are not available if you are using a HTTP proxy. The downside is that you need Confluent specific code available in your language of choice.
Wikimedia’s polyglot and distributed nature makes doing things the Confluent way difficult. EventGate and our decentralized schema model work for us, and we think there are other organizations out there that might find our approach useful. See the EventGate README for more on how to use EventGate.
EventGate, jsonschema-tools, and everything else that Wikimedia creates is always 100% Free Open Source software. We’d love any thoughts and contributions you might have, so please reach out with questions or issues or pull requests.
Wikimedia’s Event Data Platform would not be possible without the numerous Open Source projects it is built on. I’d like to take a moment to thank a few of the critical ones.
- Thanks to Confluent for all their work in the world of Event Driven Architectures. Their blog is amazing.
- Thanks to Evgeny Poberezkin and all the other contributors who work on AJV.
- Thanks to James Messinger and collaborators for json-schema-ref-parser as well as Martin Hansen for json-schema-merge-allof, both critical components of jsonschema-tools.
- Many thanks to all the incredible people working on Apache Kafka. Open Source is amazing.
About this post
This is the final part of a 3 part series Read part 1; read part 2.
Featured image credit: A stile at the bottom of Arreton Down, Editor5807, CC BY 3.0