Neo4j - слишком низкая скорость записи для импорта лог-файлов в реальном времени?

Я пытаюсь выяснить, можно ли анализировать файлы журнала сетевого трафика с помощью Neo4j. Поэтому я «отслеживаю» 3 разных файла журнала из Bro IDS в режиме реального времени, используя «sh»-библиотеку, и импортирую записи журнала в neo4j, что кажется очень медленным с использованием py2neo. Импорт CSV здесь не подойдет, так как он выполняется в режиме реального времени.

Пример: я анализирую файл захвата пакетов за один час, используя tcpreplay, который имеет почти 4 000 000 подключений. Я даже играл ее с половиной темпа. Итак, через 2 часа у меня было около 4 000 000 записей в журнале. Прямо сейчас, через 3,5 часа после начала анализа, я только что импортировал 289691 график, состоящий из 5 узлов и 4 отношений. В целом, около 15% данных почти в два раза быстрее.

Я использую py2neo, и код выглядит следующим образом (это один из графиков):

def create_conn_graph(connlog):
[...]

##  Start Session
graph = Graph(bolt=True, password="neo4j")
tx = graph.begin()

############
##  Nodes ##
############

##  Connection Node
conn = Node("Connection", uid=connlog['uid'],
            ts=connlog['ts'],
            date=evt_date,
            time=evt_time,
            [...])

conn_properties = dict(conn)
for key in conn_properties.keys():
    if conn[key] == "-" or conn[key] == "(empty)":
        conn[key] = "0"
conn.update()
tx.merge(conn, "Connection", "uid")

##  IP Nodes
orig = Node("IP", ip=connlog['orig_h'])
tx.merge(orig)

resp = Node("IP", ip=connlog['resp_h'])
tx.merge(resp)

##  History Node
if connlog['history']:
    hist_flow = history_flow(connlog['history'])
    history_node = Node("History", history=connlog['history'], flow=hist_flow)
    tx.merge(history_node, "History", "history")

    ##  (Connection)-[HAS_HISTORY]->(History)
    conn_hist = Relationship(conn, "HAS_HISTORY", history_node)
    tx.merge(conn_hist)

##  Conn_State
conn_state = Node("Conn_State", state=connlog['conn_state'], meaning=CONN_STATE[connlog['conn_state']])
tx.merge(conn_state, "Conn_State", "conn_state")

tx.commit()
tx = graph.begin()

#####################
##  Relationships  ##
#####################

##  (IP)-[STARTS_CONNECTION]->(Connection)
orig_conn = Relationship(orig, "STARTS_CONNECTION", conn, port=connlog['orig_p'])
tx.merge(orig_conn)

##  (Connection)-[CONNECTS_TO]->(IP)
conn_resp = Relationship(conn, "CONNECTS_TO", resp, port=connlog['resp_p'])
tx.merge(conn_resp)

##  (Connection)-[HAS_CONN_STATE]->(Conn_State)
conn_connstate = Relationship(conn, "HAS_CONN_STATE", conn_state)
tx.merge(conn_connstate)

tx.commit()
## (Connection)-[PRODUCED]-> (DNS|HTTP)
if connlog['service'] == "dns":
    graph.run("MATCH (c:Connection {uid:{uid}}), (d:DNS {uid:{uid}}) \
        MERGE (c)-[:PRODUCED]->(d)",
              {"uid": connlog['uid']})

if connlog['service'] == "http":
    graph.run("MATCH (c:Connection {uid:{uid}}), (d:HTTP {uid:{uid}}) \
        MERGE (c)-[:PRODUCED]->(d)",
              {"uid": connlog['uid']})

return True


##  End of create_conn_graph    ########################################


if __name__ == "__main__":
    logentry = {}
    logfield = CONNLOG
    logline = []

    for line in tail("-F", LOG_DIR, _iter=True, _bg=True):
        entry = line.strip().split("\t")
        if line.startswith('#'):
            continue
        for i in range(len(logfield)):
            logentry[logfield[i]] = entry[i]
        create_conn_graph(logentry)

У меня есть следующие ограничения и индексы:

graph.run("CREATE CONSTRAINT ON (c:Connection) ASSERT c.uid IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (i:IP) ASSERT i.ip IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (c:Conn_State) ASSERT c.conn_state IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (h:History) ASSERT h.history IS UNIQUE")
graph.run("CREATE CONSTRAINT ON (host:Host) ASSERT host.host is UNIQUE")
graph.run("CREATE CONSTRAINT ON (q:QueryType) ASSERT q.type is UNIQUE")
graph.run("CREATE CONSTRAINT ON (qc:QueryClass) ASSERT qc.class is UNIQUE")
graph.run("CREATE CONSTRAINT ON (rc:ResponseCode) ASSERT rc.code is UNIQUE")
graph.run("CREATE CONSTRAINT ON (ic:InfoCode) ASSERT ic.code is UNIQUE")
graph.run("CREATE CONSTRAINT ON (ua:UserAgent) ASSERT ua.useragent is UNIQUE")
graph.run("CREATE CONSTRAINT ON (m:Method) ASSERT m.method is UNIQUE")
graph.run("CREATE CONSTRAINT ON (r:Referrer) ASSERT r.referrer is UNIQUE")
graph.run("CREATE INDEX ON :DNS(uid)")
graph.run("CREATE INDEX ON :Uri(uri)")
graph.run("CREATE INDEX ON :HTTP(uid)")

Может быть, кто-то может подсказать, что я делаю неправильно или где я допустил ошибки в коде? Количество коммитов возникает из-за временных ошибок при попытке записи в neo4j. При большем количестве транзакций у меня больше не было ошибок.

Заранее благодарю за любую помощь


person L Stetson    schedule 15.06.2017    source источник
comment
Создали ли вы некоторые ограничения схемы/индексы?   -  person logisima    schedule 15.06.2017
comment
Я добавил ограничения и индексы к вопросу   -  person L Stetson    schedule 15.06.2017


Ответы (1)


Я не уверен, что py2neo делает под капотом, по моему опыту, драйверы python не самые быстрые.

Я бы, вероятно, использовал простые операторы Cypher, где у вас есть полный контроль над тем, что происходит.

У вас также есть некоторые неправильные/отсутствующие индексы, убедитесь, что все ваши запросы/операции используют индекс. В противном случае это приведет к полному сканированию.

  • (d:DNS {uid:{uid}})
  • (д:HTTP {uid:{uid}})

Я также предлагаю вам отправлять немного больше данных за транзакцию (например, 10 000 записей).

Также может иметь смысл выполнить некоторую предварительную обработку для каждого пакета журналов, например. создавайте отдельные IP-узлы заранее для каждого сегмента журнала, а не для каждой строки журнала.

это тоже может вам помочь: http://jexp.de/blog/2017/03/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher/

person Michael Hunger    schedule 15.06.2017
comment
Спасибо за советы. Что касается отсутствующих индексов. Я немного не сопоставил код. Индексы HTTP_Record и DNS_Record принадлежат узлам HTTP и DNS из кода и теперь имеют правильное имя. - person L Stetson; 15.06.2017