Codeb2cc's Blog

Cogito ergo sum

PyPy on Hadoop

在Hadoop Streaming中使用PyPy

PyPy作为CPython的高性能替代方案,目前已经十分成熟,得益于JIT等技术的应用,多数场景下PyPy都具有更好的性能。以通过正则对Nginx日志进行解析为例,对300G数据进行处理,PyPy对比原生Python能得到2.5倍以上的性能提升:

|            | Slot time(mins) | Read(%) | Write(%) | User(%) | MB/s | Record/s |
| ---------- | --------------- | ------- | -------- | ------- | ---- | -------- |
| Python 2.6 | 2412.76         | 3.17%   | 32.21%   | 64.61%  | 2.07 | 1741.74  |
| PyPy 2.3   | 869.37          | 9.01%   | 87.13%   | 3.85%   | 5.69 | 4809.06  |

在MapReduce任务中,使用官方支持的Portable PyPy能够比较完美地解决对运算性能以及运行环境的灵活配置的要求,该版本在CentOS 5上编译完成,对运行环境系统/链接库的要求很低,一般现在的RHEL/Centos/Ubuntu/Debian系统都能直接运行:

% ldd pypy
linux-vdso.so.1 =>  (0x00007fff959ff000)
libdl.so.2 => /lib64/libdl.so.2 (0x000000392e200000)
libm.so.6 => /lib64/libm.so.6 (0x000000302fe00000)
libz.so.1 => /lib64/libz.so.1 (0x00000039fe600000)
librt.so.1 => /lib64/librt.so.1 (0x000000381a600000)
libbz2.so.1 => /lib64/libbz2.so.1 (0x0000003031e00000)
libcrypt.so.1 => /lib64/libcrypt.so.1 (0x000000392e600000)
libutil.so.1 => /lib64/libutil.so.1 (0x0000003032a00000)
libncurses.so.5 => /lib64/libncurses.so.5 (0x0000003270600000)
libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x0000003031200000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x000000302fa00000)
libc.so.6 => /lib64/libc.so.6 (0x000000302f600000)
/lib64/ld-linux-x86-64.so.2 (0x000000302ee00000)
libfreebl3.so => /lib64/libfreebl3.so (0x000000392ee00000)
libtinfo.so.5 => /lib64/libtinfo.so.5 (0x0000003030e00000)

将下载回来的PyPy包缓存到HDFS后,通过在MapReduce Streaming中指定archives/cacheArchive,即可在任务节点直接使用PyPy执行代码:

hadoop jar /home/etl/jar/hadoop-0.20.1.12-fb-streaming.jar
    -archives lib.zip 
    -D mapred.job.name=nobody-pypy-test 
    -D mapred.cache.archives=hdfs:///home/etl/archives/pypy.tar.gz#local
    -input /home/etl/DEMO/in
    -output /home/etl/DEMO/out
    -file bootstrap.py
    -mapper "./local/pypy/bin/pypy bootstrap.py"

上面例子中,mapred.cache.archives参数的#local指定了文件在节点工作目录下的访问路径,需要注意pypy.tar.gz的目录结构,若文件解压后得到的是pypy文件夹,则需要在访问时加上这层目录,如./local/pypy/bin/pypy

使用PyPy基本不需要修改原来的Python代码,具体可查看官方兼容性列表。另外一个好处是我们可以在本地安装上各种第三方库后再将文件打包上传,这样节点上就可以直接使用这些第三方库而不受系统限制(含C扩展的需要注意编译系统与节点系统一致),也间接实现了Virtualenv的功能(不同包中安装不同的库、使用不同的版本)。以使用增强的正则库regex为例,将PyPy解压到本地后:

wget https://bootstrap.pypa.io/get-pip.py .
/PATH/TO/PYPY/bin/pypy get-pip.py
/PATH/TO/PYPY/bin/pip install regex

然后再重新将PyPy打包并上传到HDFS:

cd /PATH/TO/PYPY
tar czf pypy.tar.gz pypy
hadoop fs -put pypy.tar.gz /home/etl/archives/

MapReduce时指定该包即可在节点上调用regex库。

PS: 概括地讲,代码逻辑越复杂(计算密集)使用PyPy得到的性能提升越大,但对简单的ETL任务,譬如sys.stdout.write('A B 123 C'.split()[2]),PyPy的表现可能会低于原生Python。上线前应该使用两种方式跑下测试数据,根据结果选择适合自己业务场景的方案。

HBase Shell Commands

HBase shell commands are mainly categorized into 6 parts:

General HBase Shell Commands

status

Show cluster status. Can be ‘summary’, ‘simple’, or ‘detailed’. The default is ‘summary’.

hbase> status
hbase> status 'simple'
hbase> status 'summary'
hbase> status 'detailed'

version

Output this HBase versionUsage:

hbase> version

whoami

Show the current hbase user.Usage:

hbase> whoami

Tables Management Commands

alter

Alter column family schema; pass table name and a dictionary specifying new column family schema. Dictionaries are described on the main help command output. Dictionary must include name of column family to alter.For example, to change or add the ‘f1’ column family in table ‘t1’ from current value to keep a maximum of 5 cell VERSIONS, do:

hbase> alter 't1', NAME => 'f1', VERSIONS => 5

You can operate on several column families:

hbase> alter 't1', 'f1', {NAME => 'f2', IN_MEMORY => true}, {NAME => 'f3', VERSIONS => 5}

To delete the ‘f1’ column family in table ‘t1’, use one of:

hbase> alter 't1', NAME => 'f1', METHOD => 'delete'
hbase> alter 't1', 'delete' => 'f1'

You can also change table-scope attributes like MAX_FILESIZE, READONLY, MEMSTORE_FLUSHSIZE, DEFERRED_LOG_FLUSH, etc. These can be put at the end; for example, to change the max size of a region to 128MB, do:

hbase> alter 't1', MAX_FILESIZE => '134217728'

You can add a table coprocessor by setting a table coprocessor attribute:

hbase> alter 't1', 'coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'

Since you can have multiple coprocessors configured for a table, a sequence number will be automatically appended to the attribute name to uniquely identify it.

The coprocessor attribute must match the pattern below in order for the framework to understand how to load the coprocessor classes:

[coprocessor jar file location] | class name | [priority] | [arguments]

You can also set configuration settings specific to this table or column family:

hbase> alter 't1', CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}
hbase> alter 't1', {NAME => 'f2', CONFIGURATION => {'hbase.hstore.blockingStoreFiles' => '10'}}

You can also remove a table-scope attribute:

hbase> alter 't1', METHOD => 'table_att_unset', NAME => 'MAX_FILESIZE'
hbase> alter 't1', METHOD => 'table_att_unset', NAME => 'coprocessor$1'

There could be more than one alteration in one command:

hbase> alter 't1', { NAME => 'f1', VERSIONS => 3 }, { MAX_FILESIZE => '134217728' }, { METHOD => 'delete', NAME => 'f2' }, OWNER => 'johndoe', METADATA => { 'mykey' => 'myvalue' }

Reflection in Golang

Static Typed Go

Go作为一门静态类型的语言,所有变量都定义了其所属于的类型,不同类型的变量间不能随意赋值,例如:

1
2
3
4
5
6
7
var a int
var b string

a = 1
b = "codeb2cc"

a = b

a和b不是同一类型的变量,若尝试直接赋值将会报错cannot use b (type string) as type int in assignment,不同类型变量间的赋值需要进行类型转换(Conversion),这点与C/C++里是一致的。在Go里,对于变量x与目标类型T,类型转换需要满足以下其中一种情况:

  • x可以赋值为类型T的变量
  • x与T有着一致的实现类型(underlying types)
  • x与T都是匿名指针类型并且具有相同的实现类型
  • x与T都是整数/浮点数类型
  • x与T都是复数类型
  • x是整数/bytes片段/runes,T是string
  • x是string,T是bytes片段或runes

对于可以进行类型转换的变量,通过var c = float(100)即可得到目标类型的变量。但在实际开发过程中,我们需要的不仅仅是基本类型的转换,譬如对于给定的接口类型:

1
2
3
4
5
Type I interface {
    Read(b Buffer) bool
    Write(b Buffer) bool
    Close()
}

只要实现了这三种方法,就可以认为该类型是合法的、可操作的,但由于无法确定最终传入变量的类型,我们需要在使用前将其转化为我们已知的类型。这种情况下由于类型转化(Conversion)只关心数据,我们需要类型断言(Type Assertion)来帮助我们:

1
2
3
4
5
var x I
var y File

x = y.(I)
z, ok := y.(I)

Python GIF Processing

天气站点需要往微博推送气象信息,原始的基本反射率数据都是单帧的,为了好一点的效果计划自己合成GIF,但找了一下发现Python下没有现成好的GIF处理库,PIL能支持读存操作但不支持合并,没办法只好找些资料自己写。下面列些相关资料的链接和一个GIF合并处理的例子,基本上就是按照标准对数据封装,其它格式对GIF的转换可以同理处理。

Standards and References

GIF87

GIF89

Wikipedia

Application Extension Spec: NETSCAPE2.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
byte   1       : 33 (hex 0x21) GIF Extension code
byte   2       : 255 (hex 0xFF) Application Extension Label
byte   3       : 11 (hex 0x0B) Length of Application Block
                 (eleven bytes of data to follow)
bytes  4 to 11 : "NETSCAPE"
bytes 12 to 14 : "2.0"
byte  15       : 3 (hex 0x03) Length of Data Sub-Block
                 (three bytes of data to follow)
byte  16       : 1 (hex 0x01)
bytes 17 to 18 : 0 to 65535, an unsigned integer in
                 lo-hi byte format. This indicate the
                 number of iterations the loop should
                 be executed.
byte  19       : 0 (hex 0x00) a Data Sub-Block Terminator.

Memcached Optimization

项目新功能上线后频繁发现缓存丢失,怀疑是数据还没过期就被LRU置换了,但Memcached设置了8G内存对目前的流量来说不应该出现内存不足的情况。以前觉得Memcached比较简单配好参数用就可以了,但借这次机会研究了一下它的细节后发现有坑,有大坑……下面说两个。

Basic Concept: Slab/Chunk/Page

在讲坑前需要先厘清几个重要的概念,在Memcached中数据是以Chunk为单位存储的,每一条数据就是一个Chunk,但由于写入的数据大小是不固定的,因此若是Chunk的大小简单等于数据大小的话会造成系统严重的碎片化从而降低性能,因此在Memcached的设计中Chunk的大小是一系列的固定值,数据会以其最合适的Chunk写入,Slab就是某个大小Chunk的集合。又因为Memcached中内存是以Page为单位管理的,因此Slab实际上关联着许多个Page,Chunk写入到对应Slab下的Page中。有一个经常提及的问题是Memcached中每条数据的最大长度,这个值实际上就是Page的大小(准确说必须减掉Key和Flag的长度),一个Chunk只能存储在一个Page中,但是一个Page可以存储多个Chunk。新一点的版本中可以通过-I参数设置Page的大小,范围是1K到128M。Slab的类型(Chunk的大小)由参数增长系数-f和初始大小-n控制,Memcached启动时会根据这两个值计算出所有的Slab,简单说就是循环计算slab_size * factor直到达到Page大小,其中还需要考虑8-bytes的对齐,参考源码:

slabs_init
1
2
3
4
5
6
7
8
9
10
11
12
13
while (++i < POWER_LARGEST && size <= settings.item_size_max / factor) {
    /* Make sure items are always n-byte aligned */
    if (size % CHUNK_ALIGN_BYTES)
        size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);

    slabclass[i].size = size;
    slabclass[i].perslab = settings.item_size_max / slabclass[i].size;
    size *= factor;
    if (settings.verbose > 1) {
        fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                i, slabclass[i].size, slabclass[i].perslab);
    }
}

增长系数的默认值是1.25,初始大小为48 bytes,Page大小为1M也就是1048576 bytes,默认情况下我们可以得到96, 120, 152, ... , 771184, 1048576一共42个Slab。这里值得注意的是第一个Slab大小不是48而是96,这是因为Memcached除了存储数据外还需要封装对应的描述结构item,而item的大小为48。