# --- Victron VE.Direct description ---------------------------------------------------------------------------- #| VE.Direct is using a "Text Protocol" and a "HEX Protocol". #| "Text Protocol": every second each VE.Direct device will send a stream of ASCII characters #| containing basic parameters and their values over a serial connection (over USB). Each "stream" #| starts with "\r" and ends with "\n". #| "HEX Protocol": one can talk to VE.Direct devices by sending a command composed of a hexadecimal #| value (sent as ASCII). The command has to have a right lenght (0x..55). #| This lenght is acheived by adding the last two digits of a right value. #| More information can be found at Victron's webpage https://www.victronenergy.com/support-and-downloads/whitepapers # This script: # 1. Finding BMVs and MPPTs # 1a. Finds all connected Victron devices with list_ports_matching_descr('Victron') # 1b. Identifies MPPT and BMV devices and saves their port, e.g. /dev/ttyUSB0 ( # 2. Prepares commands lists for temperature, voltage, etc. (each command is paired with an expected begining # of answer string send back from queried device). # 3. Writing commands to BMV, parsing the answer and sedning it to a MPPT (currently - only with the 1st found BMV and MPPT) # 3a. Sending a hex command to BMV and reading the answer # 3b. Adding the value (tempperature, voltage, etc.) from the answer to a MPPT command # 3c. Calculating and appending a checksum to the MPPT command # 3d. Sending a hex command to MPPT and reading the answer # To be done: # A. Recognizing more MPPT / BMV models in 'Identyfying Victron devices' # B. Reading values from more than one BMV (maybe with some averaging?) and sending commands to more than one MPPT ### This script sends a command to a BMV (Battery Monitor) to fetch current battery temperature/voltage # and then forwards this value to a MPPT (Solar Panels Controller) to improve it's temperature compensation ### IMPORTANT: communication with some Victron MPPT's (including the 100/50 used in this project) is only # possible when the solar panels are connected and providing electricity to the MTTP ### IMPORTANT: HEX protocol values are in Little Endian, i.e. inversed (e.g. a received temp. value of 0xA073 # should actually be read as 0x73A0 = 29600 decimal) import serial #Using pyserial Library from serial import SerialException import serial.tools.list_ports import sys #for sys.exit in while True loop import re #for slicing strings into 2-character "byte_list" import time #for loop timeout import logging as log #Checking command line input arguments if len(sys.argv) == 1: log.basicConfig(level=log.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') elif sys.argv[1] == '-debug': log.basicConfig(level=log.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') elif sys.argv[1] == '-info': log.basicConfig(level=log.INFO, format='%(asctime)s - %(levelname)s - %(message)s') else: print 'Please either run the script without any parameters or add -info or -debug to modify console output verbosity' log.basicConfig(level=log.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') def list_ports_matching_descr(dev_descr): ports_found = [] ports = serial.tools.list_ports.comports() for p in ports: log.info('Found device: %s, %s, %s', p.device, p.description, p.manufacturer) if dev_descr in str(p.manufacturer): ports_found.append(p.device) log.debug('Devices containing %s: %s', dev_descr, ports_found) return ports_found def initialize_port(port_name): try: ser = serial.Serial(port_name, 19200, timeout=5) log.info('Initalizing port: %s', ser.name) except SerialException: print port_name sys.exit(log.critical('Error: cannot open port, quitting...')) return ser def Victron_HEX_call(ser, write_hex, expected_read): log.info('Sending %s to %s', write_hex, ser.name) timeout = time.time() + 3 #s of timeout while True: ser.write(write_hex) read_hex=ser.read_until('\n') #Reading until "\n" symbol if read_hex[0:len(expected_read)] == expected_read: #Exiting after receiving a correct answer log.info('Received HEX string: %s \n', read_hex) break elif time.time() > timeout: log.warning('Timeout reached, received HEX string: %s \n', read_hex) break return read_hex # Funtion: calc. a checksum so that the byte sum of command+checksum (e.g. '0F'+'FD'+...) = 0x...55 # Argument: a str (Victron HEX Protocol command in hex values) # Returns: checksum str (one byte in hex e.g. '0A') def calc_checksum(command): log.debug('Command to calculate checksum: %s', command) if len(command) % 2 != 0: command = '0' + command #if odd, add leading 0 (e.g. 'A73' -> 0A73) log.debug('Command after making its lenght odd: %s', command) byte_list = re.findall('..',command) #divides command into list of 2-digit strings, e.g. '0A73' -> [0A, 73] log.debug('Byte list for checksum calucaltion: %s', byte_list) checksum = 0x55 byte_sum = 0x0 for byte in byte_list: byte_sum = byte_sum + int(byte,16) if byte_sum > checksum: #checksum has to be > byte_sum to avoid negatvie values in the next loop checksum = checksum + 0x100 #e.g. 0x55 + 0x100 = 0x155, etc. for byte in byte_list: checksum = checksum - int(byte,16) log.debug('Full checksum: %s', str(hex(checksum))) if checksum != 0: checksum = str(hex(checksum))[-2:] else: checksum = str(checksum) + str(checksum) # converts int=0 to a hex str '00' log.debug('Last 2 digits of checksum: %s', checksum) return checksum.upper() # returns last 2 characters as a uppercase string # Function: extract the data value from a Hex Protocol command # Argument: Victron Hex Protocol command (with or without the leading ':') # Returns: extracted 2-byte (4 hex digits) value # ToDo: add support for shorter data fields (1-byte/2-hex digits) def extract_Victron_value(hex_string): log.debug('Hex string to extract value %s, lenght = %d', hex_string, len(hex_string)) value_hex = hex_string[-7:-3] # e.g. 7ECED00A07371 -> A073 log.debug('Extracted value: %s', value_hex) return value_hex def switch_endian(hex_string): log.debug('Hex string to switch endianness: %s', hex_string) if len(hex_string) % 2 != 0: hex_string = '0' + hex_string #if odd, add leading 0 (e.g. 'A73' -> 0A73) log.debug('Hex string after making its lenght odd: %s', hex_string) byte_list = re.findall('..',hex_string) #divides command into list of 2-digit strings, e.g. '0A73' -> [0A, 73] log.debug('Byte list for switching endianness: %s', byte_list) reversed_string = '' for byte in reversed(byte_list): reversed_string = reversed_string + byte log.debug('Reversed string: %s', reversed_string) return reversed_string #Identyfying Victron devices port_list = list_ports_matching_descr('Victron') MPPT_port = [] BMV_port = [] for port in port_list: ser = initialize_port(port) read_hex = Victron_HEX_call(ser, ':451\n', ':1') ser.close() if read_hex[4:6] in ['A0', 'A1', '03']: #MPPT Product ID first byte MPPT_port.append(port) log.info('Found MPPT ID %s at port %s', read_hex[2:6], MPPT_port) if read_hex[4:6] in ['02', 'A3']: #BMV Product ID first byte BMV_port.append(port) log.info('Found BMV ID %s at port %s', read_hex[2:6], BMV_port) if not MPPT_port: log.critical('No MPPT port found, is the device connected and powered?') if not BMV_port: log.critical('No BMV port found, is the device connected and powered?') if (not MPPT_port) or (not BMV_port): sys.exit(log.critical('Quitting...')) #Commands to be send and expected beginnings of answers from queried devices temperature = { 'command_BMV': ':7ECED0075\n', #Get current battery temp from BMV with a termometer attached to battery plus terminal 'expect_BMV': ':7ECED', 'command_MPPT': '8200300', #Write battery temperature to MPPT 'expect_MPPT': ':82003' } voltage = { 'command_BMV': ':78DED00D4\n', #Get current battery volatege from BMV 'expect_BMV': ':78DED', 'command_MPPT': '8200200', #Write battery volatege to MPPT 'expect_MPPT': ':82002' } command_list = [temperature,voltage] #Writing commands to BMV, parsing the answer and sedning it to a MPPT (for each command set) for i in range(len(command_list)): #BMV ser = initialize_port(BMV_port[0]) read_hex = Victron_HEX_call(ser, command_list[i]['command_BMV'], command_list[i]['expect_BMV']) ser.close() read_value = extract_Victron_value(read_hex) read_value = switch_endian(read_value) log.info('Recevied value: HEX %s, DEC: %d', read_value, int(read_value,16)) if command_list[i]['expect_BMV'] == ':7ECED': #temperature temp_C = float( int(read_value,16)-27300 )/100 log.info('Value converted to temperature: %.2f deg C', temp_C) send_value = str( hex( int(temp_C*100) ) ) log.info('Value converted to hex at 0.01 scale: %s', send_value) send_value = send_value[2:len(send_value)] send_value = switch_endian(send_value).upper() log.debug('Temperature send_value for MPPT: %s', send_value) elif command_list[i]['expect_BMV'] == ':78DED': #voltage volt_V = float( int(read_value,16) )/100 log.info('Value converted to voltage %.2f V', volt_V) send_value = str( hex( int(volt_V*100) ) ) log.info('Value converted to hex at 0.01 scale: %s', send_value) send_value = send_value[2:len(send_value)] send_value = switch_endian(send_value).upper() log.debug('Voltage send_value for MPPT: %s', send_value) else: pass #skips sending unrecognised data to MPPT #Generating command to send to MPPT command = command_list[i]['command_MPPT'] + send_value log.debug('Command without header, footer and checksum: %s', command) command = ':' + command + calc_checksum(command) + '\n' log.info('Command to send to MPPT: %s', command) #MPPT ser = initialize_port(MPPT_port[0]) read_hex = Victron_HEX_call(ser, command, command_list[i]['expect_MPPT']) ser.close() read_value = extract_Victron_value(read_hex) read_value = switch_endian(read_value) log.info('Recevied value: HEX %s, DEC: %d', read_value, int(read_value,16)) if command_list[i]['expect_BMV'] == ':7ECED': log.info('Value converted to temperature %.2f deg C', float(int(read_value,16))/100) if command_list[i]['expect_BMV'] == ':78DED': log.info('Value converted to voltage %.2f V', float(int(read_value,16))/100)