Почему моя вставка данных в мою базу данных cassandra такая медленная?

Это мой запрос, если текущий идентификатор данных присутствует или отсутствует в базе данных Cassandra.

row = session.execute("SELECT * FROM articles where id = %s", [id]) 

Разрешил сообщения в Kafka, затем определить, существует ли это сообщение в базе данных cassandra, если его нет, то оно должно выполнить операцию вставки, если оно существует, его не следует вставлять в данные.

messages = consumer.get_messages(count=25)

    if len(messages) == 0:
        print 'IDLE'
        sleep(1)
        continue

    for message in messages:
        try:
            message = json.loads(message.message.value)
            data = message['data']
            if data:
                for article in data:
                    source = article['source']
                    id = article['id']
                    title = article['title']
                    thumbnail = article['thumbnail']
                    #url = article['url']
                    text = article['text']
                    print article['created_at'],type(article['created_at'])
                    created_at = parse(article['created_at'])
                    last_crawled = article['last_crawled']
                    channel = article['channel']#userid
                    category = article['category']
                    #scheduled_for = created_at.replace(minute=created_at.minute + 5, second=0, microsecond=0)
                    scheduled_for=(datetime.utcnow() + timedelta(minutes=5)).replace(second=0, microsecond=0)
                    row = session.execute("SELECT * FROM articles where id = %s", [id])
                    if len(list(row))==0:
                    #id parse base62
                        ids = [id[0:2],id[2:9],id[9:16]]
                        idstr=''
                        for argv in ids:
                            num = int(argv)
                            idstr=idstr+encode(num)
                        url='http://weibo.com/%s/%s?type=comment' % (channel,idstr)
                        session.execute("INSERT INTO articles(source, id, title,thumbnail, url, text, created_at, last_crawled,channel,category) VALUES (%s,%s, %s, %s, %s, %s, %s, %s, %s, %s)", (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))
                        session.execute("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (%s, %s, %s,%s) USING TTL 86400", (source,'article', scheduled_for, id))
                        log.info('%s %s %s %s %s %s %s %s %s %s' % (source, id, title,thumbnail, url, text, created_at, scheduled_for,channel,category))


        except Exception, e:
            log.exception(e)
            #log.info('error %s %s' % (message['url'],body))
            print e
            continue

Изменить:

У меня есть один идентификатор, который имеет только одну уникальную строку таблицы, и я хочу, чтобы она была такой. Как только я добавляю другое запланированное время для уникального идентификатора, моя система дает сбой. Добавьте это, если len(list(row))==0: правильная мысль, но после этого моя система очень медленно работает.

Это описание моей таблицы:

DROP TABLE IF EXISTS schedules;

CREATE TABLE schedules (
 source text,
 type text,
 scheduled_for timestamp,
 id text,
 PRIMARY KEY (source, type, scheduled_for, id)
);

Это запланированное_для можно изменить. Вот еще конкретный пример

Hao article 2016-01-12 02:09:00+0800 3930462206848285
Hao article 2016-01-12 03:09:00+0801 3930462206848285
Hao article 2016-01-12 04:09:00+0802 3930462206848285
Hao article 2016-01-12 05:09:00+0803 3930462206848285

Спасибо за ваши ответы!


person peter    schedule 11.01.2016    source источник
comment
Учитывая, что операции записи дешевы, а операции чтения могут быть недешевыми, я думаю, что оптимизация, которую вы пытаетесь выполнить, не имеет смысла.   -  person Ralf    schedule 11.01.2016
comment
@Ralf Хорошо, так что бы предложить вместо этого? Спасибо за ваш ответ!   -  person peter    schedule 11.01.2016
comment
Просто вставить запись снова? Или хотя бы не выбирать * из таблицы, а только ID. Таким образом, вы сэкономите некоторую пропускную способность сети. (Я думаю, что Cassandra по-прежнему загружает всю строку; может быть, кто-то может это прокомментировать.) В зависимости от вашего приложения выбор каждой строки перед вставкой имеет addtl. недостатком разбавления кешей Cassandra, что снижает производительность чтения для ваших пользователей.   -  person Ralf    schedule 11.01.2016


Ответы (1)


Почему бы вам не использовать insert if not exists ?

https://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html

person Cedric H.    schedule 11.01.2016
comment
Обратите внимание, что IF NOT EXISTS также снижает производительность. - person Ralf; 11.01.2016
comment
Я полностью согласен, это типичный случай чтения перед записью, но, по крайней мере, это проще с точки зрения приложения и, возможно, более оптимизировано. - person Cedric H.; 11.01.2016
comment
Я думаю, что это хуже, чем просто прочитать, прежде чем писать. Чтобы функция IF NOT EXIST работала, Cassandra должна обеспечить согласованность во всех кластер. Если вы настроите кластер на окончательную согласованность, вы потеряете все преимущества производительности этой настройки. Но я предполагаю, что IF NOT EXISTS не связывается с содержимым кеша. - person Ralf; 11.01.2016
comment
@CedricH.@ralf Я добавил приведенный выше пример в свой пост: у меня есть один идентификатор, который имеет только одну уникальную строку таблицы, и я хочу, чтобы она была такой. Как только я добавляю другое запланированное время для уникального идентификатора, наша система дает сбой. Добавьте это, если len(list(row))==0: правильная мысль, но после этого моя система очень медленно работает. Не знаете, что делать? Спасибо за помощь! - person peter; 12.01.2016