Skip to main content

How-to: CSV to Kafka with Python and confluent_kafka (part 2)

In the first part of this blog, the aim was to serialize a CSV file as simply as possible to Avro, and store the result in Kafka, the schema being registered in the related registry.

For this, we opened the CSV file by means of csv.DictReader. As a result we already have a dictionary which we can use to manually assemble our data set. Let us suppose that column 1 contains a float, column 2 an int, and column 3 a string value:
 

Data_set = {„column_name_1“: float(headers[„column_name_1“]),
                     “column_name_2”: int(headers[“column_name_2”]),
                     “column_name_3”: headers[“column_name_3”]


Now we have a functioning Producer again.
 

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:
    data_set = {„column_name_1“: float(row[„column_name_1“]),
                         “column_name_2”: int(row[“column_name_2”]),
                         “column_name_3”: row[“column_name_3”]

        avroProducer.produce(topic=topic, value=data_set)
        avroProducer.flush()


Unfortunately, this script must be adjusted for each individual schema. At the same time, we already have all the necessary information.

So let us simply parse the schema, which is ultimately available as JSON. For this, we will use the JSON library

import json


and parse for the following field:

data_type = dict()

#Parse the schema to get the datatype of each column
with open(schema_path) as json_file:
    json = json.load(json_file)
    for fields in json['fields']:
        if len(fields['type'][0]) > 1:
            data_type[fields['name']] = fields['type'][0]
        else:
            data_type[fields['name']] = fields['type']

 

By means of len(fields['type'][0]) > 1 a check is performed in this case as to whether 'type' is an array or a string. This is because the data type of a NOT NULL column is specified as a string {"name":"column_01", "type": "string"}, whereas the data type of a NULL column is specified as a list: {"name":"column02", "type":["string", "null"]}


Now, we can check the data type of a column and perform conversion accordingly. To do this, we again open our CSV file as usual:

with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")


Using reader.filenames, we can have a list of all header entries returned to us.

headers = reader.fieldnames


We then proceed column by column:

for row in reader:
        for header in headers:
            if str(headers[header]).lower() = ‘double’:
                row[header] = float(row[header])
#gegebenenfalls weitere Datentypen prüfen …


The result can again be passed to Kafka:

avroProducer.produce(topic=topic, value=row)
        avroProducer.flush()


The entire script accordingly appears as follows:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv
import json

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)

data_type = dict()
#Parse the schema to get the datatype of each column
with open(schema_path) as json_file:
    json = json.load(json_file)
    for fields in json['fields']:
        if len(fields['type'][0]) > 1:
            data_type[fields['name']] = fields['type'][0]
        else:
            data_type[fields['name']] = fields['type']

with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    headers = reader.fieldnames
    for row in reader:
         for header in headers:
             if str(headers[header]).lower() = ‘double’:
                 row[header] = float(row[header])
             #gegebenenfalls weitere Datentypen prüfen …

        avroProducer.produce(topic=topic, value=data_set)
avroProducer.flush()

 

As one can see, the Python-Confluent-Kafka framework is a simple, yet powerful way of serializing data to Kafka.