Те из вас, кто часто использует базы данных, могут задаться вопросом:
1. Как проверить, какой SQL в данный момент выполняется базой данных и в каком состоянии?
2. Как завершить ненормальный SQL? Например, если оператор SELECT
, используемый для запроса таблицы с массивными данными, не содержит условий запроса, это приведет к снижению производительности всей базы данных. Это может подтолкнуть к желанию завершить этот ненормальный SQL.
В ответ на вышеуказанные проблемы в Apache ShardingSphere были введены такие функции, как Show processlist
и Kill <processID>
.
1. Введение
Show processlist
: эта команда может отображать список SQL-запросов, которые в данный момент выполняются ShardingSphere, и ход выполнения каждого SQL-запроса. Если ShardingSphere развернута в режиме кластера, функция Show processlist
агрегирует SQL, работающий для всех экземпляров Proxy в кластере, а затем отображает результат, поэтому вы всегда можете увидеть все SQL, выполняемые в данный момент.
mysql> show processlist \G;
*************************** 1. row ***************************
Id: 82a67f254959e0a0807a00f3cd695d87
User: root
Host: 10.200.79.156
db: root
Command: Execute
Time: 19
State: Executing 0/1
Info: update t_order set version = 456
1 row in set (0.24 sec)
Kill <processID>
: Эта команда реализована на основе Show processlist
и может завершать работающий SQL, указанный в файле Show processlist
.
mysql> kill 82a67f254959e0a0807a00f3cd695d87;
Query OK, 0 rows affected (0.17 sec)
2. Как они работают?
Теперь, когда вы понимаете функции Show processlist
и Kill <processID>
, давайте посмотрим, как работают эти две команды. Поскольку принцип работы Kill <processID>
аналогичен принципу Show processlist
, мы сосредоточимся на интерпретации Show processlist
.
2.1 Как сохраняется и уничтожается SQL?
Каждый SQL, выполненный в ShardingSphere, будет генерировать объект ExecutionGroupContext
. Объект содержит всю информацию об этом SQL, среди которых есть поле executionID
для обеспечения его уникальности.
Когда ShardingSphere получает команду SQL, вызывается GovernanceExecuteProcessReporter# report
для сохранения ExecutionGroupContext
информации в кэше ConcurrentHashMap
(в настоящее время поддерживаются только операторы DML и DDL MySQL; другие типы баз данных будут поддерживаться в более поздних версиях. Операторы запроса также классифицируются как DML). ).
public final class GovernanceExecuteProcessReporter implements ExecuteProcessReporter { @Override public void report(final QueryContext queryContext, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final ExecuteProcessConstants constants, final EventBusContext eventBusContext) { ExecuteProcessContext executeProcessContext = new ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants); ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(), executeProcessContext); ShowProcessListManager.getInstance().putProcessStatement(executeProcessContext.getExecutionID(), executeProcessContext.getProcessStatements()); } }
@NoArgsConstructor(access = AccessLevel.PRIVATE) public final class ShowProcessListManager { private static final ShowProcessListManager INSTANCE = new ShowProcessListManager(); @Getter private final Map<String, ExecuteProcessContext> processContexts = new ConcurrentHashMap<>(); @Getter private final Map<String, Collection<Statement>> processStatements = new ConcurrentHashMap<>(); public static ShowProcessListManager getInstance() { return INSTANCE; } public void putProcessContext(final String executionId, final ExecuteProcessContext processContext) { processContexts.put(executionId, processContext); } public void putProcessStatement(final String executionId, final Collection<Statement> statements) { if (statements.isEmpty()) { return; } processStatements.put(executionId, statements); } }
Как показано выше, класс ShowProcessListManager
имеет две карты кэша, а именно processContexts
и processStatements
. Первый хранит сопоставление между executionID
и ExecuteProcessContext
.
Последний содержит сопоставление между executionID
и Statement objects
, которое может генерировать несколько операторов после перезаписи SQL.
Каждый раз, когда ShardingSphere получает оператор SQL, информация SQL будет кэшироваться в двух Картах. После выполнения SQL кэш карты будет удален.
@RequiredArgsConstructor
public final class ProxyJDBCExecutor {
private final String type;
private final ConnectionSession connectionSession;
private final JDBCDatabaseCommunicationEngine databaseCommunicationEngine;
private final JDBCExecutor jdbcExecutor;
public List<ExecuteResult> execute(final QueryContext queryContext, final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
try {
MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
DatabaseType protocolType = database.getProtocolType();
DatabaseType databaseType = database.getResource().getDatabaseType();
ExecuteProcessEngine.initialize(queryContext, executionGroupContext, eventBusContext);
SQLStatementContext<?> context = queryContext.getSqlStatementContext();
List<ExecuteResult> result = jdbcExecutor.execute(executionGroupContext,
ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
true),
ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
false));
ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(), eventBusContext);
return result;
} finally {
ExecuteProcessEngine.clean();
}
}
Как показано выше, ExecuteProcessEngine.initialize(queryContext, executionGroupContext, eventBusContext);
будет хранить информацию SQL в двух кэш-картах. Наконец, ExecuteProcessEngine.clean();
в блоке кода очистит карту в кеше.
SQL, показанный в Show processlist
, был получен от processContexts
. Но эта карта — всего лишь локальный кеш. Если ShardingSphere развернута в режиме кластера, как Show processlist
получить SQL, работающий на других машинах в кластере? Посмотрим, как с этим справится ShardingSphere.
2.2 Как работает Show processlist
?
Когда ShardingSphere получает команду Show process
, она отправляется исполнителю ShowProcessListExecutor#execute
для обработки. Реализация getQueryResult()
находится в центре внимания.
public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor {
private Collection<String> batchProcessContexts;
@Getter
private QueryResultMetaData queryResultMetaData;
@Getter
private MergedResult mergedResult;
public ShowProcessListExecutor() {
ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().register(this);
}
@Subscribe
public void receiveProcessListData(final ShowProcessListResponseEvent event) {
batchProcessContexts = event.getBatchProcessContexts();
}
@Override
public void execute(final ConnectionSession connectionSession) {
queryResultMetaData = createQueryResultMetaData();
mergedResult = new TransparentMergedResult(getQueryResult());
}
private QueryResult getQueryResult() {
ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().post(new ShowProcessListRequestEvent());
if (null == batchProcessContexts || batchProcessContexts.isEmpty()) {
return new RawMemoryQueryResult(queryResultMetaData, Collections.emptyList());
}
Collection<YamlExecuteProcessContext> processContexts = new LinkedList<>();
for (String each : batchProcessContexts) {
processContexts.addAll(YamlEngine.unmarshal(each, BatchYamlExecuteProcessContext.class).getContexts());
}
List<MemoryQueryResultDataRow> rows = processContexts.stream().map(processContext -> {
List<Object> rowValues = new ArrayList<>(8);
rowValues.add(processContext.getExecutionID());
rowValues.add(processContext.getUsername());
rowValues.add(processContext.getHostname());
rowValues.add(processContext.getDatabaseName());
rowValues.add("Execute");
rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - processContext.getStartTimeMillis()));
int processDoneCount = processContext.getUnitStatuses().stream().map(each -> ExecuteProcessConstants.EXECUTE_STATUS_DONE == each.getStatus() ? 1 : 0).reduce(0, Integer::sum);
String statePrefix = "Executing ";
rowValues.add(statePrefix + processDoneCount + "/" + processContext.getUnitStatuses().size());
String sql = processContext.getSql();
if (null != sql && sql.length() > 100) {
sql = sql.substring(0, 100);
}
rowValues.add(null != sql ? sql : "");
return new MemoryQueryResultDataRow(rowValues);
}).collect(Collectors.toList());
return new RawMemoryQueryResult(queryResultMetaData, rows);
}
private QueryResultMetaData createQueryResultMetaData() {
List<RawQueryResultColumnMetaData> columns = new ArrayList<>();
columns.add(new RawQueryResultColumnMetaData("", "Id", "Id", Types.VARCHAR, "VARCHAR", 20, 0));
columns.add(new RawQueryResultColumnMetaData("", "User", "User", Types.VARCHAR, "VARCHAR", 20, 0));
columns.add(new RawQueryResultColumnMetaData("", "Host", "Host", Types.VARCHAR, "VARCHAR", 64, 0));
columns.add(new RawQueryResultColumnMetaData("", "db", "db", Types.VARCHAR, "VARCHAR", 64, 0));
columns.add(new RawQueryResultColumnMetaData("", "Command", "Command", Types.VARCHAR, "VARCHAR", 64, 0));
columns.add(new RawQueryResultColumnMetaData("", "Time", "Time", Types.VARCHAR, "VARCHAR", 10, 0));
columns.add(new RawQueryResultColumnMetaData("", "State", "State", Types.VARCHAR, "VARCHAR", 64, 0));
columns.add(new RawQueryResultColumnMetaData("", "Info", "Info", Types.VARCHAR, "VARCHAR", 120, 0));
return new RawQueryResultMetaData(columns);
}
}
Вы будете использовать функцию EventBus
пакета guava
, которая представляет собой информационную базу данных публикации/подписки, являющуюся элегантной реализацией паттерна Observer. EventBus
отделяет классы друг от друга, и вы узнаете больше об этом ниже.
Метод getQueryResult()
опубликует ShowProcessListRequestEvent
. ProcessRegistrySubscriber#loadShowProcessListData
использует аннотации @Subscribe
для подписки на событие.
Этот метод является основой реализации Show processlist
. Далее мы представим конкретные процедуры этого метода.
public final class ProcessRegistrySubscriber {
@Subscribe
public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
String processListId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
boolean triggerIsComplete = false;
// 1. Obtain the Process List path of all existing proxy nodes in cluster mode
Collection<String> triggerPaths = getTriggerPaths(processListId);
try {
// 2. Iterate through the path and write an empty string to the node, to trigger the node monitoring.
triggerPaths.forEach(each -> repository.persist(each, ""));
// 3. Lock and wait 5 seconds for each node to write the information of currently running SQL to the persistence layer.
triggerIsComplete = waitAllNodeDataReady(processListId, triggerPaths);
// 4. Fetch and aggregate the data written by each proxy node from the persistence layer. Then EventBus will post a ShowProcessListResponseEvent command, which means the operation is completed.
sendShowProcessList(processListId);
} finally {
// 5. Delete resources
repository.delete(ProcessNode.getProcessListIdPath(processListId));
if (!triggerIsComplete) {
triggerPaths.forEach(repository::delete);
}
}
}
}
Он состоит из пяти шагов, и в центре внимания находятся шаги 2 и 3.
2.2.1 Шаг 2: кластер получает реализацию данных
На этом этапе в узел /nodes/compute_nodes/process_trigger/<instanceId>:<processlistId>
будет записана пустая строка, которая активирует логику мониторинга ShardingSphere.
При запуске ShardingSphere уровень сохраняемости будет watch
отслеживать серию изменений пути, таких как добавление, удаление и изменение пути /nodes/compute_nodes
.
Однако мониторинг — это асинхронный процесс, и основной поток не блокируется, поэтому на шаге 3 необходимо заблокировать и дождаться, пока каждый узел ShardingSphere запишет свою текущую информацию SQL на уровень сохраняемости.
Давайте посмотрим, как ShardingSphere обрабатывает логику мониторинга.
public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
@Override
public Collection<String> getWatchingKeys(final String databaseName) {
return Collections.singleton(ComputeNode.getComputeNodePath());
}
@Override
public Collection<Type> getWatchingTypes() {
return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
}
@SuppressWarnings("unchecked")
@Override
public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
String instanceId = ComputeNode.getInstanceIdByComputeNode(event.getKey());
if (!Strings.isNullOrEmpty(instanceId)) {
...
} else if (event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
return createInstanceEvent(event);
// show processlist
} else if (event.getKey().startsWith(ComputeNode.getProcessTriggerNodePatch())) {
return createShowProcessListTriggerEvent(event);
// kill processlistId
} else if (event.getKey().startsWith(ComputeNode.getProcessKillNodePatch())) {
return createKillProcessListIdEvent(event);
}
return Optional.empty();
}
private Optional<GovernanceEvent> createShowProcessListTriggerEvent(final DataChangedEvent event) {
Matcher matcher = getShowProcessTriggerMatcher(event);
if (!matcher.find()) {
return Optional.empty();
}
if (Type.ADDED == event.getType()) {
return Optional.of(new ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
}
if (Type.DELETED == event.getType()) {
return Optional.of(new ShowProcessListUnitCompleteEvent(matcher.group(2)));
}
return Optional.empty();
}
}
После того, как ComputeNodeStateChangedWatcher#createGovernanceEvent
просмотрит информацию, он будет различать, какое событие создать в соответствии с путем.
Как показано в приведенном выше коде, это новый узел, поэтому будет опубликовано ShowProcessListTriggerEvent
. Поскольку каждый экземпляр ShardingSphere будет отслеживать /nodes/compute_nodes
, каждый экземпляр будет обрабатывать ShowProcessListTriggerEvent
.
В этом случае одномашинная обработка трансформируется в кластерную. Давайте посмотрим, как с этим справляется ShardingSphere.
public final class ClusterContextManagerCoordinator {
@Subscribe public synchronized void triggerShowProcessList(final ShowProcessListTriggerEvent event) { if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) { return; } Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext(); if (!processContexts.isEmpty()) { registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()), YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts))); } registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId())); } }
ClusterContextManagerCoordinator#triggerShowProcessList
подпишется на ShowProcessListTriggerEvent
, в котором processContext
данные обрабатываются сами собой. ShowProcessListManager.getInstance().getAllProcessContext()
извлекает processContext
, который работает в данный момент (здесь данные относятся к SQL-информации, которую ShardingSphere сохраняет в Map перед каждым выполнением SQL, что описано в начале статьи) и передает ее на уровень сохраняемости. Если узел /nodes/compute_nodes/process_trigger/<instanceId>:<processlistId>
удален, обработка завершается.
Когда вы удалите узел, мониторинг также будет запущен, и будет опубликовано ShowProcessListUnitCompleteEvent
. Это событие, наконец, разбудит ожидающую блокировку.
public final class ClusterContextManagerCoordinator {
@Subscribe
public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
if (null != simpleLock) {
simpleLock.doNotify();
}
}
}
2.2.2 Шаг 3: блокировка и ожидание реализации данных
ShardingSphere использует метод isReady(Paths)
, чтобы определить, все ли экземпляры обработаны. Он возвращает true
только после обработки всех экземпляров.
Максимальное время ожидания обработки данных составляет 5 секунд. Если обработка не завершена в течение 5 секунд, возвращается false
.
public final class ClusterContextManagerCoordinator {
@Subscribe
public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
if (null != simpleLock) {
simpleLock.doNotify();
}
}
}
2.2.3 Агрегируйте данные processList
и верните их
После того как каждый экземпляр обработал данные, экземпляр, получивший команду Show processlist
, должен агрегировать данные и затем отобразить результат.
public final class ProcessRegistrySubscriber {
private void sendShowProcessList(final String processListId) {
List<String> childrenKeys = repository.getChildrenKeys(ProcessNode.getProcessListIdPath(processListId));
Collection<String> batchProcessContexts = new LinkedList<>();
for (String each : childrenKeys) {
batchProcessContexts.add(repository.get(ProcessNode.getProcessListInstancePath(processListId, each)));
}
eventBusContext.post(new ShowProcessListResponseEvent(batchProcessContexts));
}
}
ProcessRegistrySubscriber#sendShowProcessList
объединит текущие данные SQL в batchProcessContexts
, а затем опубликует ShowProcessListResponseEvent
.
Это событие будет использовано ShowProcessListExecutor#receiveProcessListData
, а метод getQueryResult()
продолжит отображать queryResult
.
На данный момент мы завершили процесс выполнения команды Show processlist
.
2.3 Как работает Kill <processId>
?
Kill <processId>
имеет ту же логику, что и Show processlist
, то есть объединяет EventBus
с механизмом watch
.
Поскольку мы не знаем, какому SQL принадлежит processId
, также необходимо добавить пустые узлы для каждого экземпляра.
С помощью механизма watch
каждый экземпляр ShardingSphere наблюдает за новым узлом и проверяет, находится ли ключ processId
в карте кеша. Если да, извлеките значение, соответствующее ключу.
Значение представляет собой коллекцию Collection<Statement>
. Затем вам нужно только перебрать коллекцию Statement
и вызвать statement.cancel()
по очереди. Нижележащий уровень — это метод java.sql.Statement#cancel()
, вызываемый для отмены выполнения SQL.
3. Заключение
В настоящее время Apache ShardingSphere может реализовать функции Show processlist
и Kill <processId>
только для диалектов MySQL.
Как только вы узнаете, как они работают, и если вы заинтересованы, вы можете принять участие в разработке связанных функций. Наше сообщество очень открытое, и мы приветствуем всех, кто заинтересован в том, чтобы внести свой вклад в открытый исходный код.
Соответствующие ссылки:
Официальный сайт Apache ShardingSphere
Slack-канал Apache ShardingSphere
Автор
Сюй Ян, инженер по разработке промежуточного программного обеспечения в Servyou Group. Отвечает за сегментирование таблиц и баз данных с массивными данными. Энтузиаст открытого исходного кода и участник ShardingSphere. В настоящее время он заинтересован в разработке модуля ядра проекта ShardingSphere.