HouSisong@GMail.com 2007.07.27
tag:速度优化,Base64,CPU缓存优化,代码优化,查找表,汇编,SSE、SSE2优化,并行
摘要: Base64编码是很常用的一种把二进制数据转换为字符串的算法;
本文章对Base64的编码函数进行了各种优化尝试,目标是极限编码速度!
并对优化过程中使用的方法进行了详细说明(主要使用了查表优化);
(2007.07.27 觉得只使用64字节表的汇编优化也是有其意义的,所以添加了base64_encode1_asm实现)
正文:
代码使用C++;涉及到汇编优化的时候假定为x86平台;
测试平台:(CPU:AMD64x2 4200+(2.37G);内存:DDR2 667(334.9MHz);C/C++编译器:VC2005)
操作系统:WindowsXP
( 警告:代码移植到其他体系的CPU时需要重新考虑字节顺序的大端小端问题! )
A:本文章的来源和起因:
cpper编程论坛( www.cpper.com/c )以前进行过一次Base64编码器速度竞赛
( http://www.cpper.com/c/t665.html 和 http://www.cpper.com/c/t694.html );
当时我没有参加,后来应管理者要求看看能不能在结果上继续优化,那时就简要写
了点代码尝试,但没能得到更好的结果;
最近因为工作中需要用到Base64,就拿了其竞赛结果(Base64EncodeSXMNew版本)
来改写;由于拿人手段,而且以前有答应看看是否还有改进的余地;所以空余时间
就在这个基础之上进行了一些速度优化改进尝试,本文章就是本次尝试的结果;
(声明:本文章引用这次竞赛和其代码得到了cpper管理者的授权)
尝试了一些代码后,我在CSDN程序员网站( www.csdn.net )开贴征集最快的Base64
编码函数:
http://community.csdn.net/Expert/topic/5665/5665480.xml?temp=.4219171 ,
以求获得更多的思路和使已有的优化策略获得更多的验证机会;发帖的朋友对本文章
的形成也起到了重要的作用,某些版本的函数速度得到了提高,某些在我的AMDx2
CPU上运行很快的版本、在网友的奔腾4 CPU上运行减慢(放弃了一簇版本)、由讨论
而产生的新的函数版本等等;本文章也是对这次讨论的一些简要总结(某些网友的实现
版本或策略没有整合到本文章中,请到csdn论坛直接查看);
B:Base64编码原理简要说明
有时为了更好的传递或保存二进制数据,需要先把二进制数据转换成纯文本的编码方式。
最容易想到的方案就是直接转换成16进制的文本方式;即把一个字节(8bit)先分成两
个4bit(值域[0..15]),然后映射到'0'--'9''A'--'F'这16个字符编码中; 那么一个字节
的数据转换为了两个字符,也就是说数据转换后变为原数据大小的两倍!
Base64的也能完成这个任务,但更节省空间,转换后的文本数据只是原数据大小的4/3倍;
这是怎么做到的呢? 将原数据3个字节一组(3x8bit),按一定方式分组成4个6bit(值
域[0..63]); 然后将[0..25]的值映射到'A'--'Z',将[26..51]的值映射到'a'--'z',
将[52..61]的值映射到'0'--'9',将62的值映射为'+',将63的值映射为'/'; 由于原数据不
一定正好是3的倍数,所以分组后的4个值中有可能没有被分配bit位的情况,没有分配bit位
的值输出数据填充'='。
bit位分组方式示意:
// 3x8bit
// |------------------|------------------|------------------|
// | a[0..7] | b[0..7] | c[0..7] |
// |------------------|------------------|------------------|
//
// to 4x6bit
// |------------------|------------------|------------------|------------------|
// | a[2..7] |b[4..7]+a[0..1]<<4|c[6..7]+b[0..3]<<2| c[0..5] |
// |------------------|------------------|------------------|------------------|
C:一个基本实现和速度测试框架
使用了较大的数据量多次测试取平均值;
// base64_encode0 109.0 MB/s
#include <time.h>
#include <iostream>
#include <vector>
#include <string>
#define asm __asm
const unsigned char BASE64_PADDING='='; //输入数据不足3的倍数时 输出字符后面填充'='号
//将6bit数据按规则映射成字符(6bit数据)
inline unsigned char to_base64char(const unsigned char code6bit)
{
if (code6bit<26) //[ 0..25] => ['A'..'Z']
return code6bit+'A';
else if (code6bit<52) //[26..51] => ['a'..'z']
return code6bit+('a'-26);
else if (code6bit<62) //[52..61] => ['0'..'9']
return code6bit+('0'-52);
else if (code6bit==62) //62 => '+'
return '+';
else //if (code6bit==63) //63 => '/'
return '/';
}
//编码函数(原数据地址,原数据字节大小,编码输出地址)
void base64_encode0(const void* pdata,const unsigned long data_size,void* out_pcode)
{
const unsigned char* input=(const unsigned char*)pdata;
const unsigned char* input_end=&input[data_size];
unsigned char* output=(unsigned char*)out_pcode;
for(;input+2<input_end;input+=3,output+=4)
{
output[0]=to_base64char( input[0] >> 2 );
output[1]=to_base64char( ((input[0] << 4) | (input[1] >> 4)) & 0x3F );
output[2]=to_base64char( ((input[1] << 2) | (input[2] >> 6)) & 0x3F );
output[3]=to_base64char( input[2] & 0x3F);
}
unsigned long bord_width=input_end-input;
if (bord_width==1)
{
output[0]=to_base64char( input[0] >> 2 );
output[1]=to_base64char( (input[0] << 4) & 0x3F );
output[2]=BASE64_PADDING;
output[3]=BASE64_PADDING;
}
else if (bord_width==2)
{
output[0]=to_base64char( input[0] >> 2 );
output[1]=to_base64char( ((input[0] << 4) | (input[1] >> 4)) & 0x3F );
output[2]=to_base64char( (input[1] << 2) & 0x3F );
output[3]=BASE64_PADDING;
}
}
typedef void (*Tbase64_encode_proc)(const void* pdata,const unsigned long data_size,void* out_pcode);
//获得编码后的输出字符大小(原数据大小)
inline unsigned long base64_code_size(const unsigned long data_size)
{
return (data_size+2)/3*4;
}
//测试编码速度(编码器名称,编码函数,测试原数据大小)
void testSpeed(const char* proc_name_str,Tbase64_encode_proc base64_encode,const long DATA_SIZE)
{
std::cout<<">> 编码函数: "<<proc_name_str<<std::endl;
const long DATA_SIZE_MAX=DATA_SIZE+12;
std::vector<unsigned char> data_buf(DATA_SIZE_MAX); //data_buf保存需要编码的数据
for (long r=0;r<DATA_SIZE_MAX;++r)
data_buf[r]=rand(); //data_buf填充随机数据 用以测试
const long code_size_MAX=base64_code_size(DATA_SIZE_MAX);
std::string code_str;//code_str用以储存编码后的字符串数据
code_str.resize(code_size_MAX,' ');
long RunCount=0;
double SumSpeed=0;
for (long data_size=DATA_SIZE;data_size<DATA_SIZE_MAX;++data_size)
{
const long code_size=base64_code_size(data_size);
double start_time=(double)clock();
base64_encode(&data_buf[0],data_size,&code_str[0]);//编码测试
double run_time=((double)clock()-start_time)*(1.0/CLOCKS_PER_SEC);
double encode_speed=code_size*(1.0/1024/1024)/run_time;//编码速度(MB/秒)
++RunCount;
SumSpeed+=encode_speed;
std::cout<<" 编码前数据大小(MB): "<<data_size*(1.0/1024/1024)<<" 编码速度(MB/秒): "<<encode_speed<<std::endl;
//if (data_size<=1000) std::cout<<code_str<<std::endl; //
}
std::cout<<std::endl<<" 平均编码速度(MB/秒): "<<SumSpeed/RunCount<<std::endl;
std::cout<<std::endl;
}
int main()
{
std::cout<<" 请输入任意字符开始测试(可以把进程优先级设置为“实时”)> ";
getchar();
std::cout<<std::endl;
const long DATA_SIZE=80*1024*1024;
testSpeed("base64_encode0" ,base64_encode0 ,DATA_SIZE);
return 0;
}
D.优化to_base64char函数
to_base64char有很多的条件分支,在当前的CPU上会严重的降低性能;
分析该函数有这样的特征: 函数的输入数据允许的取值个数很小(只能取64个值中的一个);
单个输入值对应的返回值固定;
那么我们可以建立一个数组的表格,该数组有64个元素,每个元素的值等于该元素的序
号(假定数组序号从0开始)作为to_base64char参数时的返回值;
即: unsigned char BASE64_CODE[64];
其中BASE64_CODE[i]=to_base64char(i); // i属于[0..63]
那么对于这样的代码: output[0]=to_base64char(input[0]>>2);
可以化简为: output[0]=BASE64_CODE[input[0]>>2];
这就是利用查表来替代计算的优化方法!
(提示:在不同的需求下,表需要灵活的构造)
base64_encode0使用查询表改进后的新代码:
// base64_encode0_table 294.6 MB/s
const unsigned char BASE64_CODE[]=
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
//使用64字节的表
void base64_encode0_table(const void* pdata,const unsigned long data_size,void* out_pcode)
{
const unsigned char* input=(const unsigned char*)pdata;
const unsigned char* input_end=&input[data_size];
unsigned char* output=(unsigned char*)out_pcode;
for(;input+2<input_end;input+=3,output+=4)
{
output[0]=BASE64_CODE[ input[0] >> 2 ];
output[1]=BASE64_CODE[ ((input[0] << 4) | (input[1] >> 4)) & 0x3F ];
output[2]=BASE64_CODE[ ((input[1] << 2) | (input[2] >> 6)) & 0x3F ];
output[3]=BASE64_CODE[ input[2] & 0x3F];
}
unsigned long bord_width=input_end-input;
if (bord_width==1)
{
output[0]=BASE64_CODE[ input[0] >> 2 ];
output[1]=BASE64_CODE[ (input[0] << 4) & 0x3F ];
output[2]=BASE64_PADDING;
output[3]=BASE64_PADDING;
}
else if (bord_width==2)
{
output[0]=BASE64_CODE[ input[0] >> 2 ];
output[1]=BASE64_CODE[ ((input[0] << 4) | (input[1] >> 4)) & 0x3F ];
output[2]=BASE64_CODE[ (input[1] << 2) & 0x3F ];
output[3]=BASE64_PADDING;
}
}
E:在当前的32比特CPU上一次写入4字节将获得更好的性能;而输入数据的地方,可以先转化
成32比特整形数据再做各种复杂的位运算有利于编译器的优化(各个PC平台的差异可能比较大);
// base64_encode1 533.3 MB/s
inline void base64_addpaing(const unsigned char* input,const unsigned int bord_width,unsigned char* output)
{
if (bord_width==1)
{
unsigned int input0=input[0];
unsigned int output0=BASE64_CODE[ input0 >> 2 ];
unsigned int output1=BASE64_CODE[ (input0 << 4) & 0x3F ];
*(unsigned long*)(&output[0])=output0 | (output1<<8) | ((BASE64_PADDING<<16) | (BASE64_PADDING<<24));
}
else if (bord_width==2)
{
unsigned int input0=input[0];
unsigned int input1=input[1];
unsigned int output0=BASE64_CODE[ input0 >> 2 ];
unsigned int output1=BASE64_CODE[ ((input0 << 4) | (input1 >> 4)) & 0x3F ];
unsigned int output2=BASE64_CODE[ (input1 << 2) & 0x3F ];
*(unsigned long*)(&output[0])=output0 | (output1<<8) | (output2<<16) | (BASE64_PADDING<<24);
}
}
//使用64字节的表
void base64_encode1(const void* pdata,const unsigned long data_size,void* out_pcode)
{
const unsigned char* input=(const unsigned char*)pdata;
const unsigned char* input_end=&input[data_size];
unsigned char* output=(unsigned char*)out_pcode;
for(;input+2<input_end;input+=3,output+=4)
{
//unsigned int output0=BASE64_CODE[ input[0] >> 2 ];
//unsigned int output1=BASE64_CODE[ ((input[0] << 4) | (input[1] >> 4)) & 0x3F ];
//unsigned int output2=BASE64_CODE[ ((input[1] << 2) | (input[2] >> 6)) & 0x3F ];
//unsigned int output3=BASE64_CODE[ input[2] & 0x3F ];
//*(unsigned long*)(&output[0])=output0 | (output1<<8) | (output2<<16) | (output3<<24);
// 可以测试一下这种内存读取方式的速度
unsigned int input0=input[0];
unsigned int input1=input[1];
unsigned int input2=input[2];
unsigned int output0=BASE64_CODE[ input0 >> 2 ];
unsigned int output1=BASE64_CODE[ ((input0 << 4) | (input1 >> 4)) & 0x3F ];
unsigned int output2=BASE64_CODE[ ((input1 << 2) | (input2 >> 6)) & 0x3F ];
unsigned int output3=BASE64_CODE[ input2 & 0x3F ];
*(unsigned long*)(&output[0])=output0 | (output1<<8) | (output2<<16) | (output3<<24);
}
base64_addpaing(input,input_end-input,output);
}
F:减少位操作复杂度的一个方案:预先交换输入数据的字节顺序
// base64_encode1_bswap 579.2 MB/s
//使用64字节的表
void base64_encode1_bswap(const void* pdata,const unsigned long data_size,void* out_pcode)
{
const unsigned char* input=(const unsigned char*)pdata;
const unsigned char* input_end=&input[data_size];
unsigned char* output=(unsigned char*)out_pcode;
for(;input+2<input_end;input+=3,output+=4)
{
unsigned long input_all=(input[0]<<16) | (input[1]<<8) | (input[2]);
//汇编指令中有一条BSWAP指令交换字节顺序,可能能加快该代码的速度
unsigned int output0=BASE64_CODE[ input_all >> 18 ];
unsigned int output1=BASE64_CODE[ (input_all >> 12) & 0x3F ];
unsigned int output2=BASE64_CODE[ (input_all >> 6) & 0x3F ];
unsigned int output3=BASE64_CODE[ input_all & 0x3F ];
*(unsigned long*)(&output[0])=output0 | (output1<<8) | (output2<<16) | (output3<<24);
}
base64_addpaing(input,input_end-input,output);
}
G:使用64字节表的汇编优化版本base64_encode1_asm
// base64_encode1_asm 674.6 MB/s
void __declspec(naked) __stdcall _base64_encode1_line_asm(const unsigned char* input,unsigned char* output,const long line_count)
{
asm
{
push esi
push edi
push ebx
push ebp
mov esi,[esp+4+16]//input
mov edi,[esp+8+16]//output
mov ebp,[esp+12+16]//line_count
lea edi,[edi+ebp*4]
neg ebp
align 4
loop_begin:
mov ebx,dword ptr [esi]
bswap ebx
mov ecx,ebx
mov edx,ebx
mov eax,ebx
shr ecx,14
shr edx,8
shr eax,26
and ecx,0x3F
shr ebx,20
and edx,0x3F
and eax,0x3F
movzx ecx, BYTE PTR [BASE64_CODE+ecx]
and ebx,0x3F
mov ch , BYTE PTR [BASE64_CODE+edx]
movzx eax, BYTE PTR [BASE64_CODE+eax]
shl ecx,16
mov ah , BYTE PTR [BASE64_CODE+ebx]
add esi,3
or ecx,eax
mov [edi+ebp*4],ecx
add ebp,1
jnz loop_begin
pop ebp
pop ebx
pop edi
pop esi
ret 12
}
}
//使用64字节的表
void base64_encode1_asm(const void* pdata,const unsigned long data_size,void* out_pcode)
{
const unsigned char* input=(const unsigned char*)pdata;
unsigned char* output=(unsigned char*)out_pcode;
unsigned long fast_cell=data_size/3;
if (fast_cell>0)
{
unsigned long fast_cell_safe=fast_cell-1;
if (fast_cell_safe>0)
{
_base64_encode1_line_asm(input,output,fast_cell_safe);
input+=(fast_cell_safe*3);
output+=(fast_cell_safe*4);
}
unsigned long input_all=(input[0]<<16) | (input[1]<<8) | (input[2]);
unsigned int output0=BASE64_CODE[ input_all >> 18 ];
unsigned int output1=BASE64_CODE[ (input_all >> 12) & 0x3F ];
unsigned int output2=BASE64_CODE[ (input_all >> 6) & 0x3F ];
unsigned int output3=BASE64_CODE[ input_all & 0x3F ];
*(unsigned long*)(&output[0])=output0 | (output1<<8) | (output2<<16) | (output3<<24);
input+=3;
output+=4;
}
base64_addpaing(input,data_size-(fast_cell*3),output);
}
H:为了减少计算量,我们可以尝试适量增大表的大小(空间换时间)
对于这样的代码: output0=BASE64_CODE[input0/4];
我想改写成这样: output0=BASE64_CODE_SHIFT2[input0];
那么BASE64_CODE_SHIFT2应该怎样定义呢?由于output0=BASE64_CODE_SHIFT2[input0]=BASE64_CODE[input0/4];
有BASE64_CODE_SHIFT2[i]=BASE64_CODE[i/4]; //i属于[0.255]
对于这样的代码: output3=BASE64_CODE[input2 & 0x3F];
我想改写成这样: output3=BASE64_CODE_EX[input2];
那么BASE64_CODE_EX[i]=BASE64_CODE[i & 0x3F]; //i属于[0.255]
为了能够简化output1和output2的计算,扩大BASE64_CODE_EX到4k,参见源代码:
// base64_encode2 701.6 MB/s
// |------------------|------------------|------------------|
// | a[0..7] | b[0..7] | c[0..7] |
// |------------------|------------------|------------------|
//
// |------------------|------------------|------------------|------------------|
// | a[0..7] |b[4..7]+a[0..7]<<4|c[6..7]+b[0..7]<<2| c[0..7] |
// |------------------|------------------|------------------|------------------|
//使用64+256+4096字节的表(4K+)
void base64_encode2(const void* pdata,const unsigned long data_size,void* out_pcode)
{
static unsigned char BASE64_CODE_SHIFT2[256];
static unsigned char BASE64_CODE_EX[256];
static bool initialized=false;
if(!initialized)
{
unsigned int i;
for(i=0;i<256;++i)
BASE64_CODE_SHIFT2[i]=BASE64_CODE[i/4];
for(i=0;i<(1<<12);++i)
BASE64_CODE_EX[i]=BASE64_CODE[i%64];
initialized=true;
}
if (data_size<=0) return;
const unsigned char* input=(const unsigned char*)pdata;
const unsigned char* input_end=&input[data_size];
unsigned char* output=(unsigned char*)out_pcode;
for(;input+2<input_end;input+=3,output+=4)
{
//output[0]=BASE64_CODE_SHIFT2[input[0]];
//output[1]=BASE64_CODE_EX[(input[0]<<4) + (input[1]>>4)];
//output[2]=BASE64_CODE_EX[(input[1]<<2) + (input[2]>>6)];
//output[3]=BASE64_CODE_EX[input[2]];
// 可以测试一下这种内存读取和写入方式的速度
unsigned int input0=input[0];
unsigned int input1=input[1];
unsigned int input2=input[2];
unsigned int output0=BASE64_CODE_SHIFT2[input0];
unsigned int output1=BASE64_CODE_EX[(input0<<4) + (input1>>4)];
unsigned int output2=BASE64_CODE_EX[(input1<<2) + (input2>>6)];
unsigned int output3=BASE64_CODE_EX[input2];
*(unsigned long*)(&output[0])=output0 | (output1<<8) | (output2<<16) | (output3<<24);
}
base64_addpaing(input,input_end-input,output);
}
I:使用更大的表来换取更快的速度!
前面的代码中6bit的数据查表可以得到8bit的输出数据,那么我可以构造一个更大的表,
一次使用12bit的数据查表得到16bit的输出数据!
原代码: unsigned int output0=BASE64_CODE[input0 >> 2];
unsigned int output1=BASE64_CODE[((input0 << 4) | (input1 >> 4)) & 0x3F];
想要的新代码: output_0_1=BASE64_WCODE[(input0<<4) | (input1>>4)]
有 BASE64_WCODE[(input0<<4)|(input1>>4)]= BASE64_CODE[input0 >> 2]
| (BASE64_CODE[((input0 << 4) | (input1 >> 4)) & 0x3F] << 8);
令i==(input0<<4) | (input1>>4); 则有: input0==i>>4; (input>>4)==i&0F;
即:BASE64_WCODE[i]=BASE64_CODE[(i>>4)>>2]
| (BASE64_CODE[ (((i>>4)<< 4) | i&0F) & 0x3F ] << 8);
BASE64_WCODE[i]=BASE64_CODE[i>>6] | (BASE64_CODE[i & 0x3F]<<8);//i属于[0..4095]
(当你熟悉用建数据表来表达计算的时候,这些推导反而显得累赘;
对于这里的表的建立,只需要注意表的序号就是需要替换的函数(也可以是假想的函数)的参数,
数据表中的值就是该输入参数时函数的返回值,这样就可以直接写出表的建立表达式。)
为了不增加新的表,我们在的BASE64_WCODE表的基础上来得到计算output_2_3的表达式;
output_2_3=BASE64_WCODE[((input1<<8) | input2) & 0x0FFF];
// base64_encode3 791.3 MB/s
// |------------------|------------------|------------------|
// | a[0..7] | b[0..7] | c[0..7] |
// |------------------|------------------|------------------|
//
// |-------------------------------------|-----------------------------------|
// | a[0..7]<<4 + b[4..7] | b[0..3]<<8 + c[0..7] |
// |-------------------------------------|-----------------------------------|
//使用4096x2字节的表(8K)
void base64_encode3(const void* pdata,const unsigned long data_size,void* out_pcode)
{
static unsigned short BASE64_WCODE[1<<12];
static bool initialized=false;
if(!initialized)
{
for(unsigned int i=0;i<(1<<12);++i)
{
BASE64_WCODE[i]=BASE64_CODE[i>>6] | (BASE64_CODE[i & 0x3F] << 8);
}
initialized=true;
}
const unsigned char* input=(const unsigned char*)pdata;
unsigned char* output=









