前调 —— Trap or tease?

”你在做一款游戏,游戏需要打怪,击杀怪物会掉落宝物,宝物可能有很多种,也可能不掉宝,几率不一样但是加起来为1,请写一个function,按照几率返回掉落的宝物。”

这是前两天老板面试一个候选Senior Engineer出的算法题,我作为面试旁观者心想:老板你出的这个题,也太简单了吧!!!!这叫Senior?赶紧给我升职加薪!!我分分钟搞定。可是后来被生生的打脸了,他说:一道好的面试题一定是深入浅出的,让面试者不断发掘更优秀的解题办法,这个探索过程才是Engineer的核心能力,而不是上来就是一道不负责任的LeetCode动态规划。那么这个问题到底是有意思在哪里呢?它乍一看非常简单,就是一个抽奖问题,比如我们抽中金奖概率为2%,抽中银奖概率为3%,抽中铜奖为5%,然后抽不中概率为90%,那我们马上就能写一个很简单的版本出来:

//Prize and Probablity (真黑心啊这几率。。。)
var prizeMap = map[string]float64{
  "gold": 0.02,
  "silver": 0.03,
  "bronze": 0.05,
  "nothing" : 0.9,
}

func luckyDraw(prizeMap map[string]float64) string{
  rand.Seed(time.Now().UnixNano())
  random := rand.Float64()
  for k, v := range prizeMap {
    if random < v {
      return k
    }
    random -= v
  }
  return "nothing"
}

我: 老板你看!算法很简单,我先把所有的candidate和对应的几率放进一个map,然后通过随机数(伪随机)生成一个小数,然后按照顺序减掉map里面的几率值,最后减不掉的就是对应的选项。
老板: 这个方法的时间和空间复杂度都是多少呢?
我: 空间O(n), 时间O(n), 很快了吧~
老板: 嗯嗯那我要是有10万种奖品种类呢?你这个是不是有点慢啊。
我:啊这。。。
老板:再比如说我先在要加一个钻石奖的选项,概率为1%,其他的不变,你该怎么改数值呢?
我:啊这。。。
委屈
老板: 你再想想
我:好的

中调 —— Engineering the way through

I always believe that a good software engineer should be a good product owner with user perspective – Brabalauwka

面试中的算法题我一直非常鄙视,觉得日常生产中根本用不到。现在才发老板出的题目都别出心裁,考算法的过程中其实有很多环节都是很重要的:面试者能否清晰的定义面试官出的题目需求(需求定义与理解),提出一些input和output并且询问理解是否正确(和PM的需求沟通), 快速设计算法并且沟通思路(技术评审),写出算法并且思考edge case(快速开发测试),思考更优解(维护与性能优化)。所以这道题完全满足了这个思路,你不仅仅要考虑每次抽奖的算法,还要考虑日常生产中可能出现的情况。

经过一些思考我得到了以下办法:

//DataSet with weighted collection
var weightedPrizes = map[int][]string{
  1: []string{"Dyson", "iPhone"},
  2: []string{"XBox", "Switch"},
  3: []string{"Speaker", "AirPods"},
  50: []string{"nothing"},
}
//User Closure to create the function
func getLuckyDrawFunc(prizeMap map[int][]string) func()string {
  //Prepare a linear arrangement of all probabilities
  current := 0
  //Linear prizes markers
  prizeRange := make([]int, 0)
  //Actual Prizes
  actualPrizes := make([]string, 0)
  //Make prize markers
  for weight, prizes  := range prizeMap {
    for _, prize := range prizes {
      current += weight
      prizeRange = append(prizeRange, current)
      actualPrizes = append(actualPrizes, prize)
    }
  }
  rand.Seed(time.Now().UnixNano())
  //return the func that used to do lucky draw
  return func() string {
    random := rand.Intn(current)
    //Binary Search the prizeRange
    low := 0
    high := len(prizeRange) - 1
    for high >= low {
      mid := (low + high) / 2
      if random < prizeRange[mid] {
        high = mid - 1
      } else {
        low = mid + 1
      }
    }
    return actualPrizes[low]
  }
}

我:为了满足方便的加减奖品池,更改几率:我们使用用权重计数的map来储存奖品对应的权重,每次更改map新增或者删减奖品我们就重新生成一次里面的奖品几率。这是工程中的优化实现。同时,使用闭包来保护隐私~ 为了减少每次抽奖的时间复杂度,我们建立一个递增的奖品对应的marker,然后通过Binary Search的方法来搜索。
老板:空间和时间复杂度呢? 我:空间复杂度O(3n), 准备时间为O(nlog(n)), 抽奖时间复杂度O(log(n))
老板:不错!一般面试者写成这样我就给过了,但是还有更优解,我面试了这么多人还没有一个人写出来过。
我:。。。。。。我再研究研究。。。
老板: 这个东西叫做Weigthed Random Sampling, 我们后端用到的Load Balancer算法之一。而且能应用到生活中的各个领域

后调 —— Algorithms are sexy

之所以Binary Search不算最优秀的一种接法是因为在二分查找,移动指针的时候很容易出现+1,-1等指针错误或者index out of range error。所以我思考了很久更优秀的解法,也问了朋友能不能提供一些想法,可是我们都没有更好的灵感。最后,朋友发给的我一篇文章令我非常动容,令我感触最深。更优秀的算法出现了:这篇文章介绍了两种极致的加权随机采样算法,它们太优美了;让人不得不惊叹创造者的思想。所以下面的部分我主要用通俗的方法来介绍这两种算法的魅力。

The Hopscotch Selection

我们回顾下linear search算法,核心原理是产生一个从0到1随机数比如0.687,然后挨个减去我们的候选几率直到这个随机数比候选几率小,就找到了对应的选项。首先我们把用float64代表的几率换成int权重,然后相加起来拿到权重和也能进行这样的算法运算;前提是所有的权重加起来能不超过int64的限制。那么HopScotch的精华在于哪里呢,当然在于Hop了~

//interface{} has the format of prize name and weight
type PrizeCollection [][]interface{}

var a = PrizeCollection{
  {"Dyson",1},
  {"iPhone",1},
  {"XBox", 2},
}

func getHopScotchFunc(collection PrizeCollection) func() string {
  //Reverse Sort the Weights
  sort.Slice(collection, func(i, j int) bool {
    return collection[j][1].(int) < collection[i][1].(int)
  })
  sum := 0
  collectionWeightSum := make([]int, len(collection))
  for i := 0; i < len(collection); i++ {
    sum += collection[i][1].(int)
    collectionWeightSum[i] = sum
  }

  rand.Seed(time.Now().UnixNano())
  //return the func that used to do lucky draw
  return func() string {
    target := rand.Intn(collectionWeightSum[len(collectionWeightSum)-1])
    guessIndex := 0
    for true {
      if collectionWeightSum[guessIndex] > target {
        break
      }
      //Retrieve the actual weight of that index
      currentWeight := collection[guessIndex][1].(int)
      //Calculate the difference between the current sum and the target
      hopDistance := target - collectionWeightSum[guessIndex]
      //Calculate the safe distance (indice) to jump
      hopIndex := 1 + hopDistance/currentWeight
      //Jump
      guessIndex += hopIndex
    }
    return collection[guessIndex][0].(string)
  }
}

算法的第一部分首先要求input是一个map或者所有的奖品对应的名字和权重,之所以要一一对应是因为我们要做sorting,把所有的权重逆向排序,然后一次向家所有的权重,依然是不能超过int64的限制。排序的好处是什么呢?如果是linear search,权重大的在前面的话会减少搜索的次数,因为权重大的更有可能被选中。linearsearch的核心在于依次减去权重,是一个O(n)的时间复杂度。所以HopScotch算法在于减去权重的过程中进行跳跃,也就是运用了排序后,后面的数字一定比前面小的原理。比如我随机生成了一个数576,第一个数的权重为100。那么,在我不知道576应该落在哪个区间的情况下我可以使用跳跃:(576-100)/100 = 4 我知道后面所有的权重都比当前权重小,也就是说我在拿到576的情况下,我可以至少向后跳跃4个选项而不用去计算这些权重是什么,从而达到了一种远小于O(n)的时间复杂的。

当然这个算法的best case在于所有的权重的一样的情况,那么我们100%只需要O(1)的时间就能选到。worst case在于权重为2的次方依次递减:32,16,8,4,2,1,这就意味着我们每次只能跳跃1个。这个时候时间复杂度接近于linear search。但是!如果是这样的权重分布的话,本身被选中后面的几率就是很小的。作者也对这个算法进行了Benchmark,分别从不同的weight distribution 进行测试,可以看到这个算法表现是非出色:

similar weight similar weight similar weight similar weight

The Alias Method – Ultimate Form

下面我们就迎来了Alias Method,它是一个极其优美的O(1)复杂度的算法,既然是复杂度为O(1)的算法,他必须在常数时间内找到这个随机数归属的index和这个index所属的candidate。所以我们需要找到一种mapping的算法,这也就是Alias Method的精华所在:

func getAliasFunc(collection PrizeCollection) func()string {
  totalPrizeNum := len(collection)
  sum := 0
  for i := 0; i < len(collection); i++ {
    sum += collection[i][1].(int)
  }
  //Get Average
  average	:= float64(sum) / float64(totalPrizeNum)
  //Aliases is a array of [float, int] so we use interface{}
  aliases := make([][]interface{}, totalPrizeNum)
  //Initialisaition
  for i := 0; i < totalPrizeNum; i++ {
    aliases[i] = []interface{}{1.0, 0.0}
  }
  //U can check the explanation first before coming back~
  bigWeights := make([][]interface{},0)
  smallWeights := make([][]interface{},0)
  for index, prizeItem := range collection {
    if float64(prizeItem[1].(int)) < average {
      smallWeights = append(smallWeights, []interface{}{index, float64(prizeItem[1].(int)) / average })
    } else {
      bigWeights = append(bigWeights, []interface{}{index, float64(prizeItem[1].(int)) / average })
    }
  }
  bigWeightsPosition := 0
  for i := 0; i < len(smallWeights); i++ {
    aliases[smallWeights[i][0].(int)] = []interface{}{smallWeights[i][1].(float64), bigWeights[bigWeightsPosition][0].(int)}
    bigWeights[bigWeightsPosition][1] = bigWeights[bigWeightsPosition][1].(float64) - (1 - smallWeights[i][1].(float64))
    if bigWeights[bigWeightsPosition][1].(float64) <= 1 {
      smallWeights = append(smallWeights, bigWeights[bigWeightsPosition])
      bigWeightsPosition++
      if bigWeightsPosition >= len(bigWeights) {
        break
      }
    }
  }
  //---------------------------return the func that used to do lucky draw
  rand.Seed(time.Now().UnixNano())
  return func() string {
    target := rand.Float64()*float64(len(collection))
    targetAlias := int(target)
    targetWeight := target - float64(targetAlias)
    if targetWeight < aliases[targetAlias][0].(float64) {
      return collection[targetAlias][0].(string)
    } else {
      return collection[aliases[targetAlias][1].(int)][0].(string)
    }
  }
}

由于golang缺乏复杂的数据结构和好用的mothod,代码比较不可读,奉上python版本:

def prepare_aliased_randomizer(weights):
    N = len(weights)
    avg = sum(weights)/N
    aliases = [(1, None)]*N
    smalls = ((i, w/avg) for i,w in enumerate(weights) if w < avg)
    bigs = ((i, w/avg) for i,w in enumerate(weights) if w >= avg)
    small, big = next(smalls, None), next(bigs, None)
    while big and small:
        aliases[small[0]] = (small[1], big[0])
        big = (big[0], big[1] - (1-small[1]))
        if big[1] < 1:
            small = big
            big = next(bigs, None)
        else:
            small = next(smalls, None)

    def weighted_random():
        r = random()*N
        i = int(r)
        odds, alias = aliases[i]
        return alias if (r-i) > odds else i

    return weighted_random

这个算法的核心原理是把候选人按照候选的Average的weight进行分治。假设我们有以下几种候选和对应的weight:

var a = PrizeCollection{
  {"Dyson",1},   //20%几率
  {"iPhone",1},  //20%几率
  {"Nothing",3}, //60%几率
}
//创造n个候选Bucket,n = len(PrizeCollection)
aliases: [{候选Bucket0}, {候选Bucket1}, {候选Bucket2}]

//所以这三种奖品的平均值为
avg := 1+1+3/3 = 5/3

small = [
  {"Dyson","候选Bucket0","weight/avg => 3/5"}, 
  {"iPhone","候选Bucket1","weight/avg => 3/5"}
]
big = [
  {"Nothing","候选Bucket2","weight/avg => 9/5"}
]

所有比平均值小的会被分到一个组,所有比平均值大的会被分到另一个组,精华来了,我们可以把每个小于Avg的候选人放进他们所在的那个index的Bucket,但是每个Bucket容量为多少呢?容量为Avg,我们用1代表,小于Avg的候选人所占部分在数组中用一个小数代表!那说明我们还有一些富裕地方(Dyson只占一个Bucket的3/5)。那么我们可以为每个小于平均值的的Bucket配平一部分权重大的候选人(把大的拆分进每个小候选人)从而填平的状态

//我们率先用候选bucket2填平候选bucket0所在的Dyson
aliases: [{3/5, "填平部分来自候选Bucket2"}, {候选Bucket1}, {候选Bucket2}]
//then 现在的候选Bucket2对应的 {"Nothing","候选Bucket2","9/5 - 2/5 => 7/5"},他任然大于1
//then 继续配平
aliases: [{3/5, "填平部分来自候选Bucket2"}, {3/5, "填平部分来自候选Bucket2"}, {5/5没有填平}]

所以我们生成一个从0到bucket数量的float假设为1.85,我们就知道1.85会分配到Bucket1,但是其中的0.85是bucket中的哪部分呢?3/5的部分来自于本Bucket(也就是iphone),2/5的部分来自于Bucket2的填平(也就是Nothing),0.85>3/5所以是来自Bucket2的部分!按照这样,我们就能用随机数的整数部分表示选到了哪个桶,Bucket下标来表示当前Bucket桶所在的Prize是什么,然后用随机数的小数部分来获取我们到底使用这个桶的哪个部分,如果是填平部分我们就选择填平部分的那个Bucket!!再回去看一看这个算法是不是很清楚了呢!

我惊叹于这个算法的核心魅力在于mapping,巧妙的利用了数组下标作为当前Bucket的index,数组只存两个值一个是当前Bucket的阈值,另一个是用于填平当前Bucket的另外一个Bucket的下标。从而实现了O(1)的时间复杂度

后记

如果你看到了这里说明你对知识的好奇超过了我惨淡的文笔。记录算法的学习过程能够帮助一个程序员加深印象。同时也能够锻炼我将思维转化成有结构的文字的能力。希望自己能够保持这份好奇继续了解更多吧~~

A Faster Weighted Random Choice Bruce Hill February 2, 2017
Darts, Dice, and Coins: Sampling from a Discrete Distribution Keith Schwarz December 29, 2011

25 March 2022