黑名单的设计与实现

简单的来说,黑名单就是把一堆的数字存起来,然后检查其中是否存在某一个数字,而这个查询可以做到nlogn级别的,一般来说现实中话虽如此,也许黑名单查询会面临着很多歌用户,都需要查询他们是否在黑名单中的情况,这个时候,依旧采用每次nlogn级别的算法,就会遇到性能问题。

如果采用类似归并的算法,将全部的黑名单id取出来排序,然后全部的用户id取出来,两个数组各自排序,然后从前到后m+n的扫,判断用户id是否存在在黑名单里面。这样就可以将时间从m *nlog(n) 降到max(m+n , nlog(n) , mlog(m))。

function blackList()
{
//做一些初始化的工作
if(!$this->redis->get('flag')){
$this->redis->set("flag" , 1);
for($i = 0;$i < self::$MAX;$i++){
$tmp = rand(1,10000000);
$this->redis->zAdd(self::$REDISKEY , $tmp , $tmp);
}
}
$arr = array();
for($i = 0;$i < self::$CASE ; $i++){
$arr[] = rand(1,10000000);
}
$begin = microtime(true) * 10000;
$this->origin($arr);
$end = microtime(true) * 10000;
$end = $end - $begin;
echo "一共用时{$end}<br/>";
$begin = microtime(true) * 10000;
$this->secd($arr);
$end = microtime(true) * 10000 - $begin;
echo "一共用时{$end}<br/>";

这里是调用的地方的函数

protected function origin(&$arr)
{
//计数,是否存在zset中
$cnt = 0;
for($i = 0, $len = count($arr);$i < $len;$i++){
$tmp = $this->redis->zScore(self::$REDISKEY , $arr[$i]);
if(!empty($tmp)){
$cnt++;
}
}
echo "一共有{$cnt}个数字在黑名单中<br/>";
}

这里是通过第一种方法,即每次通过向redis查询是否存在对应的id的方法

protected function secd(&$arr)
{
$blacklist = $this->redis->zRange(self::$REDISKEY , 0 , -1);
$alen = count($arr);
$pointer = 0;
$cnt = 0;
sort($arr , SORT_NUMERIC);
for($i = 0 , $len = count($blacklist);$i < $len;$i++){
while($pointer < $alen && $arr[$pointer] <= $blacklist[$i]){
if($arr[$pointer] == $blacklist[$i]){
$cnt ++;
}
$pointer ++;
}
}
echo "一共有{$cnt}个数字在黑名单中
";
}

这里是我提出的,将所有的id获取,然后通过m+n的方式获取。
下面是最关键的环节,具体性能影响如何?
当用户有10w个,黑名单长为1w的时候,第一种方法84587.099609375ms,第二种方法2603.099609375ms,性能优化大约是40倍
因为可以进行这种大规模查询的人都是编辑,而非用户,所以2s的等待时间应该是可以接受的

通过类似的思路,可以对hash和zset以及mc进行一个对比

$blacklist = array();
for($i = 0;$i < self::$MAX;$i++){
$blacklist[] = $this->memcache->get(self::$mc_prefix . $i);
}
$alen = count($arr);
$pointer = 0;
$cnt = 0;
sort($arr , SORT_NUMERIC);
sort($blacklist , SORT_NUMERIC);
for($i = 0 , $len = count($blacklist);$i < $len;$i++){
while($pointer < $alen && $arr[$pointer] <= $blacklist[$i]){
if($arr[$pointer] == $blacklist[$i]){
$cnt ++;
}
$pointer ++;
}
}
echo “一共有{$cnt}个数字在黑名单中<br/>”;

这里是通过mc存放uid的方式进行的,效率么19783.2421875要比第一种方法大约快4倍单式依旧比第二种方法慢10倍的样子。

redis还有另外一种数据结构,就是hash,理论上来说是可以做到O(1)的插入和查找,对于这种kv结构来说最是合适,因此添加对hash的判断,依旧是第一种方法,第二种方法的测试.

public function redisHash($arr)
{
$len = count($arr);
$cnt = 0;
for($i = 0;$i < $len;$i++){
//$this->redis = new BaseModelRedis(SINASRV_REDIS_HOST);
$tmp = $this->redis->hGet(self::$redisHash , $arr[$i]);
//if(!empty($tmp)){
if($tmp){
$cnt++;
}
}
echo “一共有{$cnt}个数字在黑名单中\n”;
}

这个时候测得的用时是86803ms

public function redisHashSort(&$arr)
{
$begin = microtime(true) * 10000;
$blacklist = $this->redis->hKeys(self::$redisHash);
$middle = microtime(true) * 10000 – $begin;
$len = count($blacklist);
$alen = count($arr);
$pointer = 0;
$cnt = 0;
sort($arr , SORT_NUMERIC);
sort($blacklist , SORT_NUMERIC);

$middle = microtime(true) * 10000 – $begin;
echo “排序之后的是时间是 : ” . $middle . “ms的时间<br/>”;
for($i = 0 ;$i < $len;$i++){
while($pointer < $alen && $arr[$pointer] <= $blacklist[$i]){
if($arr[$pointer] == $blacklist[$i]){
$cnt ++;
}
$pointer ++;
}
}
echo “一共有{$cnt}个数字在黑名单中<br/>”;
$end = microtime(true) * 10000 – $begin;
echo “redis hashGetall 后sort一共用时{$end}<br/><br/>”;
}

首先读取所有的数据,然后再排序,得到的时间是3000ms,这个效率差异原因,主要是集中在了二次排序上面,zset去得的数据是不需要排序的,而hash是需要排序的

如果正式的模拟单次用户查询,每次都需要连接的时候,20W的用户,10000的黑名单,需要耗费时间是250000ms,就算是减小一个数量级,时间大概是25000,线性的增加

Leave a comment

Your email address will not be published.

*