Skip to content

Commit

Permalink
[Feature][scaleph-engine-sql-gateway] add flink sql gateway session i…
Browse files Browse the repository at this point in the history
…mplementions (#628)

* feature: add idea formatter onoff comment

* feature: add flink sql gateway session

* feature: add flink sql gateway session

* feature: add flink sql gateway session
  • Loading branch information
kalencaya authored Oct 14, 2023
1 parent f5cb4f1 commit c5f6bfc
Show file tree
Hide file tree
Showing 18 changed files with 716 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,48 +84,48 @@ public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
}
}

// @formatter:off
http
//禁用cors
.csrf().disable()

//.addFilterBefore(corsFilter, UsernamePasswordAuthenticationFilter.class)

.sessionManagement()
.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
.and()

//禁用iframe
.and()
.headers()
.frameOptions()
.disable()
.frameOptions().disable()
.and()

//请求权限配置
.and()
.authorizeRequests()
//放行endpoint
.requestMatchers(EndpointRequest.toAnyEndpoint()).permitAll()
//自定义匿名访问url
.antMatchers(anonymousUrls.toArray(new String[0])).permitAll()
//静态资源
.antMatchers(HttpMethod.GET, "/**/*.css", "/**/*.js", "/**/*.png",
//放行endpoint
.requestMatchers(EndpointRequest.toAnyEndpoint()).permitAll()
//自定义匿名访问url
.antMatchers(anonymousUrls.toArray(new String[0])).permitAll()
//静态资源
.antMatchers(HttpMethod.GET, "/**/*.css", "/**/*.js", "/**/*.png",
"/**/*.woff", "/**/*.woff2", "/**/*.svg", "/**/*.json", "/**/*.ttf", "/**/*.ico",
"/index.html"
).permitAll()
.antMatchers("/swagger**/**", "/doc.html", "/v3/**", "/webjars/**").permitAll()
.antMatchers("/ui/**")
.permitAll()
//放行options请求
.antMatchers(HttpMethod.OPTIONS, "/**").permitAll()
.anyRequest().authenticated()

"/index.html").permitAll()
.antMatchers("/swagger**/**", "/doc.html", "/v3/**", "/webjars/**").permitAll()
.antMatchers("/ui/**").permitAll()
//放行options请求
.antMatchers(HttpMethod.OPTIONS, "/**").permitAll()
.anyRequest().authenticated()
.and()

.exceptionHandling()
.authenticationEntryPoint(customAuthenticationEntryPoint)
.accessDeniedHandler(customAccessDeniedHandler)
.authenticationEntryPoint(customAuthenticationEntryPoint)
.accessDeniedHandler(customAccessDeniedHandler)
.and()

//禁用session
.and()
.apply(tokenConfigurer)
;
// @formatter:on
return http.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package cn.sliew.scaleph.api.controller.ws;

import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService;
import cn.sliew.scaleph.engine.sql.gateway.services.dto.WsFlinkSqlGatewayCreateCatalogParamsDTO;
import cn.sliew.scaleph.engine.sql.gateway.services.dto.WsFlinkSqlGatewayQueryParamsDTO;
import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayCreateCatalogParam;
import cn.sliew.scaleph.engine.sql.gateway.services.param.WsFlinkSqlGatewayQueryParam;
import cn.sliew.scaleph.engine.sql.gateway.services.dto.WsFlinkSqlGatewayQueryResultDTO;
import cn.sliew.scaleph.engine.sql.gateway.services.dto.catalog.CatalogInfo;
import io.swagger.v3.oas.annotations.Operation;
Expand Down Expand Up @@ -73,7 +73,7 @@ public ResponseEntity<Set<CatalogInfo>> getCatalogInfo(@PathVariable("clusterId"
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
})
public ResponseEntity<String> executeSql(@PathVariable("clusterId") String clusterId,
@RequestBody WsFlinkSqlGatewayQueryParamsDTO params) {
@RequestBody WsFlinkSqlGatewayQueryParam params) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.executeSql(clusterId, params));
}

Expand Down Expand Up @@ -144,7 +144,7 @@ public ResponseEntity<Boolean> addDependencies(@PathVariable("clusterId") String
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
})
public ResponseEntity<Boolean> addCatalog(@PathVariable("clusterId") String clusterId,
@RequestBody WsFlinkSqlGatewayCreateCatalogParamsDTO params) {
@RequestBody WsFlinkSqlGatewayCreateCatalogParam params) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.addCatalog(clusterId, params.getCatalogName(), params.getOptions()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.dao.entity.master.ws;

import cn.sliew.scaleph.dao.entity.BaseDO;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

/**
* <p>
* flink sql gateway session
* </p>
*/
@Data
@TableName("ws_flink_sql_gateway_session")
public class WsFlinkSqlGatewaySession extends BaseDO {

private static final long serialVersionUID = 1L;

@TableField("session_handler")
private String sessionHandler;

@TableField("session_name")
private String sessionName;

@TableField("session_config")
private String sessionConfig;

@TableField("default_catalog")
private String defaultCatalog;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.dao.mapper.master.ws;

import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewaySession;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.springframework.stereotype.Repository;

/**
* <p>
* flink sql gateway session Mapper 接口
* </p>
*/
@Repository
public interface WsFlinkSqlGatewaySessionMapper extends BaseMapper<WsFlinkSqlGatewaySession> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkSqlGatewaySessionMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="cn.sliew.scaleph.dao.entity.master.ws.WsFlinkSqlGatewaySession">
<result column="id" property="id"/>
<result column="creator" property="creator"/>
<result column="create_time" property="createTime"/>
<result column="editor" property="editor"/>
<result column="update_time" property="updateTime"/>
<result column="session_handler" property="sessionHandler"/>
<result column="session_name" property="sessionName"/>
<result column="session_config" property="sessionConfig"/>
<result column="default_catalog" property="defaultCatalog"/>
</resultMap>

<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,
creator,
create_time,
editor,
update_time,
session_handler, session_name, session_config, default_catalog
</sql>
</mapper>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.sql.gateway.environment;

import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;

public interface TableEnvironmentProvider {

TableEnvironmentInternal getTableEnvironment(FlinkSqlGatewaySession session);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.sql.gateway.environment;

import cn.sliew.scaleph.engine.sql.gateway.services.dto.FlinkSqlGatewaySession;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.gateway.service.operation.OperationExecutor;

/**
* @see OperationExecutor#getTableEnvironment()
* @see org.apache.flink.table.api.internal.TableEnvironmentInternal#create(EnvironmentSettings)
* @see org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl#create(EnvironmentSettings)
*/
public class TableEnvironmentProviderImpl implements TableEnvironmentProvider {

@Override
public TableEnvironmentInternal getTableEnvironment(FlinkSqlGatewaySession session) {
// todo to do
return null;
}
}
Loading

0 comments on commit c5f6bfc

Please sign in to comment.