平台集成 ClickHouse 之前计划参加开发者大赛,没什么头绪放弃了。
ClickHouse 自己也在学习中,写篇文章作为笔记,相互交流学习。
ClickHouse
官网:https://clickhouse.com/docs/zh
部署参考:https://clickhouse.com/docs/zh/getting-started/install
本篇文章内容
集成 ClickHouse,执行查询 SQL 语句
报表基于 ClickHouse 查询
v6.0 超级查询 是否能支持
利用集成服务-数据流 或者 AlgoX 同步数据到 ClickHouse
集成服务-数据流 & ClickHouse 实现大数据分析
如何集成?
因为苍穹平台限制,选择使用集成服务云进行集成。
使用 JDBC 集成:https://clickhouse.com/docs/zh/interfaces/jdbc ,下载 jar 拷贝到目录 lib/trd
集成服务-连接类型
配置表单,新建布局,源页面 isc_database_link 。需要注意 【连接配置】字段布局面板修改可见性。
编写连接器工厂类
可参考 kd.isc.iscb.platform.core.connector.jdbc.PgSqlConnectionFactory
import com.clickhouse.jdbc.ClickHouseDataSource; import kd.bos.dataentity.entity.DynamicObject; import kd.isc.iscb.platform.core.connector.ConnectionWrapper; import kd.isc.iscb.platform.core.connector.JdbcConnectionWrapper; import kd.isc.iscb.platform.core.connector.jdbc.AbstractConnectionFactory; import kd.isc.iscb.util.connector.Response; import kd.isc.iscb.util.connector.server.MetaType; import kd.isc.iscb.util.db.DbType; import kd.isc.iscb.util.db.Table; import kd.isc.iscb.util.io.ObjectReader; import kd.isc.iscb.util.misc.Pair; import javax.sql.DataSource; import java.sql.SQLException; import java.util.Iterator; import java.util.List; import java.util.Map; public class ClickHouseConnectionFactory extends AbstractConnectionFactory { @Override protected DataSource createDataSource(String url, String user, String password) throws SQLException { ClickHouseDataSource dataSource=new ClickHouseDataSource(url); return dataSource; } @Override public char getQuot() { return '"'; } @Override public Map<String, MetaType> getMetaList(ConnectionWrapper cn) { Map<String, MetaType> metaList = super.getMetaList(cn); Iterator var3 = metaList.entrySet().iterator(); while(var3.hasNext()) { Map.Entry<String, MetaType> entry = (Map.Entry)var3.next(); entry.setValue(MetaType.ENTITY); } return metaList; } @Override public ObjectReader<Map<String, Object>> query(ConnectionWrapper cn, String entity, Map<String, Object> requires, List<Map<String, Object>> filter, List<Map<String, String>> orderBy) { return super.query(cn, entity, requires, filter, orderBy); } @Override public Response doBizAction(ConnectionWrapper cn, String entity, Map<String, Object> data, Map<String, List<String>> judgeFields, List<String> actions, String proxy_user) { return super.doBizAction(cn, entity, data, judgeFields, actions, proxy_user); } @Override public Response doDataAction(ConnectionWrapper cn, Map<String, Object> data, Table mainTable, Map<String, Pair<Table, String>> entryTables, Map<String, List<String>> judgeFields, List<String> actions) { return super.doDataAction(cn, data, mainTable, entryTables, judgeFields, actions); } @Override public DbType getDatabaseType() { return new ClickHouseDbType(); } @Override protected String getURL(DynamicObject cfg) { String url=String.format("jdbc:ch:http://%s:%s", cfg.get("server_ip"), cfg.get("server_port")); return url; } protected String getTestSQL() { return "select 1"; } public String topSQL(JdbcConnectionWrapper cn, String sql, int topN) { if (!sql.startsWith("select") && !sql.startsWith("SELECT")) { throw new IllegalArgumentException(sql); } else { return "SELECT * FROM (" + sql + ")X LIMIT " + topN + " OFFSET 0"; } } }
集成服务-连接配置
报表使用 ClickHouse 查询 SQL
如何执行 SQL 语句
参考源码:kd.bos.isc.util.connector.s.QueryList 实现
参数 dbLinkId 是实体 isc_database_link 主键
public static List<DataRow> query(Long dbLinkId, String sql, List<Object> values, List<Integer> types) { if (!DataSourceResource.isReady()) { DataSourceResource.createConnectionPool(); } ConnectionWrapper connection = DataSourceResource.getConnectionByDbLink(dbLinkId); ObjectReader<DataRow> reader = DbUtil.executeQuery((Connection) connection, sql, values, types); try { List<DataRow> list = new ArrayList(); int totalBytes = 0; for (DataRow item = (DataRow) reader.read(); item != null; item = (DataRow) reader.read()) { totalBytes = checkTotalBytes(totalBytes, item); list.add(item); } return list; } finally { reader.close(); } }
查询结果转 DataSet
使用 Algo CustomizedInput 实现
@Override public Iterator<Object[]> createIterator() { List<Object[]> rows=new ArrayList<>(); String[] fieldNames=this.rowMeta.getFieldNames(); for (DataRow row:result){ Object[] values=new Object[fieldNames.length]; for (int i = 0; i < fieldNames.length; i++) { values[i]=row.getValue(fieldNames[i]); } rows.add(values); } return rows.iterator(); }
编写报表查询插件
@Override public DataSet query(ReportQueryParam reportQueryParam, Object o) throws Throwable { FilterInfo filterInfo=reportQueryParam.getFilter(); DynamicObject dbLink=filterInfo.getDynamicObject("soi_dblink"); String sql=filterInfo.getString("soi_sql"); RowMeta rowMeta=new RowMeta(new String[]{"town","district","c","price","price_bar"}, new DataType[]{DataType.StringType,DataType.StringType,DataType.IntegerType,DataType.BigDecimalType,DataType.StringType}); List<DataRow> result=ClickHouseByISC.query((Long)dbLink.getPkValue(),sql, Collections.emptyList(),Collections.emptyList()); DataSet dataSet=Algo.create("test").createDataSet(new ClickHouseInput(rowMeta,result)); return dataSet; }
新建报表页面
查询数据
示例数据参考:https://clickhouse.com/docs/zh/getting-started/example-datasets/uk-price-paid
数据集包含自 1995 年以来有关英格兰和威尔士房地产价格的数据
查询语句,查询最昂贵的社区
SELECT town, district, count() AS c, round(avg(price)) AS price, bar(price, 0, 5000000, 100) AS price_bar FROM uk_price_paid WHERE date >= '2020-01-01' GROUP BY town, district HAVING c >= 100 ORDER BY price DESC LIMIT 100
效果图
推荐阅读