首先看看Khipu官方github的介绍(来自https://github.com/khipu-io )
就是说Khipu是一个基于Scala/Akka实现的Ethereum协议,目前正在开发中,已经上线的alpha版本的主要feature包括:
1. 尽可能并行的执行block内部的交易。目前80%的block内部的交易都可以并行执行。
2. 为blockchain专门设计实现了一个存储引擎(Kesque),基于Kafka的log引擎开发,对于99%以上的随机读只需要1次disk io。
再来看看项目进展
目前Khipu实现了3个大的模块:
* 节点发现(访问)
* 快速同步(同步到近期的状态数据快照和所有的区块)
* 常规同步(同步区块并执行区块内包含的交易)
后续待开发的feature就不细说了。简单来说就是目前这个0.1.0-alpha版本实现了一个不完全节点(不能说轻节点因为不是光同步区块头),或者说数据备份节点的功能。它可以同步区块,执行并验证区块交易的合法性。但不能作为一个完整的节点存在因为还不能接受,或者产生交易,也不能出块因为挖矿共识这块还没有实现。虽然协议实现还不完全,但项目设计和实现上都有不少值得借鉴的地方,也能看出开发者做了一番深入的思考。
Khipu设计思路
和github上的介绍一致,Khipu的设计思路是比较清晰的,如图所示:
整个service由两大块组成:Akka部分主要负责处理区块(内部的交易),Kafka部分完成一个database的功能,用于存储区块和各种状态数据。首先我们看看Khipu如何处理区块。
区块处理
由于Khipu实现基于Akka,先简单介绍一下akka和actor模型。
什么是akka
Akka是一个用scala编写的库,用于简化编写可容错的,可扩展的,高并发应用。akka使用actor模型来提升抽象能力,提供更好的平台来构建可扩展的,弹性的应用。对于比较难处理的错误,akka采用“let it crash”模型来处理,这种模式可以使得一个任务的处理失败不会导致整个应用的crash,使你的系统拥有强大的自愈能力,也不需要重启来恢复系统。同时akka的分布式部署更加简单透明。
actor模型
actor模型并非什么新鲜事务,它早在20世纪70年代就被提出了,主要目的是为了解决分布式编程中的一系列问题。actor模型具有以下优点:1.更简单的、高度抽象的并发处理。2.异步的,非阻塞的,高性能的事件驱动编程模型。3.非常轻量级的事件驱动处理。
基于上述两个模块,Khipu的区块处理逻辑不复杂,其主要的工作都是在并行执行区块交易上,这段并行处理的逻辑就是一套map reduce,(极端)简化的流程如下图:(要感谢Scala的map reduce特性,换Java来估计代码量翻好多倍)
这里Map task容易理解,分开区块内的交易到多个并行的世界里面执行各自的交易并改变各自的状态,主要的问题在于Reduce阶段,如何合并这一系列的世界状态。在合并这一块,Khipu采用了冲突检测的办法,定义了两种不同的冲突模式(并行的状态冲突和世界状态冲突)。
object ProgramState { trait ParallelRace case object OnAccount extends ParallelRace case object OnError extends ParallelRace } object BlockWorldState { sealed trait RaceCondition case object OnAddress extends RaceCondition case object OnAccount extends RaceCondition case object OnStorage extends RaceCondition case object OnCode extends RaceCondition } |
lazy val kesque = new Kesque(kafkaProps) log.info(s"Kesque started using config file: $kafkaConfigFile") private val futureTables = Future.sequence(List( Future(kesque.getTable(Array(KesqueDataSource.account))), Future(kesque.getTable(Array(KesqueDataSource.storage))), Future(kesque.getTable(Array(KesqueDataSource.evmcode))), Future(kesque.getTimedTable(Array( KesqueDataSource.header, KesqueDataSource.body, KesqueDataSource.receipts ), 1024000)) )) private val List(accountTable, storageTable, evmcodeTable, blockTable) = Await.result(futureTables, Duration.Inf) //private val headerTable = kesque.getTimedTable(Array(KesqueDataSource.header), 1024000) //private val bodyTable = kesque.getTable(Array(KesqueDataSource.body), 1024000) //private val receiptTable = kesque.getTable(Array(KesqueDataSource.receipts), 1024000) lazy val accountNodeDataSource = new KesqueDataSource(accountTable, KesqueDataSource.account) lazy val storageNodeDataSource = new KesqueDataSource(storageTable, KesqueDataSource.storage) lazy val evmCodeDataSource = new KesqueDataSource(evmcodeTable, KesqueDataSource.evmcode) lazy val blockHeadersDataSource = new KesqueDataSource(blockTable, KesqueDataSource.header) lazy val blockBodiesDataSource = new KesqueDataSource(blockTable, KesqueDataSource.body) lazy val receiptsDataSource = new KesqueDataSource(blockTable, KesqueDataSource.receipts) |
def read(key: Array[Byte], topic: String): Option[TVal] = { try { readLock.lock val valueIndex = topicIndex(topic) caches(valueIndex).get(Hash(key)) match { case None => val hash = Hash(key) hashOffsets.get(hash.hashCode, valueIndex) match { case IntIntsMap.NO_VALUE => None case offsets => var foundValue: Option[TVal] = None var foundOffset = Int.MinValue var i = offsets.length - 1 // loop backward to find newest one while (i >= 0 && foundValue.isEmpty) { val offset = offsets(i) val (topicPartition, result) = db.read(topic, offset, fetchMaxBytes).head val recs = result.info.records.records.iterator while (recs.hasNext) { // NOTE: the records are offset resversed !! val rec = recs.next if (rec.offset == offset && java.util.Arrays.equals(db.getBytes(rec.key), key)) { foundOffset = offset foundValue = if (rec.hasValue) Some(TVal(db.getBytes(rec.value), rec.timestamp)) else None } } i -= 1 } foundValue foreach { x => caches(valueIndex).put(hash, (x, foundOffset)) } foundValue } case Some((value, offset)) => Some(value) } } finally { readLock.unlock() } } |