1. Calcite元数据创建
1. 简介Calcite 是一款来自 Apache 的开源动态数据管理框架,核心功能是提供 SQL 查询解析、优化及执行等基础能力,以灵活支持多种数据源,广泛应用于各类数据处理系统。以下从其功能特性、应用场景、优势三方面简单概述:
[*]功能特性
[*]SQL 解析:支持多种 SQL 方言,如标准 SQL 以及不同数据库特定的扩展语法,能将输入的 SQL 语句解析为抽象语法树(AST),便于后续处理。
[*]语义分析:对解析后的 SQL 进行语义检查,比如验证表名、列名是否存在,数据类型是否匹配等,确保 SQL 的语义正确。
[*]查询优化:运用基于规则(RBO)和基于代价(CBO)的优化策略。RBO 通过预设规则,如谓词下推等,重写查询;CBO 则基于统计信息,估算不同执行计划的代价,选择最优方案。
[*]执行计划生成:根据优化后的结果,生成可执行的物理执行计划,定义操作的具体执行顺序和方式。
[*]数据源适配:可连接多种数据源,如关系型数据库(MySQL、Oracle 等)、文件系统(CSV、JSON 文件)、NoSQL 数据库等,而且还支持自定义数据源适配器, 并为不同数据源生成相应的数据访问策略。
[*]跨数据源查询: 能够连接不同类型的数据源,通过适配器将不同数据源的操作进行统一抽象。在进行跨数据源连表查询时,它会将查询分解为各个数据源可以处理的子查询,然后将各个数据源的结果进行合并和进一步处理
2. 元数据准备
准备两个数据库 mysql 和 postgres
库信息如下: mysql中有张表: user, postgres有张表role
表信息如下:
CREATE TABLE `user` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',`username` varchar(255) DEFAULT NULL COMMENT '用户名称',`age` int(11) DEFAULT NULL COMMENT '性别',`sex` varchar(255) DEFAULT NULL COMMENT '性别',`role_key` int(11) DEFAULT NULL COMMENT '角色',PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=32 DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';CREATE TABLE "public"."role" ("name" varchar(255) COLLATE "pg_catalog"."default","role_key" int4);3. maven依赖
maven依赖如下:
<dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.37.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.2.23</version></dependency>4. 元数据定义
calcite支持两种多种定义元数据方式 常用的是通过json方式,另一种是通过SchemaFactory的方式。
4.1 Json Model
组织结构:
|- model # 数据模型| |- schema # 数据模式| | |- tables # 表/视图| | |- functions # 函数| | |- type # 模式类型custom: 自定义, map: 映射, jdbc: jdbc, inline: 嵌入式 (默认)| | |- factory # 指定SchemaFactory的工厂类| | |- operand # 指定额外参数示例内容:
创建两个数据源 mysql 和 postgres, 使用两种不同的声明方式
{"version": "1.0","defaultSchema": "my_mysql","schemas": [ { "type": "jdbc", "name": "my_mysql", "jdbcUser": "root", "jdbcPassword": "123456", "jdbcUrl": "jdbc:mysql://localhost:3306/test", "jdbcCatalog": "test", "jdbcSchema": null }, { "name": "my_postgres", "type": "custom", "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory", "operand": { "jdbcDriver": "org.postgresql.Driver", "jdbcUrl": "jdbc:postgresql://localhost:5432/test", "jdbcUser": "root", "jdbcPassword": "123456" } }]}calcite model 实现类org.apache.calcite.jdbc.Driver --> org.apache.calcite.model.ModelHandler
calcite model doc:https://calcite.apache.org/docs/model.html
加载资源:
将json文件放到resources下, 然后创建connection的时候指定该文件即可
Properties info = new Properties();// 不区分sql大小写info.setProperty("caseSensitive", "false");// 设置引用标识符为反引号info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());// 指定model信息info.setProperty("model", resourcePath("model/model.json"));// 创建Calcite连接Connection connection = DriverManager.getConnection("jdbc:calcite:", info);CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);// 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下SchemaPlus rootSchema = calciteConnection.getRootSchema();// 创建SQL语句执行查询Statement statement = calciteConnection.createStatement();4.2 SchemaFactory
schema UML图如下:
先创建对应数据源的datasource对象
private static DataSource getMysqlDataSource() {MysqlDataSource dataSource = new MysqlDataSource();dataSource.setUrl("jdbc:mysql://localhost:3306/test");dataSource.setUser("root");dataSource.setPassword("123456");return dataSource;}private static DataSource getPostgresDataSource() {final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();pgSimpleDataSource.setUrl("jdbc:postgresql://localhost:5432/test");pgSimpleDataSource.setUser("root");pgSimpleDataSource.setPassword("123456");return pgSimpleDataSource;}然后将datasource对象包装成JdbcSchema对象最后注册到rootSchema中
Properties info = new Properties();// 不区分sql大小写info.setProperty("caseSensitive", "false");// 设置引用标识符为双引号info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());// 创建Calcite连接Connection connection = DriverManager.getConnection("jdbc:calcite:", info);CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);// 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下SchemaPlus rootSchema = calciteConnection.getRootSchema();// 设置默认的schema, 如果不设置需要加上对应数据源的名称calciteConnection.setSchema("my_mysql");final DataSource mysqlDataSource = getMysqlDataSource();final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);final DataSource postgresDataSource = getPostgresDataSource();final JdbcSchema schemaWithPostgres = JdbcSchema.create(rootSchema, "my_postgres", postgresDataSource, "test", "public");rootSchema.add("my_mysql", schemaWithMysql);rootSchema.add("my_postgres", schemaWithPostgres);// 创建SQL语句执行查询Statement statement = calciteConnection.createStatement();rootSchema也可以使用创建
CalciteSchema calciteSchema = CalciteSchema.createRootSchema(true, true);SchemaPlus rootSchema = calciteSchema.plus();5. 测试查询
5.1 测试单个数据源的查询功能
@Test@SneakyThrowspublic void test_connection() {// 上述配置中都设置了默认的schema为my_mysql, 所以查询的时候可以不添加数据源key前缀final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`");final ResultSet resultSet = statement.executeQuery("SELECT * FROM my_mysql.`user`");printResultSet(resultSet);}输出结果如下:
Number of columns: 5{sex=1, role_key=1, id=1, age=23, username=张三}{sex=2, role_key=2, id=2, age=18, username=李四}{sex=2, role_key=1, id=3, age=26, username=张铁牛}{sex=2, role_key=3, id=4, age=30, username=王麻子}5.2 测试不同数据源连表查询
calcite支持将不同数据源的sql下推, 然后在内存中做对应的关联过滤等操作
@Test@SneakyThrowspublic void test_cross_db_query() { final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key"); printResultSet(resultSet);}输出结果如下:
Number of columns: 6{sex=1, role_key=1, name=管理员, id=1, age=23, username=张三}{sex=2, role_key=1, name=管理员, id=3, age=26, username=张铁牛}{sex=2, role_key=2, name=老师, id=2, age=18, username=李四}{sex=2, role_key=3, name=学生, id=4, age=30, username=王麻子}6. 完整测试代码
6.1 Json Model
package com.ldx.calcite;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.apache.calcite.avatica.util.Quoting;import org.apache.calcite.config.CalciteConnectionProperty;import org.apache.calcite.jdbc.CalciteConnection;import org.apache.calcite.schema.SchemaPlus;import org.apache.calcite.util.Sources;import org.junit.jupiter.api.BeforeAll;import org.junit.jupiter.api.Test;import org.testng.collections.Maps;import java.net.URL;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.sql.Statement;import java.util.Map;import java.util.Properties;@Slf4jpublic class CalciteModelTest { private static Statement statement; @BeforeAll @SneakyThrows public static void beforeAll() { Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 设置引用标识符为双引号 info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name()); // 指定model信息 info.setProperty("model", resourcePath("model/model.json")); // 创建Calcite连接 Connection connection = DriverManager.getConnection("jdbc:calcite:", info); CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 创建SQL语句执行查询 statement = calciteConnection.createStatement(); } @Test @SneakyThrows public void test_connection() { final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`"); printResultSet(resultSet); } @Test @SneakyThrows public void test_cross_db_query() { final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key"); printResultSet(resultSet); } public static void printResultSet(ResultSet resultSet) throws SQLException { // 获取 ResultSet 元数据 ResultSetMetaData metaData = resultSet.getMetaData(); // 获取列数 int columnCount = metaData.getColumnCount(); log.info("Number of columns: {}",columnCount); // 遍历 ResultSet 并打印结果 while (resultSet.next()) { final Map<String, String> item = Maps.newHashMap(); // 遍历每一列并打印 for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); String columnValue = resultSet.getString(i); item.put(columnName, columnValue); } log.info(item.toString()); } } private static String resourcePath(String path) { final URL url = CalciteCsvTest.class.getResource("/" + path); return Sources .of(url).file().getAbsolutePath(); }}6.2 SchemaFactory
package com.ldx.calcite;import com.mysql.cj.jdbc.MysqlDataSource;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.apache.calcite.adapter.jdbc.JdbcSchema;import org.apache.calcite.avatica.util.Quoting;import org.apache.calcite.config.CalciteConnectionProperty;import org.apache.calcite.jdbc.CalciteConnection;import org.apache.calcite.jdbc.CalciteSchema;import org.apache.calcite.schema.SchemaFactory;import org.apache.calcite.schema.SchemaPlus;import org.apache.calcite.util.Sources;import org.junit.jupiter.api.BeforeAll;import org.junit.jupiter.api.Test;import org.postgresql.ds.PGSimpleDataSource;import org.postgresql.osgi.PGDataSourceFactory;import org.testng.collections.Maps;import javax.sql.DataSource;import java.net.URL;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.sql.Statement;import java.util.Map;import java.util.Properties;@Slf4jpublic class CalciteCreateMataDataTest { private static Statement statement; @BeforeAll @SneakyThrows public static void beforeAll() { Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 设置引用标识符为双引号 info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name()); // 创建Calcite连接 Connection connection = DriverManager.getConnection("jdbc:calcite:", info); CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 设置默认的schema, 如果不设置需要加上对应数据源的名称 calciteConnection.setSchema("my_mysql"); final DataSource mysqlDataSource = getMysqlDataSource(); final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null); final DataSource postgresDataSource = getPostgresDataSource(); final JdbcSchema schemaWithPostgres = JdbcSchema.create(rootSchema, "my_postgres", postgresDataSource, "test", "public"); rootSchema.add("my_mysql", schemaWithMysql); rootSchema.add("my_postgres", schemaWithPostgres); // 创建SQL语句执行查询 statement = calciteConnection.createStatement(); } private static DataSource getMysqlDataSource() { MysqlDataSource dataSource = new MysqlDataSource(); dataSource.setUrl("jdbc:mysql://localhost:3306/test"); dataSource.setUser("root"); dataSource.setPassword("123456"); return dataSource; } private static DataSource getPostgresDataSource() { final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource(); pgSimpleDataSource.setUrl("jdbc:postgresql://localhost:5432/test"); pgSimpleDataSource.setUser("root"); pgSimpleDataSource.setPassword("123456"); return pgSimpleDataSource; } @Test @SneakyThrows public void test_connection() { final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`"); printResultSet(resultSet); } @Test @SneakyThrows public void test_cross_db_query() { final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key"); printResultSet(resultSet); } public static void printResultSet(ResultSet resultSet) throws SQLException { // 获取 ResultSet 元数据 ResultSetMetaData metaData = resultSet.getMetaData(); // 获取列数 int columnCount = metaData.getColumnCount(); log.info("Number of columns: {}",columnCount); // 遍历 ResultSet 并打印结果 while (resultSet.next()) { final Map<String, String> item = Maps.newHashMap(); // 遍历每一列并打印 for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); String columnValue = resultSet.getString(i); item.put(columnName, columnValue); } log.info(item.toString()); } } private static String resourcePath(String path) { final URL url = CalciteCsvTest.class.getResource("/" + path); return Sources .of(url).file().getAbsolutePath(); }}
页:
[1]