Commit 6468e261 authored by Manuel Navarrete's avatar Manuel Navarrete
Browse files

update test resistence

parent f68e5e7f
import struct
import time
import sys
import os
import shelve
import polling
module_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, module_path)
import modbus
if __name__ == '__main__':
m=modbus.Modbus()
slave_address = 3
slave_code = struct.pack('B',slave_address)
tx_test = slave_code + '\x10\x00\x0C\x00\x01\x02\xF0\xF0'
tx_test += m.get_crc(tx_test)
tx_tr = slave_code + '\x03\x00\x0C\x00\x01'
tx_tr += m.get_crc(tx_tr)
tx_r = slave_code + '\x03\x00\x03\x00\x01'
tx_r +=m.get_crc(tx_r)
print list(m.raw(tx_test))
while True:
rx = m.raw(tx_r)
value = struct.unpack('>H',rx[3:5])[0]
print list(rx)
print value
print list(m.raw(tx_tr))
import struct
import time
import sys
import os
import shelve
import polling
VALUES = {'slave': False}
def save(m):
try:
d = shelve.open('prueba.db','c')
except Exception as e:
print 'Error de guardado'
else:
if m == False:
m = True
else:
m = False
print 'Dato a guardar',m
d['slave'] = m
d.close()
def read():
t = False
try:
d = shelve.open('prueba.db','c')
except Exception as e:
print 'Error de guardado'
return t
else:
m = d['slave']
d.close()
print 'Dato guardado',m
return m
def doing():
m = read()
save(m)
read()
if __name__ == '__main__':
try:
polling.poll(lambda:doing(), step=0.5, timeout=2)
except Exception as e:
print 'error', e
......@@ -63,36 +63,36 @@ module_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__
sys.path.insert(0, module_path)
import modbus
DATA = 'data2.db'
DATA = 'data.db'
VALUE_KEY = 'value'
TIME_KEY = 'time'
SENT_KEY = 'sent'
VALUES = {VALUE_KEY: 0, TIME_KEY: 0,SENT_KEY: False}
def get_tr(tx):
"""
Extract the value of the resistence test
: pararm tx: Frame to send
: type rx: Buffer of bytes
: return result: the value of the test
: rtype: Boolean
"""
rx = False
rx_tr = m.raw(tx)
print list (rx_tr)
if len(rx_tr)==7:
value = struct.unpack('>H',rx_tr[3:5])[0]
if value == 0:
print'Test de resistencia a cero'
rx = True
else:
print'Trama del test de resistencia erronea'
rx = False
elif len(rx_tr)>0:
print'Trama del test de resistencia erronea'
print list (rx_tr)
rx = False
else:
print'Test de resistencia ejecutandose o sin respuesta'
rx = False
return rx
def get_r(tx):
"""
Extract the value of the resistence
Extract the value of the resistence.
If the value of the resistence is -1, this means there has been an error
: pararm tx: Frame to send
: type rx: Buffer of bytes
......@@ -103,23 +103,23 @@ def get_r(tx):
"""
r = -1
rx_r = m.raw(tx)
print list (rx_r)
#rx_r='\x03\x00\x00\x00\x15\x00\x00'
if len(rx_r) == 7:
r = struct.unpack('>H',rx_r[3:5])[0]
if r>0:
if r>=0:
r /=1000.0
print "Cell Resistance:"+str(r)+" miliOhms"
else:
print 'Cell Resistance: 0 miliOhms'
elif len(rx)>0:
print'Trama del test de resistencia erronea'
else:
print'Test de resistencia ejecutandose'
return r
def save_data(value,slave):
"""
Save the value of the Resistance and the saved time in the slave address
: pararm value: Value of the Resistance
: type value: float
: pararm slave: Slave address
: type value: string
"""
d = None
time_value = time.time()
save_values = VALUES
......@@ -133,27 +133,41 @@ def save_data(value,slave):
else:
d[slave] = save_values
d.close()
print 'Dato guardado'
def get_data(slave):
"""
Extract the value of the resistence and the saved time
: pararm slave: Address slave
: type rx: string
: return result: the value of the resistence and the saved time
: rtype: json
"""
d = None
result = VALUES
try:
d = shelve.open(DATA,'c')
except Exception as e:
print 'Error de lectura'
return result
else:
klist = d.keys()
for i in klist:
if slave == i:
result = d[slave]
d.close()
if slave in d.keys():
result = d[slave]
d.close()
return result
def save_sent(sent,slave):
"""
Save the value if the test is sent or not.
The value is FALSE, can to send the test frame.
The value is TRUE, can not to send the test frame.
: pararm slave: Address slave
: type rx: string
"""
d = None
if sent == False:
sent = True
......@@ -168,56 +182,75 @@ def save_sent(sent,slave):
result[SENT_KEY] = sent
d[slave] = result
d.close()
print 'Variable sent guardada'
def get_sent(slave):
result = None
"""
Extract the value if the test is sent or not.
The value is FALSE, can to send the test frame.
The value is TRUE, can not to send the test frame.
: pararm slave: Address slave
: type rx: string
: return result: the value if the test is sent or not
: rtype: json
"""
result = False
try:
d = shelve.open(DATA,'c')
except Exception as e:
print 'Error de lectura'
return result
else:
klist = d.keys()
for i in klist:
if slave == i:
result = d[slave]
result = result[SENT_KEY]
d.close()
if slave in d.keys():
result = d[slave]
result = result[SENT_KEY]
d.close()
return result
def test_resistence(tx_tr,tx_r,tx_test,slave,timexp):
def checking(tx_tr,tx_r,tx_test,slave,timexp):
"""
Check the time of the value. If the time is greater than the maximun time, do the resistence test and save the value and time.
: pararm tx_tr: Frame to check the value of the test of resistence
: type tx_tr: Buffer of bytes
: pararm tx_r: Frame to check the value of the resistence
: type tx_r: Buffer of bytes
: pararm tx_test: Frame to do the test of resistence
: type tx_test: Buffer of bytes
: pararm slave: Address slave
: type rx: string
: timexp: the maximum time of a value
: tipe timexp: int
"""
data = get_data(slave)
real_time = time.time()
time_saved = real_time-data[TIME_KEY]
if time_saved>timexp:
print 'Tiempo expirado'
tr = get_tr(tx_tr)
if tr == True:
print 'test de resistencia a 0'
tr = get_tr(tx_tr)
if tr:
test_sent = get_sent(slave)
print 'test leido:',test_sent
if test_sent == False:
m.raw(tx_test)
print 'test enviado'
save_sent(test_sent,slave)
else:
if test_sent:
r = get_r(tx_r)
save_data(r,slave)
if r >= 0:
print'Valor guardado:',r
save_data(r,slave)
save_sent(test_sent,slave)
else:
m.raw(tx_test)
save_sent(test_sent,slave)
else:
print 'Tiempo no expirado'
print 'esperamos 10 segundos'
if __name__ == '__main__':
m = modbus.Modbus()
timexp = 40
timexp = 40 # seg
slave_address = 3
slave = str(slave_address)
slave_code = struct.pack('B',slave_address)
tx_tr = slave_code + '\x03\x00\x0C\x00\x01'
......@@ -227,4 +260,4 @@ if __name__ == '__main__':
tx_test = slave_code + '\x10\x00\x0C\x00\x01\x02\xF0\xF0'
tx_test += m.get_crc(tx_test)
polling.poll(lambda:test_resistence(tx_tr,tx_r,tx_test,slave,timexp), step=10, timeout=120)
polling.poll(lambda:checking(tx_tr,tx_r,tx_test,str(slave_address),timexp), step=10, timeout=120)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment