补充数据链接:
candidates: https://pan.baidu.com/s/1nvGWbrV
bg_db: https://pan.baidu.com/s/1sllFLAd
每个字符串长度都是23,只要前面20个字符就行,由于数据太大,我只传了五分之一,大神们可以挑战一下,有速度快的可以贴一下代码,让小弟拜读一下,谢谢!
下面是正式的问题:
我现在有两个字符串数组,姑且称为candidates和bg_db,全部都是长度为20的短字符串,并且每个字符串的每个字符只有ATCG四种可能(没错!就是基因组序列啦!):
candidates = [
'GGGAGCAGGCAAGGACTCTG',
'GCTCGGGCTTGTCCACAGGA',
'...',
# 被你看出来啦,这些其实人类基因的片段
]
bg_db = [
'CTGCTGACGGGTGACACCCA',
'AGGAACTGGTGCTTGATGGC',
'...',
# 这个更多,有十亿左右
]
我的任务是对candidates的每一个candidate,找到bg_db中所有与其小于等于4个差异的记录,
举个例子来说:
# 上面一条为candidate,即candidates的一个记录
# 中间|代表相同,*代表不相同
# 下面一条代表bg_db的一条记录
A T C G A T C G A T C G A T C G A T C G
| | | | | | | | | | | | | | | | | | | | # 差异为0
A T C G A T C G A T C G A T C G A T C G
A T C G A T C G A T C G A T C G A T C G
* | | | | | | | | | | | | | | | | | | | # 差异为1
T T C G A T C G A T C G A T C G A T C G
A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | | | | | | | | | | # 差异为2
T T C G T T C G A T C G A T C G A T C G
A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | * | | | | | | | | # 差异为3
T T C G T T C G A T C C A T C G A T C G
A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | * | | | * | | | | # 差异为4
T T C G T T C G A T C C A T C A A T C G
我的问题是如果快速地找到:每一个candidate在bg_db中与之差异小于等于4的所有记录,
如果采用暴力遍历的话,以Python为例:
def align(candidate, record_from_bg_db):
mismatches = 0
for i in range(20):
if candidate[i] != record_from_bg_db[i]:
mismatches += 1
if mismatches >= 4:
return False
return True
candidate = 'GGGAGCAGGCAAGGACTCTG'
record_from_bg_db = 'CTGCTGACGGGTGACACCCA'
align(candidate, record_from_bg_db) # 1.24微秒左右
# 总时间:
10000000 * 1000000000 * 1.24 / 1000 / 1000 / 60 / 60 / 24 / 365
# = 393
# 1千万个candidates,10亿条bg_db记录
# 耗时大约393年
# 完全无法忍受啊
我的想法是,bg_db是高度有序的字符串(长度固定,每个字符的可能只有四种),有没有什么算法,可以让candidate快速比较完所有的bg_db,各位大神,求赐教。
我用blast blastn-short
:)
可以尝试用字典树来保存所有的字符串。然后在查询的时候就可以用在字典树上遍历的方法。
在树上遍历的时候,可以维护一个当前节点的序列,这个序列里保存着当前遍历到的节点和对应节点mismatch的数量。
在遍历下一个节点的时候,要把当前序列里所有的节点都尝试向下,并形成新的节点序列。
好处是可以把很多个串的当前位放在一起进行比较,可以节约一些时间。由于每个位置选择不多,mismatch也不大,所有应该不会出现当前节点序列膨胀过大的情况。(这是猜想… 没太认真验证过…)
def match(candidate, root):
nset = [[],[]]
currentId = 0
nset[currentId].append((root, 0))
for ch in candidate:
nextId = 1 - currentId
for item in nset[currentId]:
node, mismatch = item
for nx in node.child:
if nx.key == ch:
nset[nextId].append((nx, mismatch))
else:
if mismatch:
nset[nextId].append((nx, mismatch - 1))
nset[currentId].clear()
currentId = 1 - currentId
return nset[currentId]
上面的代码就是一个大概的意思。如果用C++写的话会再快很多。
整个过程都可以用集群做分布式计算。
写一个思路
candidates = [
'GGGAGCAGGCAAGGACTCTG',
'GCTCGGGCTTGTCCACAGGA',
'...',
# 被你看出来啦,这些其实人类基因的片段
]
bg_db = [
'CTGCTGACGGGTGACACCCA',
'AGGAACTGGTGCTTGATGGC',
'...',
# 这个更多,有十亿左右
]
因为你的数据其实是很有特点的,这里可以进行精简。
因为所有的字符串都是20个字符长度,且都由ATCG
四个字符组成。那么可以把它们变换为整数来进行比较。
二进制表现形式如下
A ==> 00
T ==> 01
C ==> 10
G ==> 11
因为一个字符串长度固定,每个字符可以由2个比特位表示,所以每个字符串可以表示为一个40
位的整数。可以表示为32+8
的形式,也可以直接使用64
位整形。建议使用C
语言来做。
再来说说比较。
因为要找到每一个candidate在bg_db中与之差异小于等于4的所有记录
,所以只要两个整数一做^
按位异或操作,结果中二进制中1不超过8个,且这不超过8个1最多只能分为4个组
的才有可能是符合要求的(00^11=11,10^01=11)。
把结果的40
个比特位分作20个组,那么就是说最多只有4
个组为b01
b10
b11
这三个值,其余的全部为b00
。
那么比较算法就很好写了。
可以对每个字节(四个组)获取其中有几个组是为三个非零值的,来简介获取整体的比较结果。
因为每个字节只有256
种可能的值,而符合条件的值只有,所以可以先将结果存储起来,然后进行获取。3^4=81
种
这里给出一个函数,来获取结果中有几个是非零组。
/*****************下面table中值的生成******//**
int i;
for( i=0;i<256;++i){
int t =0;
t += (i&0x01 || i&0x02)?1:0;
t += (i&0x04 || i&0x08)?1:0;
t += (i&0x10 || i&0x20)?1:0;
t += (i&0x40 || i&0x80)?1:0;
printf("%d,",t);
if(i%10 ==9){putchar('\n');}
}
********************************************//
int table[] = {
0,1,1,1,1,2,2,2,1,2,
2,2,1,2,2,2,1,2,2,2,
2,3,3,3,2,3,3,3,2,3,
3,3,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,1,2,2,2,2,3,
3,3,2,3,3,3,2,3,3,3,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,2,3,3,3,3,4,4,4,
3,4,4,4,3,4,4,4,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,2,3,
3,3,3,4,4,4,3,4,4,4,
3,4,4,4,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4
};
int getCount(uint64_t cmpresult)
{
uint8_t* pb = &cmpresult; // 这里假设是小段模式,且之前比较结果是存在低40位
return table[pb[0]]+table[pb[1]]+table[pb[2]]+table[pb[3]]+table[pb[4]];
}
算法不是很了解 但是就经验来说 复杂的算法反而耗时更久 不如这种简单粗暴来的迅速
可以考虑下多线程和集群来处理数据
对了 还有汉明距离貌似可以算这个
同样没有使用算法,暴力解法,用c写的
在我的机器上(CPU: Core 2 Duo E7500, RAM: 4G, OS: Fedora 19),测试结果
candidates bg.db cost
10000 1000000 318758165微秒
500 1000000 14950302微秒
如果换成题主的24核CPU,怎么也得有20倍的性能提升,然后再加上48台机器一起运算,5000W次运算为15s, 时间为
10000000 * 1000000000 / 500 / 1000000 * 15 / 20 / 48 / 3600 / 24 = 3.616898 天
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#define START_CC(flag) \
struct timeval st_##flag##_beg; \
gettimeofday(&st_##flag##_beg, 0)
#define STOP_CC(flag) \
struct timeval st_##flag##_end; \
gettimeofday(&st_##flag##_end, 0)
#define PRINT_CC(flag) \
double cost_##flag = 0.0L; \
cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec); \
cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec); \
printf(#flag" cost time %.6lf microsecond.\n", cost_##flag);
#define GENEORDER_CODE_LENGTH 20 + 1
typedef struct GeneOrder
{
char code[GENEORDER_CODE_LENGTH];
}GeneOrder, *GeneOrderPtr;
typedef struct GOArray
{
size_t capacity;
size_t length;
GeneOrderPtr data;
}GOArray;
GOArray createGOAarray(size_t capacity)
{
GOArray goa;
goa.capacity = capacity;
goa.length = 0;
goa.data = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));
return goa;
}
void destroyGOArray(GOArray* goa)
{
if (goa->capacity > 0) {
free(goa->data);
}
}
bool readGOFile(char const *file, GOArray *goarray)
{
FILE* fp = NULL;
if ((fp = fopen(file, "r+")) == NULL) {
return false;
}
char buff[64];
while (fgets(buff, 64, fp) != NULL) {
if (goarray->length < goarray->capacity) {
memcpy(goarray->data[goarray->length].code,
buff,
GENEORDER_CODE_LENGTH * sizeof(char)
);
goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '\0';
goarray->length ++;
} else {
fclose(fp);
return true;
}
}
fclose(fp);
return true;
}
int main(int argc, char* argv[])
{
(void)argc;
GOArray condgo = createGOAarray(10000);
GOArray bggo = createGOAarray(1000000);
printf("loading ...\n");
START_CC(loading);
if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) {
destroyGOArray(&condgo);
destroyGOArray(&bggo);
return -1;
}
STOP_CC(loading);
PRINT_CC(loading);
int count = 0;
START_CC(compare);
for (size_t i = 0;i < 500;i ++) {
const GeneOrderPtr gop = condgo.data + i;
for (size_t j = 0;j < bggo.length;j ++) {
const GeneOrderPtr inner_gop = bggo.data + j;
int inner_count = 0;
for (size_t k = 0;k < 20;k ++) {
if (gop->code[k] != inner_gop->code[k]) {
if (++inner_count > 4) {
break;
}
}
}
if (inner_count <= 4) {
#ifdef DBGPRINT
printf("%d %s - %s\n", i, gop->code, inner_gop->code);
#endif
count++;
}
}
}
STOP_CC(compare);
PRINT_CC(compare);
printf("result = %d\n", count);
destroyGOArray(&condgo);
destroyGOArray(&bggo);
return 0;
}
编译参数&运行
gcc -Wall -Wextra -o ccs main.c -std=c99 -Os && ./ccs candidate.list bg.db
如果改成多线程的话速度会更快一些,在我的机器改为2
个线程简单使用500条candidates
测试,速度可以提升到9040257微秒
,线程增加到4
个性能提升就不是很大了,但是较新的CPU都具有超线程技术,速度估计会更好一些。。。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#include <pthread.h>
#define START_CC(flag) \
struct timeval st_##flag##_beg; \
gettimeofday(&st_##flag##_beg, 0)
#define STOP_CC(flag) \
struct timeval st_##flag##_end; \
gettimeofday(&st_##flag##_end, 0)
#define PRINT_CC(flag) \
double cost_##flag = 0.0L; \
cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec); \
cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec); \
printf(#flag " cost time %.6lf microsecond.\n", cost_##flag);
#define GENEORDER_CODE_LENGTH 20 + 1
typedef struct GeneOrder {
char code[GENEORDER_CODE_LENGTH];
} GeneOrder, *GeneOrderPtr;
typedef struct GOArray {
size_t capacity;
size_t length;
GeneOrderPtr data;
} GOArray;
GOArray createGOAarray(size_t capacity)
{
GOArray goa;
goa.capacity = capacity;
goa.length = 0;
goa.data = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));
return goa;
}
void destroyGOArray(GOArray* goa)
{
if (goa->capacity > 0) {
free(goa->data);
}
}
bool readGOFile(char const* file, GOArray* goarray)
{
FILE* fp = NULL;
if ((fp = fopen(file, "r+")) == NULL) {
return false;
}
char buff[64];
while (fgets(buff, 64, fp) != NULL) {
if (goarray->length < goarray->capacity) {
memcpy(goarray->data[goarray->length].code, buff,
GENEORDER_CODE_LENGTH * sizeof(char));
goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '\0';
goarray->length++;
}
else {
fclose(fp);
return true;
}
}
fclose(fp);
return true;
}
typedef struct ProcessST {
GOArray* pcond;
GOArray* pbg;
size_t beg;
size_t end; // [beg, end)
} ProcessST;
void* processThread(void* parg)
{
ProcessST* ppst = (ProcessST*)parg;
GOArray* pcond = ppst->pcond;
GOArray* pbg = ppst->pbg;
int count = 0;
for (size_t i = ppst->beg; i < ppst->end; i++) {
const GeneOrderPtr gop = pcond->data + i;
for (size_t j = 0; j < pbg->length; j++) {
const GeneOrderPtr inner_gop = pbg->data + j;
int inner_count = 0;
for (size_t k = 0; k < 20; k++) {
if (gop->code[k] != inner_gop->code[k]) {
if (++inner_count > 4) {
break;
}
}
}
if (inner_count <= 4) {
#ifdef DBGPRINT
printf("%d %s - %s\n", i, gop->code, inner_gop->code);
#endif
count++;
}
}
}
return (void*)count;
}
int main(int argc, char* argv[])
{
(void)argc;
GOArray condgo = createGOAarray(10000);
GOArray bggo = createGOAarray(1000000);
printf("loading ...\n");
START_CC(loading);
if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) {
destroyGOArray(&condgo);
destroyGOArray(&bggo);
return -1;
}
STOP_CC(loading);
PRINT_CC(loading);
size_t range[] = { 0, 250, 500 };
pthread_t thr[2] = { 0 };
ProcessST pst[2];
START_CC(compare);
for (size_t i = 0; i < 2; i++) {
pst[i].pcond = &condgo;
pst[i].pbg = &bggo;
pst[i].beg = range[i];
pst[i].end = range[i + 1];
pthread_create(&thr[i], NULL, processThread, &pst[i]);
}
int count = 0;
int* ret = NULL;
for (size_t i = 0; i < 2; i++) {
pthread_join(thr[i], (void**)&ret);
count += (int)ret;
}
STOP_CC(compare);
PRINT_CC(compare);
printf("result = %d\n", count);
destroyGOArray(&condgo);
destroyGOArray(&bggo);
return 0;
}
编译测试
gcc -Wall -Wextra -o ccs main.c -std=c99 -O3 -lpthread && ./ccs candidate.list bg.db
基于@乌合之众 的思路,多用一倍空间用4个位表示四种字符,可以简化后面不一致字符个数的查找
思路如下:
第一:比如对比4个差异的数据
**TGACGGGTGACACCCA(删除字符串某4个位置的字符),将字符长度变为16,匹配完全相同的字符串
用map之类的保存TGACGGGTGACACCCA为key值,差异的四个作为values值
第二:对比3个差异的数据
在上述基础的上,进行对比上述的values值为比较长度为3完全相同的字符串
以此类推
目测题主没有多台机器供他计算,
我有一个朴素的思路,计算每个串的字母序数和(A:0,T:1,C:2,G:3),先计算两个字符串的序数和的差值,最大不能超过12,四个A变成四个G,差值小于12的再进行处理,
只是一个大概的想法,具体的权值可以另外设置,理论上可以快很多。
另外有一个算法是计算字符串编辑距离(将一个字符串修改为另一个字符串的最少编辑次数增、删、改)的,我一下子想不起来,你可以自行查一下。
首先,你的时间估算完全不对,这种大规模的数据量处理,好歹跑个几万条,持续十秒以上的时间,才能拿来做乘法算总时间,只算一条的话,这个时间几乎都是初始化进程的开销,而非关键的IO、CPU开销
以下正文
ACTG四种可能性相当于2bit,用一个字符表示一个基因位太过浪费了,一个字符8bit,可以放4个基因位
即使不用任何算法,只是把你的20个基因写成二进制形式,也能节省5倍时间
另外,循环20次,CPU的指令数是20*n条,n估计至少有3,但对于二进制来说,做比较的异或运算直接是cpu指令,指令数是1
坐等高手出现
可以了解一下CUDA等并行计算,你这种大量重复简单的运算性能提升非常明显
算法我不行,但是,像你这样的大量数据,一台电脑对比肯定是不行的,像你这样数据CPU密集型任务,同意其他人说的使用集群或者多进程的方式来计算,也就是我们用map-reduce的模型去计算
map就是映射,你先将你每个candidates一个一个映射到bg_db形成类似这样的数据结构(candidates,bg_db)
做成队列然后交给不同的服务器,每个服务器用多进程去计算,只能这样了,但是你这个数据量太大了,想办法把你的任务分配好,并行计算吧,
你单个的任务是确定的,需要的是把这些任务下发给 worker 去做,对于这样的计算都不是同步单进程进行的。
其实就相当于你有[a,b] 和 [c, d] 要对比,你的任务是
[a, c]
[a, d]
[b, c]
[b, d]
如果你是同步串行你需要的时间就是 4 * 单个时间
如果是你 4 个 cpu 或者 4 个 机器并行, 你的时间差不多是单个时间
所以对于像基因组这样的计算基本上都是用大型机器多核并行的任务来完成,基本上参考的原理都是 google MapReduce 这篇论文的原理
算法白痴一个, 不过刚好手上有可以用的计算资源, 就按照原来的思路(暴力无脑并行计算流)做一下题主这个例子:
run.py
from multiprocessing import Pool, cpu_count
def do_align(item):
with open("bg_db.txt") as fh, open("result.txt", "a") as rh:
db_line = fh.readline().strip()
while db_line:
counts = 0
for i in [(i, j) for (i, j) in zip(db_line, item)]:
if i[0] != i[1]:
counts += 1
if counts >= 4:
break
if counts < 4:
rh.write("{}\n".format(db_line))
db_line = fh.readline().strip()
def main():
pool = Pool(cpu_count())
with open("candidates.txt") as fh:
pool.map(do_align, map(str.strip, fh))
pool.close()
pool.join()
if __name__ == "__main__":
main()
简单先生成点数据
import random
import string
def id_generator(size=8, chars=string.ascii_letters + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
with open("candidates.txt", "w") as fh:
for i in range(10000):
fh.write("{}\n".format(id_generator(20, "ATCG")))
with open("bg_db.txt", "w") as fh:
for i in range(1000000):
fh.write("{}\n".format(id_generator(20, "ATCG")))
嗯, 造了10000
行的candidates.txt
和1000000
行的bg_db.txt
运行看下时间:
$time python run.py
real 15m45.445s
user 1362m41.712s
sys 1m12.099s
题主实际的数据是千万行的candidates.txt
和十亿行的bg_db.txt
, 简单估算下时间16*1000/(60*24) = 11.11
也就是11天, 这是我用的一个计算节点的配置
CPU Utilization: 1.0 0.0 98.9
user sys idle
Hardware
CPUs: 96 x 2.10 GHz
Memory (RAM): 1009.68 GB
Local Disk: Using 351.623 of 941.596 GB
Most Full Disk Partition: 53.5% used.
时间确实好长, 急需神优化