茄子的个人空间

neo4j导入节点和边的python代码

字数统计: 1.2k阅读时长: 5 min
2025/03/17
loading

在做知识图谱相关的项目时,经常需要将数据导入到neo4j中,这里分享一下导入节点和边的python代码,以便于以后查阅。

需要注意的是,下面的代码是在neo4j 5.x 版本上测试通过的,如果是其他版本,可能需要做一些调整。

  1. neo4j 导入节点和边的类实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

from neo4j import GraphDatabase


def format_properties( properties):
# 将属性字典转换为不带引号的字符串
formatted_properties = []
for key, value in properties.items():
if isinstance(value, str):
formatted_properties.append(f"{key}: '{value}'")
else:
formatted_properties.append(f"{key}: '{value}'")
return '{' + ', '.join(formatted_properties) + '}'


class Neo4jClass:

def __init__(self, uri, auth):
self.driver = GraphDatabase.driver(uri, auth=auth)

def close(self):
self.driver.close()

def create_node(self, label, properties):
with self.driver.session() as session:
session.write_transaction(self._create_node, label, properties)
def del_all(self):
with self.driver.session() as session:
query = f'MATCH (n) DETACH DELETE n'
session.run(query)

@staticmethod
def _create_node(tx, label, properties):
query = f"CREATE (n:{label} $properties) RETURN n"
tx.run(query, properties=properties)

def create_relationship(self, label1, properties1, label2, properties2, relationship):
with self.driver.session() as session:
session.write_transaction(self._create_relationship, label1, properties1, label2, properties2, relationship)


@staticmethod
def _create_relationship(tx, label1, properties1, label2, properties2, relationship):
properties1 = format_properties(properties1)
properties2 = format_properties(properties2)
query = f"MATCH (a:{label1} {properties1}), (b:{label2} {properties2}) \
CREATE (a)-[r:{relationship}]->(b) RETURN r"
tx.run(query)

def create_relationship_with_properties(self, label1, properties1, label2, properties2, relationship, properties):
with self.driver.session() as session:
session.write_transaction(self._create_relationship_with_properties, label1, properties1, label2, properties2, relationship, properties)

@staticmethod
def _create_relationship_with_properties(tx, label1, properties1, label2, properties2, relationship, properties):
properties1 = format_properties(properties1)
properties2 = format_properties(properties2)
properties = format_properties(properties)
query = f"MATCH (a:{label1} {properties1}), (b:{label2} {properties2}) \
CREATE (a)-[r:{relationship} {properties}]->(b) RETURN r"
tx.run(query)

def run_query(self, query):
with self.driver.session() as session:
return session.run(query)





if '__main__' == __name__:
url = r'bolt://localhost:7687'
auth = ("neo4j", "neo4j-key")

neo4j = Neo4jClass(uri=url, auth=auth)

neo4j.del_all()
neo4j.create_node("Person", {"name": "Alice", "age": 30})
neo4j.create_node("Person", {"name": "Bob", "age": 25})
neo4j.create_relationship("Person", {"name": "Alice"}, "Person", {"name": "Bob"}, "KNOWS")

neo4j.create_node("Person", {"name": "Tom", "age": 30})
neo4j.create_relationship_with_properties("Person", {"name": "Alice"}, "Person", {"name": "Tom"}, "KW", {"since": "2021-01-01", "weight": 0.8})

neo4j.close()


  1. 实际使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142

"""
@author: eggplant
@project: TCM_KG
@time: 2024年07月30日13:16:04
@function: 将预处理好的节点和三元组导入 Neo4j 数据库

JSON 文件格式说明:
JSON 文件应包含以下键:
- `entity_nodes` (list): 实体节点列表,每个元素是一个字典,包含:
- `name` (str): 实体名称
- `type` (str): 实体类型
- 其他属性 (可选)
- `relation_nodes` (list): 关系节点列表,每个元素是一个字典,包含:
- `name` (str): 关系名称
- `property` (dict, 可选): 关系属性
- `triples` (list): 三元组列表,每个元素是一个三元组 (e1_idx, rel_idx, e2_idx),索引指向 `entity_nodes` 和 `relation_nodes`

示例 JSON 文件:
{
"entity_nodes": [
{"name": "感冒", "type": "疾病", "症状": "发热"},
{"name": "发热", "type": "症状"}
],
"relation_nodes": [
{"name": "具有症状", "property": {"强度": "高"}}
],
"triples": [
[0, 0, 1] # 代表 "感冒" -> "具有症状" -> "发热"
]
}
"""

import json
import tqdm
from .neo4j_tools import Neo4jClass

def dict_to_string(entity: dict) -> dict:
"""
将实体字典中的嵌套字典转换为 JSON 字符串

参数:
- entity (dict): 需要转换的实体字典

返回:
- dict: 处理后的字典,嵌套字典转换为 JSON 字符串

示例:
输入: {"name": "感冒", "属性": {"病因": "病毒感染"}}
输出: {"name": "感冒", "属性": '{"病因": "病毒感染"}'}
"""
return {key: json.dumps(value, ensure_ascii=False) if isinstance(value, dict) else value for key, value in entity.items()}

def import_entity_nodes(neo4j, entity_nodes: list, debug_n: int):
"""
导入实体节点

参数:
- neo4j (Neo4jClass): Neo4j 数据库操作实例
- entity_nodes (list): 实体节点列表
- debug_n (int): 调试模式下最大导入数量 (-1 表示不限制)

示例:
entity_nodes = [{"name": "感冒", "type": "疾病"}, {"name": "发热", "type": "症状"}]
"""
print("导入实体节点...")
for idx, entity in tqdm.tqdm(enumerate(entity_nodes)):
neo4j.create_node(entity['type'], dict_to_string(entity))
if debug_n != -1 and idx >= debug_n:
break # Debug 模式限制导入数量

def import_triples(neo4j, data: dict, debug_n: int):
"""
导入三元组关系

参数:
- neo4j (Neo4jClass): Neo4j 数据库操作实例
- data (dict): 包含 `entity_nodes`, `relation_nodes`, `triples` 的 JSON 数据
- debug_n (int): 调试模式下最大导入数量 (-1 表示不限制)

示例:
data = {
"entity_nodes": [{"name": "感冒", "type": "疾病"}, {"name": "发热", "type": "症状"}],
"relation_nodes": [{"name": "具有症状"}],
"triples": [[0, 0, 1]]
}
"""
print("导入三元组...")
for idx, (e1_idx, rel_idx, e2_idx) in tqdm.tqdm(enumerate(data['triples'])):
try:
entity1 = {'name': data['entity_nodes'][e1_idx]['name'], 'type': data['entity_nodes'][e1_idx]['type']}
entity2 = {'name': data['entity_nodes'][e2_idx]['name'], 'type': data['entity_nodes'][e2_idx]['type']}
relation = data['relation_nodes'][rel_idx]

entity1, entity2 = map(dict_to_string, (entity1, entity2))
entity1_clean = {k: v.replace("'", "_") for k, v in entity1.items() if k != 'type'}
entity2_clean = {k: v.replace("'", "_") for k, v in entity2.items() if k != 'type'}

neo4j.create_relationship_with_properties(
entity1['type'], entity1_clean, entity2['type'], entity2_clean, relation['name'], relation.get('property', {})
)
except Exception as e:
print(f"跳过错误三元组 {idx}: {e}")
if debug_n != -1 and idx >= debug_n:
break

def import_data_to_neo4j(neo4j, file_paths: list, debug_n: int = -1):
"""
从 JSON 文件导入数据到 Neo4j

参数:
- neo4j (Neo4jClass): Neo4j 数据库操作实例
- file_paths (list): JSON 文件路径列表
- debug_n (int): 调试模式下最大导入数量 (-1 表示不限制)
"""
for file_path in file_paths:
print(f"处理文件: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)

import_entity_nodes(neo4j, data.get('entity_nodes', []), debug_n)
import_triples(neo4j, data, debug_n)

neo4j.close()
print("数据导入完成!")

if __name__ == "__main__":
DEBUG_N = -1 # 调试模式,设置导入的最大数量,-1 表示不限制

NEO4J_CONFIG = {
"uri": "bolt://localhost:7687",
"auth": ("neo4j", "neo4j-key")
}
FILE_PATHS = [
r'data.json',
]

neo4j = Neo4jClass(**NEO4J_CONFIG)
neo4j.del_all()

import_data_to_neo4j(neo4j, FILE_PATHS, debug_n=DEBUG_N)

enjoy it! 🎉

CATALOG