初始版本
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..d7af5fb
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+/build
+/dist
+*.pyc
diff --git a/convert-example.json b/convert-example.json
new file mode 100755
index 0000000..10fbaa6
--- /dev/null
+++ b/convert-example.json
@@ -0,0 +1,43 @@
+/*
+ // 配置格式说明:
+ // name: 转换目标表名
+ // action: 装换前的操作,truncate 表示先执行truncate 操作,none 表示无任何动作
+ // desc: 转换规则的描述
+ // pre_exec: 执行转换前需要执行一次的 sql 语句,格式如 [ "insert into ....", "delete from ...."],
+ // 可以有多条 sql 语句, 这个操作是在原始数据库执行
+ // post_exec: 转换完成后需要执行一次的 sql 语句, 格式同 pre_exec, 这个操作是在目标数据库执行
+ // src_sql: 转换原始表查询 sql
+ // dest_column: 转换目标表 column , src_sql 查询出来的字段与 dest_column 的字段
+*/
+[
+ {
+ "name": "T_CARD",
+ "action": "truncate",
+ "desc": "卡信息表",
+ "src_sql": "select c.card_id,(case when a.account_id is null then '3000000001'
+ else '1'||substr(a.account_id,8) end),
+ type_id,phytype,(case when c1.fee_type>0 then c1.fee_type
+ else c1.cut_type end) feetype,
+ c.cosumer_id,c.showid,c.physical_no,c.password,
+ (case when c.end_time ='' then :end_time
+ else c.end_time end) expiredate,
+ substr(c.state_id,1,1) status,
+ (case when substr(c.state_id,2,1)='1' then '1'
+ else '0' end) lossflag,
+ '' lossdate,'' lossefttime,
+ (case when substr(c.state_id,3,1)='1' then '1'
+ else '0' end) frozeflag,
+ '' frozedate,'0' badflag,'' badtype,'' baddate,'0' lockflag,
+ '' lockdate,c.begin_time opendate,'' closedate,'' cardverno,'' lastsaved
+ from YKT_CUR.T_PIF_CARD c left join ykt_cur.t_aif_account a on
+ c.card_id=a.card_id left join ykt_cur.t_cif_customer c1 on
+ c.cosumer_id=c1.cut_id order by c.card_id",
+ "dest_column": "CARDNO, ACCNO, CARDTYPE,CARDPHYTYPE, FEETYPE, CUSTID, SHOWCARDNO,
+ CARDPHYID, CARDPWD,EXPIREDATE, STATUS, LOSSFLAG, LOSSDATE, LOSSEFTTIME,
+ FROZEFLAG, FROZEDATE, BADFLAG, BADTYPE, BADDATE, LOCKFLAG, LOCKDATE,
+ OPENDATE, CLOSEDATE, CARDVERNO, LASTSAVED",
+ "remark": "导入卡表前更新 begin_time 不能为空",
+ "pre_exec" : ["update ykt_cur.t_pif_card set begin_time='20061001' where card_id=43430"],
+ "post_exec": ["update t_card set cardphytype=20"]
+ }
+]
diff --git a/cvtcfg.ini b/cvtcfg.ini
new file mode 100644
index 0000000..2743515
--- /dev/null
+++ b/cvtcfg.ini
@@ -0,0 +1,17 @@
+[database]
+srctype=oracle
+srcdsn=YKT.SHMTU.WISDOM
+srchost=192.168.1.200
+srcport=1521
+srcname=shmtu
+srcuser=ykt_cur
+srcpswd=kingstar
+
+dsttype=oracle
+dstname=yktv4
+dsthost=192.168.1.200
+dstport=1521
+dstuser=ecardshisu
+dstpswd=kingstar
+
+commitcount=5000
diff --git a/dbengine.py b/dbengine.py
new file mode 100644
index 0000000..0ec5c3d
--- /dev/null
+++ b/dbengine.py
@@ -0,0 +1,118 @@
+# -*- coding: utf-8
+"""数据库引擎的封装
+"""
+import os
+import traceback
+
+
+class DBEngine(object):
+ def __init__(self):
+ self.connection = None
+ self.cursor = None
+
+ def connect(self, **kwargs):
+ pass
+
+ def begin_transaction(self):
+ self.connection.begin()
+
+ def commit(self):
+ self.connection.commit()
+
+ def rollback(self):
+ self.connection.rollback()
+
+ def close(self):
+ if not self.cursor:
+ self.cursor.close()
+ if not self.connection:
+ self.connection.close()
+
+ def exec_sql(self, sql, parameters=None):
+ try:
+ if parameters:
+ self.cursor.execute(sql, parameters)
+ else:
+ self.cursor.execute(sql)
+ return True
+ except Exception, e:
+ # print u"执行数据失败"
+ print str(e)
+ return False
+
+ def query(self, sql):
+ try:
+ # for r in self.cursor.execute(sql):
+ # yield r
+ self.cursor.execute(sql)
+ while True:
+ r = self.cursor.fetchone()
+ if not r:
+ return
+ yield r
+ except TypeError:
+ return
+ except Exception, e:
+ raise e
+
+ def new_cursor(self):
+ return self.connection.cursor()
+
+
+class OraEngine(DBEngine):
+ def __init__(self):
+ super(OraEngine, self)
+
+ def connect(self, **kwargs):
+ os.environ["NLS_LANG"] = "SIMPLIFIED CHINESE_CHINA.AL32UTF8"
+ try:
+ import cx_Oracle
+ if kwargs['database'] is None:
+ dsn = cx_Oracle.makedsn(host=kwargs['host'], port=kwargs['port'],
+ sid=kwargs['name'])
+ conn = cx_Oracle.connect(kwargs['user'], kwargs['password'], dsn)
+ else:
+ url = '%s/%s@%s' % (kwargs['user'], kwargs['password'], kwargs['database'])
+ conn = cx_Oracle.Connection(url)
+
+ self.connection = conn
+ self.cursor = conn.cursor()
+ except cx_Oracle.DatabaseError, e:
+ traceback.print_exc()
+ raise e
+
+
+class DB2Engine(DBEngine):
+ def __init__(self):
+ super(DB2Engine, self)
+
+ def connect(self, **kwargs):
+ try:
+ import DB2
+ conn = DB2.Connection(dsn=kwargs['database'], uid=kwargs['user'],
+ pwd=kwargs['password'])
+ self.connection = conn
+ self.cursor = conn.cursor()
+ except DB2.DatabaseError, e:
+ traceback.print_exc()
+ raise e
+
+
+def test_ora():
+ db = OraEngine()
+ db.connect(database="ECARDDB.V4.SUPWISDOM", user="ecardv4", password="kingstar")
+ db.exec_sql("delete from t_test")
+ for r in db.query("select * from t_feetype"):
+ print r
+
+
+def test_db2():
+ db = DB2Engine()
+ db.connect(database="usstbase", user="db2inst1", password="kingstar")
+ db.exec_sql("delete from ykt_cur.t_test")
+ for r in db.query("select * from ykt_cur.t_feetype"):
+ print r
+
+if __name__ == "__main__":
+ test_ora()
+ test_db2()
diff --git a/setup.py b/setup.py
new file mode 100755
index 0000000..ed16c2a
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,30 @@
+# -*- coding: utf-8 -*-
+'''
+Created on 2012-4-20
+
+@author: cheng.tang
+'''
+
+from cx_Freeze import setup, Executable
+from tcutils import disttools
+from v2convert import VERSION
+
+
+include_files = ['cvtcfg.ini', u'convert-example.json']
+
+buildOptions = {"includes": [],
+ "include_files": include_files,
+ "create_shared_zip": False
+ }
+
+setup(name="Supwisdom Ykt V2 to V4",
+ version=VERSION,
+ description=u"一卡通历史数据迁移",
+ options={"build_exe": buildOptions},
+ executables=[Executable('v2convert.py',
+ targetName="cvt2to4.exe",
+ appendScriptToExe=True,
+ appendScriptToLibrary=False)])
+
+
+disttools.make_dist('cvt2to4', VERSION)
diff --git a/v2convert.py b/v2convert.py
new file mode 100755
index 0000000..4c09bd2
--- /dev/null
+++ b/v2convert.py
@@ -0,0 +1,329 @@
+#-*- coding: utf-8
+import sys
+from ConfigParser import RawConfigParser
+from datetime import datetime, timedelta
+import getopt
+import dbengine
+import json
+import locale
+import time
+from path import path
+import re
+
+
+VERSION = "1.1"
+SYSENCODING = locale.getdefaultlocale()[1]
+
+if not SYSENCODING:
+ SYSENCODING = "utf-8"
+
+
+def json_minify(json, strip_space=True):
+ tokenizer = re.compile('"|(/\*)|(\*/)|(//)|\n|\r')
+ in_string = False
+ in_multiline_comment = False
+ in_singleline_comment = False
+
+ new_str = []
+ from_index = 0 # from is a keyword in Python
+
+ for match in re.finditer(tokenizer, json):
+
+ if not in_multiline_comment and not in_singleline_comment:
+ tmp2 = json[from_index:match.start()]
+ if not in_string and strip_space:
+ tmp2 = re.sub('[ \t\n\r]*', '', tmp2) # replace only white space defined in standard
+ new_str.append(tmp2)
+
+ from_index = match.end()
+
+ if match.group() == '"' and not in_multiline_comment and not in_singleline_comment:
+ escaped = re.search('(\\\\)*$', json[:match.start()])
+ if not in_string or escaped is None or len(escaped.group()) % 2 == 0:
+ # start of string with ", or unescaped " character found to end string
+ in_string = not in_string
+ from_index -= 1 # include " character in next catch
+
+ elif match.group() == '/*' and not in_string and not in_multiline_comment and not in_singleline_comment:
+ in_multiline_comment = True
+ elif match.group() == '*/' and not in_string and in_multiline_comment and not in_singleline_comment:
+ in_multiline_comment = False
+ elif match.group() == '//' and not in_string and not in_multiline_comment and not in_singleline_comment:
+ in_singleline_comment = True
+ elif (match.group() == '\n' or match.group() == '\r') and not in_string and not in_multiline_comment and in_singleline_comment:
+ in_singleline_comment = False
+ elif not in_multiline_comment and not in_singleline_comment and (
+ match.group() not in ['\n', '\r', ' ', '\t'] or not strip_space):
+ new_str.append(match.group())
+
+ new_str.append(json[from_index:])
+ return ''.join(new_str)
+
+
+class V2DBConvert(object):
+ def __init__(self):
+ super(V2DBConvert, self).__init__()
+ self._src_db = None
+ self._dst_db = None
+ self._base_path = path(sys.argv[0]).realpath().dirname()
+ self._base_name = path(sys.argv[0]).basename()
+ self._cvt_process_file = self._base_path / '.cvtstep.txt'
+ self._cvt_config_file = self._base_path / "cvtcfg.ini"
+ self._cvt_rule_file = None
+ self._cvt_rules = None
+ self._cvt_sql_params = {}
+ self._cvt_exec_rule_name = []
+ self._list_rules = False
+ self._manual_convert = False
+
+ def run(self):
+ if not self._parse_command_line():
+ self.usage()
+ sys.exit(1)
+
+ current = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ print "启动转换程序 Version: {0}, {1}".format(VERSION, current)
+
+ if not self._load_convert_file():
+ self.die(u"不能加载转换规则文件")
+ print u"加载转换规则文件成功<{0}>".format(self._cvt_rule_file.basename())
+
+ if self._list_rules:
+ for rule in self._cvt_rules:
+ print u"转换规则[{0}]: {1}<{2}>".format(rule['name'], rule['desc'],
+ rule.get('remark', ''))
+ return
+
+ self._load_config()
+ try:
+ print u"等待连接原始数据..."
+ param = dict(database=self._src_database, host=self._src_host,
+ port=self._src_port, user=self._src_user, password=self._src_pswd,
+ name=self._src_name)
+ self._src_db = self._connect_db(self._src_type, **param)
+ print u"连接源库成功"
+
+ print u"等待连接目标数据..."
+ param = dict(database=self._dst_database, host=self._dst_host,
+ port=self._dst_port, user=self._dst_user, password=self._dst_pswd,
+ name=self._dst_name)
+ self._dst_db = self._connect_db(self._dst_type, **param)
+ print u"连接目标库成功"
+ except Exception, ex:
+ print u"连接数据失败, {0}".format(ex)
+ return
+
+ self._convert()
+
+ self._src_db.close()
+ self._dst_db.close()
+
+ def die(self, msg, exit_code=1):
+ print msg
+ sys.exit(exit_code)
+
+ def _convert(self):
+ for rule in self._cvt_rules:
+ if self._cvt_exec_rule_name:
+ if rule['name'] not in self._cvt_exec_rule_name:
+ continue
+ if self._manual_convert:
+ msg = u"是否转换规则<{0}>[{1}](Y/n)".format(rule['name'], rule['desc'])
+ ans = raw_input(msg.encode("utf-8"))
+ if not (ans is None or ans in ('Y', 'y')):
+ return
+ else:
+ print u"等待转换<{0}>[{1}]".format(rule['name'], rule['desc'])
+ if not self._do_convert(rule):
+ return
+
+ def _do_execute_sql(self, cursor, statment):
+ try:
+ cursor.prepare(statment)
+ sql_params = {}
+ for param in cursor.bindnames():
+ if param not in self._cvt_sql_params:
+ print u"参数<{0}>未指定".format(param)
+ return False
+ sql_params[param] = self._cvt_sql_params[param]
+ # print sql_params
+ if not cursor.execute(statment, **sql_params):
+ return True
+
+ return True
+ except Exception as e:
+ print u"执行sql 失败, {0}".format(e)
+ return False
+
+ def _do_convert(self, rule):
+ query_cursor = self._src_db.new_cursor()
+
+ if rule['action'] == 'truncate':
+ print u"Truncate 目标表数据..."
+ self._dst_db.exec_sql('truncate table {0}'.format(rule['name']))
+
+ if 'pre_exec' in rule:
+ for statment in rule['pre_exec']:
+ if not self._do_execute_sql(self._src_db.cursor, statment):
+ print u"执行 pre_exec 语句失败"
+ self._src_db.rollback()
+ return False
+ self._src_db.commit()
+
+ if not self._do_execute_sql(query_cursor, rule['src_sql']):
+ print u"查询原始数据失败"
+ return False
+ # print query_cursor.description
+ count = 0
+ self._dst_db.begin_transaction()
+ for row in query_cursor.fetchall():
+ v = []
+ for data in row:
+ if isinstance(data, int):
+ v.append('%d' % data)
+ elif isinstance(data, float):
+ v.append('%f' % data)
+ elif isinstance(data, str) or isinstance(data, unicode):
+ v.append("'{0}'".format(data))
+ elif data is None:
+ v.append('NULL')
+ else:
+ v.append(data)
+ values = ",".join(v)
+ insert_sql = 'insert into {0} ({1}) values({2})'.format(rule['name'],
+ rule['dest_column'], values)
+ # print insert_sql
+ if not self._dst_db.exec_sql(insert_sql):
+ self._dst_db.rollback()
+ return False
+ count += 1
+ if count % self._commit_count == 0:
+ print u"导入数据 {0} 条".format(count)
+ self._dst_db.commit()
+ self._dst_db.begin_transaction()
+
+ self._dst_db.commit()
+ print u"导入数据 {0} 条".format(count)
+ if 'post_exec' in rule:
+ print u"执行 post_exec..."
+ for statment in rule['post_exec']:
+ if not self._do_execute_sql(self._dst_db.cursor, statment):
+ print u"执行 post_exec 语句失败"
+ self._dst_db.rollback()
+ return False
+ self._dst_db.commit()
+ print u"执行 post_exec 完成"
+ query_cursor.close()
+ return True
+
+ def _load_convert_file(self):
+ rule_doc = json_minify(self._cvt_rule_file.text(encoding="utf-8"))
+ self._cvt_rules = json.loads(rule_doc, "utf-8")
+ return True
+
+ def _parse_command_line(self):
+ execute_rules = ""
+ optlist, args = getopt.getopt(sys.argv[1:], ":c:r:t:hlm")
+ for k, v in optlist:
+ if k == "-c":
+ self._cvt_config_file = path(v)
+ elif k == "-r":
+ self._cvt_rule_file = path(v)
+ elif k == "-l":
+ self._list_rules = True
+ elif k == "-t":
+ execute_rules = v
+ elif k == "-m":
+ self._manual_convert = True
+ elif k == "-h":
+ self.usage()
+ sys.exit(0)
+
+ if not self._cvt_rule_file or not self._cvt_rule_file.exists():
+ print u"请指定转换规则文件"
+ return False
+
+ if not self._cvt_config_file.exists():
+ print u"请指定配置文件"
+ return False
+
+ if self._list_rules:
+ return True
+
+ if execute_rules:
+ target_rule = execute_rules.split(',')
+ self._cvt_exec_rule_name = [rule.upper() for rule in target_rule]
+ # print self._cvt_exec_rule_name
+
+ for v in args:
+ param = v.split('=')
+ if len(param) != 2:
+ print u"参数[{0}]错误!".format(v)
+ return False
+ kn = param[0].strip(' ').upper()
+ kv = param[1].strip(' ')
+ self._cvt_sql_params[kn] = kv
+
+ return True
+
+ def usage(self):
+ print u"转换程序 {0} Version: {1}".format(self._base_name, VERSION)
+ print u"\t-c 转换配置文件默认为 cvtcfg.ini"
+ print u"\t-r 转换配置规则文件"
+ print u"\t-t 转换使用表名,多个表名用逗号分隔;例如 t_card, t_customer"
+ print u"\t-l 只列出转换规则文件中所有的 target 不做转换"
+ print u"\t-h 输出帮助"
+
+ def _load_config(self):
+ parser = RawConfigParser()
+ parser.read(self._cvt_config_file)
+ cfg = dict(parser.items('database'))
+ if 'srcdsn' in cfg:
+ self._src_database = cfg['srcdsn']
+ self._src_host = None
+ self._src_port = 0
+ self._src_name = None
+ else:
+ self._src_host = cfg.get('srchost', '')
+ self._src_port = int(cfg.get('srcport', 0))
+ self._src_name = cfg.get('srcname', '')
+ self._src_database = None
+ self._src_user = cfg.get('srcuser', '')
+ self._src_pswd = cfg.get('srcpswd', '')
+ self._src_type = cfg.get('srctype', 'oracle')
+
+ if 'dstdsn' in cfg:
+ self._dst_database = cfg['dstdsn']
+ self._dst_host = None
+ self._dst_port = 0
+ self._dst_name = None
+ else:
+ self._dst_host = cfg.get('dsthost', '')
+ self._dst_port = int(cfg.get('dstport', 0))
+ self._dst_name = cfg.get('dstname', '')
+ self._dst_database = None
+ self._dst_user = cfg.get('dstuser', '')
+ self._dst_pswd = cfg.get('dstpswd', '')
+ self._dst_type = cfg.get('dsttype', 'oracle')
+
+ self._commit_count = int(cfg.get('commitcount', 1000))
+
+ def _connect_db(self, dbtype, **kwargs):
+ if dbtype == "oracle":
+ db = dbengine.OraEngine()
+ elif dbtype == "db2":
+ db = dbengine.DB2Engine()
+ else:
+ raise ValueError(u"不支持数据类型[{0}]".format(dbtype))
+
+ db.connect(database=kwargs['database'], user=kwargs['user'],
+ password=kwargs['password'], host=kwargs['host'],
+ port=kwargs['port'], name=kwargs['name'])
+ return db
+
+
+if __name__ == "__main__":
+ begin = time.time()
+ cvt = V2DBConvert()
+ cvt.run()
+ print u"执行时间 %.3f 秒" % ((time.time() - begin))