A collection of examples and tips for getting started with stream processing using Apache Kafka & Apache Flink. This will be useful for those getting setup locally in no time and seeking an introduction to fundamental concepts with some worked examples.
The below section on Flink is to accompany the Medium article.
The below content details some guides & how-to's as well as references to stream processing concepts including:
- Practical tips for getting started with Flink, including using this local dev stack to familiarise yourself
- Examples performing typical stream processing operations and working with the data.
- Inspired by the ksqlDB exercises from the great course by Confluent Introduction to Designing Events and Event Streams. For further development I recommend following the course and recreating the exercises in Flink as I have done here, you can use my examples I've provided if you get stuck.
Run the getting started stack included in the docker-compose.yaml file to run a local Flink, Kafka and Kafka tooling(Conduktor).
docker compose up -dFor running Flink SQL commands there is a sql-client service running from the docker-compose.
# Start running FLink SQL commands
docker compose exec -it sql-client bash -c "bin/sql-client.sh" # from the composeI will expand later on running SQL Scripts.
Flink is centered around the concept of tables, they discuss this at length in the Flink docs. In our case we're going to use dynamic tables specifically, note Flink has support for batch workloads too. The dynamic tables we'll use represent the data within the underlying Kafka topics. This concept of stream table duality comes up frequently, where the data is being looked at as a stream, or a table. Let's get started.
Topics should be created before tables. This is closer to real-world behavior where the creation of topics is usually governed and splits the infrastructure management concern from the processing logic.
Create the topic items in the UI that is running locally on localhost:8080, you'll need to create a local user to start. Alternatively use the CLI:
docker compose exec kafka1 /bin/kafka-topics --create --topic items --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --partitions 3 --replication-factor 2
Running the below SQL in the SQL client, create a dynamic table with:
--DDL for Flink dynmaic table
CREATE TABLE items(
id BIGINT,
price DECIMAL(10, 2),
name STRING,
description STRING,
brand_id BIGINT,
tax_status_id BIGINT,
proc_time AS PROCTIME()
) WITH (
'topic' = 'items',
'key.fields' = 'id',
'properties.group.id' = 'flink_table_items', -- CG name needed for Kafka
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka1:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081',
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://schema-registry:8081',
'value.format' = 'avro-confluent',
'value.fields-include' = 'EXCEPT_KEY', -- set on key.fields property above, exclude the key in the value
'properties.auto.offset.reset' = 'earliest' -- needed for Kafka
-- 'scan.startup.mode' = 'earliest-offset'
-- 'properties.security.protocol' = 'PLAINTEXT',
-- 'properties.sasl.mechanism' = 'PLAINTEXT',
-- 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";',
-- 'value.format' = 'json', -- alternative for just JSON
-- 'sink.partitioner' = 'fixed' -- Fixed will send to same partition, for demo purposes only
);These properties are specific to this connector type, I've also left some commented out to show what else you may consider in a non-dev environment. The Flink docs were particularly useful for schema registry properties. Table properties can also be updated later on.
ALTER TABLE items SET ('properties.group.id'='flink_table_items');
Some useful commands for interrogating tables in Flink were: SHOW tables; , DESCRIBE items; and SHOW CREATE TABLE items; for more info. Note lowercase works. The full properties list for altering tables is, you guessed it, on the Flink docs.
When building your processing queries you have to explicitly say which timestamps may be processing time, this can be added with:
ALTER TABLE items ADD proc_time AS PROCTIME();Insert values with SQL statements.
INSERT INTO items
VALUES (
1,
CAST(19.99 AS DECIMAL(10, 2)),
'Baseball Trading Cards - Collector Edition',
'Premium Ol Slugger baseball trading cards!',
401,
778
);Appears on the topic.
Note if this fails on the Kafka side the message is lost, as the INSERT job completed, but Kafka failed, to have retries you need to insert from a streaming table rather than fire and forget one valid command.
You can see the issue for failed jobs in the job manager log list by searching for FAILED, or specifically the job exception list as it throws an exception, it's a nice jump to the failure. Note this doesn't throw an error as it fails gracefully. Example I had was forgetting to make the topic beforehand.
Examples of streaming tables which will retry:
-- One-time but with retries
INSERT INTO items
SELECT * FROM source_items -- populated table
WHERE id = 1; -- Or any specific conditionYou can also source items being from a filesystem table. Note this will include the header, I believe there are functions in the Java, Scala and Python libraries to ignore but not in the connector config.
-- with the Kafka table already created (items in this example)
-- create source filesystem table
CREATE TABLE items_from_file (
id BIGINT,
price DECIMAL(10, 2),
name STRING,
description STRING,
brand_id BIGINT,
tax_status_id BIGINT
) WITH (
'connector' = 'filesystem',
'path' = '/opt/flink/data/items_data.csv', -- local csv
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
);
-- stream from source to the topic table
INSERT INTO items
SELECT *
FROM items_from_file;Additionally the data-gen connector is particularly useful when you want a table with data in it.
CREATE TABLE `pageviews` (
`url` STRING,
`user_id` STRING,
`browser` STRING,
`ts` TIMESTAMP(3)
)
WITH (
'connector' = 'faker',
'rows-per-second' = '10', -- can edit with, ALTER TABLE `pageviews` SET ('rows-per-second' = '10');
'fields.url.expression' = '/#{GreekPhilosopher.name}.html',
'fields.user_id.expression' = '#{numerify ''user_##''}',
'fields.browser.expression' = '#{Options.option ''chrome'', ''firefox'', ''safari'')}',
'fields.ts.expression' = '#{date.past ''5'',''1'',''SECONDS''}'
);You're likely to want to make things more repeatable, more scriptable or make a mistake/encounter an error and need to look back at commands. Running SQL scripts is useful here.
Note tables are kept in memory unless persisted, all tables, views etc. are lost when the session ends. As a result you'll want to start your SQL script with table creations. In production you'd use a permanent catalog like Hive.
Flink doc examples.
The below is how I found copying scripts into the container and running them to work nicely, change the environment variable SQL_HOST, to match your path as needed. I used add-records.sql to add any new records to a table.
docker compose exec sql-client bash -c "mkdir -p /opt/flink/opt/scripts/"
SQL_HOST=./confluent-course-examples-and-data/data/add-records.sql
SQL_CONTAINER=/opt/flink/opt/scripts/add-records.sql
docker cp $SQL_HOST sql-client:$SQL_CONTAINER
docker compose exec sql-client /opt/flink/bin/sql-client.sh -f $SQL_CONTAINERThe view-all-items.sql script is for more than just adding data to the Kafka table (and hence underlying topic). It's a bit more useful:
- Create table for all_items
- Create a view from the table
- Set execution mode as this is non-interactive mode.
- As we're running in a script, non-interactive mode, you need to explicitly set the result mode. This isn't required when working in the SQL client (SQL terminal) directly as it sets it's own mode there
- Views the view
Use the block above on running SQL scripts to run the script.
Note this is blank if there's no data on the topic as there's nothing to view. Try inserting values.
Note in the current examples they don't have any time attribute fields (different from timestamps, special watermark based), so have to use proc_time or windowing (window_time) when querying using order.
Here I added PROCTIME() so I could use it to window, I don't have event time in these events.
This metadata column can be added to the table, or added in a view by including in the SELECT.
ALTER TABLE items ADD proc_time AS PROCTIME();SELECT *
FROM TABLE(
TUMBLE(
TABLE items,
DESCRIPTOR(proc_time),
INTERVAL '10' MINUTES
)
)
ORDER by window_time, price;See view-ordered-windowed.sql for this with view created too.
In this example a topic exists sending item added to cart events. It contrasts to ksqlDB which was being used in the examples on the course. The config is for the local stack you can spin-up here, just create the topic.
Make a table for the topic item_added.
-- ksqlDB syntax
CREATE STREAM item_added (
cart_id BIGINT key,
item_id BIGINT
) WITH (
KAFKA_TOPIC = 'item_added',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 6
);
-- flink translation
CREATE TABLE item_added (
cart_id BIGINT,
item_id BIGINT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'item_added',
'properties.bootstrap.servers' = 'kafka1:9092',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://schema-registry:8081',
'key.format' = 'avro-confluent',
'key.fields' = 'cart_id',
'key.avro-confluent.url' = 'http://schema-registry:8081',
'value.format' = 'avro-confluent',
'value.fields-include' = 'EXCEPT_KEY',
'properties.group.id' = 'flink_table_item_added', -- CG name needed for Kafka
'properties.auto.offset.reset' = 'earliest' -- needed for Kafka
);
-- Now watch the item count per cart
CREATE VIEW items_per_cart AS
SELECT cart_id, COUNT(*) as items_in_cart
FROM TABLE(
TUMBLE(
TABLE item_added,
DESCRIPTOR(proc_time),
INTERVAL '5' SECONDS
)
)
GROUP BY cart_id;
SELECT * FROM items_per_cart;Add items to cart and watch the increase from the events. I did this with two sql-clients open, the view on one screen and added one by one on the other. Remember you need to create the table in the new client as tables are stored in memory here.
Rather than have all the item data in the items topic as we have done, what if the data had needed combining to get here. Let's combine three different streams to make our enriched stream.
You'll likely want to run the scripts provided. Swapping out the script in the below command. They're fairly simple creating the tables, inserting data, then finally the interesting part of making the joined view. Note uncommenting the INSERT INTO is what will populate the topic if you want to more than just view it.
docker compose exec sql-client bash -c "mkdir -p /opt/flink/opt/scripts/"
SQL_HOST=./confluent-course-examples-and-data/data/enriching-streams/add-split-stream-data.sql
SQL_CONTAINER=/opt/flink/opt/scripts/target-script.sql
docker cp $SQL_HOST sql-client:$SQL_CONTAINER
docker compose exec sql-client /opt/flink/bin/sql-client.sh -f $SQL_CONTAINER- Create the four topics needed on Kafka. Which are the three additional source topics and a sink topic for enriched stream [brands, tax_status, unenriched_items, enriched_items]. Use your preferred method or the Conduktor UI running on localhost:8080 if you're running the stack
- Populate the brand, tax_status and unenriched_items topics with add-split-stream-data.sql
- Then run enriched-stream-setup.sql to view the joined table in Flink
Adding more messages to the topics would cause the view to update. What is more useful perhaps is writing to the topic using INSERT INTO. We can then view from the new topic instead.
Note the final view statement has the joins, with a 2 minute (+/-1) for the temporal join so will be waiting to update as we update any values on the join.
...
FROM unenriched_items
JOIN brands
ON unenriched_items.brand_id = brands.id
AND unenriched_items.proc_time BETWEEN brands.proc_time - INTERVAL '1' MINUTE
AND brands.proc_time + INTERVAL '1' MINUTE
...
;Note if you run the script multiple times you may end up requesting that job multiple times and run out of resources.
Stop any duplicate jobs with:
SHOW JOBS; -- to see all running jobs and their IDs
STOP JOB '<job_id>'; -- to stop a specific job, ID enclosed in quotesThe job ID can be inspected using Flink Job Manager UI running on localhost:8081.
Example of multi-event streams. Will build a shopping cart state from deltas of adding or removing items from cart, i.e. if I add two of an item, and remove one of the item the cart state is one item.
We'll
- Create item added stream
- Create item removed stream
- Add records to the streams
- Setup a continuous job to merge the streams and enrich with if the item was added, or removed
- Create a view to see the live result
Creating the item added stream - create the topics item_added, item_removed, merged_cart_actions.
Create the tables and insert mock data using add-remove-items-cart.sql.
docker compose exec sql-client bash -c "mkdir -p /opt/flink/opt/scripts/"
SQL_HOST=./confluent-course-examples-and-data/data/building-state-from-deltas/add-remove-items-cart.sql
SQL_CONTAINER=/opt/flink/opt/scripts/target.sql
docker cp $SQL_HOST sql-client:$SQL_CONTAINER
docker compose exec sql-client /opt/flink/bin/sql-client.sh -f $SQL_CONTAINERWith data in the two source topics, let's setup a continuous job to merge the streams by getting all records from each source topic, and conditionally enriching them with ADD or REMOVE depending on which topic they came from.
merged-cart.sql.
You should see the enriched messages from both source topics on the merged topic such as in the below screenshot from Conduktor
You can also see this running Flink job visually in the Flink Job Manager UI running on localhost:8081.
Lastly let's see this live state of the cart.
If you run the script in one go using the method described so far, you will get the changelog view.
This is because we can't use the default SQL client interactive mode, you'll get an error telling you as such, instead use the table style TABLEAU when running a script, this shows the change like in CDC rather than the end result.
To get the final result nicely displayed you can start a sql client in your terminal (docker compose exec -it sql-client bash -c "bin/sql-client.sh"). Then sequentially paste and run each of the queries in shopping-cart-state.sql.
We've gone through some practical tips of getting started working with Flink locally, running SQL queries on Kafka and some more advanced examples of stream processing.
Hopefully this gives you an overview of some typical stream processing operations and how you can explore this for yourself, thanks for reading.
Ordered by when mentioned in this doc, or otherwise used.
view-all-items.sql - Creates a table for all_items, this is shopping cart items used in the exercises.
add-records.sql - Example to Add more records to a table.
view-ordered-window.sql - Example of view with ORDER BY.
add-items-to-cart.sql - Adding items to cart event used in worked example.
add-split-stream-data.sql - Add split data to source topics for joining.
enriched-stream-setup.sql - Create a view enriching source topic data by joining on other topics.
add-remove-items-cart.sql - Add mock data to topics in prep of joining them.
merged-cart.sql - Job. Enrich messages from topics, based on which topic they are from as added or removed, send to destination topic.
shopping-cart-state.sql - Create the joined view from the joined topic's deltas on source topic.
streaming-analytics-event-time.sql - Examples using event time.
streaming-analytics-proc-time.sql - Different examples of how you can build a COUNT using processing time.



