Bulkwritesupport:begin->prepare->(inloopexec)->commit
并没有详细先容用法和事理,笔者在开拓业务时利用的库是sqlx[2],sql也支持clickhouse-go驱动。参考了官方样例代码[3]:
...tx,err:=connect.Begin()checkErr(err)stmt,err:=tx.Prepare("INSERTINTOexample(country_code,os_id,browser_id,categories,action_day,action_time)VALUES(?,?,?,?,?,?)")checkErr(err)fori:=0;i<100;i++{if_,err:=stmt.Exec("RU",10+i,100+i,[]int16{1,2,3},time.Now(),time.Now(),);err!=nil{log.Fatal(err)}}...
我写的bulk write类似上面的代码,但是提交给同事review时,他提出了疑问:stmt.Exec是每次实行都发送写要求到数据库吗?这个问题实在我不敢肯定,官方文档也说得不明确。考虑到严谨性,让自己的PR更有说服力,自己去翻看了干系源代码。
这里须要指出,如果利用编辑器里的代码跳转功能会跳到database/sql库中的Exec函数实现,实际上我们要看的代码是clickhouse-go中的实现,至于编辑器跳转到database/sql中的缘故原由,书写此文时笔者也没弄清楚,先挖个坑吧。

核心实现
stmt.Exec的核心代码如下[4]:
func(stmtstmt)execContext(ctxcontext.Context,args[]driver.Value)(driver.Result,error){ifstmt.isInsert{stmt.counter++iferr:=stmt.ch.block.AppendRow(args);err!=nil{returnnil,err}if(stmt.counter%stmt.ch.blockSize)==0{stmt.ch.logf("[exec]flushblock")iferr:=stmt.ch.writeBlock(stmt.ch.block);err!=nil{returnnil,err}iferr:=stmt.ch.encoder.Flush();err!=nil{returnnil,err}}returnemptyResult,nil}iferr:=stmt.ch.sendQuery(stmt.bind(convertOldArgs(args)));err!=nil{returnnil,err}iferr:=stmt.ch.process();err!=nil{returnnil,err}returnemptyResult,nil}
上面的代码不多,非常清晰,当实行Exec时,stmt.ch.block.AppendRow(args)会先把sql参数附加到本地缓存block中,然后(stmt.counter % stmt.ch.blockSize)判断本地缓存大小是否到达阈值,到达则实行Flush(),将数据写入远端。综上,clickhouse-go中的核心实现逻辑是:
底层掩护一个缓存block,同时设置block_size掌握缓存大小实行stmt.Exec时,不会直接写入远程ClickHouse中,而是将插入参数Append到block中每次Append后,判断block的size和block_size的关系,如果恰好整除,则刷新block(即写入clickhouse)因此block_size这个参数很主要,它表示本地缓存的上限,如果很大的话,程序会占用一些内存。笔者起初设置为100000,在调试日志中看不到stmt.ch.logf("[exec] flush block")打印的log,设置小后就看到下面的输出:
...[clickhouse][connect=1][begin]tx=false,data=false[clickhouse][connect=1][prepare][clickhouse][connect=1][readmeta]<-data:packet=1,columns=6,rows=0[clickhouse][connect=1][exec]flushblock[clickhouse][connect=1][exec]flushblock....
总结
很多数据库驱动都支持bulk write特性,clickhouse-go这个驱动也不例外,但是它的文档写得不是很详细,只是在文档中指明要放在begin/commit中做。再加上clickhouse不支持事务,begin/commit这种写法会让人困惑。
本文通过剖析clickhouse-go的源代码,理解bulk write的实行过程,帮助大家梳理其详细实现。
参考资料[1]
clickhouse-go: https://github.com/ClickHouse/clickhouse-go
[2]
sqlx: https://github.com/jmoiron/sqlx
[3]
官方样例代码: https://github.com/ClickHouse/clickhouse-go/blob/master/examples/sqlx.go#L35-L51
[4]
核心代码如下: https://github.com/clickhouse/clickhouse-go/blob/master/stmt.go#L44-L68
[5]
INSERT INTO Statement: https://clickhouse.tech/docs/en/sql-reference/statements/insert-into/
[6]
go-clickhouse-batchinsert: https://github.com/MaruHyl/go-clickhouse-batchinsert/blob/master/batch.go#L349-L354