Cheng Tang | eac1085 | 2013-06-25 16:43:32 +0800 | [diff] [blame] | 1 | #-*- coding: utf-8
|
| 2 | import sys
|
| 3 | from ConfigParser import RawConfigParser
|
| 4 | from datetime import datetime, timedelta
|
| 5 | import getopt
|
| 6 | import dbengine
|
| 7 | import json
|
| 8 | import locale
|
| 9 | import time
|
Cheng Tang | 29963ee | 2013-06-26 15:25:09 +0800 | [diff] [blame^] | 10 | import cx_Oracle
|
Cheng Tang | eac1085 | 2013-06-25 16:43:32 +0800 | [diff] [blame] | 11 | from path import path
|
| 12 | import re
|
| 13 |
|
| 14 |
|
Cheng Tang | 29963ee | 2013-06-26 15:25:09 +0800 | [diff] [blame^] | 15 | VERSION = "1.3"
|
Cheng Tang | eac1085 | 2013-06-25 16:43:32 +0800 | [diff] [blame] | 16 | SYSENCODING = locale.getdefaultlocale()[1]
|
| 17 |
|
| 18 | if not SYSENCODING:
|
| 19 | SYSENCODING = "utf-8"
|
| 20 |
|
| 21 |
|
| 22 | def json_minify(json, strip_space=True):
|
| 23 | tokenizer = re.compile('"|(/\*)|(\*/)|(//)|\n|\r')
|
| 24 | in_string = False
|
| 25 | in_multiline_comment = False
|
| 26 | in_singleline_comment = False
|
| 27 |
|
| 28 | new_str = []
|
| 29 | from_index = 0 # from is a keyword in Python
|
| 30 |
|
| 31 | for match in re.finditer(tokenizer, json):
|
| 32 |
|
| 33 | if not in_multiline_comment and not in_singleline_comment:
|
| 34 | tmp2 = json[from_index:match.start()]
|
| 35 | if not in_string and strip_space:
|
| 36 | tmp2 = re.sub('[ \t\n\r]*', '', tmp2) # replace only white space defined in standard
|
| 37 | new_str.append(tmp2)
|
| 38 |
|
| 39 | from_index = match.end()
|
| 40 |
|
| 41 | if match.group() == '"' and not in_multiline_comment and not in_singleline_comment:
|
| 42 | escaped = re.search('(\\\\)*$', json[:match.start()])
|
| 43 | if not in_string or escaped is None or len(escaped.group()) % 2 == 0:
|
| 44 | # start of string with ", or unescaped " character found to end string
|
| 45 | in_string = not in_string
|
| 46 | from_index -= 1 # include " character in next catch
|
| 47 |
|
| 48 | elif match.group() == '/*' and not in_string and not in_multiline_comment and not in_singleline_comment:
|
| 49 | in_multiline_comment = True
|
| 50 | elif match.group() == '*/' and not in_string and in_multiline_comment and not in_singleline_comment:
|
| 51 | in_multiline_comment = False
|
| 52 | elif match.group() == '//' and not in_string and not in_multiline_comment and not in_singleline_comment:
|
| 53 | in_singleline_comment = True
|
| 54 | elif (match.group() == '\n' or match.group() == '\r') and not in_string and not in_multiline_comment and in_singleline_comment:
|
| 55 | in_singleline_comment = False
|
| 56 | elif not in_multiline_comment and not in_singleline_comment and (
|
| 57 | match.group() not in ['\n', '\r', ' ', '\t'] or not strip_space):
|
| 58 | new_str.append(match.group())
|
| 59 |
|
| 60 | new_str.append(json[from_index:])
|
| 61 | return ''.join(new_str)
|
| 62 |
|
| 63 |
|
| 64 | class V2DBConvert(object):
|
| 65 | def __init__(self):
|
| 66 | super(V2DBConvert, self).__init__()
|
| 67 | self._src_db = None
|
| 68 | self._dst_db = None
|
| 69 | self._base_path = path(sys.argv[0]).realpath().dirname()
|
| 70 | self._base_name = path(sys.argv[0]).basename()
|
| 71 | self._cvt_process_file = self._base_path / '.cvtstep.txt'
|
| 72 | self._cvt_config_file = self._base_path / "cvtcfg.ini"
|
| 73 | self._cvt_rule_file = None
|
| 74 | self._cvt_rules = None
|
| 75 | self._cvt_sql_params = {}
|
| 76 | self._cvt_exec_rule_name = []
|
| 77 | self._list_rules = False
|
| 78 | self._manual_convert = False
|
| 79 |
|
| 80 | def run(self):
|
| 81 | if not self._parse_command_line():
|
| 82 | self.usage()
|
| 83 | sys.exit(1)
|
| 84 |
|
| 85 | current = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
Cheng Tang | 29963ee | 2013-06-26 15:25:09 +0800 | [diff] [blame^] | 86 | print u"启动转换程序 Version: {0}, {1}".format(VERSION, current)
|
Cheng Tang | eac1085 | 2013-06-25 16:43:32 +0800 | [diff] [blame] | 87 |
|
| 88 | if not self._load_convert_file():
|
| 89 | self.die(u"不能加载转换规则文件")
|
| 90 | print u"加载转换规则文件成功<{0}>".format(self._cvt_rule_file.basename())
|
| 91 |
|
| 92 | if self._list_rules:
|
| 93 | for rule in self._cvt_rules:
|
| 94 | print u"转换规则[{0}]: {1}<{2}>".format(rule['name'], rule['desc'],
|
| 95 | rule.get('remark', ''))
|
| 96 | return
|
| 97 |
|
| 98 | self._load_config()
|
| 99 | try:
|
| 100 | print u"等待连接原始数据..."
|
| 101 | param = dict(database=self._src_database, host=self._src_host,
|
| 102 | port=self._src_port, user=self._src_user, password=self._src_pswd,
|
| 103 | name=self._src_name)
|
| 104 | self._src_db = self._connect_db(self._src_type, **param)
|
| 105 | print u"连接源库成功"
|
| 106 |
|
| 107 | print u"等待连接目标数据..."
|
| 108 | param = dict(database=self._dst_database, host=self._dst_host,
|
| 109 | port=self._dst_port, user=self._dst_user, password=self._dst_pswd,
|
| 110 | name=self._dst_name)
|
| 111 | self._dst_db = self._connect_db(self._dst_type, **param)
|
| 112 | print u"连接目标库成功"
|
| 113 | except Exception, ex:
|
| 114 | print u"连接数据失败, {0}".format(ex)
|
| 115 | return
|
| 116 |
|
| 117 | self._convert()
|
| 118 |
|
| 119 | self._src_db.close()
|
| 120 | self._dst_db.close()
|
| 121 |
|
| 122 | def die(self, msg, exit_code=1):
|
| 123 | print msg
|
| 124 | sys.exit(exit_code)
|
| 125 |
|
| 126 | def _convert(self):
|
| 127 | for rule in self._cvt_rules:
|
| 128 | if self._cvt_exec_rule_name:
|
| 129 | if rule['name'] not in self._cvt_exec_rule_name:
|
| 130 | continue
|
| 131 | if self._manual_convert:
|
| 132 | msg = u"是否转换规则<{0}>[{1}](Y/n)".format(rule['name'], rule['desc'])
|
| 133 | ans = raw_input(msg.encode("utf-8"))
|
| 134 | if not (ans is None or ans in ('Y', 'y')):
|
| 135 | return
|
| 136 | else:
|
| 137 | print u"等待转换<{0}>[{1}]".format(rule['name'], rule['desc'])
|
| 138 | if not self._do_convert(rule):
|
| 139 | return
|
| 140 |
|
| 141 | def _do_execute_sql(self, cursor, statment):
|
| 142 | try:
|
| 143 | cursor.prepare(statment)
|
| 144 | sql_params = {}
|
| 145 | for param in cursor.bindnames():
|
| 146 | if param not in self._cvt_sql_params:
|
| 147 | print u"参数<{0}>未指定".format(param)
|
| 148 | return False
|
| 149 | sql_params[param] = self._cvt_sql_params[param]
|
| 150 | # print sql_params
|
| 151 | if not cursor.execute(statment, **sql_params):
|
| 152 | return True
|
| 153 |
|
| 154 | return True
|
Cheng Tang | 29963ee | 2013-06-26 15:25:09 +0800 | [diff] [blame^] | 155 | except Exception as exc:
|
| 156 | try:
|
| 157 | if isinstance(exc, cx_Oracle.DatabaseError):
|
| 158 | error, = exc.args
|
| 159 | msg = error.message.decode(SYSENCODING)
|
| 160 | else:
|
| 161 | msg = "{0}".format(exc)
|
| 162 | print u"执行sql 失败, {0}".format(msg)
|
| 163 | except UnicodeError:
|
| 164 | print exc
|
| 165 | print u"执行语句[{0}]错误".format(statment)
|
Cheng Tang | eac1085 | 2013-06-25 16:43:32 +0800 | [diff] [blame] | 166 | return False
|
| 167 |
|
| 168 | def _do_convert(self, rule):
|
| 169 | query_cursor = self._src_db.new_cursor()
|
| 170 |
|
| 171 | if rule['action'] == 'truncate':
|
| 172 | print u"Truncate 目标表数据..."
|
| 173 | self._dst_db.exec_sql('truncate table {0}'.format(rule['name']))
|
| 174 |
|
| 175 | if 'pre_exec' in rule:
|
| 176 | for statment in rule['pre_exec']:
|
| 177 | if not self._do_execute_sql(self._src_db.cursor, statment):
|
| 178 | print u"执行 pre_exec 语句失败"
|
| 179 | self._src_db.rollback()
|
| 180 | return False
|
| 181 | self._src_db.commit()
|
| 182 |
|
| 183 | if not self._do_execute_sql(query_cursor, rule['src_sql']):
|
| 184 | print u"查询原始数据失败"
|
| 185 | return False
|
| 186 | # print query_cursor.description
|
| 187 | count = 0
|
| 188 | self._dst_db.begin_transaction()
|
| 189 | for row in query_cursor.fetchall():
|
| 190 | v = []
|
| 191 | for data in row:
|
| 192 | if isinstance(data, int):
|
| 193 | v.append('%d' % data)
|
| 194 | elif isinstance(data, float):
|
| 195 | v.append('%f' % data)
|
| 196 | elif isinstance(data, str) or isinstance(data, unicode):
|
| 197 | v.append("'{0}'".format(data))
|
| 198 | elif data is None:
|
| 199 | v.append('NULL')
|
| 200 | else:
|
| 201 | v.append(data)
|
| 202 | values = ",".join(v)
|
| 203 | insert_sql = 'insert into {0} ({1}) values({2})'.format(rule['name'],
|
| 204 | rule['dest_column'], values)
|
| 205 | # print insert_sql
|
Cheng Tang | 29963ee | 2013-06-26 15:25:09 +0800 | [diff] [blame^] | 206 | count += 1
|
Cheng Tang | eac1085 | 2013-06-25 16:43:32 +0800 | [diff] [blame] | 207 | if not self._dst_db.exec_sql(insert_sql):
|
Cheng Tang | 29963ee | 2013-06-26 15:25:09 +0800 | [diff] [blame^] | 208 | print u"导入第{0}条记录错误".format(count)
|
Cheng Tang | eac1085 | 2013-06-25 16:43:32 +0800 | [diff] [blame] | 209 | self._dst_db.rollback()
|
| 210 | return False
|
Cheng Tang | 29963ee | 2013-06-26 15:25:09 +0800 | [diff] [blame^] | 211 |
|
Cheng Tang | eac1085 | 2013-06-25 16:43:32 +0800 | [diff] [blame] | 212 | if count % self._commit_count == 0:
|
| 213 | print u"导入数据 {0} 条".format(count)
|
| 214 | self._dst_db.commit()
|
| 215 | self._dst_db.begin_transaction()
|
| 216 |
|
| 217 | self._dst_db.commit()
|
| 218 | print u"导入数据 {0} 条".format(count)
|
| 219 | if 'post_exec' in rule:
|
| 220 | print u"执行 post_exec..."
|
| 221 | for statment in rule['post_exec']:
|
| 222 | if not self._do_execute_sql(self._dst_db.cursor, statment):
|
| 223 | print u"执行 post_exec 语句失败"
|
| 224 | self._dst_db.rollback()
|
| 225 | return False
|
| 226 | self._dst_db.commit()
|
| 227 | print u"执行 post_exec 完成"
|
| 228 | query_cursor.close()
|
| 229 | return True
|
| 230 |
|
| 231 | def _load_convert_file(self):
|
| 232 | rule_doc = json_minify(self._cvt_rule_file.text(encoding="utf-8"))
|
| 233 | self._cvt_rules = json.loads(rule_doc, "utf-8")
|
| 234 | return True
|
| 235 |
|
| 236 | def _parse_command_line(self):
|
| 237 | execute_rules = ""
|
| 238 | optlist, args = getopt.getopt(sys.argv[1:], ":c:r:t:hlm")
|
| 239 | for k, v in optlist:
|
| 240 | if k == "-c":
|
| 241 | self._cvt_config_file = path(v)
|
| 242 | elif k == "-r":
|
| 243 | self._cvt_rule_file = path(v)
|
| 244 | elif k == "-l":
|
| 245 | self._list_rules = True
|
| 246 | elif k == "-t":
|
| 247 | execute_rules = v
|
| 248 | elif k == "-m":
|
| 249 | self._manual_convert = True
|
| 250 | elif k == "-h":
|
| 251 | self.usage()
|
| 252 | sys.exit(0)
|
| 253 |
|
| 254 | if not self._cvt_rule_file or not self._cvt_rule_file.exists():
|
| 255 | print u"请指定转换规则文件"
|
| 256 | return False
|
| 257 |
|
| 258 | if not self._cvt_config_file.exists():
|
| 259 | print u"请指定配置文件"
|
| 260 | return False
|
| 261 |
|
| 262 | if self._list_rules:
|
| 263 | return True
|
| 264 |
|
| 265 | if execute_rules:
|
| 266 | target_rule = execute_rules.split(',')
|
| 267 | self._cvt_exec_rule_name = [rule.upper() for rule in target_rule]
|
| 268 | # print self._cvt_exec_rule_name
|
| 269 |
|
| 270 | for v in args:
|
| 271 | param = v.split('=')
|
| 272 | if len(param) != 2:
|
| 273 | print u"参数[{0}]错误!".format(v)
|
| 274 | return False
|
| 275 | kn = param[0].strip(' ').upper()
|
| 276 | kv = param[1].strip(' ')
|
| 277 | self._cvt_sql_params[kn] = kv
|
| 278 |
|
| 279 | return True
|
| 280 |
|
| 281 | def usage(self):
|
| 282 | print u"转换程序 {0} Version: {1}".format(self._base_name, VERSION)
|
| 283 | print u"\t-c 转换配置文件默认为 cvtcfg.ini"
|
| 284 | print u"\t-r 转换配置规则文件"
|
| 285 | print u"\t-t 转换使用表名,多个表名用逗号分隔;例如 t_card, t_customer"
|
| 286 | print u"\t-l 只列出转换规则文件中所有的 target 不做转换"
|
| 287 | print u"\t-h 输出帮助"
|
| 288 |
|
| 289 | def _load_config(self):
|
| 290 | parser = RawConfigParser()
|
| 291 | parser.read(self._cvt_config_file)
|
| 292 | cfg = dict(parser.items('database'))
|
| 293 | if 'srcdsn' in cfg:
|
| 294 | self._src_database = cfg['srcdsn']
|
| 295 | self._src_host = None
|
| 296 | self._src_port = 0
|
| 297 | self._src_name = None
|
| 298 | else:
|
| 299 | self._src_host = cfg.get('srchost', '')
|
| 300 | self._src_port = int(cfg.get('srcport', 0))
|
| 301 | self._src_name = cfg.get('srcname', '')
|
| 302 | self._src_database = None
|
| 303 | self._src_user = cfg.get('srcuser', '')
|
| 304 | self._src_pswd = cfg.get('srcpswd', '')
|
| 305 | self._src_type = cfg.get('srctype', 'oracle')
|
| 306 |
|
| 307 | if 'dstdsn' in cfg:
|
| 308 | self._dst_database = cfg['dstdsn']
|
| 309 | self._dst_host = None
|
| 310 | self._dst_port = 0
|
| 311 | self._dst_name = None
|
| 312 | else:
|
| 313 | self._dst_host = cfg.get('dsthost', '')
|
| 314 | self._dst_port = int(cfg.get('dstport', 0))
|
| 315 | self._dst_name = cfg.get('dstname', '')
|
| 316 | self._dst_database = None
|
| 317 | self._dst_user = cfg.get('dstuser', '')
|
| 318 | self._dst_pswd = cfg.get('dstpswd', '')
|
| 319 | self._dst_type = cfg.get('dsttype', 'oracle')
|
| 320 |
|
| 321 | self._commit_count = int(cfg.get('commitcount', 1000))
|
| 322 |
|
| 323 | def _connect_db(self, dbtype, **kwargs):
|
| 324 | if dbtype == "oracle":
|
| 325 | db = dbengine.OraEngine()
|
| 326 | elif dbtype == "db2":
|
| 327 | db = dbengine.DB2Engine()
|
| 328 | else:
|
| 329 | raise ValueError(u"不支持数据类型[{0}]".format(dbtype))
|
| 330 |
|
| 331 | db.connect(database=kwargs['database'], user=kwargs['user'],
|
| 332 | password=kwargs['password'], host=kwargs['host'],
|
| 333 | port=kwargs['port'], name=kwargs['name'])
|
| 334 | return db
|
| 335 |
|
| 336 |
|
| 337 | if __name__ == "__main__":
|
| 338 | begin = time.time()
|
| 339 | cvt = V2DBConvert()
|
| 340 | cvt.run()
|
| 341 | print u"执行时间 %.3f 秒" % ((time.time() - begin))
|