[flink]flink-sql-gateway的调试过程(一)

目录

前言:

1. 测试脚本的启动过程

1.1 http://localhost:8083/v1/sessions

1.2 发送heartbeat

2. 其他的api

2.1 get_info

2.2 删除session

2.3 执行没有job的语句接口run_non_job_statement

3. debug建表语句


前言:

必须要有flink环境,启动gateway服务;

默认指定的缓存文件在,e2e-tests/data 里面这样。

1. 测试脚本的启动过程

还有点问题,不过能看到启动的过程了;

因为服务已经挂在到服务器,使用远程debug就行,看看具体细节。

$ ./run-tests.sh 
Preparing test data...
No HDFS address provided. Putting test data into /tmp directory...
Reading Flink config...
Starting rest endpoint...
Rest endpoint started.
/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 102: jq: command not found
Request to http://localhost:8083/v1/info returns an error.
Request Method: GET
Request Body: 
/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 108: jq: command not found
/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 109: jq: command not found
./run-tests.sh: line 80: jq: command not found
Product Name: 
./run-tests.sh: line 81: jq: command not found
Version: ################################################################################
#                    Running tests in batch mode
################################################################################/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 102: jq: command not found
Request to http://localhost:8083/v1/sessions returns an error.
Request Method: POST
Request Body: {"planner": "blink","execution_type": "batch"
}
/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 108: jq: command not found
/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 109: jq: command not found
/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 102: jq: command not found
Request to http://localhost:8083/v1/sessions//heartbeat returns an error.
Request Method: POST
Request Body: 
/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 108: jq: command not found
/Users/bjhl/Documents/github/flink-sql-gateway/e2e-tests/test-commons.sh: line 109: jq: command not found

1.1 http://localhost:8083/v1/sessions

请求:

{"planner": "blink","execution_type": "batch"
}

返回:

{"session_id": "4a9b0f0c52bbcd1e656a931d3c53ac0b"
}

1.2 发送heartbeat

看代码test-commons中:

function send_heartbeat() {send_request "POST" "" "http://$HOST:$PORT/$API_VERSION/sessions/$1/heartbeat" > /dev/null
}
http://localhost:8083/v1/sessions/4a9b0f0c52bbcd1e656a931d3c53ac0b/heartbeat

返回空;

2. 其他的api

2.1 get_info

function get_info() {send_request "GET" "" "http://$HOST:$PORT/$API_VERSION/info"
}
http://localhost:8083/v1/info

返回:

{"product_name": "Apache Flink","version": "1.11.2"
}

2.2 删除session

看来是restful风格的,使用delete请求就好

function delete_session() {response=`send_request "DELETE" "" "http://$HOST:$PORT/$API_VERSION/sessions/$1"`if [[ "$?" -ne 0 ]]thenexit 1fi
}

2.3 执行没有job的语句接口run_non_job_statement

function run_non_job_statement() {json=`cat <<EOF
{"statement": "$2"
}
EOF`response=`send_request "POST" "$json" "http://$HOST:$PORT/$API_VERSION/sessions/$1/statements"`if [[ "$?" -ne 0 ]]thenexit 1fitype=`get_json_element "$response" ".statement_types[0]"`assert_equals "$3" "$type"make_array_from_json "`get_json_element "$response" ".results[0].data"`"
}

请求地址:

http://host:8083/v1/sessions/4a9b0f0c52bbcd1e656a931d3c53ac0b/statements

请求入参: 

{"statement":"create table nation (n_nationkey bigint not null, n_name varchar  not null, n_regionkey bigint  not null, n_comment varchar  not null ) WITH ('connector.type'='filesystem', 'connector.path'='hdfs://flinknameservice1/tmp/flink_sql_gateway_test/nation.tbl', 'format.type' = 'csv','format.derive-schema' = 'true','format.field-delimiter' = '|')"
}

返回:

{"results": [{"result_kind": "SUCCESS","columns": [{"name": "result","type": "VARCHAR(2)"}],"data": [["OK"]]}],"statement_types": ["CREATE_TABLE"]
}

其实statement就是语句啦。

这个建好的语句是什么样?

0|ALGERIA|0| haggle. carefully final deposits detect slyly agai|
1|ARGENTINA|1|al foxes promise slyly according to the regular accounts. bold requests alon|
2|BRAZIL|1|y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special |
3|CANADA|1|eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold|
4|EGYPT|4|y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d|
5|ETHIOPIA|0|ven packages wake quickly. regu|
6|FRANCE|3|refully final requests. regular, ironi|
7|GERMANY|3|l platelets. regular accounts x-ray: unusual, regular acco|
8|INDIA|2|ss excuses cajole slyly across the packages. deposits print aroun|
9|INDONESIA|2| slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull|
10|IRAN|4|efully alongside of the slyly final dependencies. |
11|IRAQ|4|nic deposits boost atop the quickly final requests? quickly regula|
12|JAPAN|2|ously. final, express gifts cajole a|
13|JORDAN|4|ic deposits are blithely about the carefully regular pa|
14|KENYA|0| pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t|
15|MOROCCO|0|rns. blithely bold courts among the closely regular packages use furiously bold platelets?|
16|MOZAMBIQUE|0|s. ironic, unusual asymptotes wake blithely r|
17|PERU|1|platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun|
18|CHINA|2|c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos|
19|ROMANIA|3|ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account|
20|SAUDI ARABIA|4|ts. silent requests haggle. closely express packages sleep across the blithely|
21|VIETNAM|2|hely enticingly express accounts. even, final |
22|RUSSIA|3| requests against the platelets use never according to the quickly regular pint|
23|UNITED KINGDOM|3|eans boost carefully special requests. accounts are. carefull|
24|UNITED STATES|1|y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be|

进行debug这个建表语句

3. debug建表语句

代码会进入:StatementExecuteHandler类里面。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.flink.table.gateway.rest.handler;import com.ververica.flink.table.gateway.operation.SqlCommandParser.SqlCommand;
import com.ververica.flink.table.gateway.rest.message.SessionIdPathParameter;
import com.ververica.flink.table.gateway.rest.message.SessionMessageParameters;
import com.ververica.flink.table.gateway.rest.message.StatementExecuteRequestBody;
import com.ververica.flink.table.gateway.rest.message.StatementExecuteResponseBody;
import com.ververica.flink.table.gateway.rest.result.ResultSet;
import com.ververica.flink.table.gateway.rest.session.SessionManager;
import com.ververica.flink.table.gateway.utils.SqlGatewayException;import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.MessageHeaders;import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;import javax.annotation.Nonnull;import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;/*** Request handler for executing a statement.*/
public class StatementExecuteHandlerextends AbstractRestHandler<StatementExecuteRequestBody, StatementExecuteResponseBody, SessionMessageParameters> {private final SessionManager sessionManager;public StatementExecuteHandler(SessionManager sessionManager,Time timeout,Map<String, String> responseHeaders,MessageHeaders<StatementExecuteRequestBody,StatementExecuteResponseBody,SessionMessageParameters> messageHeaders) {super(timeout, responseHeaders, messageHeaders);this.sessionManager = sessionManager;}@Overrideprotected CompletableFuture<StatementExecuteResponseBody> handleRequest(@Nonnull HandlerRequest<StatementExecuteRequestBody, SessionMessageParameters> request)throws RestHandlerException {// 获取sessionIdString sessionId = request.getPathParameter(SessionIdPathParameter.class);// 获取语句String statement = request.getRequestBody().getStatement();if (statement == null) {throw new RestHandlerException("Statement must be provided.", HttpResponseStatus.BAD_REQUEST);}// TODO supports thisLong executionTimeoutMillis = request.getRequestBody().getExecutionTimeout();try {// 主要的执行Tuple2<ResultSet, SqlCommand> tuple2 = sessionManager.getSession(sessionId).runStatement(statement);/***** */// 结果的输出ResultSet resultSet = tuple2.f0;String statementType = tuple2.f1.name();return CompletableFuture.completedFuture(new StatementExecuteResponseBody(Collections.singletonList(resultSet),Collections.singletonList(statementType)));} catch (SqlGatewayException e) {throw new RestHandlerException(e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR, e);}}
}

 

如果重复提交,对于一个session会报错:

Caused by: org.apache.flink.table.catalog.exceptions.TableAlreadyExistException: Table (or view) default_database.nation already exists in Catalog default_catalog.

进到Session里面的

runStatement:
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.ververica.flink.table.gateway.rest.session;import com.ververica.flink.table.gateway.config.entries.ExecutionEntry;
import com.ververica.flink.table.gateway.context.SessionContext;
import com.ververica.flink.table.gateway.operation.JobOperation;
import com.ververica.flink.table.gateway.operation.Operation;
import com.ververica.flink.table.gateway.operation.OperationFactory;
import com.ververica.flink.table.gateway.operation.SqlCommandParser;
import com.ververica.flink.table.gateway.operation.SqlCommandParser.SqlCommandCall;
import com.ververica.flink.table.gateway.operation.SqlParseException;
import com.ververica.flink.table.gateway.rest.result.ResultSet;
import com.ververica.flink.table.gateway.utils.SqlGatewayException;import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider;import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;/*** Similar to HTTP Session, which could maintain user identity and store user-specific data* during multiple request/response interactions between a client and the gateway server.*/
public class Session {private static final Logger LOG = LoggerFactory.getLogger(Session.class);private final SessionContext context;private final String sessionId;private long lastVisitedTime;private final Map<JobID, JobOperation> jobOperations;public Session(SessionContext context) {this.context = context;this.sessionId = context.getSessionId();this.lastVisitedTime = System.currentTimeMillis();this.jobOperations = new ConcurrentHashMap<>();}public void touch() {lastVisitedTime = System.currentTimeMillis();}public long getLastVisitedTime() {return lastVisitedTime;}public SessionContext getContext() {return context;}public Tuple2<ResultSet, SqlCommandParser.SqlCommand> runStatement(String statement) {// TODO: This is a temporary fix to avoid NPE.//  In SQL gateway, TableEnvironment is created and used by different threads, thus causing this problem.RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));LOG.info("Session: {}, run statement: {}", sessionId, statement);boolean isBlinkPlanner = context.getExecutionContext().getEnvironment().getExecution().getPlanner().equalsIgnoreCase(ExecutionEntry.EXECUTION_PLANNER_VALUE_BLINK);SqlCommandCall call;try {
// 是否是blink的planner,会影响parseOptional<SqlCommandCall> callOpt = SqlCommandParser.parse(statement, isBlinkPlanner);if (!callOpt.isPresent()) {LOG.error("Session: {}, Unknown statement: {}", sessionId, statement);throw new SqlGatewayException("Unknown statement: " + statement);} else {call = callOpt.get();}} catch (SqlParseException e) {LOG.error("Session: {}, Failed to parse statement: {}", sessionId, statement);throw new SqlGatewayException(e.getMessage(), e.getCause());}Operation operation = OperationFactory.createOperation(call, context);ResultSet resultSet = operation.execute();if (operation instanceof JobOperation) {JobOperation jobOperation = (JobOperation) operation;jobOperations.put(jobOperation.getJobId(), jobOperation);}return Tuple2.of(resultSet, call.command);}public JobStatus getJobStatus(JobID jobId) {LOG.info("Session: {}, get status for job: {}", sessionId, jobId);return getJobOperation(jobId).getJobStatus();}public void cancelJob(JobID jobId) {LOG.info("Session: {}, cancel job: {}", sessionId, jobId);getJobOperation(jobId).cancelJob();jobOperations.remove(jobId);}public Optional<ResultSet> getJobResult(JobID jobId, long token, int maxFetchSize) {LOG.info("Session: {}, get result for job: {}, token: {}, maxFetchSize: {}",sessionId, jobId, token, maxFetchSize);return getJobOperation(jobId).getJobResult(token, maxFetchSize);}private JobOperation getJobOperation(JobID jobId) {JobOperation jobOperation = jobOperations.get(jobId);if (jobOperation == null) {String msg = String.format("Job: %s does not exist in current session: %s.", jobId, sessionId);LOG.error(msg);throw new SqlGatewayException(msg);} else {return jobOperation;}}}

 

关于这里面的sql使用的是什么东西:

see: https://zhuanlan.zhihu.com/p/67066986

关于执行的planner有三种,old blink和planner

	public static final String EXECUTION_PLANNER = "planner";public static final String EXECUTION_PLANNER_VALUE_OLD = "old";public static final String EXECUTION_PLANNER_VALUE_BLINK = "blink";

会去判断是否是blink的planner。

parse过程

返回

解析给定语句返回SqlCommandCall

“set”、“show modules”、“show current catalog”和“show current database” 通过regex匹配解析,其他命令通过sql解析器解析。

sql解析器只解析语句,得到相应的SqlCommand,

/*** Parse the given statement and return corresponding SqlCommandCall.** <p>only `set`, `show modules`, `show current catalog` and `show current database`* are parsed through regex matching, other commands are parsed through sql parser.** <p>throw {@link SqlParseException} if the statement contains multiple sub-statements separated by semicolon* or there is a parse error.** <p>NOTE: sql parser only parses the statement to get the corresponding SqlCommand,* do not check whether the statement is valid here.*/public static Optional<SqlCommandCall> parse(String stmt, boolean isBlinkPlanner) {// normalizeString stmtForRegexMatch = stmt.trim();// remove ';' at the endif (stmtForRegexMatch.endsWith(";")) {stmtForRegexMatch = stmtForRegexMatch.substring(0, stmtForRegexMatch.length() - 1).trim();}// only parse gateway specific statementsfor (SqlCommand cmd : SqlCommand.values()) {if (cmd.hasPattern()) {final Matcher matcher = cmd.pattern.matcher(stmtForRegexMatch);if (matcher.matches()) {final String[] groups = new String[matcher.groupCount()];for (int i = 0; i < groups.length; i++) {groups[i] = matcher.group(i + 1);}return cmd.operandConverter.apply(groups).map((operands) -> new SqlCommandCall(cmd, operands));}}}return parseStmt(stmt, isBlinkPlanner);}

parseStmt过程

	/*** Flink Parser only supports partial Operations, so we directly use Calcite Parser here.* Once Flink Parser supports all Operations, we should use Flink Parser instead of Calcite Parser.*/private static Optional<SqlCommandCall> parseStmt(String stmt, boolean isBlinkPlanner) {SqlParser.Config config = createSqlParserConfig(isBlinkPlanner);SqlParser sqlParser = SqlParser.create(stmt, config);SqlNodeList sqlNodes;try {sqlNodes = sqlParser.parseStmtList();// no need check the statement is valid here} catch (org.apache.calcite.sql.parser.SqlParseException e) {throw new SqlParseException("Failed to parse statement.", e);}if (sqlNodes.size() != 1) {throw new SqlParseException("Only single statement is supported now");}final String[] operands;final SqlCommand cmd;SqlNode node = sqlNodes.get(0);if (node.getKind().belongsTo(SqlKind.QUERY)) {cmd = SqlCommand.SELECT;operands = new String[] { stmt };} else if (node instanceof RichSqlInsert) {RichSqlInsert insertNode = (RichSqlInsert) node;cmd = insertNode.isOverwrite() ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;operands = new String[] { stmt, insertNode.getTargetTable().toString() };} else if (node instanceof SqlShowTables) {cmd = SqlCommand.SHOW_TABLES;operands = new String[0];} else if (node instanceof SqlCreateTable) {cmd = SqlCommand.CREATE_TABLE;operands = new String[] { stmt };} else if (node instanceof SqlDropTable) {cmd = SqlCommand.DROP_TABLE;operands = new String[] { stmt };} else if (node instanceof SqlAlterTable) {cmd = SqlCommand.ALTER_TABLE;operands = new String[] { stmt };} else if (node instanceof SqlCreateView) {// TableEnvironment currently does not support creating view// so we have to perform the modification hereSqlCreateView createViewNode = (SqlCreateView) node;cmd = SqlCommand.CREATE_VIEW;operands = new String[] {createViewNode.getViewName().toString(),createViewNode.getQuery().toString()};} else if (node instanceof SqlDropView) {// TableEnvironment currently does not support dropping view// so we have to perform the modification hereSqlDropView dropViewNode = (SqlDropView) node;// TODO: we can't get this field from SqlDropView normally until FLIP-71 is implementedField ifExistsField;try {ifExistsField = SqlDrop.class.getDeclaredField("ifExists");} catch (NoSuchFieldException e) {throw new SqlParseException("Failed to parse drop view statement.", e);}ifExistsField.setAccessible(true);boolean ifExists;try {ifExists = ifExistsField.getBoolean(dropViewNode);} catch (IllegalAccessException e) {throw new SqlParseException("Failed to parse drop view statement.", e);}cmd = SqlCommand.DROP_VIEW;operands = new String[] { dropViewNode.getViewName().toString(), String.valueOf(ifExists) };} else if (node instanceof SqlShowDatabases) {cmd = SqlCommand.SHOW_DATABASES;operands = new String[0];} else if (node instanceof SqlCreateDatabase) {cmd = SqlCommand.CREATE_DATABASE;operands = new String[] { stmt };} else if (node instanceof SqlDropDatabase) {cmd = SqlCommand.DROP_DATABASE;operands = new String[] { stmt };} else if (node instanceof SqlAlterDatabase) {cmd = SqlCommand.ALTER_DATABASE;operands = new String[] { stmt };} else if (node instanceof SqlShowCatalogs) {cmd = SqlCommand.SHOW_CATALOGS;operands = new String[0];} else if (node instanceof SqlShowFunctions) {cmd = SqlCommand.SHOW_FUNCTIONS;operands = new String[0];} else if (node instanceof SqlUseCatalog) {cmd = SqlCommand.USE_CATALOG;operands = new String[] { ((SqlUseCatalog) node).getCatalogName() };} else if (node instanceof SqlUseDatabase) {cmd = SqlCommand.USE;operands = new String[] { ((SqlUseDatabase) node).getDatabaseName().toString() };} else if (node instanceof SqlRichDescribeTable) {cmd = SqlCommand.DESCRIBE_TABLE;// TODO support describe extendedString[] fullTableName = ((SqlRichDescribeTable) node).fullTableName();String escapedName =Stream.of(fullTableName).map(s -> "`" + s + "`").collect(Collectors.joining("."));operands = new String[] { escapedName };} else if (node instanceof SqlExplain) {cmd = SqlCommand.EXPLAIN;// TODO support explain detailsoperands = new String[] { ((SqlExplain) node).getExplicandum().toString() };} else if (node instanceof SqlSetOption) {SqlSetOption setNode = (SqlSetOption) node;// refer to SqlSetOption#unparseAlterOperationif (setNode.getValue() != null) {cmd = SqlCommand.SET;operands = new String[] { setNode.getName().toString(), setNode.getValue().toString() };} else {cmd = SqlCommand.RESET;if (setNode.getName().toString().toUpperCase().equals("ALL")) {operands = new String[0];} else {operands = new String[] { setNode.getName().toString() };}}} else {cmd = null;operands = new String[0];}if (cmd == null) {return Optional.empty();} else {// use the origin given statement to make sure// users can find the correct line number when parsing failedreturn Optional.of(new SqlCommandCall(cmd, operands));}}

其实sql解析用的就是第三方的类库。

 

 

 

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注