How-To: CSV to Kafka With Python and confluent_kafka (Part 1)

How-To: CSV to Kafka With Python and confluent_kafka (Part 1)

Even in modern environments, CSV is still a frequently encountered exchange format because many existing systems cannot deal with more modern alternatives. However, other formats are better suited to further processing in a big-data environment. This applies, in particular, to Avro in conjunction with Kafka. Avro offers a space-saving data format with many features, in which the data schema is also transferred. To improve handling, the schema can also be registered in a related repository.

Table of Contents

Scheme-Repository with Python

If this needs to be accomplished using Python, then the library python-confluent-kafka from the Kafka developer Confluent lends itself.

First the python-confluent-kafka library must be installed. This fails under Windows, because a dependency associated with librdkafka cannot be resolved. confluent_kafka officially also only supports OSX and Linux.

If we opt for Debian, python-confluent-kafka can be easily installed from the Debian repository. Sufficient for this purpose is:

apt install python-confluent-kafka


In the Python script, we must first import the required libraries:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv


After that, we can load the Avro schema,

value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')


and configure the Avro Producer:

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }


The second entry is used to indicate the address of the schema registry, so that the schema can be registered later.


With this configuration, we can create our Producer:

avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)


Now we are ready to open the CSV file:

with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:


"row" now contains a dictionary of the form {'Header name':'Column contents'} .

We can pass this directly to Avro Producer:

 avroProducer.produce(topic='mein_topic', value=row)
       avroProducer.flush()


This command writes the contents of "row" in Avro format to Kafka and registers the schema in the repository. The employed schema name is always <TOPIC_NAME>-data, i.e. "my_topic-data" here.

The schema is also checked in this process. If the Avro schema passed in value_schema does not match the data in the employed CSV, a corresponding error is indicated.

This unfortunately also means that this script will only work if all columns of the schema consist of strings.

The entire script appears as follows:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import csv

AvroProducerConf = {'bootstrap.servers': 'kafka.meinkafkaserver.com:9092',
                                       'schema.registry.url': 'http://schema-registry.meinregistryserver.com:80',
                                     }
value_schema = avro.load('/home/oliver/Dokumente/avro_schema/test.avsc')
avroProducer = AvroProducer(AvroProducerConf, default_value_schema=value_schema)
with open(/home/oliver/Dokumente/avro_daten/test.csv) as file:
    reader = csv.DictReader(file, delimiter=";")
    for row in reader:
        avroProducer.produce(topic=topic, value=row)
        avroProducer.flush()

Part 2 deals with conversion of the data into other data types.

Let’s Unlock the Full Potential of Your Data – Together!

Looking to become more data-driven, optimize processes, or leverage cutting-edge technologies? Our blog provides valuable insights – but the best way to tackle your specific challenges is through a direct conversation.

Let’s talk – our experts are just one click away!

Want To Learn More? Contact Us!

Pia Ehrnlechner

Your contact person

Pia Ehrnlechner

Domain Lead Data Platform & Data Management

Helene Fuchs

Your contact person

Helene Fuchs

Domain Lead Data Platform & Data Management

Who is b.telligent?

b.telligent – that’s Data Analytics, AI, Customer Engagement, and Data Visualization. It’s Germany, Austria, Switzerland, and Romania. But most importantly, it’s our team: people with a true passion for data, working together to create innovative solutions that drive sustainable progress for businesses.

Related Posts

chevron left icon
Previous post
Next post
chevron right icon

No previous post

No next post