Example: 07-Interface-Library/01-Pixy-Emulation/apriltags_pixy_spi_emulation.py

# 本作品采用MIT许可证授权。
# 版权所有 (c) 2013-2023 OpenMV LLC。保留所有权利。
# https://github.com/openmv/openmv/blob/master/LICENSE
#
# AprilTags Pixy SPI 模拟脚本
#
# This script allows your OpenMV Cam to transmit AprilTag detection data like
# 一个以SPI模式追踪颜色的Pixy(CMUcam5)。此脚本允许您
# easily replace a Pixy (CMUcam5) color tracking sensor with an OpenMV Cam
# AprilTag tracking sensor. Note that this only runs on the OpenMV Cam M7.
#
# P0 = MOSI(主出从入)
# P1 = MISO(主入从出)
# P2 = SCLK(时钟信号)
# P3 = SS(从机选择)
#
# P7 = Servo 1
# P8 = Servo 2
#
# 注意:请确保您的Arduino等设备在运行其SPI代码之前
#       OpenMV Cam starts running. Additionally, sometimes the OpenMV Cam is
#       unable to sync up with the Arduino SPI data stream. Just restart your
#       OpenMV Cam when this happens. The OpenMV Cam should sync up within
#       1-9 tries. Use the serial or I2C version of this script otherwise.
#
# Note: The tag family is TAG36H11. Additionally, in order to for the
#       signature value of a tag detection to be compatible with pixy
#       interface libraries all tag ids have 8 added to them in order
#       to move them in the color code signature range. Finally, tags
#       are all reported as color code blocks...

import math
import pyb
import sensor
import struct
import time

# Pixy参数 ############################################################
max_blocks = 1000
max_blocks_per_id = 1000

# 水平舵机
s0_lower_limit = 1000  # 舵机脉冲宽度下限(微秒)。
s0_upper_limit = 2000  # 舵机脉冲宽度上限(微秒)。

# 倾斜舵机
s1_lower_limit = 1000  # 舵机脉冲宽度下限(微秒)。
s1_upper_limit = 2000  # 舵机脉冲宽度上限(微秒)。

analog_out_enable = False  # P6 -> 模拟输出 (0v - 3.3v)。
analog_out_mode = 0  # 0 == 最大标签的x位置 - 1 == 最大标签的y位置


# 相机设置
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QQVGA)
sensor.skip_frames(time=2000)

# LED设置
red_led = pyb.LED(1)
green_led = pyb.LED(2)
blue_led = pyb.LED(3)

red_led.off()
green_led.off()
blue_led.off()

# DAC设置
dac = pyb.DAC("P6") if analog_out_enable else None

if dac:
    dac.write(0)

# 舵机设置
min_s0_limit = min(s0_lower_limit, s0_upper_limit)
max_s0_limit = max(s0_lower_limit, s0_upper_limit)
min_s1_limit = min(s1_lower_limit, s1_upper_limit)
max_s1_limit = max(s1_lower_limit, s1_upper_limit)

s0_pan = pyb.Servo(1)  # P7
s1_tilt = pyb.Servo(2)  # P8

s0_pan.pulse_width(int((max_s0_limit - min_s0_limit) // 2))  # 中心
s1_tilt.pulse_width(int((max_s1_limit - min_s1_limit) // 2))  # 中心

s0_pan_conversion_factor = (max_s0_limit - min_s0_limit) / 1000
s1_tilt_conversion_factor = (max_s1_limit - min_s1_limit) / 1000


def s0_pan_position(value):
    s0_pan.pulse_width(
        round(s0_lower_limit + (max(min(value, 1000), 0) * s0_pan_conversion_factor))
    )


def s1_tilt_position(value):
    s1_tilt.pulse_width(
        round(s1_lower_limit + (max(min(value, 1000), 0) * s1_tilt_conversion_factor))
    )


# 链接设置
bus = pyb.SPI(2, pyb.SPI.SLAVE, polarity=0, phase=0, bits=16)
while True:
    try:
        sync_bytes = bus.recv(2, timeout=10)
        if (sync_bytes[0] == 0x00) and (sync_bytes[1] == 0x5A):
            break
    except OSError as error:
        pass

    bus.deinit()
    bus.init(pyb.SPI.SLAVE, polarity=0, phase=0, bits=16)


def write(data):
    max_exceptions = 10
    loop = True
    while loop:
        try:
            bus.send(data, timeout=10)
            loop = False
        except OSError as error:
            if max_exceptions <= 0:
                return
            max_exceptions -= 1


def available():
    return 0  # 未实现,因为我们无法准备好接收数据。


def read_byte():
    return 0  # 未实现,因为我们无法准备好接收数据。


def checksum(data):
    checksum = 0
    for i in range(0, len(data), 2):
        checksum += ((data[i + 1] & 0xFF) << 8) | ((data[i + 0] & 0xFF) << 0)
    return checksum & 0xFFFF


def to_object_block_format(tag):
    angle = int((tag.rotation * 180) // math.pi)
    temp = struct.pack(
        "<hhhhhh", tag.id + 8, tag.cx, tag.cy, tag.w, tag.h, angle
    )
    return struct.pack("<hh12s", 0xAA56, checksum(temp), temp)


# FSM代码
fsm_state = 0
last_byte = 0

FSM_STATE_NONE = 0
FSM_STATE_ZERO = 1
FSM_STATE_SERVO_CONTROL_0 = 2
FSM_STATE_SERVO_CONTROL_1 = 3
FSM_STATE_SERVO_CONTROL_2 = 4
FSM_STATE_SERVO_CONTROL_3 = 5
FSM_STATE_CAMERA_CONTROL = 6
FSM_STATE_LED_CONTROL_0 = 7
FSM_STATE_LED_CONTROL_1 = 8
FSM_STATE_LED_CONTROL_2 = 9


def parse_byte(byte):
    global fsm_state
    global last_byte

    if fsm_state == FSM_STATE_NONE:
        if byte == 0x00:
            fsm_state = FSM_STATE_ZERO
        else:
            fsm_state = FSM_STATE_NONE

    elif fsm_state == FSM_STATE_ZERO:
        if byte == 0xFF:
            fsm_state = FSM_STATE_SERVO_CONTROL_0
        elif byte == 0xFE:
            fsm_state = FSM_STATE_CAMERA_CONTROL
        elif byte == 0xFD:
            fsm_state = FSM_STATE_LED_CONTROL_0
        else:
            fsm_state = FSM_STATE_NONE

    elif fsm_state == FSM_STATE_SERVO_CONTROL_0:
        fsm_state = FSM_STATE_SERVO_CONTROL_1

    elif fsm_state == FSM_STATE_SERVO_CONTROL_1:
        fsm_state = FSM_STATE_SERVO_CONTROL_2
        s0_pan_position(((byte & 0xFF) << 8) | ((last_byte & 0xFF) << 0))

    elif fsm_state == FSM_STATE_SERVO_CONTROL_2:
        fsm_state = FSM_STATE_SERVO_CONTROL_3

    elif fsm_state == FSM_STATE_SERVO_CONTROL_3:
        fsm_state = FSM_STATE_NONE
        s1_tilt_position(((byte & 0xFF) << 8) | ((last_byte & 0xFF) << 0))

    elif fsm_state == FSM_STATE_CAMERA_CONTROL:
        fsm_state = FSM_STATE_NONE
        # 忽略...

    elif fsm_state == FSM_STATE_LED_CONTROL_0:
        fsm_state = FSM_STATE_LED_CONTROL_1
        if byte & 0x80:
            red_led.on()
        else:
            red_led.off()

    elif fsm_state == FSM_STATE_LED_CONTROL_1:
        fsm_state = FSM_STATE_LED_CONTROL_2
        if byte & 0x80:
            green_led.on()
        else:
            green_led.off()

    elif fsm_state == FSM_STATE_LED_CONTROL_2:
        fsm_state = FSM_STATE_NONE
        if byte & 0x80:
            blue_led.on()
        else:
            blue_led.off()

    last_byte = byte


# 主循环
clock = time.clock()
while True:
    clock.tick()
    img = sensor.snapshot()
    tags = img.find_apriltags()  # 默认TAG36H11系列

    # 传输标签 #
==WEBLATE_PART==

    if tags and (max_blocks > 0) and (max_blocks_per_id > 0):  # 新帧
        dat_buf = struct.pack("<h", 0xAA55)
        id_map = {}
        first_b = False

        for tag in sorted(tags, key=lambda x: x.area, reverse=True)[0:max_blocks]:
            if not tag.id in id_map:
                id_map[tag.id] = 1
            else:
                id_map[tag.id] += 1

            if id_map[tag.id] <= max_blocks_per_id:
                dat_buf += to_object_block_format(tag)
                img.draw_rectangle(tag.rect)
                img.draw_cross(tag.cx, tag.cy)

            if dac and not first_b:
                x_scale = 255 / (img.width() - 1)
                y_scale = 255 / (img.height() - 1)
                dac.write(
                    round(
                        (tag.y * y_scale) if analog_out_mode else (tag.x * x_scale)
                    )
                )
                first_b = True

        dat_buf += struct.pack("<h", 0x0000)
        write(dat_buf)  # 将所有数据写入一个数据包...

    else:  # 未找到任何内容
        write(struct.pack("<h", 0x0000))

        if dac:
            dac.write(0)

    # 解析命令 #

    for i in range(available()):
        parse_byte(read_byte())

    num_tags = min(len(tags), max_blocks)
    print("%d tags(s) found - FPS %f" % (num_tags, clock.fps()))

results matching ""

    No results matching ""