Schema Validation in Atlas Stream Processing: MongoDB

Rofl Facts
6 min readDec 3, 2023

Schemas from relational databases will be recognisable to analysts, database managers, and developers. Schemas, however, are an essential component of any application and may be found in a lot of data systems. Schemas are used in everything from databases to CSV files to help organise data so that it can be accessed and utilised again. In order to guarantee that developers are efficient and can swiftly produce engaging apps, good schema design is essential.

We will demonstrate in this blog post how MongoDB’s distinct schema approach is embraced by our latest announcement of Atlas Stream Processing.

Unlike many traditional databases, which have inflexible schemas, MongoDB takes a different approach. MongoDB makes use of flexible schemas, which allow for optional enforcement of the schema. Developers will find it simpler to adjust as application requirements change and expand as a result.

Rather than the actual data values, schemas show the structure of your data. A well-designed schema makes sure the data’s structure corresponds to what a developer needs for a given data field. Data can be of any type, including integers, strings, boolean values, objects, and others. For error-free data processing or system sharing, this structure is essential.

Data in the streaming world is multi-sourced, with wide variations in schemas between providers. IoT devices such as remote sensors, connected cars, change streams, and write-ahead logs that describe database activity are examples of sources.

Schemas with MongoDB

Because of MongoDB’s document model’s flexibility, data can be arranged in ways that best represent it in the application, negating the requirement for a rigidly specified schema. Since that every message has a different schema when employing stream processing, this is especially crucial. A stream can consist of a variety of various gadgets and versions that have changed over time with various levels of compatibility. With the document model, messages can be represented in a variety of ways without being constrained by schema registries or serialisation format specifications.

Think about a weather station that provides sensor readings. Multiple reporting formats are possible with flexible schemas. View the measurements in the example below, which is presented in four different ways:

{
name: 'Station1201',
measurements: [{
'temp': 88,
'u': 'F'
},
{
'press': 12,
'u': 'bar'
}
]
}

{
name: 'Station1201',
firmware: 'v12.00.001',
measurements: {
'temp': 88,
'tempunits': 'F',
'press': 12,
'pressunits': 'bar'
}
}

{
name: 'Station1201',
measurements: {
'temp': 88,
'u': 'F'
}
}

{
name: 'Station1201',
measurements: {
'press': 12,
'u': 'bar'
}
}

Atlas Stream Processing may continue processing data in spite of schema changes, avoid modifying the structure of a message, and use the MongoDB Query API to handle missing fields or modified data types natively. This gives the developer a lot of choices for managing compatibility and schema evolution. If the messages still contain the necessary information, even if the schema changes or new fields are added or removed, then messages with evolving schemas can still be processed. This would disrupt compatibility with typically tight schemas. The $cond operator can be used to assign default values so that processing can proceed even in cases when necessary fields are lacking.

Validating schemas in Atlas Stream Processing

The $validate operator in Atlas Stream Processing can be used to validate a document’s structure. The MQL query operators that are available in the db.collection can be utilised with $validate.Use the find() commands to match and filter particular documents. Additionally, developers can use the json-schema.org draft-04 specifications to annotate and validate JSON documents by using $jsonSchema. Another advantage of utilising $validate is that, instead of simply deleting and maybe losing the messages, documents that don’t match the $jsonSchema or MQL operators for filtering can optionally have the validationAction set to a Dead Letter Queue. ValidationAction options include writing to a log file, DLQ, and discarding the message. This makes it possible to save important communications and, if needed, have them handled by other applications.

{
device_id: 'device_1',
group_id: 2,
timestamp: '2023-08-24T16:46:12.048+00:00',
max_watts: 250,
event_type: 0,
obs: {
watts: 121,
temp: 14
}
}

{
device_id: 'device_8',
group_id: 2,
timestamp: '2023-08-25T19:40:51.845+00:00',
max_watts: 250,
event_type: 1,
event_details: 'Network error',
_ts: ISODate("2023-08-25T19:40:51.845Z"),
_stream_meta: {
sourceType: 'sampleData',
timestamp: ISODate("2023-08-25T19:40:51.845Z")
}
}

It has been determined by the maintenance team that the solar panel identified as {device_id: device_8} is a test device. This is a straightforward $validate that uses a $ne query operator to eliminate all documents that include {device_id: device_8}, thereby clearing the processor pipeline of the test device documents. It’s typical for developers to have to deal with signals that aren’t crucial to the processing pipeline. This is made easy to perform with $validate.

{
$validate: {
validator: {
$expr: {
$ne: ["$device_id", "device_8"]
}
},
validationAction: "discard"
}
}

By validating against the json-schema draught 4 specification, $jsonSchema may be used to make sure that the solar panel message structure is correct for processing. The example below shows how $validate verifies the existence of needed fields. It verifies that a particular field doesn’t exist, verifies the data type (int, text, object, etc.), and matches regex patterns with minimum and maximum numerical ranges. The message is forwarded to the DLQ in the event that any of these schema requirements are broken. Developers may ensure that the message satisfies pipeline requirements by utilising $jsonSchema, freeing them from worrying about newly added, removed, or modified fields. This makes it possible to process a wide range of schemas with true flexibility.

{
$validate: {
validator: {
$jsonSchema: {
required: ["device_id", "timestamp", "obs", "event_type"],
not: {
required: ["event_details"]
},
properties: {
device_id: {
bsonType: "string",
pattern: "^device_\\d+",
description: "device_id is required and must be like device_#"
},
obs: {
bsonType: "object",
required: ["watts", "temp"],
properties: {
watts: {
bsonType: "int",
minimum: 0,
maximum: 250,
description: "'obs.watts' is required and cannot be less then 0 or more than 250"
},
temp: {
bsonType: "int",
description: "'obs.temp' must be an integer"
},
}
},
event_type: {
bsonType: "int",
minimum: 0,
maximum: 1,
},
timestamp: {
bsonType: "string",
description: "'timestamp' must be a string "
}
}
}
},
validationAction: "dlq"
}
}

By employing logical operators like $and, it is also feasible to combine the use of query operators and $jsonSchema in a single $validate. This offers a very simple and effective method for executing field-level comparisons and verifying that messages have the expected structure before undergoing further processing. A tumbling window, which carries out aggregations over time, is an illustration of this.

{
$validate: {
validator: {
$and: [{
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
},
{
$jsonSchema: {
required: ["device_id", "timestamp", "obs", "event_type"],
not: {
required: ["event_details"]
},
properties: {
device_id: {
bsonType: "string",
pattern: "^device_\\d+",
description: "device_id is required and must be like device_#"
},
obs: {
bsonType: "object",
required: ["watts", "temp"],
properties: {
watts: {
bsonType: "int",
minimum: 0,
maximum: 250,
description: "'obs.watts' is required and cannot be less than 0 or more than 250"
},
temp: {
bsonType: "int",
description: "'obs.temp' must be an integer"
},
}
},
event_type: {
bsonType: "int",
minimum: 0,
maximum: 1,
},
timestamp: {
bsonType: "string",
description: "'timestamp' must be a string "
}
}
}
}
]
},
validationAction: "dlq"
}
}

The processing and integration of data in an application depends on schemas. Simultaneously, schemas inevitably grow more complicated and difficult to maintain as organisations and applications change. Atlas Stream Processing eliminates these complications inherent to schema management, decreasing the amount of code necessary to process streaming data, while making it faster to iterate, easier to check for data accuracy, and simpler to gain visibility into your data.

--

--