Example: 07-Interface-Library/01-Pixy-Emulation/pixy_uart_emulation.py

# 本作品采用MIT许可证授权。
# 版权所有 (c) 2013-2023 OpenMV LLC。保留所有权利。
# https://github.com/openmv/openmv/blob/master/LICENSE
#
# Pixy UART 模拟脚本
#
# 此脚本允许您的 OpenMV Cam 在 UART 模式下模拟 Pixy (CMUcam5)。
# 请注意,您需要根据应用设置下方的LAB颜色阈值。
#
# P4 = 发送数据线
# P5 = RXD
#
# P7 = Servo 1
# P8 = Servo 2
#
# Pixy参数 ############################################################

import math
import pyb
import sensor
import struct
import time

color_code_mode = 1  # 0 == 禁用, 1 == 启用, 2 == 仅颜色代码, 3 == 混合模式

max_blocks = 1000
max_blocks_per_signature = 1000
min_block_area = 20

uart_baudrate = 19200

# 水平舵机
s0_lower_limit = 1000  # 舵机脉冲宽度下限(微秒)。
s0_upper_limit = 2000  # 舵机脉冲宽度上限(微秒)。

# 倾斜舵机
s1_lower_limit = 1000  # 舵机脉冲宽度下限(微秒)。
s1_upper_limit = 2000  # 舵机脉冲宽度上限(微秒)。

analog_out_enable = False  # P6 -> 模拟输出 (0v - 3.3v)。
analog_out_mode = 0  # 0 == 最大色块的x位置 - 1 == 最大色块的y位置

# 参数0 - L最小值
# 参数1 - L最大值
# 参数2 - A最小值
# 参数3 - A最大值
# 参数4 - B最小值
# 参数5 - B最大值
# 参数6 - 是否为颜色代码阈值?(真/假)
# 参数 7 - 启用阈值?(真/假)。
lab_color_thresholds = [
    (0, 100, 40, 127, -128, 127, True, True),  # 通用红色阈值
    (0, 100, -128, -10, -128, 127, True, True),  # 通用绿色阈值
    (0, 0, 0, 0, 0, 0, False, False),
    (0, 0, 0, 0, 0, 0, False, False),
    (0, 0, 0, 0, 0, 0, False, False),
    (0, 0, 0, 0, 0, 0, False, False),
    (0, 0, 0, 0, 0, 0, False, False),
]

fb_pixels_threshold = 500  # 斑点中必须包含的最小像素数
fb_merge_margin = 5  # 像素级斑点合并前的接近程度

##############################################################################

e_lab_color_thresholds = []  # 启用的阈值
e_lab_color_code = []  # 启用的颜色代码
e_lab_color_signatures = []  # 原始启用的阈值索引
for i in range(len(lab_color_thresholds)):
    if lab_color_thresholds[i][7]:
        e_lab_color_thresholds.append(lab_color_thresholds[i][0:6])
        e_lab_color_code.append(lab_color_thresholds[i][6])
        e_lab_color_signatures.append(i + 1)

# 相机设置
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(time=2000)
sensor.set_auto_gain(False)
sensor.set_auto_whitebal(False)

# LED设置

red_led = pyb.LED(1)
green_led = pyb.LED(2)
blue_led = pyb.LED(3)

red_led.off()
green_led.off()
blue_led.off()

# DAC设置

dac = pyb.DAC("P6") if analog_out_enable else None

if dac:
    dac.write(0)

# 舵机设置
min_s0_limit = min(s0_lower_limit, s0_upper_limit)
max_s0_limit = max(s0_lower_limit, s0_upper_limit)
min_s1_limit = min(s1_lower_limit, s1_upper_limit)
max_s1_limit = max(s1_lower_limit, s1_upper_limit)

s0_pan = pyb.Servo(1)  # P7
s1_tilt = pyb.Servo(2)  # P8

s0_pan.pulse_width(int((max_s0_limit - min_s0_limit) // 2))  # 中心
s1_tilt.pulse_width(int((max_s1_limit - min_s1_limit) // 2))  # 中心

s0_pan_conversion_factor = (max_s0_limit - min_s0_limit) / 1000
s1_tilt_conversion_factor = (max_s1_limit - min_s1_limit) / 1000


def s0_pan_position(value):
    s0_pan.pulse_width(
        round(s0_lower_limit + (max(min(value, 1000), 0) * s0_pan_conversion_factor))
    )


def s1_tilt_position(value):
    s1_tilt.pulse_width(
        round(s1_lower_limit + (max(min(value, 1000), 0) * s1_tilt_conversion_factor))
    )


# 链接设置

uart = pyb.UART(3, uart_baudrate, timeout_char=1000)


def write(data):
    uart.write(data)


def available():
    return uart.any()


def read_byte():
    return uart.readchar()


def checksum(data):
    checksum = 0
    for i in range(0, len(data), 2):
        checksum += ((data[i + 1] & 0xFF) << 8) | ((data[i + 0] & 0xFF) << 0)
    return checksum & 0xFFFF


def get_normal_signature(code):
    for i in range(len(e_lab_color_signatures)):
        if code & (1 << i):
            return e_lab_color_signatures[i]
    return 0


def to_normal_object_block_format(blob):
    temp = struct.pack(
        "<hhhhh",
        get_normal_signature(blob.code()),
        blob.cx(),
        blob.cy(),
        blob.w(),
        blob.h(),
    )
    return struct.pack("<hh10s", 0xAA55, checksum(temp), temp)


def get_color_code_signature(code):
    color_code_list = []
    for i in range(len(e_lab_color_signatures)):
        if code & (1 << i):
            color_code_list.append(e_lab_color_signatures[i])
    octal = 0
    color_code_list_len = len(color_code_list) - 1
    for i in range(color_code_list_len + 1):
        octal += color_code_list[i] << (3 * (color_code_list_len - i))
    return octal


def to_color_code_object_block_format(blob):
    angle = int((blob.rotation() * 180) // math.pi)
    temp = struct.pack(
        "<hhhhhh",
        get_color_code_signature(blob.code()),
        blob.cx(),
        blob.cy(),
        blob.w(),
        blob.h(),
        angle,
    )
    return struct.pack("<hh12s", 0xAA56, checksum(temp), temp)


def get_signature(blob, bits):
    return (
        get_normal_signature(blob.code())
        if (bits == 1)
        else get_color_code_signature(blob.code())
    )


def to_object_block_format(blob, bits):
    return (
        to_normal_object_block_format(blob)
        if (bits == 1)
        else to_color_code_object_block_format(blob)
    )


# FSM代码
fsm_state = 0
last_byte = 0

FSM_STATE_NONE = 0
FSM_STATE_ZERO = 1
FSM_STATE_SERVO_CONTROL_0 = 2
FSM_STATE_SERVO_CONTROL_1 = 3
FSM_STATE_SERVO_CONTROL_2 = 4
FSM_STATE_SERVO_CONTROL_3 = 5
FSM_STATE_CAMERA_CONTROL = 6
FSM_STATE_LED_CONTROL_0 = 7
FSM_STATE_LED_CONTROL_1 = 8
FSM_STATE_LED_CONTROL_2 = 9


def parse_byte(byte):
    global fsm_state
    global last_byte

    if fsm_state == FSM_STATE_NONE:
        if byte == 0x00:
            fsm_state = FSM_STATE_ZERO
        else:
            fsm_state = FSM_STATE_NONE

    elif fsm_state == FSM_STATE_ZERO:
        if byte == 0xFF:
            fsm_state = FSM_STATE_SERVO_CONTROL_0
        elif byte == 0xFE:
            fsm_state = FSM_STATE_CAMERA_CONTROL
        elif byte == 0xFD:
            fsm_state = FSM_STATE_LED_CONTROL_0
        else:
            fsm_state = FSM_STATE_NONE

    elif fsm_state == FSM_STATE_SERVO_CONTROL_0:
        fsm_state = FSM_STATE_SERVO_CONTROL_1

    elif fsm_state == FSM_STATE_SERVO_CONTROL_1:
        fsm_state = FSM_STATE_SERVO_CONTROL_2
        s0_pan_position(((byte & 0xFF) << 8) | ((last_byte & 0xFF) << 0))

    elif fsm_state == FSM_STATE_SERVO_CONTROL_2:
        fsm_state = FSM_STATE_SERVO_CONTROL_3

    elif fsm_state == FSM_STATE_SERVO_CONTROL_3:
        fsm_state = FSM_STATE_NONE
        s1_tilt_position(((byte & 0xFF) << 8) | ((last_byte & 0xFF) << 0))

    elif fsm_state == FSM_STATE_CAMERA_CONTROL:
        fsm_state = FSM_STATE_NONE
        # 忽略...

    elif fsm_state == FSM_STATE_LED_CONTROL_0:
        fsm_state = FSM_STATE_LED_CONTROL_1
        if byte & 0x80:
            red_led.on()
        else:
            red_led.off()

    elif fsm_state == FSM_STATE_LED_CONTROL_1:
        fsm_state = FSM_STATE_LED_CONTROL_2
        if byte & 0x80:
            green_led.on()
        else:
            green_led.off()

    elif fsm_state == FSM_STATE_LED_CONTROL_2:
        fsm_state = FSM_STATE_NONE
        if byte & 0x80:
            blue_led.on()
        else:
            blue_led.off()

    last_byte = byte


# 主循环
pri_color_code_mode = color_code_mode % 4


def bits_set(code):
    count = 0
    for i in range(7):
        count += 1 if (code & (1 << i)) else 0
    return count


def color_code(code):
    for i in range(len(e_lab_color_code)):
        if code & (1 << i):
            return e_lab_color_code[i]
    return False


def fb_merge_cb(blob0, blob1):
    if not pri_color_code_mode:
        return blob0.code() == blob1.code()
    else:
        return (
            True
            if (blob0.code() == blob1.code())
            else (color_code(blob0.code()) and color_code(blob1.code()))
        )


def blob_filter(blob):
    if pri_color_code_mode == 0:
        return True
    elif pri_color_code_mode == 1:  # 具有两种或更多颜色的颜色代码或常规
        return (bits_set(blob.code()) > 1) or (not color_code(blob.code()))
    elif pri_color_code_mode == 2:  # 仅包含两种或更多颜色的颜色代码
        return bits_set(blob.code()) > 1
    elif pri_color_code_mode == 3:
        return True


clock = time.clock()
while True:
    clock.tick()
    img = sensor.snapshot()
    blobs = list(
        filter(
            blob_filter,
            img.find_blobs(
                e_lab_color_thresholds,
                area_threshold=min_block_area,
                pixels_threshold=fb_pixels_threshold,
                merge=True,
                margin=fb_merge_margin,
                merge_cb=fb_merge_cb,
            ),
        )
    )

    # 传输数据块
    if blobs and (max_blocks > 0) and (max_blocks_per_signature > 0):  # 新帧
        dat_buf = struct.pack("<h", 0xAA55)
        sig_map = {}
        first_b = False

        for blob in sorted(blobs, key=lambda x: x.area(), reverse=True)[0:max_blocks]:
            bits = bits_set(blob.code())
            sign = get_signature(blob, bits)

            if not sign in sig_map:
                sig_map[sign] = 1
            else:
                sig_map[sign] += 1

            if sig_map[sign] <= max_blocks_per_signature:
                dat_buf += to_object_block_format(blob, bits)
                img.draw_rectangle(blob.rect())
                img.draw_cross(blob.cx(), blob.cy())

            if dac and not first_b:
                x_scale = 255 / (img.width() - 1)
                y_scale = 255 / (img.height() - 1)
                dac.write(
                    round(
                        (blob.y() * y_scale)
                        if analog_out_mode
                        else (blob.x() * x_scale)
                    )
                )
                first_b = True

        dat_buf += struct.pack("<h", 0x0000)
        write(dat_buf)  # 将所有数据写入一个数据包...

    else:  # 未找到任何内容
        write(struct.pack("<h", 0x0000))

        if dac:
            dac.write(0)

    # 解析命令
    for i in range(available()):
        parse_byte(read_byte())

    num_blobs = min(len(blobs), max_blocks)
    print("%d blob(s) found - FPS %f" % (num_blobs, clock.fps()))

results matching ""

    No results matching ""