SQLAlchemy Get or Create

在RMDB中,由于不存在select-test-insert的原子操作,因此要想达到不存在则插入的效果,就必须在SELECT语句中加锁。但往往对于复杂的查询条件来说,由于涉及锁的类型(Record/Gap/Next-key),加锁操作会带来很大的性能隐患,因此采用容错检查的方式可能会是一个比较好的实现方案(另一种做法是用INSERT...WHERE NOT EXISTS)。以MySQL与SQLAlchemy为例:

from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.expression import ClauseElement

def get_or_create(session, model, defaults={}, **kwargs):
    """
    插入新数据, 若数据已存在则返回已有记录. 若数据库存在唯一约束, 保证不会重复插入

    :param session: SQLAlchemy `Session`
    :param model: 数据表映射
    :param defaults: 新数据的参数默认值
    :param kwargs: 查询条件
    """
    try:
        query = session.query(model).filter_by(**kwargs)

        instance = query.first()

        if instance:
            return instance, True
        else:
            session.begin(nested=True)
            try:
                params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement))
                params.update(defaults)

                instance = model(**params)

                session.add(instance)
                session.commit()

                return instance, False
            except IntegrityError as e:
                session.rollback()
                instance = query.one()

                return instance, True
    except Exception as e:
        session.rollback()
        raise e

这个实现有一个不好的地方是表结构中必须存在对查询条件的唯一约束,因为是通过数据库的IntegrityError异常来判断别的并发调用是否在两次操作中创建了同样记录的,要是没有约束就不会报错也就没法判断出来了,从这点上来看INSERT...WHERE NOT EXISTS的实现会更好一些。get_or_create的使用方法为:

record, existed = get_or_create(session, Model,
        col1='Condition A',
        col2='Condition B',
        defaults={
            'col3': 'Default Value',
        },
    )

if existed:
    record.col1 = 'Modified Value'
else:
    pass

session.commit()

注意因为方法中使用的是嵌套事务session.begin(nested=True),所以相关逻辑完成后记得外头要commit哦!