34345 发表于 2025-2-6 23:22:11

3. 使用sql查询csv/json文件内容,还能关联查询?

1. 简介

我们在前面的文章提到了calcite可以支持文件系统的数据源适配, 其实官方已经提供了相应的能力, 其支持csv和json的查询适配, 废话不多说, 直接展示.
2. Maven

<!-- calcite文件系统支持 --><dependency>    <groupId>org.apache.calcite</groupId>    <artifactId>calcite-file</artifactId>    <version>1.37.0</version></dependency><dependency>    <groupId>org.apache.calcite</groupId>    <artifactId>calcite-core</artifactId>    <version>1.37.0</version></dependency>3. 数据文件准备

3.1 csv

user_info.csv
首行将来被解析成表的字段, 冒号后面是字段类型, 如果未指定类型 默认使用varchar
ID:long,姓名:string,GENDER:string,BIRTHDAY:date100,"张三",,"2001-01-01"110,"李四","M","2001-01-01"120,"王五","M","2002-05-03"130,"赵六","F","2005-09-07"140,"张铁牛","M","2007-01-01"3.2 json

role_info.json
[{    "id": 123,    "name": "管理员",    "key": "manager"},{    "id": 234,    "name": "老师",    "key": "teacher"},{    "id": 345,    "name": "学生",    "key": "student"}]然后将文件放到resources/file目录下
4. 核心代码

package com.ldx.calcite;import com.google.common.collect.ImmutableMap;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.apache.calcite.adapter.file.FileSchemaFactory;import org.apache.calcite.jdbc.CalciteConnection;import org.apache.calcite.schema.Schema;import org.apache.calcite.schema.SchemaPlus;import org.apache.calcite.util.Sources;import org.junit.jupiter.api.AfterAll;import org.junit.jupiter.api.BeforeAll;import org.junit.jupiter.api.Test;import org.testng.collections.Maps;import java.net.URL;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.ResultSetMetaData;import java.sql.SQLException;import java.sql.Statement;import java.util.Map;import java.util.Properties;@Slf4jpublic class CalciteFileTest {    private static Connection connection;    private static SchemaPlus rootSchema;    private static Statement statement;    @BeforeAll    @SneakyThrows    public static void beforeAll() {      Properties info = new Properties();      // 不区分sql大小写      info.setProperty("caseSensitive", "false");      // 创建Calcite连接      connection = DriverManager.getConnection("jdbc:calcite:", info);      CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);      // 构建RootSchema,在Calcite中,RootSchema是所有数据源schema的parent,多个不同数据源schema可以挂在同一个RootSchema下      rootSchema = calciteConnection.getRootSchema();      final Schema schema = FileSchemaFactory.INSTANCE.create(rootSchema, "x",                ImmutableMap.of("directory", resourcePath("file"), "flavor", "scannable"));      rootSchema.add("test", schema);      // 创建SQL语句执行查询      statement = calciteConnection.createStatement();    }    @Test    @SneakyThrows    public void execute_simple_query() {      ResultSet resultSet = statement.executeQuery("SELECT * FROM test.user_info");      printResultSet(resultSet);    }      @Test    @SneakyThrows    public void test_execute_join_query() {      ResultSet resultSet = statement.executeQuery("SELECT * FROM test.user_info ui inner join test.role_info ri on ui.role_id = ri.id");      printResultSet(resultSet);    }    @AfterAll    @SneakyThrows    public static void closeResource() {      statement.close();      connection.close();    }    private static String resourcePath(String path) {      final URL url = CalciteFileTest.class.getResource("/" + path);      return Sources.of(url).file().getAbsolutePath();    }    public static void printResultSet(ResultSet resultSet) throws SQLException {      // 获取 ResultSet 元数据      ResultSetMetaData metaData = resultSet.getMetaData();      // 获取列数      int columnCount = metaData.getColumnCount();      log.info("Number of columns: {}",columnCount);      // 遍历 ResultSet 并打印结果      while (resultSet.next()) {            final Map<String, String> item = Maps.newHashMap();            // 遍历每一列并打印            for (int i = 1; i <= columnCount; i++) {                String columnName = metaData.getColumnName(i);                String columnValue = resultSet.getString(i);                item.put(columnName, columnValue);            }            log.info(item.toString());      }    }}其实核心代码就几行, 如下:
通过FileSchemaFactory指定文件目录和文件内容的读取方式, 默认将指定目录下的csv和json文件读取成Table, 表名就是file的名称
flavor:

[*]SCANNABLE: 数据扫描。会更侧重于快速地读取和遍历数据。这种方式适用于需要对大量数据进行全表扫描或者范围扫描的情况,例如统计汇总操作
[*]FILTERABLE: 数据过滤。会更侧重于数据的条件筛选,比如在 SQL 查询中的WHERE子句。
[*]TRANSLATABLE: 数据转换。会更侧重于数据转换,以满足特定的查询需求或者数据处理要求。这种转换可能包括数据类型的转换(如将字符串类型的数字转换为实际的数值类型)、格式转换(如日期格式的调整)等。
// 这里的第二个参数“x”没什么意义, 源码中没用到, 可以随便填final Schema schema = FileSchemaFactory.INSTANCE.create(rootSchema, "x",               ImmutableMap.of("directory", resourcePath("file"), "flavor", "scannable"));// 使用目录名称为schema名称, 这里的test就是schema名称rootSchema.add("test", schema);calcite也可以做对应表的关联查询, 测试中csv关联json文件信息
"SELECT * FROM test.user_info ui inner join test.role_info ri on ui.role_id = ri.id"
5. 测试查询

execute_simple_query方法执行如下

test_execute_join_query方法执行如下:
页: [1]
查看完整版本: 3. 使用sql查询csv/json文件内容,还能关联查询?