#随手记# 平台集成 ClickHouse (持续更新中...)原创
金蝶云社区-周立思
周立思
3人赞赏了该文章 221次浏览 未经作者许可,禁止转载编辑于2024年01月31日 18:42:38

平台集成 ClickHouse 之前计划参加开发者大赛,没什么头绪放弃了。 

ClickHouse 自己也在学习中,写篇文章作为笔记,相互交流学习。


ClickHouse

官网:https://clickhouse.com/docs/zh 

部署参考:https://clickhouse.com/docs/zh/getting-started/install 


本篇文章内容

  1. 集成 ClickHouse,执行查询 SQL 语句

  2. 报表基于 ClickHouse 查询

  3. v6.0 超级查询 是否能支持

  4. 利用集成服务-数据流 或者 AlgoX 同步数据到 ClickHouse

  5. 集成服务-数据流 & ClickHouse 实现大数据分析


如何集成?

因为苍穹平台限制,选择使用集成服务云进行集成。

使用 JDBC 集成:https://clickhouse.com/docs/zh/interfaces/jdbc ,下载 jar 拷贝到目录 lib/trd


集成服务-连接类型

image.png

配置表单,新建布局,源页面 isc_database_link 。需要注意 【连接配置】字段布局面板修改可见性。

image.png


编写连接器工厂类

可参考 kd.isc.iscb.platform.core.connector.jdbc.PgSqlConnectionFactory 

import com.clickhouse.jdbc.ClickHouseDataSource;
import kd.bos.dataentity.entity.DynamicObject;
import kd.isc.iscb.platform.core.connector.ConnectionWrapper;
import kd.isc.iscb.platform.core.connector.JdbcConnectionWrapper;
import kd.isc.iscb.platform.core.connector.jdbc.AbstractConnectionFactory;
import kd.isc.iscb.util.connector.Response;
import kd.isc.iscb.util.connector.server.MetaType;
import kd.isc.iscb.util.db.DbType;
import kd.isc.iscb.util.db.Table;
import kd.isc.iscb.util.io.ObjectReader;
import kd.isc.iscb.util.misc.Pair;

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class ClickHouseConnectionFactory extends AbstractConnectionFactory {

    @Override
    protected DataSource createDataSource(String url, String user, String password) throws SQLException {
        ClickHouseDataSource dataSource=new ClickHouseDataSource(url);
        return dataSource;
    }

    @Override
    public char getQuot() {
        return '"';
    }

    @Override
    public Map<String, MetaType> getMetaList(ConnectionWrapper cn) {
        Map<String, MetaType> metaList = super.getMetaList(cn);
        Iterator var3 = metaList.entrySet().iterator();

        while(var3.hasNext()) {
            Map.Entry<String, MetaType> entry = (Map.Entry)var3.next();
            entry.setValue(MetaType.ENTITY);
        }

        return metaList;
    }

    @Override
    public ObjectReader<Map<String, Object>> query(ConnectionWrapper cn, String entity, Map<String, Object> requires, List<Map<String, Object>> filter, List<Map<String, String>> orderBy) {
        return super.query(cn, entity, requires, filter, orderBy);
    }

    @Override
    public Response doBizAction(ConnectionWrapper cn, String entity, Map<String, Object> data, Map<String, List<String>> judgeFields, List<String> actions, String proxy_user) {
        return super.doBizAction(cn, entity, data, judgeFields, actions, proxy_user);
    }

    @Override
    public Response doDataAction(ConnectionWrapper cn, Map<String, Object> data, Table mainTable, Map<String, Pair<Table, String>> entryTables, Map<String, List<String>> judgeFields, List<String> actions) {
        return super.doDataAction(cn, data, mainTable, entryTables, judgeFields, actions);
    }

    @Override
    public DbType getDatabaseType() {
        return new ClickHouseDbType();
    }

    @Override
    protected String getURL(DynamicObject cfg) {
        String url=String.format("jdbc:ch:http://%s:%s",
                cfg.get("server_ip"),
                cfg.get("server_port"));
        return url;
    }

    protected String getTestSQL() {
        return "select 1";
    }

    public String topSQL(JdbcConnectionWrapper cn, String sql, int topN) {
        if (!sql.startsWith("select") && !sql.startsWith("SELECT")) {
            throw new IllegalArgumentException(sql);
        } else {
            return "SELECT * FROM (" + sql + ")X LIMIT " + topN + " OFFSET 0";
        }
    }

}


集成服务-连接配置

image.png

报表使用 ClickHouse 查询 SQL

如何执行 SQL 语句

参考源码:kd.bos.isc.util.connector.s.QueryList 实现

参数 dbLinkId 是实体 isc_database_link 主键

public static List<DataRow> query(Long dbLinkId, String sql, List<Object> values, List<Integer> types) {
    if (!DataSourceResource.isReady()) {
        DataSourceResource.createConnectionPool();
    }
    ConnectionWrapper connection = DataSourceResource.getConnectionByDbLink(dbLinkId);
    ObjectReader<DataRow> reader = DbUtil.executeQuery((Connection) connection, sql, values, types);
    try {
        List<DataRow> list = new ArrayList();
        int totalBytes = 0;

        for (DataRow item = (DataRow) reader.read(); item != null; item = (DataRow) reader.read()) {
            totalBytes = checkTotalBytes(totalBytes, item);
            list.add(item);
        }
        return list;
    } finally {
        reader.close();
    }
}


查询结果转 DataSet

使用 Algo CustomizedInput 实现

@Override
public Iterator<Object[]> createIterator() {
    List<Object[]> rows=new ArrayList<>();
    String[] fieldNames=this.rowMeta.getFieldNames();
    for (DataRow row:result){
        Object[] values=new Object[fieldNames.length];
        for (int i = 0; i < fieldNames.length; i++) {
            values[i]=row.getValue(fieldNames[i]);
        }
        rows.add(values);
    }
    return rows.iterator();
}


编写报表查询插件

@Override
public DataSet query(ReportQueryParam reportQueryParam, Object o) throws Throwable {
    FilterInfo filterInfo=reportQueryParam.getFilter();
    DynamicObject dbLink=filterInfo.getDynamicObject("soi_dblink");
    String sql=filterInfo.getString("soi_sql");
    RowMeta rowMeta=new RowMeta(new String[]{"town","district","c","price","price_bar"},
            new DataType[]{DataType.StringType,DataType.StringType,DataType.IntegerType,DataType.BigDecimalType,DataType.StringType});
    
    List<DataRow> result=ClickHouseByISC.query((Long)dbLink.getPkValue(),sql, Collections.emptyList(),Collections.emptyList());
    DataSet dataSet=Algo.create("test").createDataSet(new ClickHouseInput(rowMeta,result));
    return dataSet;
}


新建报表页面

image.png


查询数据

示例数据参考:https://clickhouse.com/docs/zh/getting-started/example-datasets/uk-price-paid 

数据集包含自 1995 年以来有关英格兰和威尔士房地产价格的数据

image.png


查询语句,查询最昂贵的社区

SELECT
    town,
    district,
    count() AS c,
    round(avg(price)) AS price,
    bar(price, 0, 5000000, 100) AS price_bar 
FROM uk_price_paid
WHERE date >= '2020-01-01'
GROUP BY
    town,
    district
HAVING c >= 100
ORDER BY price DESC
LIMIT 100


效果图

image.png



赞 3