使⽤numpy实现topk函数操作(并排序)
np.argpartition 难以解决topK
topK是常⽤的⼀个功能,在python中,numpy等计算库使⽤了丰富的底层优化,对于矩阵计算的效率远⾼于python的for-loop实现。因此,我们希望尽量⽤⼀些numpy函数的组合实现topK。
pytorch 库提供了topk函数,可以将⾼维数组沿某⼀维度(该维度共N项),选出最⼤(最⼩)的K项并排序。返回排序结果和index信息。奇怪的是,更轻量级的numpy库并没有直接提供 topK 函数。numpy只提供了argpartition 和 partition,可以将最⼤(最⼩)的K项排到前K位。以argpartition为例,最⼩的3项排到了前3位:
>>> x = np.array([3, 5, 6, 4, 2, 7, 1])
>>> x[np.argpartition(x, 3)]
array([2, 1, 3, 4, 5, 7, 6])
注意,argpartition实现的是 partial sorting,如上例,前3项和其余项被分开,但是两部分各⾃都是不排序的!⽽我们可能更想要topK的⼏项排好序(其余项则不作要求)。因此,下⾯提供⼀种基于argpartition的topK⽅法。
⼀个naive⽅法
最简单的⽅法⾃然是全排序,然后取前K项。缺点在于,要把topK之外的数据也进⾏排序,当K << N时较为浪费时间,复杂度为O ( n log n ) O(n \log n)O(nlogn):
def naive_arg_topK(matrix, K, axis=0):
"""
perform topK based on np.argsort
:param matrix: to be sorted
:param K: select and sort the top K items
:param axis: dimension to be sorted.
:return:
"""
full_sort = np.argsort(matrix, axis=axis)
return full_sort.take(np.arange(K), axis=axis)
# Example
>>> dists = np.random.permutation(np.arange(30)).reshape(6, 5)
array([[17, 28,  1, 24, 23,  8],
[ 9, 21,  3, 22,  4,  5],
[19, 12, 26, 11, 13, 27],
[10, 15, 18, 14,  7, 16],
[ 0, 25, 29,  2,  6, 20]])
>>> naive_arg_topK(dists, 2, axis=0)
array([[4, 2, 0, 4, 1, 1],
[1, 3, 1, 2, 4, 0]])
>>> naive_arg_topK(dists, 2, axis=1)
array([[2, 5],
[2, 4],
[3, 1],
[4, 0],
[0, 3]])
基于partition的⽅法
对于 np.argpartition 函数,复杂度可能下降到 O ( n log K ) O(n \log K)O(nlogK),很多情况下,K << N,此时naive⽅法有优化的空间。
以下⽅法⾸先选出 topK 项,然后仅对前topK项进⾏排序(matrix仅限2d-array)。
def partition_arg_topK(matrix, K, axis=0):
"""
perform topK based on np.argpartition
:param matrix: to be sorted
:param K: select and sort the top K items
:param axis: 0 or 1. dimension to be sorted.
:return:
"""
a_part = np.argpartition(matrix, K, axis=axis)
if axis == 0:
numpy库不具有的功能有row_index = np.arange(matrix.shape[1 - axis])
a_sec_argsort_K = np.argsort(matrix[a_part[0:K, :], row_index], axis=axis)
return a_part[0:K, :][a_sec_argsort_K, row_index]
else:
column_index = np.arange(matrix.shape[1 - axis])[:, None]
a_sec_argsort_K = np.argsort(matrix[column_index, a_part[:, 0:K]], axis=axis)
return a_part[:, 0:K][column_index, a_sec_argsort_K]
# Example
>>> dists = np.random.permutation(np.arange(30)).reshape(6, 5)
array([[17, 28,  1, 24, 23,  8],
[ 9, 21,  3, 22,  4,  5],
[19, 12, 26, 11, 13, 27],
[10, 15, 18, 14,  7, 16],
[ 0, 25, 29,  2,  6, 20]])
>>> partition_arg_topK(dists, 2, axis=0)
array([[4, 2, 0, 4, 1, 1],
[1, 3, 1, 2, 4, 0]])
>>> partition_arg_topK(dists, 2, axis=1)
array([[2, 5],
[2, 4],
[3, 1],
[4, 0],
[0, 3]])
⼤数据量测试
对shape(5000, 100000)的矩阵进⾏topK排序,测试时间为:
K partition(s)naive(s)
108.88422.604
1009.01222.458
10008.90422.506
500011.30522.844
补充:python堆排序实现TOPK问题
# 构建⼩顶堆跳转def sift(li, low, higt):
tmp = li[low]
i = low
j = 2 * i + 1
while j <= higt:  # 情况2:i已经是最后⼀层
if j + 1 <= higt and li[j + 1] < li[j]:  # 右孩⼦存在并且⼩于左孩⼦
j += 1
if tmp > li[j]:
li[i] = li[j]
i = j
j = 2 * i + 1
else:
break  # 情况1:j位置⽐tmp⼩
li[i] = tmp
def top_k(li, k):
heap = li[0:k]
# 建堆
for i in range(k // 2 - 1, -1, -1):
sift(heap, i, k - 1)
for i in range(k, len(li)):
if li[i] > heap[0]:
heap[0] = li[i]
sift(heap, 0, k - 1)
# 挨个输出
for i in range(k - 1, -1, -1):
heap[0], heap[i] = heap[i], heap[0]
sift(heap, 0, i - 1)
return heap
li = [0, 8, 6, 2, 4, 9, 1, 4, 6]
print(top_k(li, 3))
以上为个⼈经验,希望能给⼤家⼀个参考,也希望⼤家多多⽀持。