Commit f68e5e7f authored by Manuel Navarrete's avatar Manuel Navarrete
Browse files

update test resistencia

parent b6704cf6
import struct
import time
import sys
import os
import shelve
import polling
VALUES = {'slave': False}
def save(m):
try:
d = shelve.open('prueba.db','c')
except Exception as e:
print 'Error de guardado'
else:
if m == False:
m = True
else:
m = False
print 'Dato a guardar',m
d['slave'] = m
d.close()
def read():
t = False
try:
d = shelve.open('prueba.db','c')
except Exception as e:
print 'Error de guardado'
return t
else:
m = d['slave']
d.close()
print 'Dato guardado',m
return m
def doing():
m = read()
save(m)
read()
if __name__ == '__main__':
try:
polling.poll(lambda:doing(), step=0.5, timeout=2)
except Exception as e:
print 'error', e
......@@ -191,7 +191,7 @@ def test_resistence(tx_tr,tx_r,tx_test,slave):
: pararm tx_test: Frame to do the test of resistence
: type tx_test: Buffer of bytes
"""
aux = False
tr = get_tr(tx_tr)
if tr == True:
......@@ -206,11 +206,13 @@ def test_resistence(tx_tr,tx_r,tx_test,slave):
data = get_data(slave)
real_time = time.time()
time_saved = real_time-data[TIME_KEY]
if time_saved>timexp:
if time_saved>timexp and aux==False:
print 'tiempo de expiracion superado'
#AQUI ESTA EL FALLO
m.raw(tx_test)
aux = True
elif time_saved>timexp and aux==True:
save_data(r,slave)
aux = False
else:
print 'Tiempo no superado'
......
"""
Example to check the value of the resistence and if the time has expired to do the resistance test
Device:
Intelligent battery monitor
Model: HUASU H3G-TA-12
http://www.huasucn.com/En/download/H3G-TVProductionDescriptionA0.pdf
Device:
Converts UART bus to RS485 bus
Model: HUASU CTB-485
TX_TR
Addres of slave 1: 02h
Function code: 03h
Addres of the first register: 000Ch
Quanty of registers: 0001h
RX_TR
Addres of slave: 1 Byte
Function code: 1 Byte
Byte count: 1 Byte
Value 2 Bytes
CRC: 2 Bytes
TX_R
Addres of slave 1: 02h
Function code: 03h
Addres of the first register: 0003h
Quanty of registers: 0001h
RX_R
Addres of slave: 1 Byte
Function code: 1 Byte
Byte count: 1 Byte
Value 2 Bytes
CRC: 2 Bytes
TX_TEST
Addres of slave 1: 01h
Function code: 10h
Addres of the first register: 000Ch
Quanty of registers: 0001h
Data bytes count: 02h
Data written to register: F0F0h
"""
__author__ = 'Manuel Navarrete'
__copyright__ = 'Copyright (C) 2017'
__license__ = 'MIT (expat) License'
__version__ = '0.1'
__email__ = 'manuel.navarrete@whitewallenergy.com'
import struct
import time
import sys
import os
import shelve
import polling
module_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, module_path)
import modbus
DATA = 'data2.db'
VALUE_KEY = 'value'
TIME_KEY = 'time'
SENT_KEY = 'sent'
VALUES = {VALUE_KEY: 0, TIME_KEY: 0,SENT_KEY: False}
def get_tr(tx):
rx_tr = m.raw(tx)
print list (rx_tr)
if len(rx_tr)==7:
value = struct.unpack('>H',rx_tr[3:5])[0]
if value == 0:
print'Test de resistencia a cero'
rx = True
else:
print'Trama del test de resistencia erronea'
rx = False
elif len(rx_tr)>0:
print'Trama del test de resistencia erronea'
print list (rx_tr)
rx = False
else:
print'Test de resistencia ejecutandose o sin respuesta'
rx = False
return rx
def get_r(tx):
"""
Extract the value of the resistence
: pararm tx: Frame to send
: type rx: Buffer of bytes
: return result: the value of the resistence
: rtype: float
"""
r = -1
rx_r = m.raw(tx)
print list (rx_r)
#rx_r='\x03\x00\x00\x00\x15\x00\x00'
if len(rx_r) == 7:
r = struct.unpack('>H',rx_r[3:5])[0]
if r>0:
r /=1000.0
print "Cell Resistance:"+str(r)+" miliOhms"
else:
print 'Cell Resistance: 0 miliOhms'
elif len(rx)>0:
print'Trama del test de resistencia erronea'
else:
print'Test de resistencia ejecutandose'
return r
def save_data(value,slave):
d = None
time_value = time.time()
save_values = VALUES
save_values[VALUE_KEY] = value
save_values[TIME_KEY] = time_value
try:
d = shelve.open(DATA,'c')
except Exception as e:
print 'Error de guardado'
else:
d[slave] = save_values
d.close()
print 'Dato guardado'
def get_data(slave):
d = None
result = VALUES
try:
d = shelve.open(DATA,'c')
except Exception as e:
print 'Error de lectura'
return result
else:
klist = d.keys()
for i in klist:
if slave == i:
result = d[slave]
d.close()
return result
def save_sent(sent,slave):
d = None
if sent == False:
sent = True
else:
sent = False
try:
d = shelve.open(DATA,'c')
except Exception as e:
print 'Error de guardado'
else:
result = d[slave]
result[SENT_KEY] = sent
d[slave] = result
d.close()
print 'Variable sent guardada'
def get_sent(slave):
result = None
try:
d = shelve.open(DATA,'c')
except Exception as e:
print 'Error de lectura'
return result
else:
klist = d.keys()
for i in klist:
if slave == i:
result = d[slave]
result = result[SENT_KEY]
d.close()
return result
def test_resistence(tx_tr,tx_r,tx_test,slave,timexp):
data = get_data(slave)
real_time = time.time()
time_saved = real_time-data[TIME_KEY]
if time_saved>timexp:
print 'Tiempo expirado'
tr = get_tr(tx_tr)
if tr == True:
print 'test de resistencia a 0'
test_sent = get_sent(slave)
print 'test leido:',test_sent
if test_sent == False:
m.raw(tx_test)
print 'test enviado'
save_sent(test_sent,slave)
else:
r = get_r(tx_r)
save_data(r,slave)
save_sent(test_sent,slave)
else:
print 'Tiempo no expirado'
print 'esperamos 10 segundos'
if __name__ == '__main__':
m = modbus.Modbus()
timexp = 40
slave_address = 3
slave = str(slave_address)
slave_code = struct.pack('B',slave_address)
tx_tr = slave_code + '\x03\x00\x0C\x00\x01'
tx_tr += m.get_crc(tx_tr)
tx_r = slave_code + '\x03\x00\x03\x00\x01'
tx_r +=m.get_crc(tx_r)
tx_test = slave_code + '\x10\x00\x0C\x00\x01\x02\xF0\xF0'
tx_test += m.get_crc(tx_test)
polling.poll(lambda:test_resistence(tx_tr,tx_r,tx_test,slave,timexp), step=10, timeout=120)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment