Те из вас, кто часто использует базы данных, могут задаться вопросом:

1. Как проверить, какой SQL в данный момент выполняется базой данных и в каком состоянии?

2. Как завершить ненормальный SQL? Например, если оператор SELECT, используемый для запроса таблицы с массивными данными, не содержит условий запроса, это приведет к снижению производительности всей базы данных. Это может подтолкнуть к желанию завершить этот ненормальный SQL.

В ответ на вышеуказанные проблемы в Apache ShardingSphere были введены такие функции, как Show processlist и Kill <processID>.

1. Введение

Show processlist: эта команда может отображать список SQL-запросов, которые в данный момент выполняются ShardingSphere, и ход выполнения каждого SQL-запроса. Если ShardingSphere развернута в режиме кластера, функция Show processlist агрегирует SQL, работающий для всех экземпляров Proxy в кластере, а затем отображает результат, поэтому вы всегда можете увидеть все SQL, выполняемые в данный момент.

mysql> show processlist \G;
*************************** 1. row ***************************
     Id: 82a67f254959e0a0807a00f3cd695d87
   User: root
   Host: 10.200.79.156
     db: root
Command: Execute
   Time: 19
  State: Executing 0/1
   Info: update t_order set version = 456
1 row in set (0.24 sec)

Kill <processID>: Эта команда реализована на основе Show processlist и может завершать работающий SQL, указанный в файле Show processlist.

mysql> kill 82a67f254959e0a0807a00f3cd695d87;
Query OK, 0 rows affected (0.17 sec)

2. Как они работают?

Теперь, когда вы понимаете функции Show processlist и Kill <processID>, давайте посмотрим, как работают эти две команды. Поскольку принцип работы Kill <processID> аналогичен принципу Show processlist, мы сосредоточимся на интерпретации Show processlist.

2.1 Как сохраняется и уничтожается SQL?

Каждый SQL, выполненный в ShardingSphere, будет генерировать объект ExecutionGroupContext. Объект содержит всю информацию об этом SQL, среди которых есть поле executionID для обеспечения его уникальности.

Когда ShardingSphere получает команду SQL, вызывается GovernanceExecuteProcessReporter# report для сохранения ExecutionGroupContext информации в кэше ConcurrentHashMap (в настоящее время поддерживаются только операторы DML и DDL MySQL; другие типы баз данных будут поддерживаться в более поздних версиях. Операторы запроса также классифицируются как DML). ).

public final class GovernanceExecuteProcessReporter implements ExecuteProcessReporter {
    
    @Override
    public void report(final QueryContext queryContext, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
                       final ExecuteProcessConstants constants, final EventBusContext eventBusContext) {
        ExecuteProcessContext executeProcessContext = new ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants);
        ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(), executeProcessContext);
        ShowProcessListManager.getInstance().putProcessStatement(executeProcessContext.getExecutionID(), executeProcessContext.getProcessStatements());
    }
}
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ShowProcessListManager {
    
    private static final ShowProcessListManager INSTANCE = new ShowProcessListManager();
    
    @Getter
    private final Map<String, ExecuteProcessContext> processContexts = new ConcurrentHashMap<>();
    
    @Getter
    private final Map<String, Collection<Statement>> processStatements = new ConcurrentHashMap<>();
 
    public static ShowProcessListManager getInstance() {
        return INSTANCE;
    }
    
    public void putProcessContext(final String executionId, final ExecuteProcessContext processContext) {
        processContexts.put(executionId, processContext);
    }
    
    public void putProcessStatement(final String executionId, final Collection<Statement> statements) {
        if (statements.isEmpty()) {
            return;
        }
        processStatements.put(executionId, statements);
    }
}

Как показано выше, класс ShowProcessListManager имеет две карты кэша, а именно processContexts и processStatements. Первый хранит сопоставление между executionID и ExecuteProcessContext.

Последний содержит сопоставление между executionID и Statement objects, которое может генерировать несколько операторов после перезаписи SQL.

Каждый раз, когда ShardingSphere получает оператор SQL, информация SQL будет кэшироваться в двух Картах. После выполнения SQL кэш карты будет удален.

@RequiredArgsConstructor
public final class ProxyJDBCExecutor {
    
    private final String type;
    
    private final ConnectionSession connectionSession;
    
    private final JDBCDatabaseCommunicationEngine databaseCommunicationEngine;
    
    private final JDBCExecutor jdbcExecutor;
    
    public List<ExecuteResult> execute(final QueryContext queryContext, final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
                                       final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
        try {
            MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
            EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
            ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
            DatabaseType protocolType = database.getProtocolType();
            DatabaseType databaseType = database.getResource().getDatabaseType();
            ExecuteProcessEngine.initialize(queryContext, executionGroupContext, eventBusContext);
            SQLStatementContext<?> context = queryContext.getSqlStatementContext();
            List<ExecuteResult> result = jdbcExecutor.execute(executionGroupContext,
                    ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
                            true),
                    ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
                            false));
            ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(), eventBusContext);
            return result;
        } finally {
            ExecuteProcessEngine.clean();
        }
    }

Как показано выше, ExecuteProcessEngine.initialize(queryContext, executionGroupContext, eventBusContext); будет хранить информацию SQL в двух кэш-картах. Наконец, ExecuteProcessEngine.clean(); в блоке кода очистит карту в кеше.

SQL, показанный в Show processlist, был получен от processContexts. Но эта карта — всего лишь локальный кеш. Если ShardingSphere развернута в режиме кластера, как Show processlist получить SQL, работающий на других машинах в кластере? Посмотрим, как с этим справится ShardingSphere.

2.2 Как работает Show processlist?

Когда ShardingSphere получает команду Show process, она отправляется исполнителю ShowProcessListExecutor#execute для обработки. Реализация getQueryResult() находится в центре внимания.

public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor {
    
    private Collection<String> batchProcessContexts;
    
    @Getter
    private QueryResultMetaData queryResultMetaData;
    
    @Getter
    private MergedResult mergedResult;
    
    public ShowProcessListExecutor() {
        ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().register(this);
    }
    
    @Subscribe
    public void receiveProcessListData(final ShowProcessListResponseEvent event) {
        batchProcessContexts = event.getBatchProcessContexts();
    }
    
    @Override
    public void execute(final ConnectionSession connectionSession) {
        queryResultMetaData = createQueryResultMetaData();
        mergedResult = new TransparentMergedResult(getQueryResult());
    }
    
    private QueryResult getQueryResult() {
        ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().post(new ShowProcessListRequestEvent());
        if (null == batchProcessContexts || batchProcessContexts.isEmpty()) {
            return new RawMemoryQueryResult(queryResultMetaData, Collections.emptyList());
        }
        Collection<YamlExecuteProcessContext> processContexts = new LinkedList<>();
        for (String each : batchProcessContexts) {
            processContexts.addAll(YamlEngine.unmarshal(each, BatchYamlExecuteProcessContext.class).getContexts());
        }
        List<MemoryQueryResultDataRow> rows = processContexts.stream().map(processContext -> {
            List<Object> rowValues = new ArrayList<>(8);
            rowValues.add(processContext.getExecutionID());
            rowValues.add(processContext.getUsername());
            rowValues.add(processContext.getHostname());
            rowValues.add(processContext.getDatabaseName());
            rowValues.add("Execute");
            rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - processContext.getStartTimeMillis()));
            int processDoneCount = processContext.getUnitStatuses().stream().map(each -> ExecuteProcessConstants.EXECUTE_STATUS_DONE == each.getStatus() ? 1 : 0).reduce(0, Integer::sum);
            String statePrefix = "Executing ";
            rowValues.add(statePrefix + processDoneCount + "/" + processContext.getUnitStatuses().size());
            String sql = processContext.getSql();
            if (null != sql && sql.length() > 100) {
                sql = sql.substring(0, 100);
            }
            rowValues.add(null != sql ? sql : "");
            return new MemoryQueryResultDataRow(rowValues);
        }).collect(Collectors.toList());
        return new RawMemoryQueryResult(queryResultMetaData, rows);
    }
    
    private QueryResultMetaData createQueryResultMetaData() {
        List<RawQueryResultColumnMetaData> columns = new ArrayList<>();
        columns.add(new RawQueryResultColumnMetaData("", "Id", "Id", Types.VARCHAR, "VARCHAR", 20, 0));
        columns.add(new RawQueryResultColumnMetaData("", "User", "User", Types.VARCHAR, "VARCHAR", 20, 0));
        columns.add(new RawQueryResultColumnMetaData("", "Host", "Host", Types.VARCHAR, "VARCHAR", 64, 0));
        columns.add(new RawQueryResultColumnMetaData("", "db", "db", Types.VARCHAR, "VARCHAR", 64, 0));
        columns.add(new RawQueryResultColumnMetaData("", "Command", "Command", Types.VARCHAR, "VARCHAR", 64, 0));
        columns.add(new RawQueryResultColumnMetaData("", "Time", "Time", Types.VARCHAR, "VARCHAR", 10, 0));
        columns.add(new RawQueryResultColumnMetaData("", "State", "State", Types.VARCHAR, "VARCHAR", 64, 0));
        columns.add(new RawQueryResultColumnMetaData("", "Info", "Info", Types.VARCHAR, "VARCHAR", 120, 0));
        return new RawQueryResultMetaData(columns);
    }
}

Вы будете использовать функцию EventBus пакета guava, которая представляет собой информационную базу данных публикации/подписки, являющуюся элегантной реализацией паттерна Observer. EventBus отделяет классы друг от друга, и вы узнаете больше об этом ниже.

Метод getQueryResult() опубликует ShowProcessListRequestEvent. ProcessRegistrySubscriber#loadShowProcessListData использует аннотации @Subscribe для подписки на событие.

Этот метод является основой реализации Show processlist. Далее мы представим конкретные процедуры этого метода.

public final class ProcessRegistrySubscriber {    
    @Subscribe
    public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
        String processListId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
        boolean triggerIsComplete = false;
        // 1. Obtain the Process List path of all existing proxy nodes in cluster mode
        Collection<String> triggerPaths = getTriggerPaths(processListId);
        try {
            // 2. Iterate through the path and write an empty string to the node, to trigger the node monitoring.
            triggerPaths.forEach(each -> repository.persist(each, ""));
            // 3. Lock and wait 5 seconds for each node to write the information of currently running SQL to the persistence layer. 
            triggerIsComplete = waitAllNodeDataReady(processListId, triggerPaths);
            // 4. Fetch and aggregate the data written by each proxy node from the persistence layer. Then EventBus will post a ShowProcessListResponseEvent command, which means the operation is completed.
            sendShowProcessList(processListId);
        } finally {
            // 5. Delete resources
            repository.delete(ProcessNode.getProcessListIdPath(processListId));
            if (!triggerIsComplete) {
                triggerPaths.forEach(repository::delete);
            }
        }
    }
}

Он состоит из пяти шагов, и в центре внимания находятся шаги 2 и 3.

2.2.1 Шаг 2: кластер получает реализацию данных

На этом этапе в узел /nodes/compute_nodes/process_trigger/<instanceId>:<processlistId> будет записана пустая строка, которая активирует логику мониторинга ShardingSphere.

При запуске ShardingSphere уровень сохраняемости будет watch отслеживать серию изменений пути, таких как добавление, удаление и изменение пути /nodes/compute_nodes.

Однако мониторинг — это асинхронный процесс, и основной поток не блокируется, поэтому на шаге 3 необходимо заблокировать и дождаться, пока каждый узел ShardingSphere запишет свою текущую информацию SQL на уровень сохраняемости.

Давайте посмотрим, как ShardingSphere обрабатывает логику мониторинга.

public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
    
    @Override
    public Collection<String> getWatchingKeys(final String databaseName) {
        return Collections.singleton(ComputeNode.getComputeNodePath());
    }
    
    @Override
    public Collection<Type> getWatchingTypes() {
        return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
    }
    
    @SuppressWarnings("unchecked")
    @Override
    public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
        String instanceId = ComputeNode.getInstanceIdByComputeNode(event.getKey());
        if (!Strings.isNullOrEmpty(instanceId)) {
            ...
        } else if (event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
            return createInstanceEvent(event);
            // show processlist
        } else if (event.getKey().startsWith(ComputeNode.getProcessTriggerNodePatch())) {
            return createShowProcessListTriggerEvent(event);
            // kill processlistId
        } else if (event.getKey().startsWith(ComputeNode.getProcessKillNodePatch())) {
            return createKillProcessListIdEvent(event);
        }
        return Optional.empty();
    }
    
        
    private Optional<GovernanceEvent> createShowProcessListTriggerEvent(final DataChangedEvent event) {
        Matcher matcher = getShowProcessTriggerMatcher(event);
        if (!matcher.find()) {
            return Optional.empty();
        }
        if (Type.ADDED == event.getType()) {
            return Optional.of(new ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
        }
        if (Type.DELETED == event.getType()) {
            return Optional.of(new ShowProcessListUnitCompleteEvent(matcher.group(2)));
        }
        return Optional.empty();
    }
}

После того, как ComputeNodeStateChangedWatcher#createGovernanceEvent просмотрит информацию, он будет различать, какое событие создать в соответствии с путем.

Как показано в приведенном выше коде, это новый узел, поэтому будет опубликовано ShowProcessListTriggerEvent. Поскольку каждый экземпляр ShardingSphere будет отслеживать /nodes/compute_nodes, каждый экземпляр будет обрабатывать ShowProcessListTriggerEvent.

В этом случае одномашинная обработка трансформируется в кластерную. Давайте посмотрим, как с этим справляется ShardingSphere.

public final class ClusterContextManagerCoordinator {
    @Subscribe
    public synchronized void triggerShowProcessList(final ShowProcessListTriggerEvent event) {
        if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
            return;
        }
        Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
        if (!processContexts.isEmpty()) {
            registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
                    YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
        }
        registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
    }
}

ClusterContextManagerCoordinator#triggerShowProcessList подпишется на ShowProcessListTriggerEvent, в котором processContext данные обрабатываются сами собой. ShowProcessListManager.getInstance().getAllProcessContext() извлекает processContext, который работает в данный момент (здесь данные относятся к SQL-информации, которую ShardingSphere сохраняет в Map перед каждым выполнением SQL, что описано в начале статьи) и передает ее на уровень сохраняемости. Если узел /nodes/compute_nodes/process_trigger/<instanceId>:<processlistId> удален, обработка завершается.

Когда вы удалите узел, мониторинг также будет запущен, и будет опубликовано ShowProcessListUnitCompleteEvent. Это событие, наконец, разбудит ожидающую блокировку.

public final class ClusterContextManagerCoordinator {
    
    @Subscribe
    public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
        ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
        if (null != simpleLock) {
            simpleLock.doNotify();
        }
    }
}

2.2.2 Шаг 3: блокировка и ожидание реализации данных

ShardingSphere использует метод isReady(Paths), чтобы определить, все ли экземпляры обработаны. Он возвращает true только после обработки всех экземпляров.

Максимальное время ожидания обработки данных составляет 5 секунд. Если обработка не завершена в течение 5 секунд, возвращается false.

public final class ClusterContextManagerCoordinator {
    
    @Subscribe
    public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
        ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
        if (null != simpleLock) {
            simpleLock.doNotify();
        }
    }
}

2.2.3 Агрегируйте данные processList и верните их

После того как каждый экземпляр обработал данные, экземпляр, получивший команду Show processlist, должен агрегировать данные и затем отобразить результат.

public final class ProcessRegistrySubscriber {  
    
    private void sendShowProcessList(final String processListId) {
        List<String> childrenKeys = repository.getChildrenKeys(ProcessNode.getProcessListIdPath(processListId));
        Collection<String> batchProcessContexts = new LinkedList<>();
        for (String each : childrenKeys) {
            batchProcessContexts.add(repository.get(ProcessNode.getProcessListInstancePath(processListId, each)));
        }
        eventBusContext.post(new ShowProcessListResponseEvent(batchProcessContexts));
    }
}

ProcessRegistrySubscriber#sendShowProcessList объединит текущие данные SQL в batchProcessContexts, а затем опубликует ShowProcessListResponseEvent.

Это событие будет использовано ShowProcessListExecutor#receiveProcessListData, а метод getQueryResult() продолжит отображать queryResult.

На данный момент мы завершили процесс выполнения команды Show processlist.

2.3 Как работает Kill <processId>?

Kill <processId> имеет ту же логику, что и Show processlist, то есть объединяет EventBus с механизмом watch.

Поскольку мы не знаем, какому SQL принадлежит processId, также необходимо добавить пустые узлы для каждого экземпляра.

С помощью механизма watch каждый экземпляр ShardingSphere наблюдает за новым узлом и проверяет, находится ли ключ processId в карте кеша. Если да, извлеките значение, соответствующее ключу.

Значение представляет собой коллекцию Collection<Statement>. Затем вам нужно только перебрать коллекцию Statement и вызвать statement.cancel() по очереди. Нижележащий уровень — это метод java.sql.Statement#cancel(), вызываемый для отмены выполнения SQL.

3. Заключение

В настоящее время Apache ShardingSphere может реализовать функции Show processlist и Kill <processId> только для диалектов MySQL.

Как только вы узнаете, как они работают, и если вы заинтересованы, вы можете принять участие в разработке связанных функций. Наше сообщество очень открытое, и мы приветствуем всех, кто заинтересован в том, чтобы внести свой вклад в открытый исходный код.

Соответствующие ссылки:

Официальный сайт Apache ShardingSphere

Apache ShardingSphere GitHub

Slack-канал Apache ShardingSphere

Автор

Сюй Ян, инженер по разработке промежуточного программного обеспечения в Servyou Group. Отвечает за сегментирование таблиц и баз данных с массивными данными. Энтузиаст открытого исходного кода и участник ShardingSphere. В настоящее время он заинтересован в разработке модуля ядра проекта ShardingSphere.