InvalidQueryException при попытке создать семейство столбцов в Cassandra через модуль

У меня есть кластер cassandra с 3 узлами, и с помощью моего модульного теста на Java я сначала создаю пространство ключей, а затем создаю семейство столбцов в этом пространстве ключей. Иногда модульные тесты проходят, но случайно я продолжаю получать следующую ошибку. Я использую последний java-драйвер datastax 2.1.4 и версию cassandra 2.1.0.

com.symc.edp.database.nosql.NoSQLPersistenceException: com.datastax.driver.core.exceptions.InvalidQueryException: Cannot add column family 'testmaxcolumnstable' to non existing keyspace 'testmaxcolumnskeyspace'.
    at com.symc.edp.database.nosql.cassandra.CassandraCQLTableEditor.createTable(CassandraCQLTableEditor.java:67)
    at com.symc.edp.database.nosql.cassandra.TestCassandraWideRowPerformance.testWideRowInserts(TestCassandraWideRowPerformance.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Cannot add column family 'testmaxcolumnstable' to non existing keyspace 'testmaxcolumnskeyspace'.
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36)
    at com.symc.edp.database.nosql.cassandra.CassandraCQLTableEditor.createTable(CassandraCQLTableEditor.java:65)
    ... 6 more
Caused by: com.datastax.driver.core.exceptions.InvalidConfigurationInQueryException: Cannot add column family 'testmaxcolumnstable' to non existing keyspace 'testmaxcolumnskeyspace'.
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:104)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:249)
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:421)
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:697)
    at com.datastax.shaded.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at com.datastax.shaded.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at com.datastax.shaded.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at com.datastax.shaded.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at com.datastax.shaded.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at com.datastax.shaded.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at com.datastax.shaded.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

И в файле system.log cassandra я вижу следующее исключение:

ERROR [SharedPool-Worker-1] 2015-01-28 15:08:24,286 ErrorMessage.java:218 - Unexpected exception during request
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_05]
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_05]
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_05]
        at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_05]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375) ~[na:1.8.0_05]
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:878) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:114) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:507) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:464) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:378) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:350) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) ~[netty-all-4.0.20.Final.jar:4.0.20.Final]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_05]
INFO  [SharedPool-Worker-1] 2015-01-28 15:13:01,051 MigrationManager.java:229 - Create new Keyspace: KSMetaData{name=testmaxcolumnskeyspace, strategyClass=SimpleStrategy, strategyOptions={replication_factor=1}, cfMetaData={}, durableWrites=true, userTypes=org.apache.cassandra.config.UTMetaData@790ee1bb}
INFO  [MigrationStage:1] 2015-01-28 15:13:01,058 ColumnFamilyStore.java:856 - Enqueuing flush of schema_keyspaces: 512 (0%) on-heap, 0 (0%) off-heap
INFO  [MemtableFlushWriter:7] 2015-01-28 15:13:01,059 Memtable.java:326 - Writing Memtable-schema_keyspaces@1727029917(138 serialized bytes, 3 ops, 0%/0% of on/off-heap limit)
INFO  [MemtableFlushWriter:7] 2015-01-28 15:13:01,077 Memtable.java:360 - Completed flushing /usr/share/apache-cassandra-2.1.0/bin/../data/data/system/schema_keyspaces-b0f2235744583cdb9631c43e59ce3676/system-schema_keyspaces-ka-103-Data.db (175 bytes) for commitlog position ReplayPosition(segmentId=1422485457803, position=1181)

Кроме того, я проверил через devcenter, пространство ключей не было создано.


person Sau    schedule 28.01.2015    source источник
comment
Я ответил на это в списке рассылки, прежде чем заметил вопрос SO. Если вы собираетесь делать кросс-пост, предоставьте ссылку в одном или другом месте, чтобы обсуждение могло происходить в одном месте.   -  person Adam Holmberg    schedule 30.01.2015
comment
Конечно, учту это для будущих постов.   -  person Sau    schedule 02.02.2015


Ответы (2)


Не видя вашего кода, я предполагаю, что вам нужен сон между созданием пространства ключей и попыткой создать в нем таблицы. Вероятно, вам нужно дать определению пространства ключей пару секунд, чтобы оно распространилось на все узлы в вашем кластере, прежде чем вы попытаетесь его использовать.

person Jim Meyer    schedule 29.01.2015
comment
Проблема в том, что само пространство ключей не было создано. Он выдает сброс соединения по исключению однорангового узла при попытке создать пространство ключей. - person Sau; 29.01.2015
comment
Это может быть связано с тем, что клиент дает сбой при попытке создать таблицу в пространстве ключей, которое еще не было распространено, поэтому Cassandra видит сброс соединения, когда клиент умирает. У вас есть сон между созданием пространства ключей и созданием таблиц? - person Jim Meyer; 29.01.2015

Как уже отмечалось, было бы полезно увидеть ваш класс конфигурации. Мы используем ClassPathCQLDataSet для выпуска наших операторов и одновременно создаем keyspace (ссылка на документацию ClassPathCqlDataSet, обратите внимание, что логическое значение в позициях 2 и 3 указывает ему создавать и удалять пространство ключей). db.cql - это файл, в котором мы создаем статистику таблицы. Вот наша конфигурация, которая может вам помочь:

package some.package;

import org.cassandraunit.CQLDataLoader;
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.io.ClassPathResource;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;

@Configuration
@Profile({"test"})
public class TestCassandraConfig implements DisposableBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestCassandraConfig.class);

    private final String CQL = "db.cql";

    @Value("${cassandra.contact_points:localhost}")
    private String contact_points;
    @Value("${cassandra.port:9142}")
    private int port;
    @Value("${cassandra.keyspace:test}")
    private String keyspace;

    private static Cluster cluster;
    private static Session session;
    private static SessionProxy sessionProxy;

    @Bean
    public Session session() throws Exception {
        if (session == null) {
            initialize();
        }

        return sessionProxy;
    }

    @Bean
    public TestApplicationContext testApplicationContext() {
        return new TestApplicationContext();
    }

    private void initialize() throws Exception {
        LOGGER.info("Starting embedded cassandra server");
        EmbeddedCassandraServerHelper.startEmbeddedCassandra("another-cassandra.yaml");

        LOGGER.info("Connect to embedded db");
        cluster = Cluster.builder().addContactPoints(contact_points).withPort(port).build();
        session = cluster.connect();

        LOGGER.info("Initialize keyspace");
        final CQLDataLoader cqlDataLoader = new CQLDataLoader(session);
        cqlDataLoader.load(new ClassPathCQLDataSet(CQL, false, true, keyspace));
    }

    @Override
    public void destroy() throws Exception {
        if (cluster != null) {
            cluster.close();
            cluster = null;
        }
    }
}
person Nenad Bozic    schedule 14.03.2015