This article explains how to integrate Neo4j to Apache Kafka using the Kafka Connect Neo4j Connector.
Apache Kafka Connect Neo4j Connector, which is deployed separately from Neo4j database, allows moving data from Kafka topics into Neo4j and vice versa using Cypher templated queries or Node Patterns. Integrating Neo4j with Kafka will make ingesting and sending data from/to various sources much easier.
To demonstrate this integration this article uses the Confluent Kafka platform on Docker. The Kafka Connect plugin can also be used with AWS Kafka MSK (Amazon Managed Streaming for Apache Kafka).
Installing Confluent Kafka
First, install the Confluent Kafka platform on Docker using the Confluent all in one docker-compose file :
mkdir confluent
cd confluent
curl --silent --output docker-compose.yml \
https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.2.2-post/cp-all-in-one/docker-compose.yml
Start the Confluent Platform stack with the -d option to run in detached mode:
>docker-compose up -d
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating ksqldb-server ... done
Creating ksqldb-cli ... done
Creating ksql-datagen ... done
Creating control-center ... done
Use docker-compose ps to display the ports used by Confluent processes:
confluent ➤ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp
control-center /etc/confluent/docker/run Up 0.0.0.0:9021->9021/tcp
ksql-datagen bash -c echo Waiting for K ... Up
ksqldb-cli /bin/sh Up
ksqldb-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp
rest-proxy /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Installing the Neo4j connector
Once the Confluent containers are up and running, you can install the Kafka Connect Neo4j Connector by executing the following command in the Confluent Connect container:
confluent-hub install neo4j/kafka-connect-neo4j:5.0.0
Choose option 2 (where this tool is installed) and 'yes' for all the other options:
Restart Confluent. Use docker-compose restart if using docker.
Configuring the Kafka Connect Neo4j Connector
Before configuring the connector, you will need to create a new topic called 'users' that will be used for this demonstration.
Connect to the Confluent Control Center on http://localhost:9021/clusters, select the default cluster and go to the Topics section:
Then add a Neo4j Sink Connector in the Connect section :
In the Connector configuration, choose the Kafka Topic called 'users' that you created earlier:
Complete the Connector configuration by adding your Neo4j connection details and the Cypher to execute when a message is added to the 'users' Topic. Here is an example:
{ "name": "Neo4jSinkConnectorConnector_0", "config": { "neo4j.topic.pattern.node.users": "KafkaUser{!userId}", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "name": "Neo4jSinkConnectorConnector_0", "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "errors.log.enable": "true", "errors.log.include.messages": "true", "topics": "users", "neo4j.authentication.type": "BASIC", "neo4j.authentication.basic.username": "neo4j", "neo4j.authentication.basic.password": "********", "neo4j.server.uri": "bolt://host.docker.internal:7617", "neo4j.encryption.enabled": "false", "neo4j.database": "neo4j" } }
In this example, I used the pattern strategy which allows you to extract nodes and relationships from a json by providing an extraction pattern:
"neo4j.topic.pattern.node.<TOPIC_NAME>": "<NODE_EXTRACTION_PATTERN>"
Node pattern used in the example:
"neo4j.topic.pattern.node.users": "KafkaUser{!userId}"
The userId which is prefixed by ! will be used as an Id for my KafkaUser node.
You can also use the Cypher Sink ingestion strategy. For example:
"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)"
More details about Sink ingestion strategies can be found here:
https://neo4j.com/docs/kafka/kafka-connect/sink/
Now it is time to test the Connector! Just produce a new message in the users Topic. You can do this manually for testing purposes in the Confluent Control Center:
A new KafkaUser node will be created in Neo4j each time a message is produced in the Kafka 'users' Topic:
For troubleshooting, you can check the Kafka Connect container logs for possible errors:
Comments
0 comments
Please sign in to leave a comment.