blob: 4c09bd2e088ef5a2c1f0b164d371906c1d1d35a2 [file] [log] [blame]
Cheng Tangeac10852013-06-25 16:43:32 +08001#-*- coding: utf-8
2import sys
3from ConfigParser import RawConfigParser
4from datetime import datetime, timedelta
5import getopt
6import dbengine
7import json
8import locale
9import time
10from path import path
11import re
12
13
14VERSION = "1.1"
15SYSENCODING = locale.getdefaultlocale()[1]
16
17if not SYSENCODING:
18 SYSENCODING = "utf-8"
19
20
21def json_minify(json, strip_space=True):
22 tokenizer = re.compile('"|(/\*)|(\*/)|(//)|\n|\r')
23 in_string = False
24 in_multiline_comment = False
25 in_singleline_comment = False
26
27 new_str = []
28 from_index = 0 # from is a keyword in Python
29
30 for match in re.finditer(tokenizer, json):
31
32 if not in_multiline_comment and not in_singleline_comment:
33 tmp2 = json[from_index:match.start()]
34 if not in_string and strip_space:
35 tmp2 = re.sub('[ \t\n\r]*', '', tmp2) # replace only white space defined in standard
36 new_str.append(tmp2)
37
38 from_index = match.end()
39
40 if match.group() == '"' and not in_multiline_comment and not in_singleline_comment:
41 escaped = re.search('(\\\\)*$', json[:match.start()])
42 if not in_string or escaped is None or len(escaped.group()) % 2 == 0:
43 # start of string with ", or unescaped " character found to end string
44 in_string = not in_string
45 from_index -= 1 # include " character in next catch
46
47 elif match.group() == '/*' and not in_string and not in_multiline_comment and not in_singleline_comment:
48 in_multiline_comment = True
49 elif match.group() == '*/' and not in_string and in_multiline_comment and not in_singleline_comment:
50 in_multiline_comment = False
51 elif match.group() == '//' and not in_string and not in_multiline_comment and not in_singleline_comment:
52 in_singleline_comment = True
53 elif (match.group() == '\n' or match.group() == '\r') and not in_string and not in_multiline_comment and in_singleline_comment:
54 in_singleline_comment = False
55 elif not in_multiline_comment and not in_singleline_comment and (
56 match.group() not in ['\n', '\r', ' ', '\t'] or not strip_space):
57 new_str.append(match.group())
58
59 new_str.append(json[from_index:])
60 return ''.join(new_str)
61
62
63class V2DBConvert(object):
64 def __init__(self):
65 super(V2DBConvert, self).__init__()
66 self._src_db = None
67 self._dst_db = None
68 self._base_path = path(sys.argv[0]).realpath().dirname()
69 self._base_name = path(sys.argv[0]).basename()
70 self._cvt_process_file = self._base_path / '.cvtstep.txt'
71 self._cvt_config_file = self._base_path / "cvtcfg.ini"
72 self._cvt_rule_file = None
73 self._cvt_rules = None
74 self._cvt_sql_params = {}
75 self._cvt_exec_rule_name = []
76 self._list_rules = False
77 self._manual_convert = False
78
79 def run(self):
80 if not self._parse_command_line():
81 self.usage()
82 sys.exit(1)
83
84 current = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
85 print "启动转换程序 Version: {0}, {1}".format(VERSION, current)
86
87 if not self._load_convert_file():
88 self.die(u"不能加载转换规则文件")
89 print u"加载转换规则文件成功<{0}>".format(self._cvt_rule_file.basename())
90
91 if self._list_rules:
92 for rule in self._cvt_rules:
93 print u"转换规则[{0}]: {1}<{2}>".format(rule['name'], rule['desc'],
94 rule.get('remark', ''))
95 return
96
97 self._load_config()
98 try:
99 print u"等待连接原始数据..."
100 param = dict(database=self._src_database, host=self._src_host,
101 port=self._src_port, user=self._src_user, password=self._src_pswd,
102 name=self._src_name)
103 self._src_db = self._connect_db(self._src_type, **param)
104 print u"连接源库成功"
105
106 print u"等待连接目标数据..."
107 param = dict(database=self._dst_database, host=self._dst_host,
108 port=self._dst_port, user=self._dst_user, password=self._dst_pswd,
109 name=self._dst_name)
110 self._dst_db = self._connect_db(self._dst_type, **param)
111 print u"连接目标库成功"
112 except Exception, ex:
113 print u"连接数据失败, {0}".format(ex)
114 return
115
116 self._convert()
117
118 self._src_db.close()
119 self._dst_db.close()
120
121 def die(self, msg, exit_code=1):
122 print msg
123 sys.exit(exit_code)
124
125 def _convert(self):
126 for rule in self._cvt_rules:
127 if self._cvt_exec_rule_name:
128 if rule['name'] not in self._cvt_exec_rule_name:
129 continue
130 if self._manual_convert:
131 msg = u"是否转换规则<{0}>[{1}](Y/n)".format(rule['name'], rule['desc'])
132 ans = raw_input(msg.encode("utf-8"))
133 if not (ans is None or ans in ('Y', 'y')):
134 return
135 else:
136 print u"等待转换<{0}>[{1}]".format(rule['name'], rule['desc'])
137 if not self._do_convert(rule):
138 return
139
140 def _do_execute_sql(self, cursor, statment):
141 try:
142 cursor.prepare(statment)
143 sql_params = {}
144 for param in cursor.bindnames():
145 if param not in self._cvt_sql_params:
146 print u"参数<{0}>未指定".format(param)
147 return False
148 sql_params[param] = self._cvt_sql_params[param]
149 # print sql_params
150 if not cursor.execute(statment, **sql_params):
151 return True
152
153 return True
154 except Exception as e:
155 print u"执行sql 失败, {0}".format(e)
156 return False
157
158 def _do_convert(self, rule):
159 query_cursor = self._src_db.new_cursor()
160
161 if rule['action'] == 'truncate':
162 print u"Truncate 目标表数据..."
163 self._dst_db.exec_sql('truncate table {0}'.format(rule['name']))
164
165 if 'pre_exec' in rule:
166 for statment in rule['pre_exec']:
167 if not self._do_execute_sql(self._src_db.cursor, statment):
168 print u"执行 pre_exec 语句失败"
169 self._src_db.rollback()
170 return False
171 self._src_db.commit()
172
173 if not self._do_execute_sql(query_cursor, rule['src_sql']):
174 print u"查询原始数据失败"
175 return False
176 # print query_cursor.description
177 count = 0
178 self._dst_db.begin_transaction()
179 for row in query_cursor.fetchall():
180 v = []
181 for data in row:
182 if isinstance(data, int):
183 v.append('%d' % data)
184 elif isinstance(data, float):
185 v.append('%f' % data)
186 elif isinstance(data, str) or isinstance(data, unicode):
187 v.append("'{0}'".format(data))
188 elif data is None:
189 v.append('NULL')
190 else:
191 v.append(data)
192 values = ",".join(v)
193 insert_sql = 'insert into {0} ({1}) values({2})'.format(rule['name'],
194 rule['dest_column'], values)
195 # print insert_sql
196 if not self._dst_db.exec_sql(insert_sql):
197 self._dst_db.rollback()
198 return False
199 count += 1
200 if count % self._commit_count == 0:
201 print u"导入数据 {0} 条".format(count)
202 self._dst_db.commit()
203 self._dst_db.begin_transaction()
204
205 self._dst_db.commit()
206 print u"导入数据 {0} 条".format(count)
207 if 'post_exec' in rule:
208 print u"执行 post_exec..."
209 for statment in rule['post_exec']:
210 if not self._do_execute_sql(self._dst_db.cursor, statment):
211 print u"执行 post_exec 语句失败"
212 self._dst_db.rollback()
213 return False
214 self._dst_db.commit()
215 print u"执行 post_exec 完成"
216 query_cursor.close()
217 return True
218
219 def _load_convert_file(self):
220 rule_doc = json_minify(self._cvt_rule_file.text(encoding="utf-8"))
221 self._cvt_rules = json.loads(rule_doc, "utf-8")
222 return True
223
224 def _parse_command_line(self):
225 execute_rules = ""
226 optlist, args = getopt.getopt(sys.argv[1:], ":c:r:t:hlm")
227 for k, v in optlist:
228 if k == "-c":
229 self._cvt_config_file = path(v)
230 elif k == "-r":
231 self._cvt_rule_file = path(v)
232 elif k == "-l":
233 self._list_rules = True
234 elif k == "-t":
235 execute_rules = v
236 elif k == "-m":
237 self._manual_convert = True
238 elif k == "-h":
239 self.usage()
240 sys.exit(0)
241
242 if not self._cvt_rule_file or not self._cvt_rule_file.exists():
243 print u"请指定转换规则文件"
244 return False
245
246 if not self._cvt_config_file.exists():
247 print u"请指定配置文件"
248 return False
249
250 if self._list_rules:
251 return True
252
253 if execute_rules:
254 target_rule = execute_rules.split(',')
255 self._cvt_exec_rule_name = [rule.upper() for rule in target_rule]
256 # print self._cvt_exec_rule_name
257
258 for v in args:
259 param = v.split('=')
260 if len(param) != 2:
261 print u"参数[{0}]错误!".format(v)
262 return False
263 kn = param[0].strip(' ').upper()
264 kv = param[1].strip(' ')
265 self._cvt_sql_params[kn] = kv
266
267 return True
268
269 def usage(self):
270 print u"转换程序 {0} Version: {1}".format(self._base_name, VERSION)
271 print u"\t-c 转换配置文件默认为 cvtcfg.ini"
272 print u"\t-r 转换配置规则文件"
273 print u"\t-t 转换使用表名,多个表名用逗号分隔;例如 t_card, t_customer"
274 print u"\t-l 只列出转换规则文件中所有的 target 不做转换"
275 print u"\t-h 输出帮助"
276
277 def _load_config(self):
278 parser = RawConfigParser()
279 parser.read(self._cvt_config_file)
280 cfg = dict(parser.items('database'))
281 if 'srcdsn' in cfg:
282 self._src_database = cfg['srcdsn']
283 self._src_host = None
284 self._src_port = 0
285 self._src_name = None
286 else:
287 self._src_host = cfg.get('srchost', '')
288 self._src_port = int(cfg.get('srcport', 0))
289 self._src_name = cfg.get('srcname', '')
290 self._src_database = None
291 self._src_user = cfg.get('srcuser', '')
292 self._src_pswd = cfg.get('srcpswd', '')
293 self._src_type = cfg.get('srctype', 'oracle')
294
295 if 'dstdsn' in cfg:
296 self._dst_database = cfg['dstdsn']
297 self._dst_host = None
298 self._dst_port = 0
299 self._dst_name = None
300 else:
301 self._dst_host = cfg.get('dsthost', '')
302 self._dst_port = int(cfg.get('dstport', 0))
303 self._dst_name = cfg.get('dstname', '')
304 self._dst_database = None
305 self._dst_user = cfg.get('dstuser', '')
306 self._dst_pswd = cfg.get('dstpswd', '')
307 self._dst_type = cfg.get('dsttype', 'oracle')
308
309 self._commit_count = int(cfg.get('commitcount', 1000))
310
311 def _connect_db(self, dbtype, **kwargs):
312 if dbtype == "oracle":
313 db = dbengine.OraEngine()
314 elif dbtype == "db2":
315 db = dbengine.DB2Engine()
316 else:
317 raise ValueError(u"不支持数据类型[{0}]".format(dbtype))
318
319 db.connect(database=kwargs['database'], user=kwargs['user'],
320 password=kwargs['password'], host=kwargs['host'],
321 port=kwargs['port'], name=kwargs['name'])
322 return db
323
324
325if __name__ == "__main__":
326 begin = time.time()
327 cvt = V2DBConvert()
328 cvt.run()
329 print u"执行时间 %.3f 秒" % ((time.time() - begin))