# -*- coding: utf-8 | |
""" | |
一卡通webservice 请求 session 封装 | |
""" | |
import urllib | |
from datetime import datetime | |
from tornado.httpclient import HTTPClient, HTTPRequest, HTTPError | |
import json | |
import traceback | |
class SWWebserviceError(BaseException): | |
def __init__(self, code, msg): | |
self.code = code | |
self.msg = msg | |
def __str__(self): | |
try: | |
msg = self.msg.encode('utf-8', 'ignore') | |
except: | |
msg = self.msg.encode('gbk', 'ignore') | |
return "%d : %s" % (self.code, msg) | |
def __unicode__(self): | |
return u"%d: %s" % (self.code, self.msg) | |
class SWWebserviceSession(object): | |
""" | |
webservice session object | |
example | |
session = SWWebserviceSession('http://localhost:8080/yktapi/services', | |
'123213','3334df334223',1) | |
try: | |
if not session.auth(): | |
print u"签到失败" | |
except SWWebserviceError, ex: | |
print "Error : %d , %s" % (ex.code, ex.msg) | |
""" | |
def __init__(self, service_url, appid, appsecret, termid): | |
self.service_url = service_url | |
self.appid = appid | |
self.appsecret = appsecret | |
if isinstance(termid, int): | |
self.termid = str(termid) | |
elif isinstance(termid, float): | |
self.termid = str(termid) | |
else: | |
self.termid = termid | |
self.access_token = None | |
self.session_key = None | |
self.error_msg = None | |
self.status_code = None | |
def auth(self, timeout=10): | |
""" | |
签到 | |
""" | |
if not self._get_access_token(timeout): | |
return False | |
if not self._get_session_key(timeout): | |
return False | |
return True | |
def logoff(self): | |
""" | |
签退 | |
""" | |
self.access_token = None | |
self.session_key = None | |
def is_logon(self): | |
""" | |
是否签到成功 | |
""" | |
return self.session_key != None | |
def _get_access_token(self, timeout): | |
url = ''.join((self.service_url, '/authservice/getauth/', | |
self.appid, '/getaccesstoken?term_id=', self.termid)) | |
http_client = HTTPClient() | |
try: | |
response = http_client.fetch(url, connect_timeout=timeout, | |
request_timeout=timeout) | |
try: | |
result = json.loads(response.body) | |
self.access_token = result.get('access_token', None) | |
if not self.access_token: | |
raise SWWebserviceError(501, u"系统返回值正确") | |
return True | |
except: | |
raise SWWebserviceError(500, u"无法解析返回值") | |
except HTTPError, ex: | |
if not ex.response: | |
self.error_msg = u"检查与服务器的连接" | |
return False | |
self.status_code = ex.response.code | |
self.error_msg = ex.response.body | |
return False | |
def _sign_data(self, key, data): | |
import hmac | |
import hashlib | |
h = hmac.new(key, digestmod=hashlib.sha1) | |
h.update(data) | |
return (h.hexdigest(), 'HMAC') | |
def _get_session_key(self, timeout): | |
timestamp = self.nonce() | |
sign, sign_meth = self._sign_data(self.appsecret, | |
self.access_token + timestamp) | |
r = (self.service_url, '/authservice/getauth/', self.appid, | |
'?access_token=', self.access_token, '&term_id=', self.termid, | |
"&sign_method=", sign_meth, "&sign=", sign, "×tamp=", | |
timestamp, "&v=1") | |
url = ''.join(r) | |
http_client = HTTPClient() | |
try: | |
response = http_client.fetch(url, connect_timeout=timeout, | |
request_timeout=timeout) | |
try: | |
result = json.loads(response.body) | |
self.session_key = result.get('session_key', None) | |
if not self.session_key: | |
raise SWWebserviceError(501, u"系统返回值正确") | |
return True | |
except: | |
raise SWWebserviceError(500, u"无法解析返回值") | |
except HTTPError, ex: | |
self.status_code = ex.response.code | |
self.error_msg = ex.response.body | |
return False | |
def nonce(self): | |
return datetime.now().strftime('%Y%m%d%H%M%S') | |
def gen_request_token(self): | |
""" | |
生成请求 token | |
""" | |
timestamp = self.nonce() | |
token = ''.join((self.appid, self.termid, | |
self.session_key, timestamp)) | |
sign, sign_meth = self._sign_data(self.appsecret, token) | |
# params = urllib.urlencode(dict(app_id=appinfo.get('appid'), | |
# term_id=appinfo.get('termid'), | |
# sign_method="HMAC", | |
# timestamp=timestamp, | |
# sign=sign)) | |
return dict(timestamp=timestamp, sign=sign, | |
app_id=self.appid, term_id=self.termid, | |
sign_method=sign_meth) | |
def request_token_urlencode(self, token): | |
""" | |
将 gen_request_token 返回的数据编码成请求url | |
""" | |
return urllib.urlencode(token) | |
class SWRequest(object): | |
def __init__(self, session): | |
self.session = session | |
self.service_url = self.session.service_url + '/ecardservice/ecardapi' | |
self.error_msg = None | |
def request(self, request, response, timeout=10.0): | |
if not self.session.is_logon(): | |
raise SWWebserviceError(u"session无效") | |
client = HTTPClient() | |
try: | |
http_request = HTTPRequest(url=self.service_url, method='POST', | |
headers={'Content-Type': 'application/json'}, | |
connect_timeout=timeout, | |
body=self._pack_body(request)) | |
http_resp = client.fetch(http_request) | |
except HTTPError, ex: | |
http_resp = ex.response | |
except: | |
traceback.print_exc() | |
return False | |
if not http_resp: | |
return False | |
self.status_code = http_resp.code | |
if http_resp.code != 200: | |
self.error_msg = '%d : %s' % (self.status_code, http_resp.body) | |
self.error_msg = self.error_msg.decode('utf-8', 'ignore') | |
else: | |
try: | |
response.unserialize(http_resp.body) | |
return True | |
except: | |
return False | |
def _pack_body(self, request): | |
funcdata = request.serialize() | |
request_token = self.session.gen_request_token() | |
request_token['funcdata'] = funcdata.decode('utf-8', 'ignore') | |
return json.dumps(request_token, ensure_ascii=False) |