blob: 178aef99b8ad612126dc04eedc92bcb9b5362e4a [file] [log] [blame]
Cheng Tangeac10852013-06-25 16:43:32 +08001#-*- coding: utf-8
2import sys
3from ConfigParser import RawConfigParser
4from datetime import datetime, timedelta
5import getopt
6import dbengine
7import json
8import locale
9import time
Cheng Tang29963ee2013-06-26 15:25:09 +080010import cx_Oracle
Cheng Tangeac10852013-06-25 16:43:32 +080011from path import path
12import re
13
14
Cheng Tang29963ee2013-06-26 15:25:09 +080015VERSION = "1.3"
Cheng Tangeac10852013-06-25 16:43:32 +080016SYSENCODING = locale.getdefaultlocale()[1]
17
18if not SYSENCODING:
19 SYSENCODING = "utf-8"
20
21
22def json_minify(json, strip_space=True):
23 tokenizer = re.compile('"|(/\*)|(\*/)|(//)|\n|\r')
24 in_string = False
25 in_multiline_comment = False
26 in_singleline_comment = False
27
28 new_str = []
29 from_index = 0 # from is a keyword in Python
30
31 for match in re.finditer(tokenizer, json):
32
33 if not in_multiline_comment and not in_singleline_comment:
34 tmp2 = json[from_index:match.start()]
35 if not in_string and strip_space:
36 tmp2 = re.sub('[ \t\n\r]*', '', tmp2) # replace only white space defined in standard
37 new_str.append(tmp2)
38
39 from_index = match.end()
40
41 if match.group() == '"' and not in_multiline_comment and not in_singleline_comment:
42 escaped = re.search('(\\\\)*$', json[:match.start()])
43 if not in_string or escaped is None or len(escaped.group()) % 2 == 0:
44 # start of string with ", or unescaped " character found to end string
45 in_string = not in_string
46 from_index -= 1 # include " character in next catch
47
48 elif match.group() == '/*' and not in_string and not in_multiline_comment and not in_singleline_comment:
49 in_multiline_comment = True
50 elif match.group() == '*/' and not in_string and in_multiline_comment and not in_singleline_comment:
51 in_multiline_comment = False
52 elif match.group() == '//' and not in_string and not in_multiline_comment and not in_singleline_comment:
53 in_singleline_comment = True
54 elif (match.group() == '\n' or match.group() == '\r') and not in_string and not in_multiline_comment and in_singleline_comment:
55 in_singleline_comment = False
56 elif not in_multiline_comment and not in_singleline_comment and (
57 match.group() not in ['\n', '\r', ' ', '\t'] or not strip_space):
58 new_str.append(match.group())
59
60 new_str.append(json[from_index:])
61 return ''.join(new_str)
62
63
64class V2DBConvert(object):
65 def __init__(self):
66 super(V2DBConvert, self).__init__()
67 self._src_db = None
68 self._dst_db = None
69 self._base_path = path(sys.argv[0]).realpath().dirname()
70 self._base_name = path(sys.argv[0]).basename()
71 self._cvt_process_file = self._base_path / '.cvtstep.txt'
72 self._cvt_config_file = self._base_path / "cvtcfg.ini"
73 self._cvt_rule_file = None
74 self._cvt_rules = None
75 self._cvt_sql_params = {}
76 self._cvt_exec_rule_name = []
77 self._list_rules = False
78 self._manual_convert = False
79
80 def run(self):
81 if not self._parse_command_line():
82 self.usage()
83 sys.exit(1)
84
85 current = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
Cheng Tang29963ee2013-06-26 15:25:09 +080086 print u"启动转换程序 Version: {0}, {1}".format(VERSION, current)
Cheng Tangeac10852013-06-25 16:43:32 +080087
88 if not self._load_convert_file():
89 self.die(u"不能加载转换规则文件")
90 print u"加载转换规则文件成功<{0}>".format(self._cvt_rule_file.basename())
91
92 if self._list_rules:
93 for rule in self._cvt_rules:
94 print u"转换规则[{0}]: {1}<{2}>".format(rule['name'], rule['desc'],
95 rule.get('remark', ''))
96 return
97
98 self._load_config()
99 try:
100 print u"等待连接原始数据..."
101 param = dict(database=self._src_database, host=self._src_host,
102 port=self._src_port, user=self._src_user, password=self._src_pswd,
103 name=self._src_name)
104 self._src_db = self._connect_db(self._src_type, **param)
105 print u"连接源库成功"
106
107 print u"等待连接目标数据..."
108 param = dict(database=self._dst_database, host=self._dst_host,
109 port=self._dst_port, user=self._dst_user, password=self._dst_pswd,
110 name=self._dst_name)
111 self._dst_db = self._connect_db(self._dst_type, **param)
112 print u"连接目标库成功"
113 except Exception, ex:
114 print u"连接数据失败, {0}".format(ex)
115 return
116
117 self._convert()
118
119 self._src_db.close()
120 self._dst_db.close()
121
122 def die(self, msg, exit_code=1):
123 print msg
124 sys.exit(exit_code)
125
126 def _convert(self):
127 for rule in self._cvt_rules:
128 if self._cvt_exec_rule_name:
129 if rule['name'] not in self._cvt_exec_rule_name:
130 continue
131 if self._manual_convert:
132 msg = u"是否转换规则<{0}>[{1}](Y/n)".format(rule['name'], rule['desc'])
133 ans = raw_input(msg.encode("utf-8"))
134 if not (ans is None or ans in ('Y', 'y')):
135 return
136 else:
137 print u"等待转换<{0}>[{1}]".format(rule['name'], rule['desc'])
138 if not self._do_convert(rule):
139 return
140
141 def _do_execute_sql(self, cursor, statment):
142 try:
143 cursor.prepare(statment)
144 sql_params = {}
145 for param in cursor.bindnames():
146 if param not in self._cvt_sql_params:
147 print u"参数<{0}>未指定".format(param)
148 return False
149 sql_params[param] = self._cvt_sql_params[param]
150 # print sql_params
151 if not cursor.execute(statment, **sql_params):
152 return True
153
154 return True
Cheng Tang29963ee2013-06-26 15:25:09 +0800155 except Exception as exc:
156 try:
157 if isinstance(exc, cx_Oracle.DatabaseError):
158 error, = exc.args
Cheng Tangb09b0222013-06-26 15:51:38 +0800159 msg = error.message.decode("utf-8")
Cheng Tang29963ee2013-06-26 15:25:09 +0800160 else:
161 msg = "{0}".format(exc)
162 print u"执行sql 失败, {0}".format(msg)
163 except UnicodeError:
164 print exc
165 print u"执行语句[{0}]错误".format(statment)
Cheng Tangeac10852013-06-25 16:43:32 +0800166 return False
167
168 def _do_convert(self, rule):
169 query_cursor = self._src_db.new_cursor()
170
171 if rule['action'] == 'truncate':
172 print u"Truncate 目标表数据..."
173 self._dst_db.exec_sql('truncate table {0}'.format(rule['name']))
174
175 if 'pre_exec' in rule:
176 for statment in rule['pre_exec']:
177 if not self._do_execute_sql(self._src_db.cursor, statment):
178 print u"执行 pre_exec 语句失败"
179 self._src_db.rollback()
180 return False
181 self._src_db.commit()
182
183 if not self._do_execute_sql(query_cursor, rule['src_sql']):
184 print u"查询原始数据失败"
185 return False
186 # print query_cursor.description
187 count = 0
188 self._dst_db.begin_transaction()
189 for row in query_cursor.fetchall():
190 v = []
191 for data in row:
192 if isinstance(data, int):
193 v.append('%d' % data)
194 elif isinstance(data, float):
195 v.append('%f' % data)
196 elif isinstance(data, str) or isinstance(data, unicode):
197 v.append("'{0}'".format(data))
198 elif data is None:
199 v.append('NULL')
200 else:
201 v.append(data)
202 values = ",".join(v)
203 insert_sql = 'insert into {0} ({1}) values({2})'.format(rule['name'],
204 rule['dest_column'], values)
205 # print insert_sql
Cheng Tang29963ee2013-06-26 15:25:09 +0800206 count += 1
Cheng Tangeac10852013-06-25 16:43:32 +0800207 if not self._dst_db.exec_sql(insert_sql):
Cheng Tang29963ee2013-06-26 15:25:09 +0800208 print u"导入第{0}条记录错误".format(count)
Cheng Tangeac10852013-06-25 16:43:32 +0800209 self._dst_db.rollback()
210 return False
Cheng Tang29963ee2013-06-26 15:25:09 +0800211
Cheng Tangeac10852013-06-25 16:43:32 +0800212 if count % self._commit_count == 0:
213 print u"导入数据 {0} 条".format(count)
214 self._dst_db.commit()
215 self._dst_db.begin_transaction()
216
217 self._dst_db.commit()
218 print u"导入数据 {0} 条".format(count)
219 if 'post_exec' in rule:
220 print u"执行 post_exec..."
221 for statment in rule['post_exec']:
222 if not self._do_execute_sql(self._dst_db.cursor, statment):
223 print u"执行 post_exec 语句失败"
224 self._dst_db.rollback()
225 return False
226 self._dst_db.commit()
227 print u"执行 post_exec 完成"
228 query_cursor.close()
229 return True
230
231 def _load_convert_file(self):
232 rule_doc = json_minify(self._cvt_rule_file.text(encoding="utf-8"))
233 self._cvt_rules = json.loads(rule_doc, "utf-8")
234 return True
235
236 def _parse_command_line(self):
237 execute_rules = ""
238 optlist, args = getopt.getopt(sys.argv[1:], ":c:r:t:hlm")
239 for k, v in optlist:
240 if k == "-c":
241 self._cvt_config_file = path(v)
242 elif k == "-r":
243 self._cvt_rule_file = path(v)
244 elif k == "-l":
245 self._list_rules = True
246 elif k == "-t":
247 execute_rules = v
248 elif k == "-m":
249 self._manual_convert = True
250 elif k == "-h":
251 self.usage()
252 sys.exit(0)
253
254 if not self._cvt_rule_file or not self._cvt_rule_file.exists():
255 print u"请指定转换规则文件"
256 return False
257
258 if not self._cvt_config_file.exists():
259 print u"请指定配置文件"
260 return False
261
262 if self._list_rules:
263 return True
264
265 if execute_rules:
266 target_rule = execute_rules.split(',')
267 self._cvt_exec_rule_name = [rule.upper() for rule in target_rule]
268 # print self._cvt_exec_rule_name
269
270 for v in args:
271 param = v.split('=')
272 if len(param) != 2:
273 print u"参数[{0}]错误!".format(v)
274 return False
275 kn = param[0].strip(' ').upper()
276 kv = param[1].strip(' ')
277 self._cvt_sql_params[kn] = kv
278
279 return True
280
281 def usage(self):
282 print u"转换程序 {0} Version: {1}".format(self._base_name, VERSION)
283 print u"\t-c 转换配置文件默认为 cvtcfg.ini"
284 print u"\t-r 转换配置规则文件"
285 print u"\t-t 转换使用表名,多个表名用逗号分隔;例如 t_card, t_customer"
286 print u"\t-l 只列出转换规则文件中所有的 target 不做转换"
287 print u"\t-h 输出帮助"
288
289 def _load_config(self):
290 parser = RawConfigParser()
291 parser.read(self._cvt_config_file)
292 cfg = dict(parser.items('database'))
293 if 'srcdsn' in cfg:
294 self._src_database = cfg['srcdsn']
295 self._src_host = None
296 self._src_port = 0
297 self._src_name = None
298 else:
299 self._src_host = cfg.get('srchost', '')
300 self._src_port = int(cfg.get('srcport', 0))
301 self._src_name = cfg.get('srcname', '')
302 self._src_database = None
303 self._src_user = cfg.get('srcuser', '')
304 self._src_pswd = cfg.get('srcpswd', '')
305 self._src_type = cfg.get('srctype', 'oracle')
306
307 if 'dstdsn' in cfg:
308 self._dst_database = cfg['dstdsn']
309 self._dst_host = None
310 self._dst_port = 0
311 self._dst_name = None
312 else:
313 self._dst_host = cfg.get('dsthost', '')
314 self._dst_port = int(cfg.get('dstport', 0))
315 self._dst_name = cfg.get('dstname', '')
316 self._dst_database = None
317 self._dst_user = cfg.get('dstuser', '')
318 self._dst_pswd = cfg.get('dstpswd', '')
319 self._dst_type = cfg.get('dsttype', 'oracle')
320
321 self._commit_count = int(cfg.get('commitcount', 1000))
322
323 def _connect_db(self, dbtype, **kwargs):
324 if dbtype == "oracle":
325 db = dbengine.OraEngine()
326 elif dbtype == "db2":
327 db = dbengine.DB2Engine()
328 else:
329 raise ValueError(u"不支持数据类型[{0}]".format(dbtype))
330
331 db.connect(database=kwargs['database'], user=kwargs['user'],
332 password=kwargs['password'], host=kwargs['host'],
333 port=kwargs['port'], name=kwargs['name'])
334 return db
335
336
337if __name__ == "__main__":
338 begin = time.time()
339 cvt = V2DBConvert()
340 cvt.run()
341 print u"执行时间 %.3f 秒" % ((time.time() - begin))