blob: 29cd25f13deac8c01d633170da65eac3887af88d [file] [log] [blame]
# -*- coding: utf-8
"""
一卡通webservice 请求 session 封装
"""
import urllib
from datetime import datetime
from tornado.httpclient import HTTPClient, HTTPRequest, HTTPError
import json
import traceback
class SWWebserviceError(BaseException):
def __init__(self, code, msg):
self.code = code
self.msg = msg
def __str__(self):
try:
msg = self.msg.encode('utf-8', 'ignore')
except:
msg = self.msg.encode('gbk', 'ignore')
return "%d : %s" % (self.code, msg)
def __unicode__(self):
return u"%d: %s" % (self.code, self.msg)
class SWWebserviceSession(object):
"""
webservice session object
example
session = SWWebserviceSession('http://localhost:8080/yktapi/services',
'123213','3334df334223',1)
try:
if not session.auth():
print u"签到失败"
except SWWebserviceError, ex:
print "Error : %d , %s" % (ex.code, ex.msg)
"""
def __init__(self, service_url, appid, appsecret, termid):
self.service_url = service_url
self.appid = appid
self.appsecret = appsecret
if isinstance(termid, int):
self.termid = str(termid)
elif isinstance(termid, float):
self.termid = str(termid)
else:
self.termid = termid
self.access_token = None
self.session_key = None
self.error_msg = None
self.status_code = None
def auth(self, timeout=10):
"""
签到
"""
if not self._get_access_token(timeout):
return False
if not self._get_session_key(timeout):
return False
return True
def logoff(self):
"""
签退
"""
self.access_token = None
self.session_key = None
def is_logon(self):
"""
是否签到成功
"""
return self.session_key != None
def _get_access_token(self, timeout):
url = ''.join((self.service_url, '/authservice/getauth/',
self.appid, '/getaccesstoken?term_id=', self.termid))
http_client = HTTPClient()
try:
response = http_client.fetch(url, connect_timeout=timeout,
request_timeout=timeout)
try:
result = json.loads(response.body)
self.access_token = result.get('access_token', None)
if not self.access_token:
raise SWWebserviceError(501, u"系统返回值正确")
return True
except:
raise SWWebserviceError(500, u"无法解析返回值")
except HTTPError, ex:
if not ex.response:
self.error_msg = u"检查与服务器的连接"
return False
self.status_code = ex.response.code
self.error_msg = ex.response.body
return False
def _sign_data(self, key, data):
import hmac
import hashlib
h = hmac.new(key, digestmod=hashlib.sha1)
h.update(data)
return (h.hexdigest(), 'HMAC')
def _get_session_key(self, timeout):
timestamp = self.nonce()
sign, sign_meth = self._sign_data(self.appsecret,
self.access_token + timestamp)
r = (self.service_url, '/authservice/getauth/', self.appid,
'?access_token=', self.access_token, '&term_id=', self.termid,
"&sign_method=", sign_meth, "&sign=", sign, "&timestamp=",
timestamp, "&v=1")
url = ''.join(r)
http_client = HTTPClient()
try:
response = http_client.fetch(url, connect_timeout=timeout,
request_timeout=timeout)
try:
result = json.loads(response.body)
self.session_key = result.get('session_key', None)
if not self.session_key:
raise SWWebserviceError(501, u"系统返回值正确")
return True
except:
raise SWWebserviceError(500, u"无法解析返回值")
except HTTPError, ex:
self.status_code = ex.response.code
self.error_msg = ex.response.body
return False
def nonce(self):
return datetime.now().strftime('%Y%m%d%H%M%S')
def gen_request_token(self):
"""
生成请求 token
"""
timestamp = self.nonce()
token = ''.join((self.appid, self.termid,
self.session_key, timestamp))
sign, sign_meth = self._sign_data(self.appsecret, token)
# params = urllib.urlencode(dict(app_id=appinfo.get('appid'),
# term_id=appinfo.get('termid'),
# sign_method="HMAC",
# timestamp=timestamp,
# sign=sign))
return dict(timestamp=timestamp, sign=sign,
app_id=self.appid, term_id=self.termid,
sign_method=sign_meth)
def request_token_urlencode(self, token):
"""
将 gen_request_token 返回的数据编码成请求url
"""
return urllib.urlencode(token)
class SWRequest(object):
def __init__(self, session):
self.session = session
self.service_url = self.session.service_url + '/ecardservice/ecardapi'
self.error_msg = None
def request(self, request, response, timeout=10.0):
if not self.session.is_logon():
raise SWWebserviceError(u"session无效")
client = HTTPClient()
try:
http_request = HTTPRequest(url=self.service_url, method='POST',
headers={'Content-Type': 'application/json'},
connect_timeout=timeout,
request_timeout=timeout,
body=self._pack_body(request, timeout))
http_resp = client.fetch(http_request)
except HTTPError, ex:
http_resp = ex.response
except:
traceback.print_exc()
return False
if not http_resp:
return False
self.status_code = http_resp.code
if http_resp.code != 200:
self.error_msg = '%d : %s' % (self.status_code, http_resp.body)
self.error_msg = self.error_msg.decode('utf-8', 'ignore')
else:
try:
response.unserialize(http_resp.body)
return True
except:
return False
def _pack_body(self, request, timeout):
funcdata = request.serialize()
request_token = self.session.gen_request_token()
if isinstance(funcdata, unicode):
request_token['funcdata'] = funcdata
elif isinstance(funcdata, str):
request_token['funcdata'] = funcdata.decode('gbk', 'ignore')
request_token['timeout'] = int(round(timeout, 0))
return json.dumps(request_token, ensure_ascii=False)