Motivation

Many user and customers want to integrate Kafka and other streaming solutions with Neo4j. Either to ingest data into the graph from other sources. Or to send update events (change data capture - CDC) to the event log for later consumption.

This extension was developed to satisfy all these use-cases and more to come.

The project is composed of several parts:

  • Neo4j Streams Procedure: a procedure to send a payload to a topic

  • Neo4j Streams Producer: a transaction event handler events that sends data to a Kafka topic

  • Neo4j Streams Consumer: a Neo4j application that ingest data from Kafka topics into Neo4j via templated Cypher Statements

  • Kafka-Connect Plugin: a plugin for the Confluent Platform that allows to ingest data into Neo4j, from Kafka topics, via Cypher queries.

Installation

Copy it into $NEO4J_HOME/plugins and configure the relevant connections.

The minimal setup in your neo4j.conf is:

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092

For each module there are additional configs that are explained in the individual sections.

Build locally

mvn clean install
  1. Copy <project_dir>/target/neo4j-streams-<VERSION>.jar into $NEO4J_HOME/plugins

  2. Restart Neo4j

Neo4j Streams Producer

Is the transaction event handler events that sends data to a Kafka topic

Configuration

You can set the following configuration values in your neo4j.conf, here are the defaults.

neo4j.conf
kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
kafka.acks=1
kafka.num.partitions=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.reindex.batch.size=1000
kafka.session.timeout.ms=15000
kafka.connection.timeout.ms=10000
kafka.replication=1
kafka.linger.ms=1
kafka.transactional.id=

streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.source.enable=<true/false, default=true>

Note: To use the Kafka transactions please set kafka.transactional.id and kafka.acks properly

See the Apache Kafka documentation for details on these settings.

Patterns

Nodes

To control which nodes are sent to Kafka, and which of their properties you can define node-patterns in the config.

You can chose Labels and properties for inclusion or exclusion, with * meaning all.

Patterns are separated by semicolons ;.

The basic syntax is:

Label{*};Label1{prop1, prop2};Label3{-prop1,-prop2}
pattern meaning

Label{*}

all nodes with this label with all their properties go to the related topic

Label1:Label2

nodes with these two labels are sent to the related topic

Label{prop1,prop2}

the prop1 and prop2 of all nodes with this label are sent to the related topic

Label{-prop1,-prop2}

in the node with label Label properties prop1 and prop2 are excluded

Relationships

To control which relationships are sent to Kafka, and which of their properties you can define relationships-patterns in the config.

You can chose Type and properties for inclusion or exclusion, with * meaning all.

Patterns are separated by semicolons ;.

The basic syntax is:

KNOWS{*};MEET{prop1, prop2};ANSWER{-prop1,-prop2}
pattern meaning

KNOWS{*}

all relationship with this label with all their properties go to the related topic

KNOWS{prop1,prop2}

the prop1 and prop2 of all relationship with this type are sent to the related topic

KNOWS{-prop1,-prop2}

in the relationship with type KNOWS properties prop1 and prop2 are excluded

Transaction Event Handler

The transaction event handler is the core of the Stream Producer and allows to stream database changes.

Events

The Producer streams three kind of events:

  • created: when a node/relation/property is created

  • updated: when a node/relation/property is updated

  • deleted: when a node/relation/property is deleted

Created

Following an example of the node creation event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie"
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123"
    },
    "end": {
      "labels": ["Person"],
      "id": "456"
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  }
}
Updated

Following an example of the node update event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person", "Tmp"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne"
      }
    },
    "after": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo":[0.123, 46.2222, 32.11111]
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123"
    },
    "end": {
      "labels": ["Person"],
      "id": "456"
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  }
}
Deleted

Following an example of the node creation event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo":[0.123, 46.2222, 32.11111]
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123"
    },
    "end": {
      "labels": ["Person"],
      "id": "456"
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  }
}

Meta

The meta field contains the metadata related to the transaction event:

Field Type Description

timestamp

Number

The timestamp related to the transaction event

username

String

The username that generated the transaction

tx_id

Number

The transaction id provided by the Neo4j trasaction manager

tx_event_count

Number

The number of the events included into the transaction (i.e. 2 update on nodes, 1 relationship creation)

tx_event_id

Number

The id of the event inside the transaction

operation

enum["created", "updated", "deleted"]

The operation type

source

Object

Contains the information about the source

Source
Field Type Description

hostname

String

The information about the source

Payload

The payload field contains the information about the the data related to the event

Field Type Description

id

Number

The id of the graph entity

type

enum["node", "relationship"]

The type of the graph entity

before

Object

The data before the transaction event

after

Object

The data after the transaction event

Payload: before and after

We must distinguish two cases:

Nodes
Field Type Description

labels

String[]

List of labels attached to the node

properties

Map<K, V>

List of properties attached to the node, the Key is the property name

Relationships
Field Type Description

label

string

The relationship type

properties

Map<K,V>

List of properties attached to the relationship, the Key is the property name

start

Object

The starting node of the relationship

end

Object

The ending node of the relationship

Relationships: startNode and endNode
Field Type Description

id

Number

The id of the node

labels

String[]

List of labels attached to the node

Schema

Field Type Description

before

Object

The schema before the transaction event

after

Object

The schema after the transaction event

Schema: before and after
Field Type Description

before

Object

The schema before the transaction event

after

Object

The schema after the transaction event

We must distinguish two cases:

Nodes
Field Type Description

constraints

Object[]

List of constraints attached to the node

properties

Map<K, V>

List of properties attached to the node

Constraints

Nodes and Relationships can have a list of constraints attached to them:

Table 1. Node Constraints
Field Type Description

label

String

The label attached to the constraint

type

enum["NODE_KEY", "UNIQUENESS", "RELATIONSHIP_PROPERTY_EXISTENCE", "NODE_PROPERTY_EXISTENCE"]

The contraint type

properties

String[]

List of node properties involved in the constraint

Table 2. Relationship constraints
Field Type Description

properties

Map<K, V>

List of properties attached to the relationship

Neo4j Streams Consumer

Is the Kafka Sink that ingest the data directly into Neo4j

How it works

It works with template Cypher queries stored into properties with the following format:

streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>

Each Cypher template must refer to an event object that will be injected by the Sink

Following an example:

For this event

{
 "id": 42,
 "properties": {
 "title": "Answer to anyting",
 "description": "It depends."}
}
neo4j.conf
streams.sink.topic.cypher.my-topic=MERGE (n:Label {id: event.id}) \
    ON CREATE SET n += event.properties

Under the hood the Sink inject the event object as a parameter in this way

UNWIND {events} AS event
MERGE (n:Label {id: event.id})
    ON CREATE SET n += event.properties

Where {batch} is a json list, so continuing with the example above a possible full representation could be:

:params events => [{id:"alice@example.com",properties:{name:"Alice",age:32}},
    {id:"bob@example.com",properties:{name:"Bob",age:42}}]

UNWIND {events} AS event
MERGE (n:Label {id: event.id})
    ON CREATE SET n += event.properties

Configuration

You can set the following Kafka configuration values in your neo4j.conf, here are the defaults.

neo4j.conf
kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j

streams.sink.polling.interval=<The time, in milliseconds, spent waiting in poll if data is not available in the buffer. default=Long.MAX_VALUE>
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
streams.sink.enable=<true/false, default=true>

See the Apache Kafka documentation for details on these settings.

Procedures

The Streams project comes out with a list of procedures.

General configuration

You can enable/disable the procedures by changing this variable inside the neo4j.conf

neo4j.conf
streams.procedures.enable=<true/false, default=true>

streams.publish

This procedure allows custom message streaming from Neo4j to the configured environment by using the underlying configured Producer.

Uses:

CALL streams.publish('my-topic', 'Hello World from Neo4j!')

The message retrieved from the Consumer is the following:

{"payload":"Hello world from Neo4j!"}

If you use a local docker (compose) setup, you can check for these messages with:

docker exec -it kafka kafka-console-consumer --topic my-topic --bootstrap-server kafka:9092

Input Parameters:

Variable Name Type Description

topic

String

The topic where you want to publish the data

payload

Object

The data that you want to stream

You can send any kind of data in the payload, nodes, relationships, paths, lists, maps, scalar values and nested versions thereof.

In case of nodes or relationships if the topic is defined in the patterns provided by the configuration their properties will be filtered in according with the configuration.

streams.consume

This procedure allows to consume messages from a given topic.

Uses:

CALL streams.consume('my-topic', {<config>}) YIELD event RETURN event

Example: Imagine you have a producer that publish events like this {"name": "Andrea", "surname": "Santurbano"}, we can create user nodes in this way:

CALL streams.consume('my-topic', {<config>}) YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})

Input Parameters:

Variable Name Type Description

topic

String

The topic where you want to publish the data

config

Object

The configuration parameters

Available configuration parameters

Variable Name Type Description

timeout

Long

It’s the value passed to Kafka Consumer#poll method (milliseconds). Default 1000

from

String

It’s the Kafka configuration parameter auto.offset.reset

Execute with Docker

Following you’ll find a lightweight Docker Compose file that allows you to test the application in your local environment

Prerequisites:

  • Docker

  • Docker Compose

Here the instruction about how to configure Docker and Docker-Compose

Launch it locally

Following a compose file that allows you to spin-up Neo4j, Kafka and Zookeeper in order to test the application.

Before starting please change the volume directory according to yours, inside the <plugins> dir you must put Streams jar

volumes:
    - $HOME/neo4j/3.4/plugins:/plugins

From the same directory where the compose file is, you can launch this command:

$ docker-compose up -d

Please note that the Neo4j Docker image use a naming convention; you can override every neo4j.conf property by prefix it with NEO4J_ and using the following transformations:

  • single underscore is converted in double underscore: _ → __

  • point is converted in single underscore: ._

Example:

  • dbms.memory.heap.max_size=8GNEO4J_dbms_memory_heap_max__size: 8G

  • dbms.logs.debug.level=DEBUGNEO4J_dbms_logs_debug_level: DEBUG

Producer

In case you are testing the producer you can execute a consumer that subscribes the topic neo4j by executing this command:

$ docker exec kafka kafka-console-consumer --bootstrap-server kafka:19092 --topic neo4j --from-beginning

Then directly from the Neo4j browser you can generate some random data with this query:

UNWIND range(1,100) as id
CREATE (p:Person {id:id, name: "Name " + id, age: id % 3}) WITH collect(p) as people
UNWIND people as p1
UNWIND range(1,10) as friend
WITH p1, people[(p1.id + friend) % size(people)] as p2
CREATE (p1)-[:KNOWS {years: abs(p2.id - p1.id)}]->(p2)

And if you go back to your consumer you’ll see something like this:

{"key":"neo4j","value":{"meta":{"timestamp":1542047038549,"username":"neo4j","txId":12,"txEventId":107,"txEventsCount":110,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"99","start":{"id":"9","labels":["Person"]},"end":{"id":"0","labels":["Person"]},"before":null,"after":{"properties":{"years":9}},"label":"KNOWS","type":"relationship"},"schema":{"properties":[],"constraints":null}}}
{"key":"neo4j","value":{"meta":{"timestamp":1542047038549,"username":"neo4j","txId":12,"txEventId":108,"txEventsCount":110,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"96","start":{"id":"9","labels":["Person"]},"end":{"id":"7","labels":["Person"]},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":[],"constraints":null}}}
{"key":"neo4j","value":{"meta":{"timestamp":1542047038549,"username":"neo4j","txId":12,"txEventId":109,"txEventsCount":110,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"97","start":{"id":"9","labels":["Person"]},"end":{"id":"8","labels":["Person"]},"before":null,"after":{"properties":{"years":1}},"label":"KNOWS","type":"relationship"},"schema":{"properties":[],"constraints":null}}}

Consumer

In case of you are using the Sink you can define your topic/cypher-query combination as it follows:

    environment:
      NEO4J_streams_sink_topic_neo4j:
        "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
          MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
          MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
          MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
          MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
          MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
          MERGE (s:Question{neo_id: toInteger(payload.start.id)})
          MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
        )"
docker-compose.yml
version: '3'
services:
  neo4j:
    image: neo4j:3.4
    hostname: neo4j
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    depends_on:
    - kafka
    volumes:
    - $HOME/neo4j/3.4/plugins:/plugins
    environment:
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_logs_debug_level: DEBUG
      # KAFKA related configuration
      NEO4J_kafka_zookeeper_connect: zookeeper:12181
      NEO4J_kafka_bootstrap_servers: kafka:19092

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
    - "12181:12181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 12181

  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    ports:
    - "19092:19092"
    depends_on:
    - zookeeper
    environment:
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092

Kafka Connect

Neo4j Loves Confluent

Kafka Connect, an open source component of Apache Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.

The Neo4j Streams project provides a Kafka Connect plugin that can be installed into the Confluent Platform enabling:

  • Ingest data from Kafka topics directly into Neo4j via templated Cypher queries;

  • Stream Neo4j transaction events (coming soon).

Docker compose file

Inside the directory /kafka-connect-neo4j/docker you’ll find a compose file that allows you to start the whole testing environment:

docker-compose.yml
---
version: '2'
services:
  neo4j:
    image: neo4j:3.4-enterprise
    hostname: neo4j
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    environment:
      NEO4J_kafka_zookeeper_connect: zookeeper:2181
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G
      NEO4J_ACCEPT_LICENSE_AGREEMENT: yes

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
    - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093

      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
    - zookeeper
    - broker
    ports:
    - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    container_name: connect
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    ports:
    - "8083:8083"
    volumes:
    - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    container_name: control-center
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    - connect
    ports:
    - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

Configuration parameters

You can set the following configuration values via Confluent Connect UI, or via REST endpoint

Field Type Description

neo4j.server.uri

String

The Bolt URI

neo4j.authentication.type

enum[NONE, BASIC, KERBEROS]

The authentication type (default BASIC)

neo4j.batch.size

Int

The max number of events processed by the Cypher query (default 1000)

neo4j.batch.timeout.msec

Long

The execution timeout for the cypher query (default 30000)

neo4j.authentication.basic.username

String

The authentication username

neo4j.authentication.basic.password

String

The authentication password

neo4j.authentication.basic.realm

String

The authentication realm

neo4j.authentication.kerberos.ticket

String

The Kerberos ticket

neo4j.encryption.enabled

Boolean

If the encryption is enabled (default false)

neo4j.encryption.trust.strategy

enum[TRUST_ALL_CERTIFICATES,TRUST_CUSTOM_CA_SIGNED_CERTIFICATES,TRUST_SYSTEM_CA_SIGNED_CERTIFICATES]

The Neo4j trust strategy (default TRUST_ALL_CERTIFICATES)

neo4j.encryption.ca.certificate.path

String

The path of the certificate

neo4j.connection.max.lifetime.msecs

Long

The max Neo4j connection lifetime (default 1 hour)

neo4j.connection.acquisition.timeout.msecs

Long

The max Neo4j acquisition timeout (default 1 hour)

neo4j.connection.liveness.check.timeout.msecs

Long

The max Neo4j liveness check timeout (default 1 hour)

neo4j.connection.max.pool.size

Int

The max pool size (default 100)

neo4j.load.balance.strategy

enum[ROUND_ROBIN,LEAST_CONNECTED]

The Neo4j load balance strategy (default LEAST_CONNECTED)

Build it locally

Build the project by running the following command:

mvn clean install

Inside the directory <neo4j-streams>/kafka-connect-neo4j/target/component/packages you’ll find a file named neo4j-kafka-connect-neo4j-<VERSION>.zip

Configuring the stack

Create a directory plugins at the same level of the compose file and unzip the file neo4j-kafka-connect-neo4j-<VERSION>.zip inside it, then start the compose file

docker-compose up -d

You can access your Neo4j instance under: http://localhost:7474, log in with neo4j as username and connect as password (see the docker-compose file to change it).

The insertion is sped up, if you create these two indexes:

CREATE INDEX ON :Person(surname);
CREATE CONSTRAINT ON (f:Family) ASSERT f.name IS UNIQUE;

Create the Sink instance:

We’ll define the Sink configuration as follows:

{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "my-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname, from: 'AVRO'}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
  }
}

In particular this line:

"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"

defines that all the data that comes from the topic my-topic will be unpacked by the Sink into Neo4j with the following Cypher query:

MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)

Under the hood the Sink inject the event object in this way

UNWIND {batch} AS event
MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)

Where {batch} is a list of event objects.

You can change the query or remove the property and add your own, but you must follow the following convention:

"neo4j.topic.cypher.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"

Let’s load the configuration into the Confluent Platform with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @contrib.sink.avro.neo4j.json

The file contrib.sink.string-json.neo4j.json contains a configuration that manage a simple JSON producer example

Please check that everything is fine by going into:

(or on 0.0.0.0 instead of localhost depending on your Docker environment)

and click to the Send data out (or Sink depending on your Confluent Platform Version) tab. You must find a table just like this:

Status Active Tasks Name Topics

Running

1

Neo4jSinkConnector

my-topic

Use the data generator

You can download and use the neo4j-streams-sink-tester-1.0.jar in order to generate a sample dataset.

This package sends records to the Neo4j Kafka Sink by using the following in two data formats:

JSON example:

{"name": "Name", "surname": "Surname"}

AVRO, with the schema:

{
 "type":"record",
 "name":"User",
 "fields":[{"name":"name","type":"string"}, {"name":"surname","type":"string"}]
}

Note: Before start using the data generator please create the following indexes on Neo4j (in order to speed-up the import process):

CREATE INDEX ON :Person(name)
CREATE INDEX ON :Family(surname)

Please type:

java -jar neo4j-streams-sink-tester-1.0.jar -h

to print the option list with default values.

In order to choose the data format please use the -f flag: -f AVRO or -f JSON (the default value). So:

java -jar neo4j-streams-sink-tester-1.0.jar -f AVRO

Will send data in AVRO format.

For a complete overview of the Neo4j Steams Sink Tester please refer to this repo

Monitor via Confluent Platform UI

The Kafka Monitoring UI can be found at http://<localhost>:9021/management/connect

Confluent Importing Metrics

They show up properly in my topic, and then are added to Neo4j via the sink.

Below you see the data that has been ingested into Neo4j. During my testing I got up to more than 2M events.

Confluent Platform Management