teemu_hyle

Avro objects

Blog Post created by teemu_hyle Employee on Jun 4, 2018

This article describes how to create Apache Avro™ object from JSON files.
For this example there is used Apache Avro™ version 1.8.2 http://avro.apache.org/

 

The use case behind this is to send Avro objects to message systems like Kafka.
This example creates Avro objects from JSON files and send Avro objects via a connector.
The same logic could be implemented in custom connectors with Dell Boomi connector SDK.

 

Download Avro tools

Download avro-tools-1.8.2.jar from http://avro.apache.org/releases.html#Download

 

Create Custom Library

Upload avro-tools-1.8.2.jar via via the Account Libraries tab on the account menu's Setup page.
Create Custom Library with avro-tools-1.8.2.jar and deploy it to your Atom.

Define Avro schema

There is used following schema example from http://avro.apache.org/
JSON profile can be created to Boomi for this schema, and Avro schema could be validated during runtime but it is not necessary.

user.avsc

{
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [{
        "name": "name",
        "type": "string"
        }, {
            "name": "favorite_number",
            "type": ["int", "null"]
        }, {
            "name": "favorite_color",
            "type": ["string", "null"]
        }
    ]
}

Input JSON files

For this example there is used two JSON files that are placed in SFTP Server.

user1.json

{"name": "Alyssa", "favorite_number": {"int": 256}, "favorite_color": null}

user2.json

{"name": "Ben", "favorite_number": {"int": 7}, "favorite_color": {"string": "red"}}

Create Process

There is used following process to read input JSON files and Avro schema. Avro schema is stored in to Dynamic Process Property.
Data Process step and Custom Scripting is used to create Avro object from JSON.
One Avro object is created for multiple JSON files.
In this example the Avro object is sent to SFTP Server, but it could be sent to message system like Kafka.

Data Process / Custom Scripting (Avro)

import java.util.Properties;
import java.io.InputStream;
import com.boomi.execution.ExecutionUtil;
import org.apache.avro.Schema;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Decoder;
import org.apache.avro.io. DatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io. DatumWriter;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;

// Avro schema
String schemaStr = ExecutionUtil.getDynamicProcessProperty("AVRO_SCHEMA");
Schema.Parser schemaParser = new Schema.Parser();
Schema schema = schemaParser.parse(schemaStr);

// Avro writer
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
ByteArrayOutputStream baos=new ByteArrayOutputStream();
dataFileWriter.create(schema, baos);

for( int i = 0; i < dataContext.getDataCount(); i++ ) {
    InputStream is = dataContext.getStream(i);
    Properties props = dataContext.getProperties(i);

    // Input JSON document
    Scanner s = new Scanner(is).useDelimiter("\\A");
    String jsonString = s.hasNext() ? s.next() : "";

    // JSON to Avro record
    DecoderFactory decoderFactory = new DecoderFactory();
    Decoder decoder = decoderFactory.jsonDecoder(schema, jsonString);
    DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
    GenericRecord genericRecord = reader.read(null, decoder);

    dataFileWriter.append(genericRecord);
}

dataFileWriter.close();

// Output document (Avro)
dataContext.storeStream(new ByteArrayInputStream(baos.toByteArray()), new Properties());

Test run and validation

The process is tested and Avro object is validated using "Deserializer".

 

 

 

Below is a code for "Deserializer" to validate Avro object.

import org.apache.avro.io.DatumReader;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.Schema;
import java.io.File;

class Deserialize {
    public static void main(String args []) {
        try {
                  Schema schema = new Schema.Parser().parse(new File(args[0]));
                  File file = new File(args[1]);

                  System.out.println("Avro Deserializer");
                  System.out.println("------------------------------");
                  DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
                  DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
                  GenericRecord record= null;
                  while (dataFileReader.hasNext()) {
                     record = dataFileReader.next(record);
                  System.out.println(record);
              }
              System.out.println("------------------------------");
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

Outcomes