Explication de la routine 17-Pixy-Emulation->pixy_spi_emulation simule le protocole Pixy spi

# 仿真Pixy spi协议
#
# 该脚本允许您的OpenMV Cam在uart模式下模拟Pixy(CMUcam5)。
# 请注意,您需要为您的应用程序设置下面的lab颜色阈值。
#
# P0 = MOSI
# P1 = MISO
# P2 = SCLK
# P3 = SS
#
# P7 = Servo 1
# P8 = Servo 2

#注意:在OpenMV Cam开始运行之前,请确保您的Arduino等正在运行其SPI代码。 
#另外,有时OpenMV Cam无法与Arduino SPI数据流同步。 发生这种情况时,
#重新启动您的OpenMV。 OpenMV Cam应该在1-9次尝试中同步。 
#否则使用此脚本的uart或I2C版本。

# Pixy参数############################################################

color_code_mode = 1 # 0 == Disabled, 1 == Enabled, 2 == Color Codes Only, 3 == Mixed

max_blocks = 1000
max_blocks_per_signature = 1000
min_block_area = 20

# Pan Servo
s0_lower_limit = 1000 # 伺服脉冲宽度下限(以微秒计)。
s0_upper_limit = 2000 # 伺服脉冲宽度上限(微秒)。

# Tilt Servo
s1_lower_limit = 1000 # 伺服脉冲宽度下限(微秒)。
s1_upper_limit = 2000 # 伺服脉冲宽度上限(微秒)。

analog_out_enable = False # P6 -> Analog Out (0v - 3.3v).
analog_out_mode = 0 # 0 == x position of largest blob - 1 == y position of largest blob

# Parameter 0 - L Min.
# Parameter 1 - L Max.
# Parameter 2 - A Min.
# Parameter 3 - A Max.
# Parameter 4 - B Min.
# Parameter 5 - B Max.
# Parameter 6 - Is Color Code Threshold? (True/False).
# Parameter 7 - Enable Threshold? (True/False).
lab_color_thresholds = [(0, 100, 40, 127, -128, 127, True, True), # 通用红色阈值
                        (0, 100, -128, -10, -128, 127, True, True), # 通用绿色阈值
                        (0, 0, 0, 0, 0, 0, False, False),
                        (0, 0, 0, 0, 0, 0, False, False),
                        (0, 0, 0, 0, 0, 0, False, False),
                        (0, 0, 0, 0, 0, 0, False, False),
                        (0, 0, 0, 0, 0, 0, False, False)]

fb_pixels_threshold = 500 # 一个blob中必须包含的最小像素数
fb_merge_margin = 5 # 相邻色块在合并之前最近的像素距离

##############################################################################

e_lab_color_thresholds = [] # #激活阈值
e_lab_color_code = [] # 开启颜色代码
e_lab_color_signatures = [] # 原始启用阈值索引
for i in range(len(lab_color_thresholds)):
    if lab_color_thresholds[i][7]:
        e_lab_color_thresholds.append(lab_color_thresholds[i][0:6])
        e_lab_color_code.append(lab_color_thresholds[i][6])
        e_lab_color_signatures.append(i + 1)

import image, math, pyb, sensor, struct, time

# 相机设置

sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(time = 2000)
sensor.set_auto_gain(False)
sensor.set_auto_whitebal(False)

# LED设置

red_led = pyb.LED(1)
green_led = pyb.LED(2)
blue_led = pyb.LED(3)

red_led.off()
green_led.off()
blue_led.off()

# DAC设置

dac = pyb.DAC("P6") if analog_out_enable else None

if dac:
    dac.write(0)

# 舵机设置

min_s0_limit = min(s0_lower_limit, s0_upper_limit)
max_s0_limit = max(s0_lower_limit, s0_upper_limit)
min_s1_limit = min(s1_lower_limit, s1_upper_limit)
max_s1_limit = max(s1_lower_limit, s1_upper_limit)

s0_pan = pyb.Servo(1) # P7
s1_tilt = pyb.Servo(2) # P8

s0_pan.pulse_width(int((max_s0_limit - min_s0_limit) // 2)) # center
s1_tilt.pulse_width(int((max_s1_limit - min_s1_limit) // 2)) # center

s0_pan_conversion_factor = (max_s0_limit - min_s0_limit) / 1000
s1_tilt_conversion_factor = (max_s1_limit - min_s1_limit) / 1000

def s0_pan_position(value):
    s0_pan.pulse_width(round(s0_lower_limit + (max(min(value, 1000), 0) * s0_pan_conversion_factor)))

def s1_tilt_position(value):
    s1_tilt.pulse_width(round(s1_lower_limit + (max(min(value, 1000), 0) * s1_tilt_conversion_factor)))

# 链接设置

bus = pyb.SPI(2, pyb.SPI.SLAVE, polarity = 0, phase = 0, bits = 16)
while(True):
    try:
        sync_bytes = bus.recv(2, timeout = 10)
        if((sync_bytes[0] == 0x00) and (sync_bytes[1] == 0x5A)):
            break
    except OSError as error:
        pass

    bus.deinit()
    bus.init(pyb.SPI.SLAVE, polarity = 0, phase = 0, bits = 16)

def write(data):

    max_exceptions = 10
    loop = True
    while(loop):
        try:
            bus.send(data, timeout = 10)
            loop = False
        except OSError as error:
            if(max_exceptions <= 0):
                return
            max_exceptions -= 1

def available():
    return 0 # 由于我们没有准备好接收数据的方式,因此未实现。

def read_byte():
    return 0 # 由于我们没有准备好接收数据的方式,因此未实现。

# 辅助工作

def checksum(data):
    checksum = 0
    for i in range(0, len(data), 2):
        checksum += ((data[i+1] & 0xFF) << 8) | ((data[i+0] & 0xFF) << 0)
    return checksum & 0xFFFF

def get_normal_signature(code):
    for i in range(len(e_lab_color_signatures)):
        if code & (1 << i):
            return e_lab_color_signatures[i]
    return 0

def to_normal_object_block_format(blob):
    temp = struct.pack("<hhhhh", get_normal_signature(blob.code()), blob.cx(), blob.cy(), blob.w(), blob.h())
    return struct.pack("<hh10s", 0xAA55, checksum(temp), temp)

def get_color_code_signature(code):
    color_code_list = []
    for i in range(len(e_lab_color_signatures)):
        if code & (1 << i):
            color_code_list.append(e_lab_color_signatures[i])
    octal = 0
    color_code_list_len = len(color_code_list) - 1
    for i in range(color_code_list_len + 1):
        octal += color_code_list[i] << (3 * (color_code_list_len - i))
    return octal

def to_color_code_object_block_format(blob):
    angle = int((blob.rotation() * 180) // math.pi)
    temp = struct.pack("<hhhhhh", get_color_code_signature(blob.code()), blob.cx(), blob.cy(), blob.w(), blob.h(), angle)
    return struct.pack("<hh12s", 0xAA56, checksum(temp), temp)

def get_signature(blob, bits):
    return get_normal_signature(blob.code()) if (bits == 1) else get_color_code_signature(blob.code())

def to_object_block_format(blob, bits):
    return to_normal_object_block_format(blob) if (bits == 1) else to_color_code_object_block_format(blob)

# FSM 代码

fsm_state = 0
last_byte = 0

FSM_STATE_NONE = 0
FSM_STATE_ZERO = 1
FSM_STATE_SERVO_CONTROL_0 = 2
FSM_STATE_SERVO_CONTROL_1 = 3
FSM_STATE_SERVO_CONTROL_2 = 4
FSM_STATE_SERVO_CONTROL_3 = 5
FSM_STATE_CAMERA_CONTROL = 6
FSM_STATE_LED_CONTROL_0 = 7
FSM_STATE_LED_CONTROL_1 = 8
FSM_STATE_LED_CONTROL_2 = 9

def parse_byte(byte):
    global fsm_state
    global last_byte

    if fsm_state == FSM_STATE_NONE:
        if byte == 0x00: fsm_state = FSM_STATE_ZERO
        else: fsm_state = FSM_STATE_NONE

    elif fsm_state == FSM_STATE_ZERO:
        if byte == 0xFF: fsm_state = FSM_STATE_SERVO_CONTROL_0
        elif byte == 0xFE: fsm_state = FSM_STATE_CAMERA_CONTROL
        elif byte == 0xFD: fsm_state = FSM_STATE_LED_CONTROL_0
        else: fsm_state = FSM_STATE_NONE

    elif fsm_state == FSM_STATE_SERVO_CONTROL_0:
        fsm_state = FSM_STATE_SERVO_CONTROL_1

    elif fsm_state == FSM_STATE_SERVO_CONTROL_1:
        fsm_state = FSM_STATE_SERVO_CONTROL_2
        s0_pan_position(((byte & 0xFF) << 8) | ((last_byte & 0xFF) << 0))

    elif fsm_state == FSM_STATE_SERVO_CONTROL_2:
        fsm_state = FSM_STATE_SERVO_CONTROL_3

    elif fsm_state == FSM_STATE_SERVO_CONTROL_3:
        fsm_state = FSM_STATE_NONE
        s1_tilt_position(((byte & 0xFF) << 8) | ((last_byte & 0xFF) << 0))

    elif fsm_state == FSM_STATE_CAMERA_CONTROL:
        fsm_state = FSM_STATE_NONE
        # 忽略……

    elif fsm_state == FSM_STATE_LED_CONTROL_0:
        fsm_state = FSM_STATE_LED_CONTROL_1
        if byte & 0x80: red_led.on()
        else: red_led.off()

    elif fsm_state == FSM_STATE_LED_CONTROL_1:
        fsm_state = FSM_STATE_LED_CONTROL_2
        if byte & 0x80: green_led.on()
        else: green_led.off()

    elif fsm_state == FSM_STATE_LED_CONTROL_2:
        fsm_state = FSM_STATE_NONE
        if byte & 0x80: blue_led.on()
        else: blue_led.off()

    last_byte = byte

# 主循环

pri_color_code_mode = color_code_mode % 4

def bits_set(code):
    count = 0
    for i in range(7):
        count += 1 if (code & (1 << i)) else 0
    return count

def color_code(code):
    for i in range(len(e_lab_color_code)):
        if code & (1 << i):
            return e_lab_color_code[i]
    return False

def fb_merge_cb(blob0, blob1):
    if not pri_color_code_mode:
        return blob0.code() == blob1.code()
    else:
        return True if (blob0.code() == blob1.code()) else (color_code(blob0.code()) and color_code(blob1.code()))

def blob_filter(blob):
    if(pri_color_code_mode == 0):
        return True
    elif(pri_color_code_mode == 1): # 具有两种或两种以上颜色或单一颜色的颜色代码
        return (bits_set(blob.code()) > 1) or (not color_code(blob.code()))
    elif(pri_color_code_mode == 2): # 只有两种或更多颜色的颜色代码
        return (bits_set(blob.code()) > 1)
    elif(pri_color_code_mode == 3):
        return True

clock = time.clock()
while(True):
    clock.tick()
    img = sensor.snapshot()
    blobs = list(filter(blob_filter, img.find_blobs(e_lab_color_thresholds, area_threshold = min_block_area, pixels_threshold = fb_pixels_threshold, merge = True, margin = fb_merge_margin, merge_cb = fb_merge_cb)))

    # 传输色块 #

    if blobs and (max_blocks > 0) and (max_blocks_per_signature > 0): # new frame
        dat_buf = struct.pack("<h", 0xAA55)
        sig_map = {}
        first_b = False

        for blob in sorted(blobs, key = lambda x: x.area(), reverse = True)[0:max_blocks]:
            bits = bits_set(blob.code())
            sign = get_signature(blob, bits)

            if not sign in sig_map:
                sig_map[sign] = 1
            else:
                sig_map[sign] += 1

            if sig_map[sign] <= max_blocks_per_signature:
                dat_buf += to_object_block_format(blob, bits)
                img.draw_rectangle(blob.rect())
                img.draw_cross(blob.cx(), blob.cy())

            if dac and not first_b:
                x_scale = 255 / (img.width()-1)
                y_scale = 255 / (img.height()-1)
                dac.write(round((blob.y() * y_scale) if analog_out_mode else (blob.x() * x_scale)))
                first_b = True

        dat_buf += struct.pack("<h", 0x0000)
        write(dat_buf) # 把所有数据写在一个包里…

    else: # 没有发现
        write(struct.pack("<h", 0x0000))

        if dac:
            dac.write(0)

    # 解析命令 #

    for i in range(available()):
        parse_byte(read_byte())

    num_blobs = min(len(blobs), max_blocks)
    print("%d blob(s) found - FPS %f" % (num_blobs, clock.fps()))

Explicación de la función del documento oficial chino de Singtown Technology OpenMV:

results matching ""

    No results matching ""