蜜桃av色欲a片精品一区,麻豆aⅴ精品无码一区二区,亚洲人成网站在线播放影院在线,亚洲 素人 字幕 在线 最新

微立頂科技

新聞資訊

創(chuàng)新 服務(wù) 價(jià)值

  10分鐘教你寫個(gè)數(shù)據(jù)庫(kù)

發(fā)布日期:2022/10/24 10:39:32      瀏覽量:

今天教大家借助一款框架快速實(shí)現(xiàn)一個(gè)數(shù)據(jù)庫(kù),這個(gè)框架就是Calcite,下面會(huì)帶大家通過(guò)兩個(gè)例子快速教會(huì)大家怎么實(shí)現(xiàn),一個(gè)是可以通過(guò) SQL 語(yǔ)句的方式可以直接查詢文件內(nèi)容,第二個(gè)是模擬 Mysql 查詢功能,以及最后告訴大家怎么實(shí)現(xiàn) SQL 查詢 Kafka 數(shù)據(jù)。

Calcite

Calcite 是一個(gè)用于優(yōu)化異構(gòu)數(shù)據(jù)源的查詢處理的可插拔基礎(chǔ)框架(他是一個(gè)框架),可以將任意數(shù)據(jù)(Any data, Anywhere)DML 轉(zhuǎn)換成基于 SQL 的 DML 引擎,并且我們可以選擇性的使用它的部分功能。

Calcite能干什么

  1. 使用 SQL 訪問(wèn)內(nèi)存中某個(gè)數(shù)據(jù)

  2. 使用 SQL 訪問(wèn)某個(gè)文件的數(shù)據(jù)

  3. 跨數(shù)據(jù)源的數(shù)據(jù)訪問(wèn)、聚合、排序等(例如 Mysql 和 Redis 數(shù)據(jù)源中的數(shù)據(jù)進(jìn)行join)

當(dāng)我們需要自建一個(gè)數(shù)據(jù)庫(kù)的時(shí)候,數(shù)據(jù)可以為任何格式的,比如text、word、xml、mysql、es、csv、第三方接口數(shù)據(jù)等等,我們只有數(shù)據(jù),我們想讓這些數(shù)據(jù)支持 SQL 形式動(dòng)態(tài)增刪改查。

另外,像Hive、Drill、Flink、Phoenix 和 Storm 等項(xiàng)目中,數(shù)據(jù)處理系統(tǒng)都是使用 Calcite 來(lái)做 SQL 解析和查詢優(yōu)化,當(dāng)然,還有部分用來(lái)構(gòu)建自己的 JDBC driver。

名詞解釋

Token

就是將標(biāo)準(zhǔn) SQL(可以理解為Mysql)關(guān)鍵詞以及關(guān)鍵詞之間的字符串截取出來(lái),每一個(gè)token,會(huì)被封裝為一個(gè)SqlNode,SqlNode會(huì)衍生很多子類,比如Select會(huì)被封裝為SqlSelect,當(dāng)前 SqlNode 也能反解析為 SQL 文本。

RelDataTypeField

某個(gè)字段的名稱和類型信息

RelDataType

多個(gè) RelDataTypeField 組成了 RelDataType,可以理解為數(shù)據(jù)行

Table

一個(gè)完整的表的信息

Schema

所有元數(shù)據(jù)的組合,可以理解為一組 Table 或者庫(kù)的概念

開(kāi)始使用

1. 引入包

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <!-- 目前最新版本 2022-09-10日更新-->
    <version>1.32.0</version>
</dependency>

2. 創(chuàng)建model.json文件和表結(jié)構(gòu)csv

model.json 里面主要描述或者說(shuō)告訴 Calcite 如何創(chuàng)建 Schema,也就是告訴框架怎么創(chuàng)建出庫(kù)。

{ "version": "1.0",//忽略 "defaultSchema": "CSV",//設(shè)置默認(rèn)的schema "schemas": [//可定義多個(gè)schema { "name": "CSV",//相當(dāng)于namespace和上面的defaultSchema的值對(duì)應(yīng) "type": "custom",//寫死 "factory": "csv.CsvSchemaFactory",//factory的類名必須是你自己實(shí)現(xiàn)的factory的包的全路徑 "operand": { //這里可以傳遞自定義參數(shù),最終會(huì)以map的形式傳遞給factory的operand參數(shù) "directory": "csv"//directory代表calcite會(huì)在resources下面的csv目錄下面讀取所有的csv文件,factory創(chuàng)建的Schema會(huì)吧這些文件全部構(gòu)建成Table,可以理解為讀取數(shù)據(jù)文件的根目錄,當(dāng)然key的名稱也不一定非得用directory,你可以隨意指定 } } ] } 

接下來(lái)還需要定義一個(gè) csv 文件,用來(lái)定義表結(jié)構(gòu)。

NAME:string,MONEY:string aixiaoxian,10000萬(wàn)
xiaobai,10000萬(wàn)
adong,10000萬(wàn)
maomao,10000萬(wàn)
xixi,10000萬(wàn)
zizi,10000萬(wàn)
wuwu,10000萬(wàn)
kuku,10000萬(wàn)

整個(gè)項(xiàng)目的結(jié)構(gòu)大概就是這樣:

3. 實(shí)現(xiàn)Schema的工廠類

在上述文件中指定的包路徑下去編寫 CsvSchemaFactory 類,實(shí)現(xiàn) SchemaFactory 接口,并且實(shí)現(xiàn)里面唯一的方法 create 方法,創(chuàng)建Schema(庫(kù))。

public class CsvSchemaFactory implements SchemaFactory { /**
     * parentSchema 父節(jié)點(diǎn),一般為root
     * name 為model.json中定義的名字
     * operand 為model.json中定于的數(shù)據(jù),這里可以傳遞自定義參數(shù)
     *
     * @param parentSchema Parent schema
     * @param name         Name of this schema
     * @param operand      The "operand" JSON property
     * @return */ @Override public Schema create(SchemaPlus parentSchema, String name,
                         Map<String, Object> operand) { final String directory = (String) operand.get("directory"); File directoryFile = new File(directory); return new CsvSchema(directoryFile, "scannable");
    }
}

4. 自定義Schma類

有了 SchemaFactory,接下來(lái)需要自定義 Schema 類。

自定義的 Schema 需要實(shí)現(xiàn) Schema 接口,但是直接實(shí)現(xiàn)要實(shí)現(xiàn)的方法太多,我們?nèi)?shí)現(xiàn)官方的 AbstractSchema 類,這樣就只需要實(shí)現(xiàn)一個(gè)方法就行(如果有其他定制化需求可以實(shí)現(xiàn)原生接口)。

核心的邏輯就是createTableMap方法,用于創(chuàng)建出 Table 表。

他會(huì)掃描指定的Resource下面的所有 csv 文件,將每個(gè)文件映射成Table對(duì)象,最終以map形式返回,Schema接口的其他幾個(gè)方法會(huì)用到這個(gè)對(duì)象。

//實(shí)現(xiàn)這一個(gè)方法就行了 @Override protected Map<String, Table> getTableMap() { if (tableMap == null) {
            tableMap = createTableMap();
        } return tableMap;
    } private Map<String, Table> createTableMap() { // Look for files in the directory ending in ".csv" final Source baseSource = Sources.of(directoryFile); //會(huì)自動(dòng)過(guò)濾掉非指定文件后綴的文件,我這里寫的csv File[] files = directoryFile.listFiles((dir, name) -> { final String nameSansGz = trim(name, ".gz"); return nameSansGz.endsWith(".csv");
        }); if (files == null) {
            System.out.println("directory " + directoryFile + " not found");
            files = new File[0];
        } // Build a map from table name to table; each file becomes a table. final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder(); for (File file : files) { Source source = Sources.of(file); final Source sourceSansCsv = source.trimOrNull(".csv"); if (sourceSansCsv != null) { final Table table = createTable(source);
                builder.put(sourceSansCsv.relative(baseSource).path(), table);
            }
        } return builder.build();
    }

5. 自定義 Table

Schema 有了,并且數(shù)據(jù)文件 csv 也映射成 Table 了,一個(gè) csv 文件對(duì)應(yīng)一個(gè) Table。

接下來(lái)我們?nèi)プ远x Table,自定義 Table 的核心是我們要定義字段的類型和名稱,以及如何讀取 csv文件。

  1. 先獲取數(shù)據(jù)類型和名稱,即單表結(jié)構(gòu),從csv文件頭中獲取(當(dāng)前文件頭需要我們自己定義,包括規(guī)則我們也可以定制化)。
/**
 * Base class for table that reads CSV files.
 */ public abstract class CsvTable extends AbstractTable { protected final Source source; protected final @Nullable RelProtoDataType protoRowType; private @Nullable RelDataType rowType; private @Nullable List<RelDataType> fieldTypes; /**
     * Creates a CsvTable.
     */ CsvTable(Source source, @Nullable RelProtoDataType protoRowType) { this.source = source; this.protoRowType = protoRowType;
    } /**
		* 創(chuàng)建一個(gè)CsvTable,繼承AbstractTable,需要實(shí)現(xiàn)里面的getRowType方法,此方法就是獲取當(dāng)前的表結(jié)構(gòu)。
			Table的類型有很多種,比如還有視圖類型,AbstractTable類中幫我們默認(rèn)實(shí)現(xiàn)了Table接口的一些方法,比如getJdbcTableType			方法,默認(rèn)為Table類型,如果有其他定制化需求可直接實(shí)現(xiàn)Table接口。
			和AbstractSchema很像
		*/ @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { if (protoRowType != null) { return protoRowType.apply(typeFactory);
        } if (rowType == null) {
            rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source, null);
        } return rowType;
    } /**
     * Returns the field types of this CSV table.
     */ public List<RelDataType> getFieldTypes(RelDataTypeFactory typeFactory) { if (fieldTypes == null) {
            fieldTypes = new ArrayList<>();
            CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    fieldTypes);
        } return fieldTypes;
    } public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
                                            Source source, @Nullable List<RelDataType> fieldTypes) { final List<RelDataType> types = new ArrayList<>(); final List<String> names = new ArrayList<>(); try (CSVReader reader = openCsv(source)) {
            String[] strings = reader.readNext(); if (strings == null) {
                strings = new String[]{"EmptyFileHasNoColumns:boolean"};
            } for (String string : strings) { final String name; final RelDataType fieldType; //就是簡(jiǎn)單的讀取字符串冒號(hào)前面是名稱,冒號(hào)后面是類型 final int colon = string.indexOf(’:’); if (colon >= 0) {
                    name = string.substring(0, colon); String typeString = string.substring(colon + 1); Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString); if (decimalMatcher.matches()) { int precision = Integer.parseInt(decimalMatcher.group(1)); int scale = Integer.parseInt(decimalMatcher.group(2));
                        fieldType = parseDecimalSqlType(typeFactory, precision, scale);
                    } else { switch (typeString) { case "string":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR); break; case "boolean":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN); break; case "byte":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT); break; case "char":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR); break; case "short":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT); break; case "int":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER); break; case "long":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT); break; case "float":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL); break; case "double":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE); break; case "date":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE); break; case "timestamp":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP); break; case "time":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME); break; default:
                                LOGGER.warn( "Found unknown type: {} in file: {} for column: {}. Will assume the type of " + "column is string.",
                                        typeString, source.path(), name);
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR); break;
                        }
                    }
                } else { //  如果沒(méi)定義,默認(rèn)都是String類型,字段名稱也是string name = string;
                    fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
                }
                names.add(name);
                types.add(fieldType); if (fieldTypes != null) {
                    fieldTypes.add(fieldType);
                }
            }
        } catch (IOException e) { // ignore } if (names.isEmpty()) {
            names.add("line");
            types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
        } return typeFactory.createStructType(Pair.zip(names, types));
    }
}
  1. 獲取文件中的數(shù)據(jù),上面把Table的表結(jié)構(gòu)字段名稱和類型都獲取到了以后,就剩最后一步了,獲取文件中的數(shù)據(jù)。我們需要自定義一個(gè)類,實(shí)現(xiàn) ScannableTable 接口,并且實(shí)現(xiàn)里面唯一的方法 scan 方法,其實(shí)本質(zhì)上就是讀文件,然后把文件的每一行的數(shù)據(jù)和上述獲取的 fileType 進(jìn)行匹配。
@Override public Enumerable<Object[]> scan(DataContext root) { JavaTypeFactory typeFactory = root.getTypeFactory(); final List<RelDataType> fieldTypes = getFieldTypes(typeFactory); final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size()); final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); return new AbstractEnumerable<@Nullable Object[]>() { @Override public Enumerator<@Nullable Object[]> enumerator() { //返回我們自定義的讀取數(shù)據(jù)的類 return new CsvEnumerator<>(source, cancelFlag, false, null,
                        CsvEnumerator.arrayConverter(fieldTypes, fields, false));
            }
        };
    } public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream, @Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) { this.cancelFlag = cancelFlag; this.rowConverter = rowConverter; this.filterValues = filterValues == null ? null : ImmutableNullableList.copyOf(filterValues); try { this.reader = openCsv(source); //跳過(guò)第一行,因?yàn)榈谝恍惺嵌x類型和名稱的 this.reader.readNext(); // skip header row } catch (IOException e) { throw new RuntimeException(e);
        }
    } //CsvEnumerator必須實(shí)現(xiàn)calcit自己的迭代器,里面有current、moveNext方法,current是返回當(dāng)前游標(biāo)所在的數(shù)據(jù)記錄,moveNext是將游標(biāo)指向下一個(gè)記錄,官網(wǎng)中自己定義了一個(gè)類型轉(zhuǎn)換器,是將csv文件中的數(shù)據(jù)轉(zhuǎn)換成文件頭指定的類型,這個(gè)需要我們自己來(lái)實(shí)現(xiàn) @Override public E current() { return castNonNull(current);
    } @Override public boolean moveNext() { try {
            outer: for (; ; ) { if (cancelFlag.get()) { return false;
                } final String[] strings = reader.readNext(); if (strings == null) {
                    current = null;
                    reader.close(); return false;
                } if (filterValues != null) { for (int i = 0; i < strings.length; i++) { String filterValue = filterValues.get(i); if (filterValue != null) { if (!filterValue.equals(strings[i])) { continue outer;
                            }
                        }
                    }
                }
                current = rowConverter.convertRow(strings); return true;
            }
        } catch (IOException e) { throw new RuntimeException(e);
        }
    } protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) { if (fieldType == null || string == null) { return string;
            } switch (fieldType.getSqlTypeName()) { case BOOLEAN: if (string.length() == 0) { return null;
                    } return Boolean.parseBoolean(string); case TINYINT: if (string.length() == 0) { return null;
                    } return Byte.parseByte(string); case SMALLINT: if (string.length() == 0) { return null;
                    } return Short.parseShort(string); case INTEGER: if (string.length() == 0) { return null;
                    } return Integer.parseInt(string); case BIGINT: if (string.length() == 0) { return null;
                    } return Long.parseLong(string); case FLOAT: if (string.length() == 0) { return null;
                    } return Float.parseFloat(string); case DOUBLE: if (string.length() == 0) { return null;
                    } return Double.parseDouble(string); case DECIMAL: if (string.length() == 0) { return null;
                    } return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string); case DATE: if (string.length() == 0) { return null;
                    } try { Date date = TIME_FORMAT_DATE.parse(string); return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
                    } catch (ParseException e) { return null;
                    } case TIME: if (string.length() == 0) { return null;
                    } try { Date date = TIME_FORMAT_TIME.parse(string); return (int) date.getTime();
                    } catch (ParseException e) { return null;
                    } case TIMESTAMP: if (string.length() == 0) { return null;
                    } try { Date date = TIME_FORMAT_TIMESTAMP.parse(string); return date.getTime();
                    } catch (ParseException e) { return null;
                    } case VARCHAR: default: return string;
            }
        }

6. 最后

至此我們需要準(zhǔn)備的東西:庫(kù)、表名稱、字段名稱、字段類型都有了,接下來(lái)我們?nèi)懳覀兊?SQL 語(yǔ)句查詢我們的數(shù)據(jù)文件。

創(chuàng)建好幾個(gè)測(cè)試的數(shù)據(jù)文件,例如上面項(xiàng)目結(jié)構(gòu)中我創(chuàng)建 2 個(gè) csv 文件USERINFO.csv、ASSET.csv,然后創(chuàng)建測(cè)試類。

這樣跑起來(lái),就可以通過(guò) SQL 語(yǔ)句的方式直接查詢數(shù)據(jù)了。

public class Test { public static void main(String[] args) throws SQLException { Connection connection = null; Statement statement = null; try { Properties info = new Properties();
            info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo "));
 
            print(statement.executeQuery(" select age from userinfo where name =’aixiaoxian’ "));
 
            print(statement.executeQuery(" select * from userinfo where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo where name like ’a%’ "));
        } finally {
            connection.close();
        }
    } private static void print(ResultSet resultSet) throws SQLException { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); while (resultSet.next()) { for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i)); if (i < columnCount) {
                    System.out.print(", ");
                } else {
                    System.out.println(); break;
                }
            }
        }
    }
}

查詢結(jié)果:

這里在測(cè)試的時(shí)候踩到2個(gè)坑,大家如果自己實(shí)驗(yàn)的時(shí)候可以避免下。

  1. Calcite 默認(rèn)會(huì)把你的 SQL 語(yǔ)句中的表名和類名全部轉(zhuǎn)換為大寫,因?yàn)槟J(rèn)的 csv(其他文件也一樣)文件的名稱就是表名,除非你自定義規(guī)則,所以你的文件名要寫成大寫。

  2. Calcite 有一些默認(rèn)的關(guān)鍵字不能用作表名,不然會(huì)查詢失敗,比如我剛開(kāi)始定的user.csv就一直查不出來(lái),改成USERINFO就可以了,這點(diǎn)和Mysql 的內(nèi)置關(guān)鍵字差不多,也可以通過(guò)個(gè)性化配置去改。

演示Mysql

  1. 首先,還是先準(zhǔn)備 Calcite 需要的東西:庫(kù)、表名稱、字段名稱、字段類型。

如果數(shù)據(jù)源使用Mysql的話,這些都不用我們?nèi)?JAVA 服務(wù)中去定義,直接在 Mysql 客戶端創(chuàng)建好,這里直接創(chuàng)建兩張表用于測(cè)試,就和我們的csv文件一樣。

CREATE TABLE `USERINFO1` (
  `NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `AGE` int DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3; CREATE TABLE `ASSET` (
  `NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `MONEY` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
  1. 上述 csv 案例中的 SchemaFactory 以及 Schema 這些都不需要?jiǎng)?chuàng)建,因?yàn)?nbsp;Calcite 默認(rèn)提供了 Mysql 的 Adapter適配器。

  2. 其實(shí),上述兩步都不需要做,我們真正要做的是,告訴 Calcite 你的 JDBC 的連接信息就行了,也是在 model.json 文件中定義。

{ "version": "1.0", "defaultSchema": "Demo", "schemas": [ { "name": "Demo", "type": "custom", //  這里是calcite默認(rèn)的SchemaFactory,里面的流程和我們上述自己定義的相同,下面會(huì)簡(jiǎn)單看看源碼。 "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory", "operand": { //  我用的是mysql8以上版本,所以這里注意包的名稱 "jdbcDriver": "com.mysql.cj.jdbc.Driver", "jdbcUrl": "jdbc:mysql://localhost:3306/irving", "jdbcUser": "root", "jdbcPassword": "123456" } } ] } 
  1. 在項(xiàng)目中引入 Mysql 的驅(qū)動(dòng)包
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.30</version>
</dependency>
  1. 寫好測(cè)試類,這樣直接就相當(dāng)于完成了所有的功能了。
public class TestMysql { public static void main(String[] args) throws SQLException { Connection connection = null; Statement statement = null; try { Properties info = new Properties();
            info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            statement.executeUpdate(" insert into  userinfo1 values (’xxx’,12) ");
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo1 "));
 
            print(statement.executeQuery(" select age from userinfo1 where name =’aixiaoxian’ "));
 
            print(statement.executeQuery(" select * from userinfo1 where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo1 where name like ’a%’ "));
        } finally {
            connection.close();
        }
 
    } private static void print(ResultSet resultSet) throws SQLException { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); while (resultSet.next()) { for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i)); if (i < columnCount) {
                    System.out.print(", ");
                } else {
                    System.out.println(); break;
                }
            }
        }
    }
}

查詢結(jié)果:

Mysql實(shí)現(xiàn)原理

上述我們?cè)?nbsp;model.json 文件中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory類,可以看下這個(gè)類的代碼。

這個(gè)類是把 Factory 和 Schema 寫在了一起,其實(shí)就是調(diào)用schemafactory類的create方法創(chuàng)建一個(gè) schema 出來(lái),和我們上面自定義的流程是一樣的。

其中JdbcSchema類也是 Schema 的子類,所以也會(huì)實(shí)現(xiàn)getTable方法(這個(gè)我們上述也實(shí)現(xiàn)了,我們當(dāng)時(shí)是獲取表結(jié)構(gòu)和表的字段類型以及名稱,是從csv文件頭中讀文件的),JdbcSchema的實(shí)現(xiàn)是通過(guò)連接 Mysql 服務(wù)端查詢?cè)獢?shù)據(jù)信息,再將這些信息封裝成 Calcite需要的對(duì)象格式。

這里同樣要注意 csv方式的2個(gè)注意點(diǎn),大小寫和關(guān)鍵字問(wèn)題。

public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      Map<String, Object> operand) {
    DataSource dataSource; try { final String dataSourceName = (String) operand.get("dataSource"); if (dataSourceName != null) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
      } else { //會(huì)走在這里來(lái),這里就是我們?cè)趍odel.json中指定的jdbc的連接信息,最終會(huì)創(chuàng)建一個(gè)datasource final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl"); final String jdbcDriver = (String) operand.get("jdbcDriver"); final String jdbcUser = (String) operand.get("jdbcUser"); final String jdbcPassword = (String) operand.get("jdbcPassword");
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
      }
    } catch (Exception e) { throw new RuntimeException("Error while reading dataSource", e);
    } String jdbcCatalog = (String) operand.get("jdbcCatalog"); String jdbcSchema = (String) operand.get("jdbcSchema"); String sqlDialectFactory = (String) operand.get("sqlDialectFactory"); if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) { return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else { SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
          SqlDialectFactory.class, sqlDialectFactory); return JdbcSchema.create(
          parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
    }
  } @Override public @Nullable Table getTable(String name) { return getTableMap(false).get(name);
  } private synchronized ImmutableMap<String, JdbcTable> getTableMap( boolean force) { if (force || tableMap == null) {
      tableMap = computeTables();
    } return tableMap;
  } private ImmutableMap<String, JdbcTable> computeTables() { Connection connection = null; ResultSet resultSet = null; try {
      connection = dataSource.getConnection(); final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection); final String catalog = catalogSchema.left; final String schema = catalogSchema.right; final Iterable<MetaImpl.MetaTable> tableDefs; Foo threadMetadata = THREAD_METADATA.get(); if (threadMetadata != null) {
        tableDefs = threadMetadata.apply(catalog, schema);
      } else { final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>(); //  獲取元數(shù)據(jù) final DatabaseMetaData metaData = connection.getMetaData();
        resultSet = metaData.getTables(catalog, schema, null, null); while (resultSet.next()) { //獲取庫(kù)名,表明等信息 final String catalogName = resultSet.getString(1); final String schemaName = resultSet.getString(2); final String tableName = resultSet.getString(3); final String tableTypeName = resultSet.getString(4);
          tableDefList.add( new MetaImpl.MetaTable(catalogName, schemaName, tableName,
                  tableTypeName));
        }
        tableDefs = tableDefList;
      } final ImmutableMap.Builder<String, JdbcTable> builder =
          ImmutableMap.builder(); for (MetaImpl.MetaTable tableDef : tableDefs) { final String tableTypeName2 = tableDef.tableType == null ? null : tableDef.tableType.toUpperCase(Locale.ROOT).replace(’ ’, ’_’); final TableType tableType = Util.enumVal(TableType.OTHER, tableTypeName2); if (tableType == TableType.OTHER  && tableTypeName2 != null) {
          System.out.println("Unknown table type: " + tableTypeName2);
        } //  最終封裝成JdbcTable對(duì)象 final JdbcTable table = new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
                tableDef.tableName, tableType);
        builder.put(tableDef.tableName, table);
      } return builder.build();
    } catch (SQLException e) { throw new RuntimeException( "Exception while reading tables", e);
    } finally {
      close(connection, null, resultSet);
    }
  }

SQL執(zhí)行流程

OK,到這里基本上兩個(gè)簡(jiǎn)單的案例已經(jīng)演示好了,最后補(bǔ)充一下整個(gè)Calcite架構(gòu)和整個(gè) SQL 的執(zhí)行流程。

整個(gè)流程如下:SQL解析(Parser)=> SQL校驗(yàn)(Validator)=> SQL查詢優(yōu)化(optimizer)=> SQL生成 => SQL執(zhí)行

SQL Parser

所有的 SQL 語(yǔ)句在執(zhí)行前都需要經(jīng)歷 SQL 解析器解析,解析器的工作內(nèi)容就是將 SQL 中的 Token 解析成抽象語(yǔ)法樹(shù),每個(gè)樹(shù)的節(jié)點(diǎn)都是一個(gè) SqlNode,這個(gè)過(guò)程其實(shí)就是 Sql Text => SqlNode 的過(guò)程。

我們前面的 Demo 沒(méi)有自定義 Parser,是因?yàn)?Calcite 采用了自己默認(rèn)的 Parser(SqlParserImpl)。

SqlNode

SqlNode是整個(gè)解析中的核心,比如圖中你可以發(fā)現(xiàn),對(duì)于每個(gè)比如select、from、where關(guān)鍵字之后的內(nèi)容其實(shí)都是一個(gè)SqlNode。

parserConfig方法主要是設(shè)置 SqlParserFactory 的參數(shù),比如我們上面所說(shuō)的我本地測(cè)試的時(shí)候踩的大小寫的坑,就可以在這里設(shè)置。

直接調(diào)用setCaseSensitive=false即不會(huì)將 SQL 語(yǔ)句中的表名列名轉(zhuǎn)為大寫,下面是默認(rèn)的,其他的參數(shù)可以按需配置。

SQL Validator

SQL 語(yǔ)句先經(jīng)過(guò) Parser,然后經(jīng)過(guò)語(yǔ)法驗(yàn)證器,注意 Parser 并不會(huì)驗(yàn)證語(yǔ)法的正確性。

其實(shí) Parser 只會(huì)驗(yàn)證 SQL 關(guān)鍵詞的位置是否正確,我們上述2個(gè) Parser 的例子中都沒(méi)有創(chuàng)建 schema 和 table 這些,但是如果這樣寫,那就會(huì)報(bào)錯(cuò),這個(gè)錯(cuò)誤就是 parser 檢測(cè)后拋出來(lái)的(ParseLocationErrorTest)。

真正的校驗(yàn)在 validator 中,會(huì)去驗(yàn)證查詢的表名是否存在,查詢的字段是否存在,類型是否匹配,這個(gè)過(guò)程比較復(fù)雜,默認(rèn)的 validator 是SqlValidatorImpl。

查詢優(yōu)化

比如關(guān)系代數(shù),比如什么投影、笛卡爾積這些,Calcite提供了很多內(nèi)部的優(yōu)化器,也可以實(shí)現(xiàn)自己的優(yōu)化器。

適配器

Calcite 是不包含存儲(chǔ)層的,所以提供一種適配器的機(jī)制來(lái)訪問(wèn)外部的數(shù)據(jù)存儲(chǔ)或者存儲(chǔ)引擎。

最后,進(jìn)階

官網(wǎng)里面寫了未來(lái)會(huì)支持Kafka適配器到公共Api中,到時(shí)候使用起來(lái)就和上述集成Mysql一樣方便,但是現(xiàn)在還沒(méi)有支持,我這里給大家提供個(gè)自己實(shí)現(xiàn)的方式,這樣就可以通過(guò) SQL 的方式直接查詢 Kafka 中的 Topic 數(shù)據(jù)等信息。

這里我們內(nèi)部集成實(shí)現(xiàn)了KSQL的能力,查詢結(jié)果是OK的。

還是像上述步驟一樣,我們需要準(zhǔn)備庫(kù)、表名稱、字段名稱、字段類型、數(shù)據(jù)源(多出來(lái)的地方)。

  1. 自定義Sql解析,之前我們都沒(méi)有自定義解析,這里需要自定義解析,是因?yàn)槲倚枰獎(jiǎng)討B(tài)解析sql的where條件里面的partation。
  • 配置解析器,就是之前案例中提到的配置大小寫之類的
  • 創(chuàng)建解析器,使用的默認(rèn)SqlParseImpl
  • 開(kāi)始解析,生成AST,我們可以基于生成的SqlNode做一些業(yè)務(wù)相關(guān)的校驗(yàn)和參數(shù)解析
  1. 適配器獲取數(shù)據(jù)源
public class KafkaConsumerAdapter { public static List<KafkaResult> executor(KafkaSqlInfo kafkaSql) { Properties props = new Properties();
           props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
           List<TopicPartition> topics = new ArrayList<>(); for (Integer partition : kafkaSql.getPartition()) { TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);
               topics.add(tp);
           }
           consumer.assign(topics); for (TopicPartition tp : topics) {
               Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp)); long position = 500; if (offsets.get(tp).longValue() > position) {
                   consumer.seek(tp, offsets.get(tp).longValue() - 500);
               } else {
                   consumer.seek(tp, 0);
               }
           }
           List<KafkaResult> results = new ArrayList<>(); boolean flag = true; while (flag) {
               ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { //轉(zhuǎn)成我定義的對(duì)象集合 KafkaResult result = new KafkaResult();
                   result.setPartition(record.partition());
                   result.setOffset(record.offset());
                   result.setMsg(record.value());
                   result.setKey(record.key());
                   results.add(result);
               } if (!records.isEmpty()) {
                   flag = false;
               }
           }
           consumer.close(); return results;
       }
   
   }
  1. 執(zhí)行查詢,就可以得到我們想要的效果了。
public class TestKafka { public static void main(String[] args) throws Exception { KafkaService kafkaService = new KafkaService(); //把解析到的參數(shù)放在我自己定義的kafkaSqlInfo對(duì)象中 KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 "); //適配器獲取數(shù)據(jù)源,主要是從上述的sqlInfo對(duì)象中去poll數(shù)據(jù) List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo); //執(zhí)行查詢 query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   
           sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like ’%account%’  limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   
   
           sqlInfo = kafkaService.parseSql("select count(*) AS addad  from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
       } private static void query(String tableName, List<KafkaResult> results,
                                 String sql) throws Exception { //創(chuàng)建model.json,設(shè)置我的SchemaFactory,設(shè)置庫(kù)名 String model = createTempJson(); //設(shè)置我的表結(jié)構(gòu),表名稱和表字段名以及類型 KafkaTableSchema.generateSchema(tableName, results); Properties info = new Properties();
           info.setProperty("lex", Lex.JAVA.toString()); Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info); Statement st = connection.createStatement(); //執(zhí)行 ResultSet result = st.executeQuery(sql); ResultSetMetaData rsmd = result.getMetaData();
           List<Map<String, Object>> ret = new ArrayList<>(); while (result.next()) {
               Map<String, Object> map = new LinkedHashMap<>(); for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                   map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
               }
               ret.add(map);
           }
           result.close();
           st.close();
           connection.close();
       } private static void print(ResultSet resultSet) throws SQLException { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); while (resultSet.next()) { for (int i = 1; ; i++) {
                   System.out.print(resultSet.getString(i)); if (i < columnCount) {
                       System.out.print(", ");
                   } else {
                       System.out.println(); break;
                   }
               }
           }
       } private static String createTempJson() throws IOException { JSONObject object = new JSONObject();
           object.put("version", "1.0");
           object.put("defaultSchema", "QAKAFKA"); JSONArray array = new JSONArray(); JSONObject tmp = new JSONObject();
           tmp.put("name", "QAKAFKA");
           tmp.put("type", "custom");
           tmp.put("factory", "kafka.KafkaSchemaFactory");
           array.add(tmp);
           object.put("schemas", array); return object.toJSONString();
       }
   }
  • 生成臨時(shí)的model.json,之前是基于文件,現(xiàn)在基于text字符串,mode=inline模式
  • 設(shè)置我的表結(jié)構(gòu)、表名稱、字段名、字段類型等,并放置在內(nèi)存中,同時(shí)將適配器查詢出來(lái)的數(shù)據(jù)也放進(jìn)去table里面
  • 獲取連接,執(zhí)行查詢,完美!



  業(yè)務(wù)實(shí)施流程

需求調(diào)研 →

團(tuán)隊(duì)組建和動(dòng)員 →

數(shù)據(jù)初始化 →

調(diào)試完善 →

解決方案和選型 →

硬件網(wǎng)絡(luò)部署 →

系統(tǒng)部署試運(yùn)行 →

系統(tǒng)正式上線 →

合作協(xié)議

系統(tǒng)開(kāi)發(fā)/整合

制作文檔和員工培訓(xùn)

售后服務(wù)

馬上咨詢: 如果您有業(yè)務(wù)方面的問(wèn)題或者需求,歡迎您咨詢!我們帶來(lái)的不僅僅是技術(shù),還有行業(yè)經(jīng)驗(yàn)積累。
QQ: 39764417/308460098     Phone: 13 9800 1 9844 / 135 6887 9550     聯(lián)系人:石先生/雷先生