查阅了一些资料,终于算是理解了一些基于韶光戳的方案和思路。大体如下:通过工具把线上某段韶光的流量记录下来,个中包含韶光戳等信息,然后通过回放引擎把流量回放出去。
办理思路目前流量回放集中于HTTP流量,以是之前写过的引擎的发压部分还是可以连续利用。以是我也有了自己的办理思路:
个中最最核心的该当便是行列步队的选择,这里我用看java的java.util.concurrent.DelayQueue,也没找到其他更好的框架了。其实在一开始我想复用自己写之前写的日志回放框架的行列步队,也考试测验对集中常用行列步队进行了性能测试:

本来想是用多线程去读取日志的过程中,通过判断每一条日志是否到韶光点,然后丢到一个线程安全的行列步队中,后面用线程池取行列步队中的工具,发送要求的。但是仔细想来太繁芜了,流量过了好几手,不利于实现和拓展功能。
然后我重新对java.util.concurrent.DelayQueue进行了性能测试延迟行列步队DelayQueue性能测试,有了测试结果之后,就可以放心大胆地干了。关于延迟行列步队的基本利用可参考下单延迟10s撤单性能测试。
实现总体来说实现起来思路比较清晰,我分成三部分分享。
属性定义我首先定义了一个com.funtester.frame.execute.ReplayConcurrent.ReplayLog日志工具,用于存储每一个要求日志然后定义一个com.funtester.frame.execute.ReplayConcurrent#logs用来存储日志,这里往事重提一下,千万级别的日志工具,存储在内存里面是OK的,以是我才会采取这种办法。为什么要从日志文件中转一手呢?由于日志是不按照韶光戳排序的。再定义com.funtester.frame.execute.ReplayConcurrent#logDelayQueue用来当作回放要求行列步队,也便是流量中转站,生产者从com.funtester.frame.execute.ReplayConcurrent#logs中取,clone之后丢到行列步队中;消费者从行列步队中取工具,丢给线程池。定义com.funtester.frame.execute.ReplayConcurrent#handle当作是处理流量的方法,便是把流量工具包装成HttpRequestBase工具然后发送出去生产者确定利用异步线程完成,利用Java自定义异步功能实践。根据com.funtester.frame.execute.ReplayConcurrent#logDelayQueue性能测试数据,添加com.funtester.frame.execute.ReplayConcurrent#threadNum参数来掌握。多线程取com.funtester.frame.execute.ReplayConcurrent#logs工具,用到了几个线程安全类,用于保障多线程是顺序读取,避免了在延迟行列步队中进行排序操作。利用了com.funtester.frame.execute.ReplayConcurrent#getMAX_LENGTH掌握行列步队的长度。貌似没找到限定延迟行列步队长度的API。只能自己实现了,思路当添加日志数量超过最大值,存储当前行列步队长度。当长度大于最大长度,则不才一次添加工具前,休眠1s,然后在重置本地存储的行列步队长度。这样可以办理这个问题。当然最大值设置足够高,避免1s中内行列步队变成空。回放引擎设计50万QPS,以是我就先设置了80万的最大长度。后续可以根据实际情形调度。消费者依旧利用异步,生产者利用API时java.util.concurrent.DelayQueue#poll(long, java.util.concurrent.TimeUnit),避免壅塞导致线程无法终止。引入com.funtester.frame.execute.ReplayConcurrent#getMultiple掌握流量回放的倍数。利用com.funtester.frame.execute.ReplayConcurrent#getTotal记录回放的日志数量。利用com.funtester.frame.execute.ReplayConcurrent#getHandle处理日志工具。代码如下:
package com.funtester.frame.executeimport com.funtester.base.bean.AbstractBeanimport com.funtester.frame.SourceCodeimport com.funtester.utils.LogUtilimport com.funtester.utils.RWUtilimport org.apache.logging.log4j.LogManagerimport org.apache.logging.log4j.Loggerimport java.util.concurrent.DelayQueueimport java.util.concurrent.Delayedimport java.util.concurrent.ThreadPoolExecutorimport java.util.concurrent.TimeUnitimport java.util.concurrent.atomic.AtomicIntegerimport java.util.concurrent.atomic.LongAdder/ 回放功能实行类/class ReplayConcurrent extends SourceCode { private static Logger logger = LogManager.getLogger(ReplayConcurrent.class); static ThreadPoolExecutor executor static boolean key = true static int MAX_LENGTH = 800000 int threadNum = 2 String name String fileName int multiple Closure handle List<ReplayLog> logs DelayQueue<ReplayLog> logDelayQueue = new DelayQueue<ReplayLog>() LongAdder total = new LongAdder() ReplayConcurrent(String name, String fileName, int multiple, Closure handle) { this.name = name this.fileName = fileName this.multiple = multiple this.handle = handle } void start() { if (executor == null) executor = ThreadPoolUtil.createCachePool(THREADPOOL_MAX, "R") time({ RWUtil.readFile(fileName, { def delay = new ReplayLog(it) if (delay.getTimestamp() != 0) logDelayQueue.add(delay) }) }, 1, "读取日志$fileName") logs = logDelayQueue.toList() def timestamp = logs.get(0).getTimestamp() logDelayQueue.clear() AtomicInteger index = new AtomicInteger() AtomicInteger size = new AtomicInteger() def LogSize = logs.size() AtomicInteger diff = new AtomicInteger() threadNum.times { fun { while (key) { if (index.get() % LogSize == 0) diff.set(getMark() - timestamp) if (index.get() % MAX_LENGTH == 0) size.set(logDelayQueue.size()) if (size.get() > MAX_LENGTH) { sleep(1.0) size.set(logDelayQueue.size()) } def replay = logs.get(index.getAndIncrement() % LogSize) logDelayQueue.add(replay.clone(replay.timestamp + diff.get())) } } } threadNum.times { fun { while (key) { def poll = logDelayQueue.poll(1, TimeUnit.SECONDS) if (poll != null) { executor.execute { multiple.times { handle(poll.getUrl()) total.add(1) } } } } } } fun { while (key) { sleep(COUNT_INTERVAL as double) int real = total.sumThenReset() / COUNT_INTERVAL as int def active = executor.getActiveCount() def count = active == 0 ? 1 : active logger.info("{} ,实际QPS:{} 生动线程数:{} 单线程效率:{}", name, real, active, real / count as int) } } } / 中止 @return / def stop() { key = false executor.shutdown() logger.info("replay压测关闭了!") } / 日志工具/ static class ReplayLog extends AbstractBean implements Delayed { int timestamp String url ReplayLog(String logLine) { def log = LogUtil.getLog(logLine) this.url = log.getUrl() this.timestamp = log.getTime() } ReplayLog(int timestamp, String url) { this.timestamp = timestamp this.url = url } @Override long getDelay(TimeUnit unit) { return this.timestamp - getMark() } @Override int compareTo(Delayed o) { return this.timestamp - o.timestamp } protected Object clone(int timestamp) { return new ReplayLog(timestamp, this.url) } }}
自测
下面是我的测试用例:
package com.okcoin.hickwall.pressesimport com.okcoin.hickwall.presses.funtester.frame.execute.ReplayConcurrentimport com.okcoin.hickwall.presses.funtester.httpclient.FunHttpclass RplayT extends FunHttp { static String HOST = "http://localhost:12345" public static void main(String[] args) { def fileName = "api.log" new ReplayConcurrent("测试回放功能", fileName, 1, {String url -> getHttpResponse(getHttpGet(HOST + url)) }).start() }}
测试结果如下:
22:45:43.510 main ###### # # # # ####### ###### ##### ####### ###### ##### # # # ## # # # # # # # # #### # # # # # # #### ##### # #### ##### # # # # # # # # # # # # # # ##### # # # ###### ##### # ###### # # 10:56:18 F-5 测试回放功能, 实际QPS:23162 生动线程数:0单线程效率:2316210:56:23 F-5 测试回放功能, 实际QPS:36575 生动线程数:6单线程效率:609510:56:28 F-5 测试回放功能, 实际QPS:38974 生动线程数:21单线程效率:1855 10:56:33 F-5 测试回放功能, 实际QPS:32798 生动线程数:8单线程效率:409910:56:38 F-5 测试回放功能,实际QPS:35224 生动线程数:4单线程效率:880610:56:43 F-5 测试回放功能,实际QPS:28426 生动线程数:0单线程效率:2842610:56:48 F-5 测试回放功能, 实际QPS:33607 生动线程数:6单线程效率:560110:56:53 F-5 测试回放功能,实际QPS:34392 生动线程数:0单线程效率:34392