# -*- coding: utf-8 -*- | |
import sys,codecs,struct | |
IFT_NULL=0 | |
IFT_ULONG=1 | |
IFT_BYTE=2 | |
IFT_USHORT=3 | |
IFT_DATETIME=4 | |
IFT_BUFFER=5 | |
IFT_STRING=6 | |
IFT_LONG=7 | |
# (0,0,1,IFT_BYTE,0,"msgtype","消息码"), | |
FLD_NO_IDX=0 | |
FLD_LENGTH_IDX=2 | |
FLD_TYPE_IDX=3 | |
FLD_FLAG_IDX=4 | |
FLD_NAME_IDX=5 | |
FLD_DESC_IDX=6 | |
HDFieldsDef = [ | |
(1,0,0,IFT_NULL,0,"extend",u"扩展域否"), | |
(2,0,4,IFT_ULONG,0,"terminalsn",u" 终端序列号"), | |
(3,0,2,IFT_USHORT,0,"address",u"CAN地址"), | |
(4,0,4,IFT_ULONG,0,"cardnumber",u"卡号"), | |
(5,0,1,IFT_BYTE,0,"indexofew",u" 钱包索引"), | |
(6,0,2,IFT_USHORT,0,"traceofew",u" 钱包流水号"), | |
(7,0,4,IFT_LONG,0,"amount",u" 交易金额"), | |
(8,0,4,IFT_LONG,0,"additionalamount1",u"附加金额"), | |
(9,0,4,IFT_ULONG,0,"traceofpos",u"POS流水号"), | |
(10,0,4,IFT_ULONG,0,"orgtranstrace",u"原交易流水号"), | |
(11,0,7,IFT_DATETIME,0,"datetime",u"交易日期和时间"), | |
(12,0,1,IFT_BYTE,0,"responsecode",u"响应码"), | |
(13,0,2,IFT_USHORT,0,"terminalid",u"终端号"), | |
(14,0,2,IFT_USHORT,0,"terminaltype",u"终端类型"), | |
(15,0,2,IFT_USHORT,0,"merchantid",u"商户(网点)号"), | |
(16,0,4,IFT_ULONG,0,"operatorid",u"操作员号"), | |
(17,0,8,IFT_BUFFER,0,"pin",u"个人识别码PIN"), | |
(18,0,4,IFT_ULONG,0,"veroflist",u"黑名单版本"), | |
(19,0,4,IFT_ULONG,0,"managefee",u"交易批次号(暂不用)"), | |
(20,0,360,IFT_BUFFER,2,"additionaldata1",u"附加信息1"), | |
(21,0,528,IFT_BUFFER,2,"additionaldata2",u"附加信息2"), | |
(22,0,528,IFT_STRING,2,"additionaldata3",u"附加信息3"), | |
(23,0,528,IFT_STRING,2,"additionaldata4",u"附加信息4"), | |
(24,0,2,IFT_USHORT,0,"mac",u"消息认证码") | |
] | |
class HDPack: | |
''' 汇多 8583 协议包 ''' | |
def __init__(self): | |
self.msg_type = 0 | |
self.value={} | |
self.field_idx={} | |
self.field_name={} | |
self.msgtype = 0 | |
self.load_config() | |
def __eq__(self,other): | |
if not isinstance(other,HDPack): | |
return False | |
return self.compare(other) | |
def __ne__(self,other): | |
if not isinstance(other,HDPack): | |
return True | |
return not self.compare(other) | |
def compare(self,other): | |
if self.msgtype != other.msgtype: | |
return False | |
for k,v in self.value.items(): | |
if not other.value.has_key(k): | |
print "field [%s] not equal" % k | |
return False | |
if v <> other.get(k): | |
print "field [%s] value not equal [%s,%s]" % (k,v,other.get(k)) | |
return False | |
return True | |
def load_config(self): | |
for field in HDFieldsDef: | |
index,t,len,type,flag,name,desc = field | |
self.field_idx[index] = field | |
self.field_name[name] = field | |
#print self.field_name | |
def get_field_def(self,index): | |
field_def = None | |
if isinstance(index,int): | |
if not self.field_idx.has_key(index): | |
raise ValueError("field index %d not exists" % index) | |
field_def = self.field_idx[index] | |
elif isinstance(index,str): | |
if not self.field_name.has_key(index): | |
raise ValueError("field index %s not exists" % index) | |
field_def = self.field_name[index] | |
else: | |
raise ValueError("field index type not support") | |
return field_def | |
def set(self,index,value): | |
field_def = self.get_field_def(index) | |
t = field_def[FLD_TYPE_IDX] | |
n = field_def[FLD_NAME_IDX] | |
l = field_def[FLD_LENGTH_IDX] | |
f = field_def[FLD_FLAG_IDX] | |
if t == IFT_USHORT or t == IFT_ULONG or t == IFT_NULL or t == IFT_BYTE or t == IFT_LONG: | |
if isinstance(value,int): | |
b = value | |
elif isinstance(value,str): | |
if value.startswith('0x'): | |
b = int(value,16) | |
else: | |
b = int(value) | |
else: | |
raise ValueError("field %s value type not integer" % n) | |
elif t == IFT_DATETIME: | |
if len(value) != 12 and len(value) != 14: | |
raise ValueError("field %s length must be 12 or 14" % n) | |
if len(value) == 14: | |
value = value[2:] | |
b = value + "00" | |
else: | |
if not isinstance(value,str): | |
raise ValueError("field %s value type not string" % n) | |
if t == IFT_DATETIME: | |
l *= 2 | |
elif value.startswith('0x'): | |
l *= 2 | |
l += 2 | |
if f <> 2 and len(value) <> l: | |
raise ValueError("field %s value data length not equal %d<>%d " % (n,len(value),l)) | |
elif len(value) > l: | |
raise ValueError("field %s value data length exceed %d " % (n,l)) | |
b = value | |
self.value[n] = b | |
def get(self,index): | |
field_def = self.get_field_def(index) | |
t = field_def[3] | |
n = field_def[5] | |
if self.value.has_key(n): | |
return self.value[n] | |
raise ValueError("Field %s not specified" % n) | |
def encode_bitmap(self,bitmap): | |
buffer = '' | |
offset = 0 | |
#print "bit len[%d]" % len(bitmap) | |
while offset < len(bitmap): | |
bit = 0 | |
for i in range(8): | |
t = ord(bitmap[offset+i]) - 0x30 | |
if t == 0: continue | |
bit |= (1 << (7 - i % 8)) | |
buffer += chr(bit&0xFF) | |
offset += 8 | |
return buffer | |
def pack(self,workkey=None): | |
buffer = '' | |
bitmap = ['0' for i in range(24)] | |
for field in HDFieldsDef: | |
index,x,len,t,flag,name,desc = field | |
if not self.value.has_key(name): | |
continue | |
if t == IFT_USHORT or t == IFT_ULONG or t == IFT_NULL or t == IFT_BYTE or t == IFT_LONG: | |
buffer += self.int_2_buffer(self.value[name],t,len) | |
else: | |
buffer += self.string_2_buffer(self.value[name],t,len,flag) | |
bitmap[index-1] = '1' | |
#print bitmap | |
bitmap_buffer = self.encode_bitmap(bitmap) | |
full_data = chr(self.msgtype&0xFF) + bitmap_buffer + buffer | |
header = self.data_header(full_data) | |
#print "h[%s]d[%s]" % (codecs.encode(header,'hex'),codecs.encode(full_data,'hex')) | |
return header + full_data | |
def data_header(self,buffer): | |
if len(buffer) > 0xFFFF: | |
raise ValueError("Buffer max length exceed %d", len(buffer)) | |
header = struct.pack('<H',len(buffer)) | |
return header | |
def int_2_buffer(self,value,type,length): | |
buffer = '' | |
if type == IFT_BYTE: | |
buffer = chr(value%0x100) | |
elif type == IFT_NULL: | |
return buffer | |
elif type == IFT_USHORT: | |
v = value % 0x10000 | |
buffer = struct.pack('<H',v) | |
elif type == IFT_ULONG: | |
buffer = struct.pack('<I',value) | |
elif type == IFT_LONG: | |
buffer = struct.pack('<i',value) | |
else: | |
raise ValueError('Input Error') | |
return buffer | |
def encode_bcd(self,value): | |
if len(value) % 2 != 0: | |
raise ValueError("value length must div 2") | |
i = 0 | |
#print "value [%s]" % value | |
buffer = '' | |
while i < len(value): | |
t1 = (ord(value[i]) - 0x30)& 0xFF | |
t2 = (ord(value[i+1]) - 0x30) & 0xFF | |
#print "t1[%d]t2[%d]" % (t1,t2) | |
t = ((t1<<4) | t2) & 0xFF | |
#print "t1[%d]t2[%d]t[%d]" % (t1,t2,t) | |
buffer += chr(t) | |
i += 2 | |
#print "bcd value[%s]" % codecs.encode(buffer,'hex') | |
return buffer | |
def decode_bcd(self,value): | |
buffer = '' | |
#print "bcd [%s]" % codecs.encode(value,'hex') | |
for c in value: | |
t = ord(c) & 0xFF | |
t1 = (t >> 4) & 0x0F | |
t2 = t & 0x0F | |
#print "t[%02x]t1[%d]t2[%d]" % (t,t1,t2) | |
buffer += chr(t1 + 0x30) + chr(t2 + 0x30) | |
#print "bcd value[%s]" % buffer | |
return buffer | |
def string_2_buffer(self,value,type,length,flag): | |
buffer = '' | |
if type == IFT_STRING or type == IFT_BUFFER: | |
if value.startswith('0x'): | |
buffer = codecs.decode(value[2:],'hex') | |
else: | |
buffer = value | |
elif type == IFT_DATETIME: | |
buffer = '' | |
i = 0 | |
while i < len(value): | |
t = int(value[i:i+2]) | |
buffer += chr(t & 0xFF) | |
i += 2 | |
else: | |
raise ValueError('Input Error') | |
if flag == 2: | |
header = struct.pack('<H',len(buffer) % 0x10000) | |
return header + buffer | |
else: | |
#if type == IFT_BUFFER: | |
# if len(buffer) % 2 <> 0 or len(buffer)/2 <> length: | |
# raise ValueError("Value length not matched [%s]" % value) | |
# buffer = codecs.decode(buffer,'hex') | |
if len(buffer) != length: | |
padlen = length - len(buffer) | |
if padlen < 0: | |
raise ValueError("Value length not matched [%s]" % value) | |
pad = chr(0) * padlen | |
buffer = pad + buffer | |
return buffer | |
def decode_bitmap(self,bitmap): | |
if len(bitmap) != 3: | |
raise ValueError("bitmap length error") | |
#print "bitmap[%s]" % codecs.encode(bitmap,'hex') | |
buffer=[] | |
for t in bitmap: | |
v = ord(t) & 0xFF | |
for i in range(8): | |
x = (v & (1 << (7-i))) | |
if x > 0: | |
buffer.append('1') | |
else: | |
buffer.append('0') | |
return buffer | |
def unpack(self,data): | |
self.value = {} | |
#print "data[%s]" % codecs.encode(data,'hex') | |
data_len = struct.unpack('<H',data[:2])[0] | |
if len(data) != data_len + 2: | |
print "input[%d] extually[%d]" % (data_len,len(data)) | |
return False | |
offset = 2 | |
self.msgtype = ord(data[offset]) & 0xFF | |
offset += 1 | |
bitmap = self.decode_bitmap(data[offset:offset+3]) | |
offset += 3 | |
#print bitmap | |
for i in range(len(bitmap)): | |
if bitmap[i] == '0': continue | |
findex = i + 1 | |
field = self.get_field_def(findex) | |
ftype = field[FLD_TYPE_IDX] | |
flen = field[FLD_LENGTH_IDX] | |
fflag = field[FLD_FLAG_IDX] | |
fname = field[FLD_NAME_IDX] | |
#print "parse field[%s] [%d]off[%d][%s]" % (fname,fflag,offset,codecs.encode(data[offset:offset+2],'hex')) | |
if fflag == 2: # 变长数据 | |
#print "len [%s]" % codecs.encode(data[offset:offset+2],'hex') | |
endpos = self.buffer_2_int(data[offset:offset+2],IFT_USHORT) | |
offset += 2 | |
endpos += offset | |
else: | |
endpos = offset + flen | |
if ftype == IFT_USHORT or ftype == IFT_ULONG or ftype == IFT_NULL or ftype == IFT_BYTE or ftype == IFT_LONG: | |
value = self.buffer_2_int(data[offset:endpos],ftype) | |
else: | |
#temp = self.buffer_2_string(data[offset:endpos],ftype) | |
#if fflag != 2 and ftype == IFT_STRING: | |
# for i in temp: | |
# print "char [%02x]" % ord(i) | |
# if ord(i) > 0: | |
# value += i | |
#else: | |
# value = temp | |
value = self.buffer_2_string(data[offset:endpos],ftype) | |
if fflag <> 2 and ftype == IFT_BUFFER: | |
value = '0x' + codecs.encode(value,'hex') | |
self.set(findex,value) | |
#print "parse field[%s] OK =====" % fname | |
offset = endpos | |
#print self.value | |
return True | |
def buffer_2_int(self,data,type): | |
if type == IFT_BYTE: | |
return ord(data) | |
elif type == IFT_USHORT: | |
return struct.unpack('<H',data)[0] | |
elif type == IFT_ULONG: | |
return struct.unpack('<I',data)[0] | |
elif type == IFT_LONG: | |
return struct.unpack('<i',data)[0] | |
elif type == IFT_NULL: | |
return None | |
else: | |
raise ValueError("input data type error") | |
def buffer_2_string(self,data,type): | |
if type == IFT_STRING: | |
return data | |
elif type == IFT_BUFFER: | |
return data | |
elif type == IFT_DATETIME: | |
return self.decode_bcd(data) | |
else: | |
raise ValueError("input data type error") | |
def get_2byte_int(self,data): | |
return struct.unpack(">H",data)[0] | |
def get_3byte_int(self,data): | |
temp = data + chr(0) | |
return struct.unpack("<I",temp)[0] | |
def get_4byte_int(self,data): | |
return struct.unpack("<I",data)[0] | |
def set_2byte_int(self,data): | |
return struct.pack('<H',data) | |
def set_3byte_int(self,data): | |
temp = struct.pack('<H',data) | |
return temp[:-1] | |
def set_4byte_int(self,data): | |
return struct.pack('<I',data) |