Normalization script
Create a normalization script to take raw IoT data and output the data in the data structure for readings on the platform.
Output data structure#
Your normalization script must transform the raw IoT data to the following format:
{ "readings": [ { // IoT reading data goes here "_ts": "", "_tsMetadata": { "_sourceId": "" } } ]}| Property | Type | Description | Required |
|---|---|---|---|
"readings" | Array of Objects | Contains an array of reading objects, which also contain a timestamp and metadata. | Required |
"_ts" | String | A timestamp you create | Required |
"_tsMetadata" | Object | Create an object that contains a "_sourceId" property. | Required |
"_sourceId" | String | Contains the source id of the sensor that takes the reading. | Required |
Input and output data examples#
Raw IoT input data#
{ "type": "text", "messageId": "ID:activemq-39563-1689584032552-5:4:-1:1:1", "correlationId": null, "destination": { "type": "queue", "name": "Consumer.<ANY_STRING>.VirtualTopic.lnt_RfmbfrED" }, "replyTo": null, "priority": 4, "expiration": 0, "timestamp": 1689584797284, "redelivered": false, "properties": { "ActiveMQ.MQTT.QoS": { "type": "integer", "boolean": null, "byte": null, "short": null, "integer": 0, "long": null, "float": null, "double": null, "string": null } }, "payloadText": "{\"from\":\"BLE\",\"mac\":\"30:ae:7b:e1:f5:28\",\"to\":\"GATEWAY\",\"time\":1689584797,\"deviceCode\":\"01012354-ad7d-4a0a-91db-a08435b7c9e3\", \"type\":\"reportAttribute\",\"data\":{\"value\":{\"device_list\":[{\"data\":\"0201061AFF5B07050305730073005E27D80085018E04AA023A079B0C000011079ECADC240EE5A9E093F3A3B50100406E09094869626F75414952\",\"modelstr\":\"HibouAIRe\",\"scan_rssi\":-51,\"Board_id\":\"5d9f693f-6f95-4ea5-a610-5fe7bf061f40\",\"ble_addr\":\"E8:E5:ED:BE:5E:D9\",\"addr_type\":1,\"scan_time\":1684043612,\"CO2\":0,\"HibouType\":\"PM\",\"dev_name\":\"HibouAIR\",\"ALS\":115,\"PM1.0\":\"68.200\",\"PM2.5\":\"185.000\",\"Pressure\":\"1007.800\",\"PM10\":\"322.700\",\"Temperature\":\"21.600\",\"Humidity\":\"38.900\",\"VOC\":1166,\"connectable\":1}]},\"attribute\":\"mod.device_list\",\"mac\":\"30:ae:7b:e1:f5:28\"},\"count\":1,\"name\":\"kafka-telemetry-1_sXrwGTdXah_timeseries\",\"id\":\"1-1\",\"csv\":\"execution7.csv\"}"}Normalized data output#
{ "readings": [ { "data": "0201061AFF5B07050305730073005E27D80085018E04AA023A079B0C000011079ECADC240EE5A9E093F3A3B50100406E09094869626F75414952", "modelstr": "HibouAIRe", "scan_rssi": -51, "Board_id": "5d9f693f-6f95-4ea5-a610-5fe7bf061f40", "ble_addr": "E8:E5:ED:BE:5E:D9", "addr_type": 1, "scan_time": 1684043612, "CO2": 0, "HibouType": "PM", "dev_name": "HibouAIR", "ALS": 115, "PM1.0": "68.200", "PM2.5": "185.000", "Pressure": "1007.800", "PM10": "322.700", "Temperature": "21.600", "Humidity": "38.900", "VOC": 1166, "connectable": 1, "_ts": "2023-05-14T05:53:32.000Z", "_tsMetadata": { "_sourceId": "5d9f693f-6f95-4ea5-a610-5fe7bf061f40" } } ]}Creating a normalization script#
The following procedure creates the script that transforms the raw IoT input data to the normalized output in the previous example:
Important: Only use the
varkeyword to create variables, do not useconstorlet.
Important: Do not use console logging functions in your script.
Note: You can use arrow functions.
To skip to the final script, see Script example.
Create a
functionsobject to contain yourconstructReadingsandnormalizemethods.var functions = { }In the
functionsobject, create anormalizemethod that takes a string message as its parameter.... normalize: function(strMessage) { }In the
normalizemethod, parse the string message parameter.... var message = JSON.parse(strMessage); var payloadText = message.payloadText; var payload = JSON.parse(payloadText); var data = payload.data.value.device_list;Create a readings array to push normalized readings to.
... var readings = [];Construct a readings object for each device using a
constructReadingsmethod you define later, then push the object to thereadingsarray.... data.forEach(d => { var readingsObj = {...d, ...this.constructReadings(d.Board_id, d.scan_time)}; readings.push(readingsObj) });Declare a
constructReadingsmethod that takes the reading's source id and timestamp as parameters.... constructReadings: (sourceId, ts) => { }In the
constructReadingsmethod, create a new Date object based on the timestamp parameter multiplied by 1000.... var date = new Date(ts*1000);In the
constructReadingsfunction, construct a readings object with"_ts","_tsMetadata", and"_tsMetadata._sourceId"properties, then return that object.... var readingsObj = { "_ts": date.toISOString(), "_tsMetadata":{ "_sourceId":sourceId } }; return readingsObj;
Script example#
var functions = { normalize: function(strMessage){ var message = JSON.parse(strMessage); var payloadText=message.payloadText; var payload = JSON.parse(payloadText); if (typeof payload === 'string') { payload = JSON.parse(payload); } var data = payload.data.value.device_list;
var readings = []; data.forEach(d => { var readingsObj = {...d, ...this.constructReadings(d.Board_id, d.scan_time)}; readings.push(readingsObj) });
var result = {readings}; return JSON.stringify(result); }, constructReadings: (sourceId, ts) => { var date = new Date(ts*1000); var readingsObj = { "_ts": date.toISOString(), "_tsMetadata":{ "_sourceId":sourceId } }; return readingsObj; }}
Normalization script invocation#
let s = {"type":"text","messageId":"ID:activemq-39563-1689584032552-5:4:-1:1:1","correlationId":null,"destination":{"type":"queue","name":"Consumer.<ANY_STRING>.VirtualTopic.lnt_RfmbfrED"},"replyTo":null,"priority":4,"expiration":0,"timestamp":1689584797284,"redelivered":false,"properties":{"ActiveMQ.MQTT.QoS":{"type":"integer","boolean":null,"byte":null,"short":null,"integer":0,"long":null,"float":null,"double":null,"string":null}},"payloadText":"{\"from\":\"BLE\",\"mac\":\"30:ae:7b:e1:f5:28\",\"to\":\"GATEWAY\",\"time\":1689584797,\"deviceCode\":\"01012354-ad7d-4a0a-91db-a08435b7c9e3\",\"type\":\"reportAttribute\",\"data\":{\"value\":{\"device_list\":[{\"data\":\"0201061AFF5B07050305730073005E27D80085018E04AA023A079B0C000011079ECADC240EE5A9E093F3A3B50100406E09094869626F75414952\",\"modelstr\":\"HibouAIRe\",\"scan_rssi\":-51,\"Board_id\":\"5d9f693f-6f95-4ea5-a610-5fe7bf061f40\",\"ble_addr\":\"E8:E5:ED:BE:5E:D9\",\"addr_type\":1,\"scan_time\":1684043612,\"CO2\":0,\"HibouType\":\"PM\",\"dev_name\":\"HibouAIR\",\"ALS\":115,\"PM1.0\":\"68.200\",\"PM2.5\":\"185.000\",\"Pressure\":\"1007.800\",\"PM10\":\"322.700\",\"Temperature\":\"21.600\",\"Humidity\":\"38.900\",\"VOC\":1166,\"connectable\":1}]},\"attribute\":\"mod.device_list\",\"mac\":\"30:ae:7b:e1:f5:28\"},\"count\":1,\"name\":\"kafka-telemetry-1_sXrwGTdXah_timeseries\",\"id\":\"1-1\",\"csv\":\"execution7.csv\"}","payloadMap":null,"payloadBytes":null};
let res = functions.normalize(JSON.stringify(s));console.log(res);