| # -*- coding: utf-8 |
| """数据库引擎的封装 |
| """ |
| import os |
| import traceback |
| |
| |
| class DBEngine(object): |
| def __init__(self): |
| self.connection = None |
| self.cursor = None |
| |
| def connect(self, **kwargs): |
| pass |
| |
| def begin_transaction(self): |
| self.connection.begin() |
| |
| def commit(self): |
| self.connection.commit() |
| |
| def rollback(self): |
| self.connection.rollback() |
| |
| def close(self): |
| if not self.cursor: |
| self.cursor.close() |
| if not self.connection: |
| self.connection.close() |
| |
| def exec_sql(self, sql, parameters=None): |
| try: |
| if parameters: |
| self.cursor.execute(sql, parameters) |
| else: |
| self.cursor.execute(sql) |
| return True |
| except Exception, e: |
| # print u"执行数据失败" |
| print str(e) |
| return False |
| |
| def query(self, sql): |
| try: |
| # for r in self.cursor.execute(sql): |
| # yield r |
| self.cursor.execute(sql) |
| while True: |
| r = self.cursor.fetchone() |
| if not r: |
| return |
| yield r |
| except TypeError: |
| return |
| except Exception, e: |
| raise e |
| |
| def new_cursor(self): |
| return self.connection.cursor() |
| |
| |
| class OraEngine(DBEngine): |
| def __init__(self): |
| super(OraEngine, self) |
| |
| def connect(self, **kwargs): |
| os.environ["NLS_LANG"] = "SIMPLIFIED CHINESE_CHINA.AL32UTF8" |
| try: |
| import cx_Oracle |
| if kwargs['database'] is None: |
| dsn = cx_Oracle.makedsn(host=kwargs['host'], port=kwargs['port'], |
| sid=kwargs['name']) |
| conn = cx_Oracle.connect(kwargs['user'], kwargs['password'], dsn) |
| else: |
| url = '%s/%s@%s' % (kwargs['user'], kwargs['password'], kwargs['database']) |
| conn = cx_Oracle.Connection(url) |
| |
| self.connection = conn |
| self.cursor = conn.cursor() |
| except cx_Oracle.DatabaseError, e: |
| traceback.print_exc() |
| raise e |
| |
| |
| class DB2Engine(DBEngine): |
| def __init__(self): |
| super(DB2Engine, self) |
| |
| def connect(self, **kwargs): |
| try: |
| import DB2 |
| conn = DB2.Connection(dsn=kwargs['database'], uid=kwargs['user'], |
| pwd=kwargs['password']) |
| self.connection = conn |
| self.cursor = conn.cursor() |
| except DB2.DatabaseError, e: |
| traceback.print_exc() |
| raise e |
| |
| |
| def test_ora(): |
| db = OraEngine() |
| db.connect(database="ECARDDB.V4.SUPWISDOM", user="ecardv4", password="kingstar") |
| db.exec_sql("delete from t_test") |
| for r in db.query("select * from t_feetype"): |
| print r |
| |
| |
| def test_db2(): |
| db = DB2Engine() |
| db.connect(database="usstbase", user="db2inst1", password="kingstar") |
| db.exec_sql("delete from ykt_cur.t_test") |
| for r in db.query("select * from ykt_cur.t_feetype"): |
| print r |
| |
| if __name__ == "__main__": |
| test_ora() |
| test_db2() |