Skip to main content

How-to: CSV to Kafka with Python and confluent_kafka (part 1)

Even in modern environments, CSV is still a frequently encountered exchange format because many existing systems cannot deal with more modern alternatives. However, other formats are better suited to further processing in a big-data environment. This applies, in particular, to Avro in conjunction with Kafka. Avro offers a space-saving data format with many features, in which the data schema is also transferred. To improve handling, the schema can also be registered in a related repository.

Scheme-Repository with Python

If this needs to be accomplished using Python, then the library python-confluent-kafka from the Kafka developer Confluent lends itself.

First the python-confluent-kafka library must be installed. This fails under Windows, because a dependency associated with librdkafka cannot be resolved. confluent_kafka officially also only supports OSX and Linux.
 

If we opt for Debian, python-confluent-kafka can be easily installed from the Debian repository. Sufficient for this purpose is:

apt install python-confluent-kafka


In the Python script, we must first import the required libraries:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv


After that, we can load the Avro schema,

value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')


and configure the Avro Producer:

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }


The second entry is used to indicate the address of the schema registry, so that the schema can be registered later.


With this configuration, we can create our Producer:

avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)


Now we are ready to open the CSV file:

with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:

"row" now contains a dictionary of the form {'Header name':'Column contents'} .
 

We can pass this directly to Avro Producer:

       avroProducer.produce(topic='mein_topic', value=row)
       avroProducer.flush()


This command writes the contents of "row" in Avro format to Kafka and registers the schema in the repository. The employed schema name is always <TOPIC_NAME>-data, i.e. "my_topic-data" here.

The schema is also checked in this process. If the Avro schema passed in value_schema does not match the data in the employed CSV, a corresponding error is indicated.

This unfortunately also means that this script will only work if all columns of the schema consist of strings.


The entire script appears as follows:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:
        avroProducer.produce(topic=topic, value=row)
        avroProducer.flush()

 

Part 2 deals with conversion of the data into other data types.