Merge branch 'test/add_case_to_detect_ds2ds_issue_v3.0' into 'release/v3.0'
test: add case to detect ds2ds issue (backport v3.0) See merge request idf/esp-idf!3867
This commit is contained in:
285
.gitlab-ci.yml
285
.gitlab-ci.yml
@@ -29,12 +29,26 @@ variables:
|
||||
APPLY_BOT_FILTER_SCRIPT: "$CI_PROJECT_DIR/tools/ci/apply_bot_filter.py"
|
||||
CHECKOUT_REF_SCRIPT: "$CI_PROJECT_DIR/tools/ci/checkout_project_ref.py"
|
||||
|
||||
# When 'fetch' strategy is used, Gitlab removes untracked files before checking out
|
||||
# new revision. However if the new revision doesn't include some of the submodules
|
||||
# which were present in the old revision, such submodule directories would not be
|
||||
# removed by the checkout. This extra step ensures that these stale submodules
|
||||
# are removed.
|
||||
.git_clean_stale_submodules: &git_clean_stale_submodules >
|
||||
find . -name '.git' -not -path './.git' -printf '%P\n'
|
||||
| sed 's|/.git||'
|
||||
| xargs -I {} sh -c '
|
||||
grep -q {} .gitmodules
|
||||
|| (echo "Removing {}, has .git directory but not in .gitmodules file"
|
||||
&& rm -rf {});'
|
||||
|
||||
# before each job, we need to check if this job is filtered by bot stage/job filter
|
||||
.apply_bot_filter: &apply_bot_filter
|
||||
python $APPLY_BOT_FILTER_SCRIPT || exit 0
|
||||
|
||||
before_script:
|
||||
- source tools/ci/setup_python.sh
|
||||
- *git_clean_stale_submodules
|
||||
# apply bot filter in before script
|
||||
- *apply_bot_filter
|
||||
# add gitlab ssh key
|
||||
@@ -56,6 +70,7 @@ before_script:
|
||||
.do_nothing_before:
|
||||
before_script: &do_nothing_before
|
||||
- source tools/ci/setup_python.sh
|
||||
- *git_clean_stale_submodules
|
||||
# apply bot filter in before script
|
||||
- *apply_bot_filter
|
||||
- echo "Not setting up GitLab key, not fetching submodules"
|
||||
@@ -64,6 +79,7 @@ before_script:
|
||||
.add_gitlab_key_before:
|
||||
before_script: &add_gitlab_key_before
|
||||
- source tools/ci/setup_python.sh
|
||||
- *git_clean_stale_submodules
|
||||
# apply bot filter in before script
|
||||
- *apply_bot_filter
|
||||
- echo "Not fetching submodules"
|
||||
@@ -381,37 +397,36 @@ check_submodule_sync:
|
||||
- git submodule update --init --recursive
|
||||
|
||||
assign_test:
|
||||
<<: *build_template
|
||||
tags:
|
||||
- assign_test
|
||||
image: $CI_DOCKER_REGISTRY/ubuntu-test-env$BOT_DOCKER_IMAGE_TAG
|
||||
stage: assign_test
|
||||
# gitlab ci do not support match job with RegEx or wildcard now in dependencies.
|
||||
# we have a lot build example jobs. now we don't use dependencies, just download all artificats of build stage.
|
||||
dependencies:
|
||||
- build_ssc_00
|
||||
- build_ssc_01
|
||||
- build_ssc_02
|
||||
- build_esp_idf_tests
|
||||
variables:
|
||||
UT_BIN_PATH: "tools/unit-test-app/output"
|
||||
OUTPUT_BIN_PATH: "test_bins/ESP32_IDF"
|
||||
TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
|
||||
EXAMPLE_CONFIG_OUTPUT_PATH: "$CI_PROJECT_DIR/examples/test_configs"
|
||||
artifacts:
|
||||
paths:
|
||||
- $OUTPUT_BIN_PATH
|
||||
- components/idf_test/*/CIConfigs
|
||||
- components/idf_test/*/TC.sqlite
|
||||
- $EXAMPLE_CONFIG_OUTPUT_PATH
|
||||
expire_in: 1 mos
|
||||
before_script: *add_gitlab_key_before
|
||||
script:
|
||||
# first move test bins together: test_bins/CHIP_SDK/TestApp/bin_files
|
||||
- mkdir -p $OUTPUT_BIN_PATH
|
||||
# copy and rename folder name to "UT_config"
|
||||
- for CONFIG in $(ls $UT_BIN_PATH); do cp -r "$UT_BIN_PATH/$CONFIG" "$OUTPUT_BIN_PATH/UT_$CONFIG"; done
|
||||
- cp -r SSC/ssc_bin/* $OUTPUT_BIN_PATH
|
||||
# assign example tests
|
||||
- python $TEST_FW_PATH/CIAssignExampleTest.py $IDF_PATH/examples $IDF_PATH/.gitlab-ci.yml $EXAMPLE_CONFIG_OUTPUT_PATH
|
||||
# assign unit test cases
|
||||
- python $TEST_FW_PATH/CIAssignUnitTest.py $IDF_PATH/components/idf_test/unit_test/TestCaseAll.yml $IDF_PATH/.gitlab-ci.yml $IDF_PATH/components/idf_test/unit_test/CIConfigs
|
||||
# clone test script to assign tests
|
||||
- git clone $TEST_SCRIPT_REPOSITORY
|
||||
- cd auto_test_script
|
||||
- python $CHECKOUT_REF_SCRIPT auto_test_script
|
||||
# assign unit test cases
|
||||
- python CIAssignTestCases.py -t $IDF_PATH/components/idf_test/unit_test -c $IDF_PATH/.gitlab-ci.yml -b $IDF_PATH/test_bins
|
||||
# assgin integration test cases
|
||||
- python CIAssignTestCases.py -t $IDF_PATH/components/idf_test/integration_test -c $IDF_PATH/.gitlab-ci.yml -b $IDF_PATH/SSC/ssc_bin
|
||||
|
||||
@@ -491,15 +506,23 @@ assign_test:
|
||||
|
||||
# template for unit test jobs
|
||||
.unit_test_template: &unit_test_template
|
||||
<<: *test_template
|
||||
allow_failure: false
|
||||
<<: *example_test_template
|
||||
stage: unit_test
|
||||
dependencies:
|
||||
- assign_test
|
||||
- build_esp_idf_tests
|
||||
only:
|
||||
refs:
|
||||
- master
|
||||
- /^release\/v/
|
||||
- /^v\d+\.\d+(\.\d+)?($|-)/
|
||||
- triggers
|
||||
variables:
|
||||
LOCAL_ENV_CONFIG_PATH: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/ESP32_IDF"
|
||||
LOG_PATH: "$CI_PROJECT_DIR/$CI_COMMIT_SHA"
|
||||
TEST_CASE_FILE_PATH: "$CI_PROJECT_DIR/components/idf_test/unit_test"
|
||||
MODULE_UPDATE_FILE: "$CI_PROJECT_DIR/components/idf_test/ModuleDefinition.yml"
|
||||
TEST_FW_PATH: "$CI_PROJECT_DIR/tools/tiny-test-fw"
|
||||
TEST_CASE_PATH: "$CI_PROJECT_DIR/tools/unit-test-app"
|
||||
CONFIG_FILE: "$CI_PROJECT_DIR/components/idf_test/unit_test/CIConfigs/$CI_JOB_NAME.yml"
|
||||
LOG_PATH: "$CI_PROJECT_DIR/TEST_LOGS"
|
||||
ENV_FILE: "$CI_PROJECT_DIR/ci-test-runner-configs/$CI_RUNNER_DESCRIPTION/EnvConfig.yml"
|
||||
|
||||
nvs_compatible_test:
|
||||
<<: *test_template
|
||||
@@ -537,252 +560,200 @@ UT_001_01:
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_default
|
||||
|
||||
UT_001_02:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_default
|
||||
|
||||
UT_001_03:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_default
|
||||
|
||||
UT_001_04:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_default
|
||||
|
||||
UT_001_05:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_SDMODE
|
||||
- UT_default
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_06:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_SPIMODE
|
||||
- UT_default
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_07:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_default
|
||||
|
||||
UT_001_08:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_default
|
||||
|
||||
UT_001_09:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_default
|
||||
|
||||
UT_001_10:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_11:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_12:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_13:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_14:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_15:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_16:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_17:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_18:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_19:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_20:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_001_21:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
|
||||
UT_002_01:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_release
|
||||
- psram
|
||||
|
||||
UT_002_02:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_release
|
||||
- psram
|
||||
|
||||
UT_002_03:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_release
|
||||
- psram
|
||||
|
||||
UT_002_04:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_release
|
||||
- psram
|
||||
|
||||
UT_002_05:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_SDMODE
|
||||
- UT_release
|
||||
- UT_T1_1
|
||||
- psram
|
||||
|
||||
UT_002_06:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_SPIMODE
|
||||
- UT_release
|
||||
- UT_T1_1
|
||||
- psram
|
||||
|
||||
UT_002_07:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_release
|
||||
|
||||
UT_002_08:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_release
|
||||
|
||||
UT_002_09:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_release
|
||||
- psram
|
||||
|
||||
UT_003_01:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_single_core
|
||||
- UT_T2_1
|
||||
|
||||
UT_003_02:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_single_core
|
||||
- UT_T2_1
|
||||
|
||||
UT_003_03:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_single_core
|
||||
|
||||
UT_003_04:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_single_core
|
||||
|
||||
UT_003_05:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_SDMODE
|
||||
- UT_single_core
|
||||
|
||||
UT_003_06:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_SPIMODE
|
||||
- UT_single_core
|
||||
|
||||
UT_003_07:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_single_core
|
||||
|
||||
UT_003_08:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_single_core
|
||||
|
||||
UT_003_09:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_single_core
|
||||
- UT_T2_1
|
||||
|
||||
UT_004_01:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_psram
|
||||
|
||||
UT_004_02:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_psram
|
||||
|
||||
UT_004_03:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_psram
|
||||
|
||||
UT_004_04:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_psram
|
||||
|
||||
UT_004_05:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_SDMODE
|
||||
- UT_psram
|
||||
|
||||
UT_004_06:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_SPIMODE
|
||||
- UT_psram
|
||||
|
||||
UT_004_07:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_psram
|
||||
|
||||
UT_004_08:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_psram
|
||||
|
||||
UT_004_09:
|
||||
<<: *unit_test_template
|
||||
tags:
|
||||
- ESP32_IDF
|
||||
- UT_T1_1
|
||||
- UT_psram
|
||||
- UT_T2_1
|
||||
- psram
|
||||
|
||||
IT_001_01:
|
||||
<<: *test_template
|
||||
|
||||
@@ -105,30 +105,35 @@ template<> int SlowInit<2>::mInitBy = -1;
|
||||
template<> int SlowInit<2>::mInitCount = 0;
|
||||
|
||||
template<int obj>
|
||||
static void start_slow_init_task(int id, int affinity)
|
||||
static int start_slow_init_task(int id, int affinity)
|
||||
{
|
||||
xTaskCreatePinnedToCore(&SlowInit<obj>::task, "slow_init", 2048,
|
||||
reinterpret_cast<void*>(id), 3, NULL, affinity);
|
||||
return xTaskCreatePinnedToCore(&SlowInit<obj>::task, "slow_init", 2048,
|
||||
reinterpret_cast<void*>(id), 3, NULL, affinity) ? 1 : 0;
|
||||
}
|
||||
|
||||
TEST_CASE("static initialization guards work as expected", "[cxx]")
|
||||
{
|
||||
s_slow_init_sem = xSemaphoreCreateCounting(10, 0);
|
||||
TEST_ASSERT_NOT_NULL(s_slow_init_sem);
|
||||
int task_count = 0;
|
||||
// four tasks competing for static initialization of one object
|
||||
start_slow_init_task<1>(0, PRO_CPU_NUM);
|
||||
start_slow_init_task<1>(1, APP_CPU_NUM);
|
||||
start_slow_init_task<1>(2, PRO_CPU_NUM);
|
||||
start_slow_init_task<1>(3, tskNO_AFFINITY);
|
||||
task_count += start_slow_init_task<1>(0, PRO_CPU_NUM);
|
||||
#if portNUM_PROCESSORS == 2
|
||||
task_count += start_slow_init_task<1>(1, APP_CPU_NUM);
|
||||
#endif
|
||||
task_count += start_slow_init_task<1>(2, PRO_CPU_NUM);
|
||||
task_count += start_slow_init_task<1>(3, tskNO_AFFINITY);
|
||||
|
||||
// four tasks competing for static initialization of another object
|
||||
start_slow_init_task<2>(0, PRO_CPU_NUM);
|
||||
start_slow_init_task<2>(1, APP_CPU_NUM);
|
||||
start_slow_init_task<2>(2, PRO_CPU_NUM);
|
||||
start_slow_init_task<2>(3, tskNO_AFFINITY);
|
||||
task_count += start_slow_init_task<2>(0, PRO_CPU_NUM);
|
||||
#if portNUM_PROCESSORS == 2
|
||||
task_count += start_slow_init_task<2>(1, APP_CPU_NUM);
|
||||
#endif
|
||||
task_count += start_slow_init_task<2>(2, PRO_CPU_NUM);
|
||||
task_count += start_slow_init_task<2>(3, tskNO_AFFINITY);
|
||||
|
||||
// All tasks should
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
for (int i = 0; i < task_count; ++i) {
|
||||
TEST_ASSERT_TRUE(xSemaphoreTake(s_slow_init_sem, 500/portTICK_PERIOD_MS));
|
||||
}
|
||||
vSemaphoreDelete(s_slow_init_sem);
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "esp_wifi.h"
|
||||
#include "esp_log.h"
|
||||
#include "nvs_flash.h"
|
||||
#include "test_utils.h"
|
||||
|
||||
static const char* TAG = "test_adc2";
|
||||
|
||||
@@ -44,7 +45,9 @@ TEST_CASE("adc2 work with wifi","[adc]")
|
||||
{
|
||||
int read_raw;
|
||||
int target_value;
|
||||
|
||||
|
||||
test_case_uses_tcpip();
|
||||
|
||||
//adc and dac init
|
||||
TEST_ESP_OK( dac_output_enable( DAC_CHANNEL_1 ));
|
||||
TEST_ESP_OK( dac_output_enable( DAC_CHANNEL_2 ));
|
||||
|
||||
86
components/esp32/test/test_wifi.c
Normal file
86
components/esp32/test/test_wifi.c
Normal file
@@ -0,0 +1,86 @@
|
||||
/*
|
||||
Tests for the Wi-Fi
|
||||
*/
|
||||
#include "esp_system.h"
|
||||
#include "unity.h"
|
||||
#include "esp_system.h"
|
||||
#include "esp_event_loop.h"
|
||||
#include "esp_wifi_types.h"
|
||||
#include "esp_wifi.h"
|
||||
#include "esp_log.h"
|
||||
#include "nvs_flash.h"
|
||||
#include "test_utils.h"
|
||||
#include "freertos/task.h"
|
||||
|
||||
static const char* TAG = "test_wifi";
|
||||
|
||||
#define DEFAULT_SSID "TEST_SSID"
|
||||
#define DEFAULT_PWD "TEST_PASS"
|
||||
|
||||
static void start_wifi_as_softap(void)
|
||||
{
|
||||
wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
|
||||
cfg.nvs_enable = false;
|
||||
|
||||
wifi_config_t w_config = {
|
||||
.ap.ssid = "default_ssid",
|
||||
.ap.password = "default_password",
|
||||
.ap.ssid_len = 0,
|
||||
.ap.channel = 1,
|
||||
.ap.authmode = WIFI_AUTH_WPA2_PSK,
|
||||
.ap.ssid_hidden = false,
|
||||
.ap.max_connection = 4,
|
||||
.ap.beacon_interval = 100,
|
||||
};
|
||||
|
||||
TEST_ESP_OK(esp_wifi_init(&cfg));
|
||||
TEST_ESP_OK(esp_wifi_set_mode(WIFI_MODE_AP));
|
||||
TEST_ESP_OK(esp_wifi_set_config(WIFI_IF_AP, &w_config));
|
||||
TEST_ESP_OK(esp_wifi_start());
|
||||
|
||||
}
|
||||
|
||||
static void stop_wifi(void)
|
||||
{
|
||||
TEST_ESP_OK(esp_wifi_stop());
|
||||
TEST_ESP_OK(esp_wifi_deinit());
|
||||
}
|
||||
|
||||
static void receive_ds2ds_packet(void)
|
||||
{
|
||||
start_wifi_as_softap();
|
||||
unity_wait_for_signal("sender ready");
|
||||
unity_send_signal("receiver ready");
|
||||
|
||||
// wait for sender to send packets
|
||||
vTaskDelay(1000/portTICK_PERIOD_MS);
|
||||
stop_wifi();
|
||||
|
||||
}
|
||||
|
||||
static const char ds2ds_pdu[] = {
|
||||
0x48, 0x03, 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0xE8, 0x65, 0xD4, 0xCB, 0x74, 0x19, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
|
||||
0x60, 0x94, 0xE8, 0x65, 0xD4, 0xCB, 0x74, 0x1C, 0x26, 0xB9,
|
||||
0x0D, 0x02, 0x7D, 0x13, 0x00, 0x00, 0x01, 0xE8, 0x65, 0xD4, 0xCB, 0x74,
|
||||
0x1C, 0x00, 0x00, 0x26, 0xB9, 0x00, 0x00, 0x00, 0x00
|
||||
};
|
||||
|
||||
|
||||
extern esp_err_t esp_wifi_80211_tx(wifi_interface_t ifx, const void *buffer, int len, bool en_sys_seq);
|
||||
|
||||
static void send_ds2ds_packet(void)
|
||||
{
|
||||
start_wifi_as_softap();
|
||||
unity_send_signal("sender ready");
|
||||
unity_wait_for_signal("receiver ready");
|
||||
|
||||
// send packet 20 times to make sure receiver will get this packet
|
||||
for (uint16_t i = 0; i < 20; i++) {
|
||||
esp_wifi_80211_tx(ESP_IF_WIFI_AP, ds2ds_pdu, sizeof(ds2ds_pdu), true);
|
||||
vTaskDelay(50 / portTICK_PERIOD_MS);
|
||||
}
|
||||
stop_wifi();
|
||||
}
|
||||
|
||||
TEST_CASE_MULTIPLE_DEVICES("receive ds2ds packet without exception", "[wifi][test_env=UT_T2_1]", receive_ds2ds_packet, send_ds2ds_packet);
|
||||
@@ -78,9 +78,12 @@ class BaseApp(object):
|
||||
if not test_suite_name:
|
||||
test_suite_name = os.path.splitext(os.path.basename(sys.modules['__main__'].__file__))[0]
|
||||
sdk_path = cls.get_sdk_path()
|
||||
return os.path.join(sdk_path, "TEST_LOGS",
|
||||
test_suite_name +
|
||||
time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
|
||||
log_folder = os.path.join(sdk_path, "TEST_LOGS",
|
||||
test_suite_name +
|
||||
time.strftime("_%m%d_%H_%M_%S", time.localtime(LOG_FOLDER_TIMESTAMP)))
|
||||
if not os.path.exists(log_folder):
|
||||
os.makedirs(log_folder)
|
||||
return log_folder
|
||||
|
||||
def process_app_info(self):
|
||||
"""
|
||||
|
||||
@@ -22,147 +22,20 @@ import sys
|
||||
import re
|
||||
import argparse
|
||||
|
||||
import yaml
|
||||
|
||||
test_fw_path = os.getenv("TEST_FW_PATH")
|
||||
if test_fw_path:
|
||||
sys.path.insert(0, test_fw_path)
|
||||
|
||||
from Utility import CaseConfig, SearchCases, GitlabCIJob
|
||||
from Utility.CIAssignTest import AssignTest, Group
|
||||
|
||||
|
||||
class Group(object):
|
||||
|
||||
MAX_EXECUTION_TIME = 30
|
||||
MAX_CASE = 15
|
||||
SORT_KEYS = ["env_tag"]
|
||||
|
||||
def __init__(self, case):
|
||||
self.execution_time = 0
|
||||
self.case_list = [case]
|
||||
self.filters = dict(zip(self.SORT_KEYS, [case.case_info[x] for x in self.SORT_KEYS]))
|
||||
|
||||
def accept_new_case(self):
|
||||
"""
|
||||
check if allowed to add any case to this group
|
||||
|
||||
:return: True or False
|
||||
"""
|
||||
max_time = (sum([x.case_info["execution_time"] for x in self.case_list]) < self.MAX_EXECUTION_TIME)
|
||||
max_case = (len(self.case_list) < self.MAX_CASE)
|
||||
return max_time and max_case
|
||||
|
||||
def add_case(self, case):
|
||||
"""
|
||||
add case to current group
|
||||
|
||||
:param case: test case
|
||||
:return: True if add succeed, else False
|
||||
"""
|
||||
added = False
|
||||
if self.accept_new_case():
|
||||
for key in self.filters:
|
||||
if case.case_info[key] != self.filters[key]:
|
||||
break
|
||||
else:
|
||||
self.case_list.append(case)
|
||||
added = True
|
||||
return added
|
||||
|
||||
def output(self):
|
||||
"""
|
||||
output data for job configs
|
||||
|
||||
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
|
||||
"""
|
||||
output_data = {
|
||||
"Filter": self.filters,
|
||||
"CaseConfig": [{"name": x.case_info["name"]} for x in self.case_list],
|
||||
}
|
||||
return output_data
|
||||
class ExampleGroup(Group):
|
||||
SORT_KEYS = CI_JOB_MATCH_KEYS = ["env_tag", "chip"]
|
||||
|
||||
|
||||
class AssignTest(object):
|
||||
"""
|
||||
Auto assign tests to CI jobs.
|
||||
|
||||
:param test_case: path of test case file(s)
|
||||
:param ci_config_file: path of ``.gitlab-ci.yml``
|
||||
"""
|
||||
|
||||
class CIExampleAssignTest(AssignTest):
|
||||
CI_TEST_JOB_PATTERN = re.compile(r"^example_test_.+")
|
||||
|
||||
def __init__(self, test_case, ci_config_file):
|
||||
self.test_cases = self._search_cases(test_case)
|
||||
self.jobs = self._parse_gitlab_ci_config(ci_config_file)
|
||||
|
||||
def _parse_gitlab_ci_config(self, ci_config_file):
|
||||
|
||||
with open(ci_config_file, "r") as f:
|
||||
ci_config = yaml.load(f)
|
||||
|
||||
job_list = list()
|
||||
for job_name in ci_config:
|
||||
if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
|
||||
job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
|
||||
return job_list
|
||||
|
||||
@staticmethod
|
||||
def _search_cases(test_case, case_filter=None):
|
||||
"""
|
||||
:param test_case: path contains test case folder
|
||||
:param case_filter: filter for test cases
|
||||
:return: filtered test case list
|
||||
"""
|
||||
test_methods = SearchCases.Search.search_test_cases(test_case)
|
||||
return CaseConfig.filter_test_cases(test_methods, case_filter if case_filter else dict())
|
||||
|
||||
def _group_cases(self):
|
||||
"""
|
||||
separate all cases into groups according group rules. each group will be executed by one CI job.
|
||||
|
||||
:return: test case groups.
|
||||
"""
|
||||
groups = []
|
||||
for case in self.test_cases:
|
||||
for group in groups:
|
||||
# add to current group
|
||||
if group.add_case(case):
|
||||
break
|
||||
else:
|
||||
# create new group
|
||||
groups.append(Group(case))
|
||||
return groups
|
||||
|
||||
def assign_cases(self):
|
||||
"""
|
||||
separate test cases to groups and assign test cases to CI jobs.
|
||||
|
||||
:raise AssertError: if failed to assign any case to CI job.
|
||||
:return: None
|
||||
"""
|
||||
failed_to_assign = []
|
||||
test_groups = self._group_cases()
|
||||
for group in test_groups:
|
||||
for job in self.jobs:
|
||||
if job.match_group(group):
|
||||
job.assign_group(group)
|
||||
break
|
||||
else:
|
||||
failed_to_assign.append(group)
|
||||
assert not failed_to_assign
|
||||
|
||||
def output_configs(self, output_path):
|
||||
"""
|
||||
|
||||
:param output_path: path to output config files for each CI job
|
||||
:return: None
|
||||
"""
|
||||
if not os.path.exists(output_path):
|
||||
os.makedirs(output_path)
|
||||
for job in self.jobs:
|
||||
job.output_config(output_path)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
@@ -174,6 +47,6 @@ if __name__ == '__main__':
|
||||
help="output path of config files")
|
||||
args = parser.parse_args()
|
||||
|
||||
assign_test = AssignTest(args.test_case, args.ci_config_file)
|
||||
assign_test = CIExampleAssignTest(args.test_case, args.ci_config_file, case_group=ExampleGroup)
|
||||
assign_test.assign_cases()
|
||||
assign_test.output_configs(args.output_path)
|
||||
|
||||
153
tools/tiny-test-fw/CIAssignUnitTest.py
Normal file
153
tools/tiny-test-fw/CIAssignUnitTest.py
Normal file
@@ -0,0 +1,153 @@
|
||||
"""
|
||||
Command line tool to assign unit tests to CI test jobs.
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
import yaml
|
||||
|
||||
test_fw_path = os.getenv("TEST_FW_PATH")
|
||||
if test_fw_path:
|
||||
sys.path.insert(0, test_fw_path)
|
||||
|
||||
from Utility import CIAssignTest
|
||||
|
||||
|
||||
class Group(CIAssignTest.Group):
|
||||
SORT_KEYS = ["config", "SDK", "test environment", "multi_device", "multi_stage", "tags"]
|
||||
MAX_CASE = 30
|
||||
ATTR_CONVERT_TABLE = {
|
||||
"execution_time": "execution time"
|
||||
}
|
||||
# when IDF support multiple chips, SDK will be moved into tags, we can remove it
|
||||
CI_JOB_MATCH_KEYS = ["test environment", "SDK"]
|
||||
|
||||
def __init__(self, case):
|
||||
super(Group, self).__init__(case)
|
||||
for tag in self._get_case_attr(case, "tags"):
|
||||
self.ci_job_match_keys.add(tag)
|
||||
|
||||
@staticmethod
|
||||
def _get_case_attr(case, attr):
|
||||
if attr in Group.ATTR_CONVERT_TABLE:
|
||||
attr = Group.ATTR_CONVERT_TABLE[attr]
|
||||
return case[attr]
|
||||
|
||||
def _create_extra_data(self, test_function):
|
||||
"""
|
||||
For unit test case, we need to copy some attributes of test cases into config file.
|
||||
So unit test function knows how to run the case.
|
||||
"""
|
||||
case_data = []
|
||||
for case in self.case_list:
|
||||
one_case_data = {
|
||||
"config": self._get_case_attr(case, "config"),
|
||||
"name": self._get_case_attr(case, "summary"),
|
||||
"reset": self._get_case_attr(case, "reset"),
|
||||
"timeout": self._get_case_attr(case, "timeout"),
|
||||
}
|
||||
|
||||
if test_function in ["run_multiple_devices_cases", "run_multiple_stage_cases"]:
|
||||
try:
|
||||
one_case_data["child case num"] = self._get_case_attr(case, "child case num")
|
||||
except KeyError as e:
|
||||
print("multiple devices/stages cases must contains at least two test functions")
|
||||
print("case name: {}".format(one_case_data["name"]))
|
||||
raise e
|
||||
|
||||
case_data.append(one_case_data)
|
||||
return case_data
|
||||
|
||||
def _map_test_function(self):
|
||||
"""
|
||||
determine which test function to use according to current test case
|
||||
|
||||
:return: test function name to use
|
||||
"""
|
||||
if self.filters["multi_device"] == "Yes":
|
||||
test_function = "run_multiple_devices_cases"
|
||||
elif self.filters["multi_stage"] == "Yes":
|
||||
test_function = "run_multiple_stage_cases"
|
||||
else:
|
||||
test_function = "run_unit_test_cases"
|
||||
return test_function
|
||||
|
||||
def output(self):
|
||||
"""
|
||||
output data for job configs
|
||||
|
||||
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
|
||||
"""
|
||||
test_function = self._map_test_function()
|
||||
output_data = {
|
||||
# we don't need filter for test function, as UT uses a few test functions for all cases
|
||||
"CaseConfig": [
|
||||
{
|
||||
"name": test_function,
|
||||
"extra_data": self._create_extra_data(test_function),
|
||||
}
|
||||
]
|
||||
}
|
||||
return output_data
|
||||
|
||||
|
||||
class UnitTestAssignTest(CIAssignTest.AssignTest):
|
||||
CI_TEST_JOB_PATTERN = re.compile(r"^UT_.+")
|
||||
|
||||
def __init__(self, test_case_path, ci_config_file):
|
||||
CIAssignTest.AssignTest.__init__(self, test_case_path, ci_config_file, case_group=Group)
|
||||
|
||||
def _search_cases(self, test_case_path, case_filter=None):
|
||||
"""
|
||||
For unit test case, we don't search for test functions.
|
||||
The unit test cases is stored in a yaml file which is created in job build-idf-test.
|
||||
"""
|
||||
|
||||
try:
|
||||
with open(test_case_path, "r") as f:
|
||||
raw_data = yaml.load(f)
|
||||
test_cases = raw_data["test cases"]
|
||||
except IOError:
|
||||
print("Test case path is invalid. Should only happen when use @bot to skip unit test.")
|
||||
test_cases = []
|
||||
# filter keys are lower case. Do map lower case keys with original keys.
|
||||
try:
|
||||
key_mapping = {x.lower(): x for x in test_cases[0].keys()}
|
||||
except IndexError:
|
||||
key_mapping = dict()
|
||||
if case_filter:
|
||||
for key in case_filter:
|
||||
filtered_cases = []
|
||||
for case in test_cases:
|
||||
try:
|
||||
mapped_key = key_mapping[key]
|
||||
# bot converts string to lower case
|
||||
if isinstance(case[mapped_key], str):
|
||||
_value = case[mapped_key].lower()
|
||||
else:
|
||||
_value = case[mapped_key]
|
||||
if _value in case_filter[key]:
|
||||
filtered_cases.append(case)
|
||||
except KeyError:
|
||||
# case don't have this key, regard as filter success
|
||||
filtered_cases.append(case)
|
||||
test_cases = filtered_cases
|
||||
return test_cases
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("test_case",
|
||||
help="test case folder or file")
|
||||
parser.add_argument("ci_config_file",
|
||||
help="gitlab ci config file")
|
||||
parser.add_argument("output_path",
|
||||
help="output path of config files")
|
||||
args = parser.parse_args()
|
||||
|
||||
assign_test = UnitTestAssignTest(args.test_case, args.ci_config_file)
|
||||
assign_test.assign_cases()
|
||||
assign_test.output_configs(args.output_path)
|
||||
@@ -85,6 +85,14 @@ def _decode_data(data):
|
||||
return data
|
||||
|
||||
|
||||
def _pattern_to_string(pattern):
|
||||
try:
|
||||
ret = "RegEx: " + pattern.pattern
|
||||
except AttributeError:
|
||||
ret = pattern
|
||||
return ret
|
||||
|
||||
|
||||
class _DataCache(_queue.Queue):
|
||||
"""
|
||||
Data cache based on Queue. Allow users to process data cache based on bytes instead of Queue."
|
||||
@@ -94,7 +102,22 @@ class _DataCache(_queue.Queue):
|
||||
_queue.Queue.__init__(self, maxsize=maxsize)
|
||||
self.data_cache = str()
|
||||
|
||||
def get_data(self, timeout=0):
|
||||
def _move_from_queue_to_cache(self):
|
||||
"""
|
||||
move all of the available data in the queue to cache
|
||||
|
||||
:return: True if moved any item from queue to data cache, else False
|
||||
"""
|
||||
ret = False
|
||||
while True:
|
||||
try:
|
||||
self.data_cache += _decode_data(self.get(0))
|
||||
ret = True
|
||||
except _queue.Empty:
|
||||
break
|
||||
return ret
|
||||
|
||||
def get_data(self, timeout=0.0):
|
||||
"""
|
||||
get a copy of data from cache.
|
||||
|
||||
@@ -105,12 +128,16 @@ class _DataCache(_queue.Queue):
|
||||
if timeout < 0:
|
||||
timeout = 0
|
||||
|
||||
try:
|
||||
data = self.get(timeout=timeout)
|
||||
self.data_cache += _decode_data(data)
|
||||
except _queue.Empty:
|
||||
# don't do anything when on update for cache
|
||||
pass
|
||||
ret = self._move_from_queue_to_cache()
|
||||
|
||||
if not ret:
|
||||
# we only wait for new data if we can't provide a new data_cache
|
||||
try:
|
||||
data = self.get(timeout=timeout)
|
||||
self.data_cache += _decode_data(data)
|
||||
except _queue.Empty:
|
||||
# don't do anything when on update for cache
|
||||
pass
|
||||
return copy.deepcopy(self.data_cache)
|
||||
|
||||
def flush(self, index=0xFFFFFFFF):
|
||||
@@ -127,16 +154,64 @@ class _DataCache(_queue.Queue):
|
||||
self.data_cache = self.data_cache[index:]
|
||||
|
||||
|
||||
class _LogThread(threading.Thread, _queue.Queue):
|
||||
"""
|
||||
We found some SD card on Raspberry Pi could have very bad performance.
|
||||
It could take seconds to save small amount of data.
|
||||
If the DUT receives data and save it as log, then it stops receiving data until log is saved.
|
||||
This could lead to expect timeout.
|
||||
As an workaround to this issue, ``BaseDUT`` class will create a thread to save logs.
|
||||
Then data will be passed to ``expect`` as soon as received.
|
||||
"""
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self, name="LogThread")
|
||||
_queue.Queue.__init__(self, maxsize=0)
|
||||
self.setDaemon(True)
|
||||
self.flush_lock = threading.Lock()
|
||||
|
||||
def save_log(self, filename, data):
|
||||
"""
|
||||
:param filename: log file name
|
||||
:param data: log data. Must be ``bytes``.
|
||||
"""
|
||||
self.put({"filename": filename, "data": data})
|
||||
|
||||
def flush_data(self):
|
||||
with self.flush_lock:
|
||||
data_cache = dict()
|
||||
while True:
|
||||
# move all data from queue to data cache
|
||||
try:
|
||||
log = self.get_nowait()
|
||||
try:
|
||||
data_cache[log["filename"]] += log["data"]
|
||||
except KeyError:
|
||||
data_cache[log["filename"]] = log["data"]
|
||||
except _queue.Empty:
|
||||
break
|
||||
# flush data
|
||||
for filename in data_cache:
|
||||
with open(filename, "ab+") as f:
|
||||
f.write(data_cache[filename])
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
time.sleep(1)
|
||||
self.flush_data()
|
||||
|
||||
|
||||
class _RecvThread(threading.Thread):
|
||||
|
||||
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
|
||||
|
||||
def __init__(self, read, data_cache):
|
||||
def __init__(self, read, data_cache, recorded_data, record_data_lock):
|
||||
super(_RecvThread, self).__init__()
|
||||
self.exit_event = threading.Event()
|
||||
self.setDaemon(True)
|
||||
self.read = read
|
||||
self.data_cache = data_cache
|
||||
self.recorded_data = recorded_data
|
||||
self.record_data_lock = record_data_lock
|
||||
# cache the last line of recv data for collecting performance
|
||||
self._line_cache = str()
|
||||
|
||||
@@ -169,7 +244,10 @@ class _RecvThread(threading.Thread):
|
||||
while not self.exit_event.isSet():
|
||||
data = self.read(1000)
|
||||
if data:
|
||||
self.data_cache.put(data)
|
||||
with self.record_data_lock:
|
||||
self.data_cache.put(data)
|
||||
for capture_id in self.recorded_data:
|
||||
self.recorded_data[capture_id].put(data)
|
||||
self.collect_performance(data)
|
||||
|
||||
def exit(self):
|
||||
@@ -187,6 +265,10 @@ class BaseDUT(object):
|
||||
"""
|
||||
|
||||
DEFAULT_EXPECT_TIMEOUT = 5
|
||||
MAX_EXPECT_FAILURES_TO_SAVED = 10
|
||||
|
||||
LOG_THREAD = _LogThread()
|
||||
LOG_THREAD.start()
|
||||
|
||||
def __init__(self, name, port, log_file, app, **kwargs):
|
||||
|
||||
@@ -196,13 +278,39 @@ class BaseDUT(object):
|
||||
self.log_file = log_file
|
||||
self.app = app
|
||||
self.data_cache = _DataCache()
|
||||
# the main process of recorded data are done in receive thread
|
||||
# but receive thread could be closed in DUT lifetime (tool methods)
|
||||
# so we keep it in BaseDUT, as their life cycle are same
|
||||
self.recorded_data = dict()
|
||||
self.record_data_lock = threading.RLock()
|
||||
self.receive_thread = None
|
||||
self.expect_failures = []
|
||||
# open and start during init
|
||||
self.open()
|
||||
|
||||
def __str__(self):
|
||||
return "DUT({}: {})".format(self.name, str(self.port))
|
||||
|
||||
def _save_expect_failure(self, pattern, data, start_time):
|
||||
"""
|
||||
Save expect failure. If the test fails, then it will print the expect failures.
|
||||
In some cases, user will handle expect exceptions.
|
||||
The expect failures could be false alarm, and test case might generate a lot of such failures.
|
||||
Therefore, we don't print the failure immediately and limit the max size of failure list.
|
||||
"""
|
||||
self.expect_failures.insert(0, {"pattern": pattern, "data": data,
|
||||
"start": start_time, "end": time.time()})
|
||||
self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
|
||||
|
||||
def _save_dut_log(self, data):
|
||||
"""
|
||||
Save DUT log into file using another thread.
|
||||
This is a workaround for some devices takes long time for file system operations.
|
||||
|
||||
See descriptions in ``_LogThread`` for details.
|
||||
"""
|
||||
self.LOG_THREAD.save_log(self.log_file, data)
|
||||
|
||||
# define for methods need to be overwritten by Port
|
||||
@classmethod
|
||||
def list_available_ports(cls):
|
||||
@@ -290,7 +398,8 @@ class BaseDUT(object):
|
||||
:return: None
|
||||
"""
|
||||
self._port_open()
|
||||
self.receive_thread = _RecvThread(self._port_read, self.data_cache)
|
||||
self.receive_thread = _RecvThread(self._port_read, self.data_cache,
|
||||
self.recorded_data, self.record_data_lock)
|
||||
self.receive_thread.start()
|
||||
|
||||
def close(self):
|
||||
@@ -302,6 +411,7 @@ class BaseDUT(object):
|
||||
if self.receive_thread:
|
||||
self.receive_thread.exit()
|
||||
self._port_close()
|
||||
self.LOG_THREAD.flush_data()
|
||||
|
||||
def write(self, data, eol="\r\n", flush=True):
|
||||
"""
|
||||
@@ -316,7 +426,7 @@ class BaseDUT(object):
|
||||
if flush:
|
||||
self.data_cache.flush()
|
||||
# do write if cache
|
||||
if data:
|
||||
if data is not None:
|
||||
self._port_write(data + eol if eol else data)
|
||||
|
||||
@_expect_lock
|
||||
@@ -333,6 +443,42 @@ class BaseDUT(object):
|
||||
self.data_cache.flush(size)
|
||||
return data
|
||||
|
||||
def start_capture_raw_data(self, capture_id="default"):
|
||||
"""
|
||||
Sometime application want to get DUT raw data and use ``expect`` method at the same time.
|
||||
Capture methods provides a way to get raw data without affecting ``expect`` or ``read`` method.
|
||||
|
||||
If you call ``start_capture_raw_data`` with same capture id again, it will restart capture on this ID.
|
||||
|
||||
:param capture_id: ID of capture. You can use different IDs to do different captures at the same time.
|
||||
"""
|
||||
with self.record_data_lock:
|
||||
try:
|
||||
# if start capture on existed ID, we do flush data and restart capture
|
||||
self.recorded_data[capture_id].flush()
|
||||
except KeyError:
|
||||
# otherwise, create new data cache
|
||||
self.recorded_data[capture_id] = _DataCache()
|
||||
|
||||
def stop_capture_raw_data(self, capture_id="default"):
|
||||
"""
|
||||
Stop capture and get raw data.
|
||||
This method should be used after ``start_capture_raw_data`` on the same capture ID.
|
||||
|
||||
:param capture_id: ID of capture.
|
||||
:return: captured raw data between start capture and stop capture.
|
||||
"""
|
||||
with self.record_data_lock:
|
||||
try:
|
||||
ret = self.recorded_data[capture_id].get_data()
|
||||
self.recorded_data.pop(capture_id)
|
||||
except KeyError as e:
|
||||
e.message = "capture_id does not exist. " \
|
||||
"You should call start_capture_raw_data with same ID " \
|
||||
"before calling stop_capture_raw_data"
|
||||
raise e
|
||||
return ret
|
||||
|
||||
# expect related methods
|
||||
|
||||
@staticmethod
|
||||
@@ -410,14 +556,19 @@ class BaseDUT(object):
|
||||
start_time = time.time()
|
||||
while True:
|
||||
ret, index = method(data, pattern)
|
||||
if ret is not None or time.time() - start_time > timeout:
|
||||
if ret is not None:
|
||||
self.data_cache.flush(index)
|
||||
break
|
||||
time_remaining = start_time + timeout - time.time()
|
||||
if time_remaining < 0:
|
||||
break
|
||||
# wait for new data from cache
|
||||
data = self.data_cache.get_data(time.time() + timeout - start_time)
|
||||
data = self.data_cache.get_data(time_remaining)
|
||||
|
||||
if ret is None:
|
||||
raise ExpectTimeout(self.name + ": " + str(pattern))
|
||||
pattern = _pattern_to_string(pattern)
|
||||
self._save_expect_failure(pattern, data, start_time)
|
||||
raise ExpectTimeout(self.name + ": " + pattern)
|
||||
return ret
|
||||
|
||||
def _expect_multi(self, expect_all, expect_item_list, timeout):
|
||||
@@ -457,22 +608,25 @@ class BaseDUT(object):
|
||||
if expect_item["ret"] is not None:
|
||||
# match succeed for one item
|
||||
matched_expect_items.append(expect_item)
|
||||
break
|
||||
|
||||
# if expect all, then all items need to be matched,
|
||||
# else only one item need to matched
|
||||
if expect_all:
|
||||
match_succeed = (matched_expect_items == expect_items)
|
||||
match_succeed = len(matched_expect_items) == len(expect_items)
|
||||
else:
|
||||
match_succeed = True if matched_expect_items else False
|
||||
|
||||
if time.time() - start_time > timeout or match_succeed:
|
||||
time_remaining = start_time + timeout - time.time()
|
||||
if time_remaining < 0 or match_succeed:
|
||||
break
|
||||
else:
|
||||
data = self.data_cache.get_data(time.time() + timeout - start_time)
|
||||
data = self.data_cache.get_data(time_remaining)
|
||||
|
||||
if match_succeed:
|
||||
# do callback and flush matched data cache
|
||||
# sort matched items according to order of appearance in the input data,
|
||||
# so that the callbacks are invoked in correct order
|
||||
matched_expect_items = sorted(matched_expect_items, key=lambda it: it["index"])
|
||||
# invoke callbacks and flush matched data cache
|
||||
slice_index = -1
|
||||
for expect_item in matched_expect_items:
|
||||
# trigger callback
|
||||
@@ -482,7 +636,9 @@ class BaseDUT(object):
|
||||
# flush already matched data
|
||||
self.data_cache.flush(slice_index)
|
||||
else:
|
||||
raise ExpectTimeout(self.name + ": " + str(expect_items))
|
||||
pattern = str([_pattern_to_string(x["pattern"]) for x in expect_items])
|
||||
self._save_expect_failure(pattern, data, start_time)
|
||||
raise ExpectTimeout(self.name + ": " + pattern)
|
||||
|
||||
@_expect_lock
|
||||
def expect_any(self, *expect_items, **timeout):
|
||||
@@ -528,6 +684,22 @@ class BaseDUT(object):
|
||||
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
|
||||
return self._expect_multi(True, expect_items, **timeout)
|
||||
|
||||
@staticmethod
|
||||
def _format_ts(ts):
|
||||
return "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(ts)), str(ts % 1)[2:5])
|
||||
|
||||
def print_debug_info(self):
|
||||
"""
|
||||
Print debug info of current DUT. Currently we will print debug info for expect failures.
|
||||
"""
|
||||
Utility.console_log("DUT debug info for DUT: {}:".format(self.name), color="orange")
|
||||
|
||||
for failure in self.expect_failures:
|
||||
Utility.console_log(u"\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n"
|
||||
.format(failure["pattern"], failure["data"],
|
||||
self._format_ts(failure["start"]), self._format_ts(failure["end"])),
|
||||
color="orange")
|
||||
|
||||
|
||||
class SerialDUT(BaseDUT):
|
||||
""" serial with logging received data feature """
|
||||
@@ -548,18 +720,15 @@ class SerialDUT(BaseDUT):
|
||||
self.serial_configs.update(kwargs)
|
||||
super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def _format_data(data):
|
||||
def _format_data(self, data):
|
||||
"""
|
||||
format data for logging. do decode and add timestamp.
|
||||
|
||||
:param data: raw data from read
|
||||
:return: formatted data (str)
|
||||
"""
|
||||
timestamp = time.time()
|
||||
timestamp = "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(timestamp)),
|
||||
str(timestamp % 1)[2:5])
|
||||
formatted_data = "[{}]:\r\n{}\r\n".format(timestamp, _decode_data(data))
|
||||
timestamp = "[{}]".format(self._format_ts(time.time()))
|
||||
formatted_data = timestamp.encode() + b"\r\n" + data + b"\r\n"
|
||||
return formatted_data
|
||||
|
||||
def _port_open(self):
|
||||
@@ -571,11 +740,12 @@ class SerialDUT(BaseDUT):
|
||||
def _port_read(self, size=1):
|
||||
data = self.port_inst.read(size)
|
||||
if data:
|
||||
with open(self.log_file, "a+") as _log_file:
|
||||
_log_file.write(self._format_data(data))
|
||||
self._save_dut_log(self._format_data(data))
|
||||
return data
|
||||
|
||||
def _port_write(self, data):
|
||||
if isinstance(data, str):
|
||||
data = data.encode()
|
||||
self.port_inst.write(data)
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -17,6 +17,8 @@ import os
|
||||
import threading
|
||||
import functools
|
||||
|
||||
import netifaces
|
||||
|
||||
import EnvConfig
|
||||
|
||||
|
||||
@@ -47,12 +49,12 @@ class Env(object):
|
||||
dut=None,
|
||||
env_tag=None,
|
||||
env_config_file=None,
|
||||
test_name=None,
|
||||
test_suite_name=None,
|
||||
**kwargs):
|
||||
self.app_cls = app
|
||||
self.default_dut_cls = dut
|
||||
self.config = EnvConfig.Config(env_config_file, env_tag)
|
||||
self.log_path = self.app_cls.get_log_folder(test_name)
|
||||
self.log_path = self.app_cls.get_log_folder(test_suite_name)
|
||||
if not os.path.exists(self.log_path):
|
||||
os.makedirs(self.log_path)
|
||||
|
||||
@@ -130,27 +132,47 @@ class Env(object):
|
||||
"""
|
||||
return self.config.get_variable(variable_name)
|
||||
|
||||
PROTO_MAP = {
|
||||
"ipv4": netifaces.AF_INET,
|
||||
"ipv6": netifaces.AF_INET6,
|
||||
"mac": netifaces.AF_LINK,
|
||||
}
|
||||
|
||||
@_synced
|
||||
def get_pc_nic_info(self, nic_name="pc_nic"):
|
||||
def get_pc_nic_info(self, nic_name="pc_nic", proto="ipv4"):
|
||||
"""
|
||||
get_pc_nic_info(nic_name="pc_nic")
|
||||
try to get nic info (ip address, ipv6 address, mac address)
|
||||
try to get info of a specified NIC and protocol.
|
||||
|
||||
:param nic_name: pc nic name. allows passing variable name, nic name value or omitted (to get default nic info).
|
||||
:return: a dict of address ("ipv4", "ipv6", "mac") if successfully found. otherwise None.
|
||||
:param nic_name: pc nic name. allows passing variable name, nic name value.
|
||||
:param proto: "ipv4", "ipv6" or "mac"
|
||||
:return: a dict of nic info if successfully found. otherwise None.
|
||||
nic info keys could be different for different protocols.
|
||||
key "addr" is available for both mac, ipv4 and ipv6 pic info.
|
||||
"""
|
||||
# TODO: need to implement auto get nic info method
|
||||
return self.config.get_variable("nic_info/" + nic_name)
|
||||
interfaces = netifaces.interfaces()
|
||||
if nic_name in interfaces:
|
||||
# the name is in the interface list, we regard it as NIC name
|
||||
if_addr = netifaces.ifaddresses(nic_name)
|
||||
else:
|
||||
# it's not in interface name list, we assume it's variable name
|
||||
_nic_name = self.get_variable(nic_name)
|
||||
if_addr = netifaces.ifaddresses(_nic_name)
|
||||
|
||||
return if_addr[self.PROTO_MAP[proto]][0]
|
||||
|
||||
@_synced
|
||||
def close(self):
|
||||
def close(self, dut_debug=False):
|
||||
"""
|
||||
close()
|
||||
close all DUTs of the Env.
|
||||
|
||||
:param dut_debug: if dut_debug is True, then print all dut expect failures before close it
|
||||
:return: None
|
||||
"""
|
||||
for dut_name in self.allocated_duts:
|
||||
dut = self.allocated_duts[dut_name]["dut"]
|
||||
if dut_debug:
|
||||
dut.print_debug_info()
|
||||
dut.close()
|
||||
self.allocated_duts = dict()
|
||||
|
||||
@@ -53,7 +53,7 @@ class Config(object):
|
||||
try:
|
||||
with open(config_file) as f:
|
||||
configs = yaml.load(f)[env_name]
|
||||
except (OSError, TypeError):
|
||||
except (OSError, TypeError, IOError):
|
||||
configs = dict()
|
||||
return configs
|
||||
|
||||
|
||||
@@ -144,11 +144,28 @@ class Example(IDFApp):
|
||||
|
||||
class UT(IDFApp):
|
||||
def get_binary_path(self, app_path):
|
||||
if app_path:
|
||||
# specified path, join it and the idf path
|
||||
path = os.path.join(self.idf_path, app_path)
|
||||
else:
|
||||
path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
|
||||
"""
|
||||
:param app_path: app path or app config
|
||||
:return: binary path
|
||||
"""
|
||||
if not app_path:
|
||||
app_path = "default"
|
||||
|
||||
path = os.path.join(self.idf_path, app_path)
|
||||
if not os.path.exists(path):
|
||||
while True:
|
||||
# try to get by config
|
||||
if app_path == "default":
|
||||
# it's default config, we first try to get form build folder of unit-test-app
|
||||
path = os.path.join(self.idf_path, "tools", "unit-test-app", "build")
|
||||
if os.path.exists(path):
|
||||
# found, use bin in build path
|
||||
break
|
||||
# ``make ut-build-all-configs`` or ``make ut-build-CONFIG`` will copy binary to output folder
|
||||
path = os.path.join(self.idf_path, "tools", "unit-test-app", "output", app_path)
|
||||
if os.path.exists(path):
|
||||
break
|
||||
raise OSError("Failed to get unit-test-app binary path")
|
||||
return path
|
||||
|
||||
|
||||
|
||||
@@ -20,6 +20,8 @@ import functools
|
||||
import random
|
||||
import tempfile
|
||||
|
||||
from serial.tools import list_ports
|
||||
|
||||
import DUT
|
||||
|
||||
|
||||
|
||||
@@ -45,6 +45,31 @@ def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32", module="examples", e
|
||||
execution_time=execution_time, level=level, **kwargs)
|
||||
|
||||
|
||||
def idf_unit_test(app=UT, dut=IDFDUT, chip="ESP32", module="unit-test", execution_time=1,
|
||||
level="unit", erase_nvs=True, **kwargs):
|
||||
"""
|
||||
decorator for testing idf unit tests (with default values for some keyword args).
|
||||
|
||||
:param app: test application class
|
||||
:param dut: dut class
|
||||
:param chip: chip supported, string or tuple
|
||||
:param module: module, string
|
||||
:param execution_time: execution time in minutes, int
|
||||
:param level: test level, could be used to filter test cases, string
|
||||
:param erase_nvs: if need to erase_nvs in DUT.start_app()
|
||||
:param kwargs: other keyword args
|
||||
:return: test method
|
||||
"""
|
||||
try:
|
||||
# try to config the default behavior of erase nvs
|
||||
dut.ERASE_NVS = erase_nvs
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
return TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
|
||||
execution_time=execution_time, level=level, **kwargs)
|
||||
|
||||
|
||||
def log_performance(item, value):
|
||||
"""
|
||||
do print performance with pre-defined format to console
|
||||
@@ -52,7 +77,11 @@ def log_performance(item, value):
|
||||
:param item: performance item name
|
||||
:param value: performance value
|
||||
"""
|
||||
Utility.console_log("[Performance][{}]: {}".format(item, value), "orange")
|
||||
performance_msg = "[Performance][{}]: {}".format(item, value)
|
||||
Utility.console_log(performance_msg, "orange")
|
||||
# update to junit test report
|
||||
current_junit_case = TinyFW.JunitReport.get_current_test_case()
|
||||
current_junit_case.stdout += performance_msg + "\r\n"
|
||||
|
||||
|
||||
def check_performance(item, value):
|
||||
|
||||
@@ -40,18 +40,22 @@ class Runner(threading.Thread):
|
||||
def __init__(self, test_case, case_config, env_config_file=None):
|
||||
super(Runner, self).__init__()
|
||||
self.setDaemon(True)
|
||||
test_methods = SearchCases.Search.search_test_cases(test_case)
|
||||
self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
|
||||
self.test_result = True
|
||||
if case_config:
|
||||
test_suite_name = os.path.splitext(os.path.basename(case_config))[0]
|
||||
else:
|
||||
test_suite_name = "TestRunner"
|
||||
TinyFW.set_default_config(env_config_file=env_config_file, test_suite_name=test_suite_name)
|
||||
test_methods = SearchCases.Search.search_test_cases(test_case)
|
||||
self.test_cases = CaseConfig.Parser.apply_config(test_methods, case_config)
|
||||
self.test_result = []
|
||||
|
||||
def run(self):
|
||||
for case in self.test_cases:
|
||||
self.test_result = self.test_result and case.run()
|
||||
result = case.run()
|
||||
self.test_result.append(result)
|
||||
|
||||
def get_test_result(self):
|
||||
return self.test_result and all(self.test_result)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
@@ -76,5 +80,5 @@ if __name__ == '__main__':
|
||||
except KeyboardInterrupt:
|
||||
print("exit by Ctrl-C")
|
||||
break
|
||||
if not runner.test_result:
|
||||
if not runner.get_test_result():
|
||||
sys.exit(1)
|
||||
|
||||
@@ -13,14 +13,12 @@
|
||||
# limitations under the License.
|
||||
|
||||
""" Interface for test cases. """
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import traceback
|
||||
import inspect
|
||||
import functools
|
||||
|
||||
import xunitgen
|
||||
import junit_xml
|
||||
|
||||
import Env
|
||||
import DUT
|
||||
@@ -28,11 +26,6 @@ import App
|
||||
import Utility
|
||||
|
||||
|
||||
XUNIT_FILE_NAME = "XUNIT_RESULT.xml"
|
||||
XUNIT_RECEIVER = xunitgen.EventReceiver()
|
||||
XUNIT_DEFAULT_TEST_SUITE = "test-suite"
|
||||
|
||||
|
||||
class DefaultEnvConfig(object):
|
||||
"""
|
||||
default test configs. There're 3 places to set configs, priority is (high -> low):
|
||||
@@ -69,46 +62,69 @@ set_default_config = DefaultEnvConfig.set_default_config
|
||||
get_default_config = DefaultEnvConfig.get_default_config
|
||||
|
||||
|
||||
class TestResult(object):
|
||||
TEST_RESULT = {
|
||||
"pass": [],
|
||||
"fail": [],
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_failed_cases(cls):
|
||||
"""
|
||||
:return: failed test cases
|
||||
"""
|
||||
return cls.TEST_RESULT["fail"]
|
||||
|
||||
@classmethod
|
||||
def get_passed_cases(cls):
|
||||
"""
|
||||
:return: passed test cases
|
||||
"""
|
||||
return cls.TEST_RESULT["pass"]
|
||||
|
||||
@classmethod
|
||||
def set_result(cls, result, case_name):
|
||||
"""
|
||||
:param result: True or False
|
||||
:param case_name: test case name
|
||||
:return: None
|
||||
"""
|
||||
cls.TEST_RESULT["pass" if result else "fail"].append(case_name)
|
||||
|
||||
|
||||
get_failed_cases = TestResult.get_failed_cases
|
||||
get_passed_cases = TestResult.get_passed_cases
|
||||
|
||||
|
||||
MANDATORY_INFO = {
|
||||
"execution_time": 1,
|
||||
"env_tag": "default",
|
||||
"category": "function",
|
||||
"ignore": False,
|
||||
}
|
||||
|
||||
|
||||
class JunitReport(object):
|
||||
# wrapper for junit test report
|
||||
# TODO: Don't support by multi-thread (although not likely to be used this way).
|
||||
|
||||
JUNIT_FILE_NAME = "XUNIT_RESULT.xml"
|
||||
JUNIT_DEFAULT_TEST_SUITE = "test-suite"
|
||||
JUNIT_TEST_SUITE = junit_xml.TestSuite(JUNIT_DEFAULT_TEST_SUITE)
|
||||
JUNIT_CURRENT_TEST_CASE = None
|
||||
_TEST_CASE_CREATED_TS = 0
|
||||
|
||||
@classmethod
|
||||
def output_report(cls, junit_file_path):
|
||||
""" Output current test result to file. """
|
||||
with open(os.path.join(junit_file_path, cls.JUNIT_FILE_NAME), "w") as f:
|
||||
cls.JUNIT_TEST_SUITE.to_file(f, [cls.JUNIT_TEST_SUITE], prettyprint=False)
|
||||
|
||||
@classmethod
|
||||
def get_current_test_case(cls):
|
||||
"""
|
||||
By default, the test framework will handle junit test report automatically.
|
||||
While some test case might want to update some info to test report.
|
||||
They can use this method to get current test case created by test framework.
|
||||
|
||||
:return: current junit test case instance created by ``JunitTestReport.create_test_case``
|
||||
"""
|
||||
return cls.JUNIT_CURRENT_TEST_CASE
|
||||
|
||||
@classmethod
|
||||
def test_case_finish(cls, test_case):
|
||||
"""
|
||||
Append the test case to test suite so it can be output to file.
|
||||
Execution time will be automatically updated (compared to ``create_test_case``).
|
||||
"""
|
||||
test_case.elapsed_sec = time.time() - cls._TEST_CASE_CREATED_TS
|
||||
cls.JUNIT_TEST_SUITE.test_cases.append(test_case)
|
||||
|
||||
@classmethod
|
||||
def create_test_case(cls, name):
|
||||
"""
|
||||
Extend ``junit_xml.TestCase`` with:
|
||||
|
||||
1. save create test case so it can be get by ``get_current_test_case``
|
||||
2. log create timestamp, so ``elapsed_sec`` can be auto updated in ``test_case_finish``.
|
||||
|
||||
:param name: test case name
|
||||
:return: instance of ``junit_xml.TestCase``
|
||||
"""
|
||||
# set stdout to empty string, so we can always append string to stdout.
|
||||
# It won't affect output logic. If stdout is empty, it won't be put to report.
|
||||
test_case = junit_xml.TestCase(name, stdout="")
|
||||
cls.JUNIT_CURRENT_TEST_CASE = test_case
|
||||
cls._TEST_CASE_CREATED_TS = time.time()
|
||||
return test_case
|
||||
|
||||
|
||||
def test_method(**kwargs):
|
||||
"""
|
||||
decorator for test case function.
|
||||
@@ -122,22 +138,17 @@ def test_method(**kwargs):
|
||||
:keyword env_config_file: test env config file. usually will not set this keyword when define case
|
||||
:keyword test_suite_name: test suite name, used for generating log folder name and adding xunit format test result.
|
||||
usually will not set this keyword when define case
|
||||
:keyword junit_report_by_case: By default the test fw will handle junit report generation.
|
||||
In some cases, one test function might test many test cases.
|
||||
If this flag is set, test case can update junit report by its own.
|
||||
"""
|
||||
def test(test_func):
|
||||
# get test function file name
|
||||
frame = inspect.stack()
|
||||
test_func_file_name = frame[1][1]
|
||||
|
||||
case_info = MANDATORY_INFO.copy()
|
||||
case_info["name"] = test_func.__name__
|
||||
case_info["name"] = case_info["ID"] = test_func.__name__
|
||||
case_info["junit_report_by_case"] = False
|
||||
case_info.update(kwargs)
|
||||
|
||||
# create env instance
|
||||
env_config = DefaultEnvConfig.get_default_config()
|
||||
for key in kwargs:
|
||||
if key in env_config:
|
||||
env_config[key] = kwargs[key]
|
||||
|
||||
@functools.wraps(test_func)
|
||||
def handle_test(extra_data=None, **overwrite):
|
||||
"""
|
||||
@@ -147,12 +158,20 @@ def test_method(**kwargs):
|
||||
:param overwrite: args that runner or main want to overwrite
|
||||
:return: None
|
||||
"""
|
||||
# create env instance
|
||||
env_config = DefaultEnvConfig.get_default_config()
|
||||
for key in kwargs:
|
||||
if key in env_config:
|
||||
env_config[key] = kwargs[key]
|
||||
|
||||
env_config.update(overwrite)
|
||||
env_inst = Env.Env(**env_config)
|
||||
|
||||
# prepare for xunit test results
|
||||
xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
|
||||
XUNIT_FILE_NAME)
|
||||
XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
|
||||
junit_file_path = env_inst.app_cls.get_log_folder(env_config["test_suite_name"])
|
||||
junit_test_case = JunitReport.create_test_case(case_info["name"])
|
||||
result = False
|
||||
|
||||
try:
|
||||
Utility.console_log("starting running test: " + test_func.__name__, color="green")
|
||||
# execute test function
|
||||
@@ -162,23 +181,21 @@ def test_method(**kwargs):
|
||||
except Exception as e:
|
||||
# handle all the exceptions here
|
||||
traceback.print_exc()
|
||||
result = False
|
||||
# log failure
|
||||
XUNIT_RECEIVER.failure(str(e), test_func_file_name)
|
||||
junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
|
||||
finally:
|
||||
# do close all DUTs
|
||||
env_inst.close()
|
||||
if not case_info["junit_report_by_case"]:
|
||||
JunitReport.test_case_finish(junit_test_case)
|
||||
# do close all DUTs, if result is False then print DUT debug info
|
||||
env_inst.close(dut_debug=(not result))
|
||||
|
||||
# end case and output result
|
||||
XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
|
||||
with open(xunit_file, "ab+") as f:
|
||||
f.write(xunitgen.toxml(XUNIT_RECEIVER.results(),
|
||||
XUNIT_DEFAULT_TEST_SUITE))
|
||||
JunitReport.output_report(junit_file_path)
|
||||
|
||||
if result:
|
||||
Utility.console_log("Test Succeed: " + test_func.__name__, color="green")
|
||||
else:
|
||||
Utility.console_log(("Test Fail: " + test_func.__name__), color="red")
|
||||
TestResult.set_result(result, test_func.__name__)
|
||||
return result
|
||||
|
||||
handle_test.case_info = case_info
|
||||
|
||||
236
tools/tiny-test-fw/Utility/CIAssignTest.py
Normal file
236
tools/tiny-test-fw/Utility/CIAssignTest.py
Normal file
@@ -0,0 +1,236 @@
|
||||
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http:#www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Common logic to assign test cases to CI jobs.
|
||||
|
||||
Some background knowledge about Gitlab CI and use flow in esp-idf:
|
||||
|
||||
* Gitlab CI jobs are static in ``.gitlab-ci.yml``. We can't dynamically create test jobs
|
||||
* For test job running on DUT, we use ``tags`` to select runners with different test environment
|
||||
* We have ``assign_test`` stage, will collect cases, and then assign them to correct test jobs
|
||||
* ``assign_test`` will fail if failed to assign any cases
|
||||
* with ``assign_test``, we can:
|
||||
* dynamically filter test case we want to test
|
||||
* alert user if they forget to add CI jobs and guide how to add test jobs
|
||||
* the last step of ``assign_test`` is to output config files, then test jobs will run these cases
|
||||
|
||||
The Basic logic to assign test cases is as follow:
|
||||
|
||||
1. do search all the cases
|
||||
2. do filter case (if filter is specified by @bot)
|
||||
3. put cases to different groups according to rule of ``Group``
|
||||
* try to put them in existed groups
|
||||
* if failed then create a new group and add this case
|
||||
4. parse and filter the test jobs from CI config file
|
||||
5. try to assign all groups to jobs according to tags
|
||||
6. output config files for jobs
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
|
||||
import yaml
|
||||
|
||||
from Utility import (CaseConfig, SearchCases, GitlabCIJob, console_log)
|
||||
|
||||
|
||||
class Group(object):
|
||||
|
||||
MAX_EXECUTION_TIME = 30
|
||||
MAX_CASE = 15
|
||||
SORT_KEYS = ["env_tag"]
|
||||
# Matching CI job rules could be different from the way we want to group test cases.
|
||||
# For example, when assign unit test cases, different test cases need to use different test functions.
|
||||
# We need to put them into different groups.
|
||||
# But these groups can be assigned to jobs with same tags, as they use the same test environment.
|
||||
CI_JOB_MATCH_KEYS = SORT_KEYS
|
||||
|
||||
def __init__(self, case):
|
||||
self.execution_time = 0
|
||||
self.case_list = [case]
|
||||
self.filters = dict(zip(self.SORT_KEYS, [self._get_case_attr(case, x) for x in self.SORT_KEYS]))
|
||||
# we use ci_job_match_keys to match CI job tags. It's a set of required tags.
|
||||
self.ci_job_match_keys = set([self._get_case_attr(case, x) for x in self.CI_JOB_MATCH_KEYS])
|
||||
|
||||
@staticmethod
|
||||
def _get_case_attr(case, attr):
|
||||
# we might use different type for case (dict or test_func)
|
||||
# this method will do get attribute form cases
|
||||
return case.case_info[attr]
|
||||
|
||||
def accept_new_case(self):
|
||||
"""
|
||||
check if allowed to add any case to this group
|
||||
|
||||
:return: True or False
|
||||
"""
|
||||
max_time = (sum([self._get_case_attr(x, "execution_time") for x in self.case_list])
|
||||
< self.MAX_EXECUTION_TIME)
|
||||
max_case = (len(self.case_list) < self.MAX_CASE)
|
||||
return max_time and max_case
|
||||
|
||||
def add_case(self, case):
|
||||
"""
|
||||
add case to current group
|
||||
|
||||
:param case: test case
|
||||
:return: True if add succeed, else False
|
||||
"""
|
||||
added = False
|
||||
if self.accept_new_case():
|
||||
for key in self.filters:
|
||||
if self._get_case_attr(case, key) != self.filters[key]:
|
||||
break
|
||||
else:
|
||||
self.case_list.append(case)
|
||||
added = True
|
||||
return added
|
||||
|
||||
def output(self):
|
||||
"""
|
||||
output data for job configs
|
||||
|
||||
:return: {"Filter": case filter, "CaseConfig": list of case configs for cases in this group}
|
||||
"""
|
||||
output_data = {
|
||||
"Filter": self.filters,
|
||||
"CaseConfig": [{"name": self._get_case_attr(x, "name")} for x in self.case_list],
|
||||
}
|
||||
return output_data
|
||||
|
||||
|
||||
class AssignTest(object):
|
||||
"""
|
||||
Auto assign tests to CI jobs.
|
||||
|
||||
:param test_case_path: path of test case file(s)
|
||||
:param ci_config_file: path of ``.gitlab-ci.yml``
|
||||
"""
|
||||
# subclass need to rewrite CI test job pattern, to filter all test jobs
|
||||
CI_TEST_JOB_PATTERN = re.compile(r"^test_.+")
|
||||
# by default we only run function in CI, as other tests could take long time
|
||||
DEFAULT_FILTER = {
|
||||
"category": "function",
|
||||
"ignore": False,
|
||||
}
|
||||
|
||||
def __init__(self, test_case_path, ci_config_file, case_group=Group):
|
||||
self.test_case_path = test_case_path
|
||||
self.test_cases = []
|
||||
self.jobs = self._parse_gitlab_ci_config(ci_config_file)
|
||||
self.case_group = case_group
|
||||
|
||||
def _parse_gitlab_ci_config(self, ci_config_file):
|
||||
|
||||
with open(ci_config_file, "r") as f:
|
||||
ci_config = yaml.load(f)
|
||||
|
||||
job_list = list()
|
||||
for job_name in ci_config:
|
||||
if self.CI_TEST_JOB_PATTERN.search(job_name) is not None:
|
||||
job_list.append(GitlabCIJob.Job(ci_config[job_name], job_name))
|
||||
job_list.sort(key=lambda x: x["name"])
|
||||
return job_list
|
||||
|
||||
def _search_cases(self, test_case_path, case_filter=None):
|
||||
"""
|
||||
:param test_case_path: path contains test case folder
|
||||
:param case_filter: filter for test cases. the filter to use is default filter updated with case_filter param.
|
||||
:return: filtered test case list
|
||||
"""
|
||||
_case_filter = self.DEFAULT_FILTER.copy()
|
||||
if case_filter:
|
||||
_case_filter.update(case_filter)
|
||||
test_methods = SearchCases.Search.search_test_cases(test_case_path)
|
||||
return CaseConfig.filter_test_cases(test_methods, _case_filter)
|
||||
|
||||
def _group_cases(self):
|
||||
"""
|
||||
separate all cases into groups according group rules. each group will be executed by one CI job.
|
||||
|
||||
:return: test case groups.
|
||||
"""
|
||||
groups = []
|
||||
for case in self.test_cases:
|
||||
for group in groups:
|
||||
# add to current group
|
||||
if group.add_case(case):
|
||||
break
|
||||
else:
|
||||
# create new group
|
||||
groups.append(self.case_group(case))
|
||||
return groups
|
||||
|
||||
@staticmethod
|
||||
def _apply_bot_filter():
|
||||
"""
|
||||
we support customize CI test with bot.
|
||||
here we process from and return the filter which ``_search_cases`` accepts.
|
||||
|
||||
:return: filter for search test cases
|
||||
"""
|
||||
bot_filter = os.getenv("BOT_CASE_FILTER")
|
||||
if bot_filter:
|
||||
bot_filter = json.loads(bot_filter)
|
||||
else:
|
||||
bot_filter = dict()
|
||||
return bot_filter
|
||||
|
||||
def _apply_bot_test_count(self):
|
||||
"""
|
||||
Bot could also pass test count.
|
||||
If filtered cases need to be tested for several times, then we do duplicate them here.
|
||||
"""
|
||||
test_count = os.getenv("BOT_TEST_COUNT")
|
||||
if test_count:
|
||||
test_count = int(test_count)
|
||||
self.test_cases *= test_count
|
||||
|
||||
def assign_cases(self):
|
||||
"""
|
||||
separate test cases to groups and assign test cases to CI jobs.
|
||||
|
||||
:raise AssertError: if failed to assign any case to CI job.
|
||||
:return: None
|
||||
"""
|
||||
failed_to_assign = []
|
||||
case_filter = self._apply_bot_filter()
|
||||
self.test_cases = self._search_cases(self.test_case_path, case_filter)
|
||||
self._apply_bot_test_count()
|
||||
test_groups = self._group_cases()
|
||||
for group in test_groups:
|
||||
for job in self.jobs:
|
||||
if job.match_group(group):
|
||||
job.assign_group(group)
|
||||
break
|
||||
else:
|
||||
failed_to_assign.append(group)
|
||||
if failed_to_assign:
|
||||
console_log("Too many test cases vs jobs to run. Please add the following jobs to .gitlab-ci.yml with specific tags:", "R")
|
||||
for group in failed_to_assign:
|
||||
console_log("* Add job with: " + ",".join(group.ci_job_match_keys), "R")
|
||||
raise RuntimeError("Failed to assign test case to CI jobs")
|
||||
|
||||
def output_configs(self, output_path):
|
||||
"""
|
||||
:param output_path: path to output config files for each CI job
|
||||
:return: None
|
||||
"""
|
||||
if not os.path.exists(output_path):
|
||||
os.makedirs(output_path)
|
||||
for job in self.jobs:
|
||||
job.output_config(output_path)
|
||||
@@ -51,14 +51,33 @@ import yaml
|
||||
import TestCase
|
||||
|
||||
|
||||
def _convert_to_lower_case(item):
|
||||
"""
|
||||
bot filter is always lower case string.
|
||||
this function will convert to all string to lower case.
|
||||
"""
|
||||
if isinstance(item, (tuple, list)):
|
||||
output = [_convert_to_lower_case(v) for v in item]
|
||||
elif isinstance(item, str):
|
||||
output = item.lower()
|
||||
else:
|
||||
output = item
|
||||
return output
|
||||
|
||||
|
||||
def _filter_one_case(test_method, case_filter):
|
||||
""" Apply filter for one case (the filter logic is the same as described in ``filter_test_cases``) """
|
||||
filter_result = True
|
||||
for key in case_filter:
|
||||
# filter keys are lower case. Do map lower case keys with original keys.
|
||||
key_mapping = {x.lower(): x for x in test_method.case_info.keys()}
|
||||
|
||||
for orig_key in case_filter:
|
||||
key = key_mapping[orig_key]
|
||||
if key in test_method.case_info:
|
||||
# the filter key is both in case and filter
|
||||
# we need to check if they match
|
||||
filter_item, accepted_item = case_filter[key], test_method.case_info[key]
|
||||
filter_item = _convert_to_lower_case(case_filter[orig_key])
|
||||
accepted_item = _convert_to_lower_case(test_method.case_info[key])
|
||||
|
||||
if isinstance(filter_item, (tuple, list)) \
|
||||
and isinstance(accepted_item, (tuple, list)):
|
||||
@@ -91,6 +110,7 @@ def filter_test_cases(test_methods, case_filter):
|
||||
* if one is list/tuple, the other one is string/int, then check if string/int is in list/tuple
|
||||
* if both are list/tuple, then check if they have common item
|
||||
2. if only case attribute or filter have the key, filter succeed
|
||||
3. will do case insensitive compare for string
|
||||
|
||||
for example, the following are match succeed scenarios
|
||||
(the rule is symmetric, result is same if exchange values for user filter and case attribute):
|
||||
|
||||
@@ -27,6 +27,7 @@ class Job(dict):
|
||||
def __init__(self, job, job_name):
|
||||
super(Job, self).__init__(job)
|
||||
self["name"] = job_name
|
||||
self.tags = set(self["tags"])
|
||||
|
||||
def match_group(self, group):
|
||||
"""
|
||||
@@ -37,17 +38,8 @@ class Job(dict):
|
||||
:return: True or False
|
||||
"""
|
||||
match_result = False
|
||||
for _ in range(1):
|
||||
if "case group" in self:
|
||||
# this job is already assigned
|
||||
break
|
||||
for value in group.filters.values():
|
||||
if value not in self["tags"]:
|
||||
break
|
||||
else:
|
||||
continue
|
||||
break
|
||||
else:
|
||||
if "case group" not in self and group.ci_job_match_keys == self.tags:
|
||||
# group not assigned and all tags match
|
||||
match_result = True
|
||||
return match_result
|
||||
|
||||
@@ -70,4 +62,4 @@ class Job(dict):
|
||||
file_name = os.path.join(file_path, self["name"] + ".yml")
|
||||
if "case group" in self:
|
||||
with open(file_name, "w") as f:
|
||||
yaml.dump(self["case group"].output(), f)
|
||||
yaml.dump(self["case group"].output(), f, default_flow_style=False)
|
||||
|
||||
@@ -69,7 +69,7 @@ Let's first check a simple simple::
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TinyFW.set_default_config(config_file="EnvConfigTemplate.yml")
|
||||
TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml")
|
||||
test_examples_protocol_https_request()
|
||||
|
||||
|
||||
@@ -127,7 +127,9 @@ The following 3rd party lib is required:
|
||||
|
||||
* pyserial
|
||||
* pyyaml
|
||||
* xunitgen
|
||||
* junit_xml
|
||||
* netifaces
|
||||
* matplotlib (if use Utility.LineChart)
|
||||
|
||||
To build document, we need to install ``Sphinx`` and ``sphinx-rtd-theme`` (you may replace this with your own theme).
|
||||
|
||||
|
||||
@@ -47,5 +47,5 @@ def test_examples_protocol_https_request(env, extra_data):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TinyFW.set_default_config(config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
|
||||
TinyFW.set_default_config(env_config_file="EnvConfigTemplate.yml", dut=IDF.IDFDUT)
|
||||
test_examples_protocol_https_request()
|
||||
|
||||
5
tools/tiny-test-fw/requirements.txt
Normal file
5
tools/tiny-test-fw/requirements.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
pyserial
|
||||
pyyaml
|
||||
junit_xml
|
||||
netifaces
|
||||
matplotlib
|
||||
15
tools/unit-test-app/components/unity/Kconfig
Normal file
15
tools/unit-test-app/components/unity/Kconfig
Normal file
@@ -0,0 +1,15 @@
|
||||
menu "Unity test framework"
|
||||
|
||||
config UNITY_FREERTOS_PRIORITY
|
||||
int "Priority of Unity test task"
|
||||
default 5
|
||||
|
||||
config UNITY_FREERTOS_CPU
|
||||
int "CPU to run Unity test task on"
|
||||
default 0
|
||||
|
||||
config UNITY_FREERTOS_STACK_SIZE
|
||||
int "Stack size of Unity test task, in bytes"
|
||||
default 8192
|
||||
|
||||
endmenu
|
||||
@@ -36,8 +36,74 @@ void ref_clock_init();
|
||||
*/
|
||||
void ref_clock_deinit();
|
||||
|
||||
|
||||
/**
|
||||
* @brief Get reference clock timestamp
|
||||
* @return number of microseconds since the reference clock was initialized
|
||||
*/
|
||||
uint64_t ref_clock_get();
|
||||
|
||||
|
||||
/**
|
||||
* @brief Reset automatic leak checking which happens in unit tests.
|
||||
*
|
||||
* Updates recorded "before" free memory values to the free memory values
|
||||
* at time of calling. Resets leak checker if tracing is enabled in
|
||||
* config.
|
||||
*
|
||||
* This can be called if a test case does something which allocates
|
||||
* memory on first use, for example.
|
||||
*
|
||||
* @note Use with care as this can mask real memory leak problems.
|
||||
*/
|
||||
void unity_reset_leak_checks(void);
|
||||
|
||||
|
||||
/**
|
||||
* @brief Call this function from a test case which requires TCP/IP or
|
||||
* LWIP functionality.
|
||||
*
|
||||
* @note This should be the first function the test case calls, as it will
|
||||
* allocate memory on first use (and also reset the test case leak checker).
|
||||
*/
|
||||
void test_case_uses_tcpip(void);
|
||||
|
||||
|
||||
/**
|
||||
* @brief wait for signals.
|
||||
*
|
||||
* for multiple devices test cases, DUT might need to wait for other DUTs before continue testing.
|
||||
* As all DUTs are independent, need user (or test script) interaction to make test synchronized.
|
||||
*
|
||||
* Here we provide signal functions for this.
|
||||
* For example, we're testing GPIO, DUT1 has one pin connect to with DUT2.
|
||||
* DUT2 will output high level and then DUT1 will read input.
|
||||
* DUT1 should call `unity_wait_for_signal("output high level");` before it reads input.
|
||||
* DUT2 should call `unity_send_signal("output high level");` after it finished setting output high level.
|
||||
* According to the console logs:
|
||||
*
|
||||
* DUT1 console:
|
||||
*
|
||||
* ```
|
||||
* Waiting for signal: [output high level]!
|
||||
* Please press "Enter" key to once any board send this signal.
|
||||
* ```
|
||||
*
|
||||
* DUT2 console:
|
||||
*
|
||||
* ```
|
||||
* Send signal: [output high level]!
|
||||
* ```
|
||||
*
|
||||
* Then we press Enter key on DUT1's console, DUT1 starts to read input and then test success.
|
||||
*
|
||||
* @param signal_name signal name which DUT expected to wait before proceed testing
|
||||
*/
|
||||
void unity_wait_for_signal(const char* signal_name);
|
||||
|
||||
/**
|
||||
* @brief DUT send signal.
|
||||
*
|
||||
* @param signal_name signal name which DUT send once it finished preparing.
|
||||
*/
|
||||
void unity_send_signal(const char* signal_name);
|
||||
|
||||
@@ -8,10 +8,12 @@
|
||||
// Adapt Unity to our environment, disable FP support
|
||||
|
||||
#include <esp_err.h>
|
||||
#include <sdkconfig.h>
|
||||
|
||||
/* Some definitions applicable to Unity running in FreeRTOS */
|
||||
#define UNITY_FREERTOS_PRIORITY 5
|
||||
#define UNITY_FREERTOS_CPU 0
|
||||
#define UNITY_FREERTOS_PRIORITY CONFIG_UNITY_FREERTOS_PRIORITY
|
||||
#define UNITY_FREERTOS_CPU CONFIG_UNITY_FREERTOS_CPU
|
||||
#define UNITY_FREERTOS_STACK_SIZE CONFIG_UNITY_FREERTOS_STACK_SIZE
|
||||
|
||||
#define UNITY_EXCLUDE_FLOAT
|
||||
#define UNITY_EXCLUDE_DOUBLE
|
||||
@@ -20,21 +22,50 @@
|
||||
#define UNITY_OUTPUT_FLUSH unity_flush
|
||||
|
||||
// Define helpers to register test cases from multiple files
|
||||
|
||||
#define UNITY_EXPAND2(a, b) a ## b
|
||||
#define UNITY_EXPAND(a, b) UNITY_EXPAND2(a, b)
|
||||
#define UNITY_TEST_UID(what) UNITY_EXPAND(what, __LINE__)
|
||||
|
||||
#define UNITY_TEST_REG_HELPER reg_helper ## UNITY_TEST_UID
|
||||
#define UNITY_TEST_DESC_UID desc ## UNITY_TEST_UID
|
||||
|
||||
|
||||
// get count of __VA_ARGS__
|
||||
#define PP_NARG(...) \
|
||||
PP_NARG_(__VA_ARGS__,PP_RSEQ_N())
|
||||
#define PP_NARG_(...) \
|
||||
PP_ARG_N(__VA_ARGS__)
|
||||
#define PP_ARG_N( \
|
||||
_1, _2, _3, _4, _5, _6, _7, _8, _9, N, ...) N
|
||||
#define PP_RSEQ_N() 9,8,7,6,5,4,3,2,1,0
|
||||
|
||||
// support max 5 test func now
|
||||
#define FN_NAME_SET_1(a) {#a}
|
||||
#define FN_NAME_SET_2(a, b) {#a, #b}
|
||||
#define FN_NAME_SET_3(a, b, c) {#a, #b, #c}
|
||||
#define FN_NAME_SET_4(a, b, c, d) {#a, #b, #c, #d}
|
||||
#define FN_NAME_SET_5(a, b, c, d, e) {#a, #b, #c, #d, #e}
|
||||
|
||||
#define FN_NAME_SET2(n) FN_NAME_SET_##n
|
||||
#define FN_NAME_SET(n, ...) FN_NAME_SET2(n)(__VA_ARGS__)
|
||||
|
||||
#define UNITY_TEST_FN_SET(...) \
|
||||
static test_func UNITY_TEST_UID(test_functions)[] = {__VA_ARGS__}; \
|
||||
static const char* UNITY_TEST_UID(test_fn_name)[] = FN_NAME_SET(PP_NARG(__VA_ARGS__), __VA_ARGS__)
|
||||
|
||||
|
||||
typedef void (* test_func)(void);
|
||||
|
||||
struct test_desc_t
|
||||
{
|
||||
const char* name;
|
||||
const char* desc;
|
||||
void (*fn)(void);
|
||||
const char* file;
|
||||
int line;
|
||||
struct test_desc_t* next;
|
||||
const char* name;
|
||||
const char* desc;
|
||||
test_func* fn;
|
||||
const char* file;
|
||||
int line;
|
||||
uint8_t test_fn_count;
|
||||
const char ** test_fn_name;
|
||||
struct test_desc_t* next;
|
||||
};
|
||||
|
||||
void unity_testcase_register(struct test_desc_t* desc);
|
||||
@@ -46,7 +77,7 @@ void unity_run_tests_with_filter(const char* filter);
|
||||
void unity_run_all_tests();
|
||||
|
||||
/* Test case macro, a-la CATCH framework.
|
||||
First argument is a free-form description,
|
||||
First argument is a free-form description,
|
||||
second argument is (by convention) a list of identifiers, each one in square brackets.
|
||||
Identifiers are used to group related tests, or tests with specific properties.
|
||||
Use like:
|
||||
@@ -56,21 +87,80 @@ void unity_run_all_tests();
|
||||
// test goes here
|
||||
}
|
||||
*/
|
||||
|
||||
#define TEST_CASE(name_, desc_) \
|
||||
static void UNITY_TEST_UID(test_func_) (void); \
|
||||
static void __attribute__((constructor)) UNITY_TEST_UID(test_reg_helper_) () \
|
||||
{ \
|
||||
static struct test_desc_t UNITY_TEST_UID(test_desc_) = { \
|
||||
.name = name_, \
|
||||
.desc = desc_, \
|
||||
.fn = &UNITY_TEST_UID(test_func_), \
|
||||
.file = __FILE__, \
|
||||
.line = __LINE__, \
|
||||
.next = NULL \
|
||||
}; \
|
||||
unity_testcase_register( & UNITY_TEST_UID(test_desc_) ); \
|
||||
}\
|
||||
static void UNITY_TEST_UID(test_func_) (void)
|
||||
static void UNITY_TEST_UID(test_func_) (void); \
|
||||
static void __attribute__((constructor)) UNITY_TEST_UID(test_reg_helper_) () \
|
||||
{ \
|
||||
static test_func test_fn_[] = {&UNITY_TEST_UID(test_func_)}; \
|
||||
static struct test_desc_t UNITY_TEST_UID(test_desc_) = { \
|
||||
.name = name_, \
|
||||
.desc = desc_, \
|
||||
.fn = test_fn_, \
|
||||
.file = __FILE__, \
|
||||
.line = __LINE__, \
|
||||
.test_fn_count = 1, \
|
||||
.test_fn_name = NULL, \
|
||||
.next = NULL \
|
||||
}; \
|
||||
unity_testcase_register( & UNITY_TEST_UID(test_desc_) ); \
|
||||
}\
|
||||
static void UNITY_TEST_UID(test_func_) (void)
|
||||
|
||||
|
||||
/*
|
||||
* Multiple stages test cases will handle the case that test steps are separated by DUT reset.
|
||||
* e.g: we want to verify some function after SW reset, WDT reset or deep sleep reset.
|
||||
*
|
||||
* First argument is a free-form description,
|
||||
* second argument is (by convention) a list of identifiers, each one in square brackets.
|
||||
* subsequent arguments are names test functions separated by reset.
|
||||
* e.g:
|
||||
* TEST_CASE_MULTIPLE_STAGES("run light sleep after deep sleep","[sleep]", goto_deepsleep, light_sleep_after_deep_sleep_wakeup);
|
||||
* */
|
||||
|
||||
#define TEST_CASE_MULTIPLE_STAGES(name_, desc_, ...) \
|
||||
UNITY_TEST_FN_SET(__VA_ARGS__); \
|
||||
static void __attribute__((constructor)) UNITY_TEST_UID(test_reg_helper_) () \
|
||||
{ \
|
||||
static struct test_desc_t UNITY_TEST_UID(test_desc_) = { \
|
||||
.name = name_, \
|
||||
.desc = desc_"[multi_stage]", \
|
||||
.fn = UNITY_TEST_UID(test_functions), \
|
||||
.file = __FILE__, \
|
||||
.line = __LINE__, \
|
||||
.test_fn_count = PP_NARG(__VA_ARGS__), \
|
||||
.test_fn_name = UNITY_TEST_UID(test_fn_name), \
|
||||
.next = NULL \
|
||||
}; \
|
||||
unity_testcase_register( & UNITY_TEST_UID(test_desc_) ); \
|
||||
}
|
||||
|
||||
/*
|
||||
* First argument is a free-form description,
|
||||
* second argument is (by convention) a list of identifiers, each one in square brackets.
|
||||
* subsequent arguments are names of test functions for different DUTs
|
||||
* e.g:
|
||||
* TEST_CASE_MULTIPLE_DEVICES("master and slave spi","[spi][test_env=UT_T2_1]", master_test, slave_test);
|
||||
* */
|
||||
|
||||
#define TEST_CASE_MULTIPLE_DEVICES(name_, desc_, ...) \
|
||||
UNITY_TEST_FN_SET(__VA_ARGS__); \
|
||||
static void __attribute__((constructor)) UNITY_TEST_UID(test_reg_helper_) () \
|
||||
{ \
|
||||
static struct test_desc_t UNITY_TEST_UID(test_desc_) = { \
|
||||
.name = name_, \
|
||||
.desc = desc_"[multi_device]", \
|
||||
.fn = UNITY_TEST_UID(test_functions), \
|
||||
.file = __FILE__, \
|
||||
.line = __LINE__, \
|
||||
.test_fn_count = PP_NARG(__VA_ARGS__), \
|
||||
.test_fn_name = UNITY_TEST_UID(test_fn_name), \
|
||||
.next = NULL \
|
||||
}; \
|
||||
unity_testcase_register( & UNITY_TEST_UID(test_desc_) ); \
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: initialization of test_desc_t fields above has to be done exactly
|
||||
* in the same order as the fields are declared in the structure.
|
||||
|
||||
@@ -12,8 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <string.h>
|
||||
#include "unity.h"
|
||||
#include "test_utils.h"
|
||||
#include "rom/ets_sys.h"
|
||||
#include "rom/uart.h"
|
||||
#include "freertos/FreeRTOS.h"
|
||||
#include "freertos/task.h"
|
||||
#include "tcpip_adapter.h"
|
||||
#include "lwip/sockets.h"
|
||||
|
||||
const esp_partition_t *get_test_data_partition()
|
||||
{
|
||||
@@ -23,3 +30,57 @@ const esp_partition_t *get_test_data_partition()
|
||||
TEST_ASSERT_NOT_NULL(result); /* means partition table set wrong */
|
||||
return result;
|
||||
}
|
||||
|
||||
// wait user to send "Enter" key
|
||||
static void wait_user_control()
|
||||
{
|
||||
char sign[5] = {0};
|
||||
while(strlen(sign) == 0)
|
||||
{
|
||||
/* Flush anything already in the RX buffer */
|
||||
while(uart_rx_one_char((uint8_t *) sign) == OK) {
|
||||
}
|
||||
/* Read line */
|
||||
UartRxString((uint8_t*) sign, sizeof(sign) - 1);
|
||||
}
|
||||
}
|
||||
|
||||
void test_case_uses_tcpip()
|
||||
{
|
||||
// Can be called more than once, does nothing on subsequent calls
|
||||
tcpip_adapter_init();
|
||||
|
||||
// Allocate all sockets then free them
|
||||
// (First time each socket is allocated some one-time allocations happen.)
|
||||
int sockets[CONFIG_LWIP_MAX_SOCKETS];
|
||||
for (int i = 0; i < CONFIG_LWIP_MAX_SOCKETS; i++) {
|
||||
int type = (i % 2 == 0) ? SOCK_DGRAM : SOCK_STREAM;
|
||||
int family = (i % 3 == 0) ? PF_INET6 : PF_INET;
|
||||
sockets[i] = socket(family, type, IPPROTO_IP);
|
||||
}
|
||||
for (int i = 0; i < CONFIG_LWIP_MAX_SOCKETS; i++) {
|
||||
close(sockets[i]);
|
||||
}
|
||||
|
||||
// Allow LWIP tasks to finish initialising themselves
|
||||
vTaskDelay(25 / portTICK_RATE_MS);
|
||||
|
||||
printf("Note: tcpip_adapter_init() has been called. Until next reset, TCP/IP task will periodicially allocate memory and consume CPU time.\n");
|
||||
|
||||
// Reset the leak checker as LWIP allocates a lot of memory on first run
|
||||
unity_reset_leak_checks();
|
||||
}
|
||||
|
||||
// signal functions, used for sync between unity DUTs for multiple devices cases
|
||||
void unity_wait_for_signal(const char* signal_name)
|
||||
{
|
||||
printf("Waiting for signal: [%s]!\n"
|
||||
"Please press \"Enter\" key to once any board send this signal.\n", signal_name);
|
||||
wait_user_control();
|
||||
}
|
||||
|
||||
void unity_send_signal(const char* signal_name)
|
||||
{
|
||||
printf("Send signal: [%s]!\n", signal_name);
|
||||
}
|
||||
|
||||
|
||||
@@ -17,8 +17,6 @@
|
||||
#include "esp_heap_trace.h"
|
||||
#endif
|
||||
|
||||
#define unity_printf ets_printf
|
||||
|
||||
// Pointers to the head and tail of linked list of test description structs:
|
||||
static struct test_desc_t* s_unity_tests_first = NULL;
|
||||
static struct test_desc_t* s_unity_tests_last = NULL;
|
||||
@@ -39,6 +37,16 @@ static size_t before_free_32bit;
|
||||
const size_t WARN_LEAK_THRESHOLD = 256;
|
||||
const size_t CRITICAL_LEAK_THRESHOLD = 4096;
|
||||
|
||||
void unity_reset_leak_checks(void)
|
||||
{
|
||||
before_free_8bit = heap_caps_get_free_size(MALLOC_CAP_8BIT);
|
||||
before_free_32bit = heap_caps_get_free_size(MALLOC_CAP_32BIT);
|
||||
|
||||
#ifdef CONFIG_HEAP_TRACING
|
||||
heap_trace_start(HEAP_TRACE_LEAKS);
|
||||
#endif
|
||||
}
|
||||
|
||||
/* setUp runs before every test */
|
||||
void setUp(void)
|
||||
{
|
||||
@@ -56,12 +64,7 @@ void setUp(void)
|
||||
printf("%s", ""); /* sneakily lazy-allocate the reent structure for this test task */
|
||||
get_test_data_partition(); /* allocate persistent partition table structures */
|
||||
|
||||
before_free_8bit = heap_caps_get_free_size(MALLOC_CAP_8BIT);
|
||||
before_free_32bit = heap_caps_get_free_size(MALLOC_CAP_32BIT);
|
||||
|
||||
#ifdef CONFIG_HEAP_TRACING
|
||||
heap_trace_start(HEAP_TRACE_LEAKS);
|
||||
#endif
|
||||
unity_reset_leak_checks();
|
||||
}
|
||||
|
||||
static void check_leak(size_t before_free, size_t after_free, const char *type)
|
||||
@@ -145,12 +148,61 @@ void unity_testcase_register(struct test_desc_t* desc)
|
||||
}
|
||||
}
|
||||
|
||||
/* print the multiple function case name and its sub-menu
|
||||
* e.g:
|
||||
* (1) spi master/slave case
|
||||
* (1)master case
|
||||
* (2)slave case
|
||||
* */
|
||||
static void print_multiple_function_test_menu(const struct test_desc_t* test_ms)
|
||||
{
|
||||
printf("%s\n", test_ms->name);
|
||||
for (int i = 0; i < test_ms->test_fn_count; i++)
|
||||
{
|
||||
printf("\t(%d)\t\"%s\"\n", i+1, test_ms->test_fn_name[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void multiple_function_option(const struct test_desc_t* test_ms)
|
||||
{
|
||||
int selection;
|
||||
char cmdline[256] = {0};
|
||||
|
||||
print_multiple_function_test_menu(test_ms);
|
||||
while(strlen(cmdline) == 0)
|
||||
{
|
||||
/* Flush anything already in the RX buffer */
|
||||
while(uart_rx_one_char((uint8_t *) cmdline) == OK) {
|
||||
|
||||
}
|
||||
UartRxString((uint8_t*) cmdline, sizeof(cmdline) - 1);
|
||||
if(strlen(cmdline) == 0) {
|
||||
/* if input was newline, print a new menu */
|
||||
print_multiple_function_test_menu(test_ms);
|
||||
}
|
||||
}
|
||||
selection = atoi((const char *) cmdline) - 1;
|
||||
if(selection >= 0 && selection < test_ms->test_fn_count) {
|
||||
UnityDefaultTestRun(test_ms->fn[selection], test_ms->name, test_ms->line);
|
||||
} else {
|
||||
printf("Invalid selection, your should input number 1-%d!", test_ms->test_fn_count);
|
||||
}
|
||||
}
|
||||
|
||||
static void unity_run_single_test(const struct test_desc_t* test)
|
||||
{
|
||||
printf("Running %s...\n", test->name);
|
||||
// Unit test runner expects to see test name before the test starts
|
||||
fflush(stdout);
|
||||
uart_tx_wait_idle(CONFIG_CONSOLE_UART_NUM);
|
||||
|
||||
Unity.TestFile = test->file;
|
||||
Unity.CurrentDetail1 = test->desc;
|
||||
UnityDefaultTestRun(test->fn, test->name, test->line);
|
||||
if(test->test_fn_count == 1) {
|
||||
UnityDefaultTestRun(test->fn[0], test->name, test->line);
|
||||
} else {
|
||||
multiple_function_option(test);
|
||||
}
|
||||
}
|
||||
|
||||
static void unity_run_single_test_by_index(int index)
|
||||
@@ -158,6 +210,7 @@ static void unity_run_single_test_by_index(int index)
|
||||
const struct test_desc_t* test;
|
||||
for (test = s_unity_tests_first; test != NULL && index != 0; test = test->next, --index)
|
||||
{
|
||||
|
||||
}
|
||||
if (test != NULL)
|
||||
{
|
||||
@@ -201,7 +254,7 @@ static void unity_run_single_test_by_name(const char* filter)
|
||||
{
|
||||
unity_run_single_test(test);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void unity_run_all_tests()
|
||||
@@ -247,14 +300,21 @@ static void trim_trailing_space(char* str)
|
||||
static int print_test_menu(void)
|
||||
{
|
||||
int test_counter = 0;
|
||||
unity_printf("\n\nHere's the test menu, pick your combo:\n");
|
||||
printf("\n\nHere's the test menu, pick your combo:\n");
|
||||
for (const struct test_desc_t* test = s_unity_tests_first;
|
||||
test != NULL;
|
||||
test = test->next, ++test_counter)
|
||||
{
|
||||
unity_printf("(%d)\t\"%s\" %s\n", test_counter + 1, test->name, test->desc);
|
||||
}
|
||||
return test_counter;
|
||||
printf("(%d)\t\"%s\" %s\n", test_counter + 1, test->name, test->desc);
|
||||
if(test->test_fn_count > 1)
|
||||
{
|
||||
for (int i = 0; i < test->test_fn_count; i++)
|
||||
{
|
||||
printf("\t(%d)\t\"%s\"\n", i+1, test->test_fn_name[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
return test_counter;
|
||||
}
|
||||
|
||||
static int get_test_count(void)
|
||||
@@ -271,7 +331,7 @@ static int get_test_count(void)
|
||||
|
||||
void unity_run_menu()
|
||||
{
|
||||
unity_printf("\n\nPress ENTER to see the list of tests.\n");
|
||||
printf("\n\nPress ENTER to see the list of tests.\n");
|
||||
int test_count = get_test_count();
|
||||
while (true)
|
||||
{
|
||||
@@ -289,6 +349,12 @@ void unity_run_menu()
|
||||
print_test_menu();
|
||||
}
|
||||
}
|
||||
/*use '-' to show test history. Need to do it before UNITY_BEGIN cleanup history */
|
||||
if (cmdline[0] == '-')
|
||||
{
|
||||
UNITY_END();
|
||||
continue;
|
||||
}
|
||||
|
||||
UNITY_BEGIN();
|
||||
|
||||
|
||||
@@ -14,6 +14,6 @@ void app_main()
|
||||
{
|
||||
// Note: if unpinning this task, change the way run times are calculated in
|
||||
// unity_platform
|
||||
xTaskCreatePinnedToCore(unityTask, "unityTask", 8192, NULL,
|
||||
xTaskCreatePinnedToCore(unityTask, "unityTask", UNITY_FREERTOS_STACK_SIZE, NULL,
|
||||
UNITY_FREERTOS_PRIORITY, NULL, UNITY_FREERTOS_CPU);
|
||||
}
|
||||
|
||||
1
tools/unit-test-app/tools/ConfigDependency.yml
Normal file
1
tools/unit-test-app/tools/ConfigDependency.yml
Normal file
@@ -0,0 +1 @@
|
||||
"psram": "CONFIG_SPIRAM_SUPPORT=y"
|
||||
@@ -8,4 +8,13 @@ test_env:
|
||||
omitted: "UT_T1_1"
|
||||
reset:
|
||||
default: "POWERON_RESET"
|
||||
omitted: " "
|
||||
omitted: " "
|
||||
multi_device:
|
||||
default: "Yes"
|
||||
omitted: "No"
|
||||
multi_stage:
|
||||
default: "Yes"
|
||||
omitted: "No"
|
||||
timeout:
|
||||
default: 30
|
||||
omitted: 30
|
||||
|
||||
@@ -8,7 +8,6 @@ import hashlib
|
||||
from copy import deepcopy
|
||||
import CreateSectionTable
|
||||
|
||||
|
||||
TEST_CASE_PATTERN = {
|
||||
"initial condition": "UTINIT1",
|
||||
"SDK": "ESP32_IDF",
|
||||
@@ -20,13 +19,9 @@ TEST_CASE_PATTERN = {
|
||||
"version": "v1 (2016-12-06)",
|
||||
"test environment": "UT_T1_1",
|
||||
"reset": "",
|
||||
"expected result": "1. set succeed"
|
||||
}
|
||||
|
||||
CONFIG_FILE_PATTERN = {
|
||||
"Config": {"execute count": 1, "execute order": "in order"},
|
||||
"DUT": [],
|
||||
"Filter": [{"Add": {"ID": []}}]
|
||||
"expected result": "1. set succeed",
|
||||
"cmd set": "test_unit_test_case",
|
||||
"Test App": "UT",
|
||||
}
|
||||
|
||||
|
||||
@@ -39,11 +34,12 @@ class Parser(object):
|
||||
# file path (relative to idf path)
|
||||
TAG_DEF_FILE = os.path.join("tools", "unit-test-app", "tools", "TagDefinition.yml")
|
||||
MODULE_DEF_FILE = os.path.join("tools", "unit-test-app", "tools", "ModuleDefinition.yml")
|
||||
CONFIG_DEPENDENCY_FILE = os.path.join("tools", "unit-test-app", "tools", "ConfigDependency.yml")
|
||||
MODULE_ARTIFACT_FILE = os.path.join("components", "idf_test", "ModuleDefinition.yml")
|
||||
TEST_CASE_FILE = os.path.join("components", "idf_test", "unit_test", "TestCaseAll.yml")
|
||||
UT_BIN_FOLDER = os.path.join("tools", "unit-test-app", "builds")
|
||||
UT_BIN_FOLDER = os.path.join("tools", "unit-test-app", "output")
|
||||
ELF_FILE = "unit-test-app.elf"
|
||||
APP_NAME_PREFIX = "UT_"
|
||||
SDKCONFIG_FILE = "sdkconfig"
|
||||
|
||||
def __init__(self, idf_path=os.getenv("IDF_PATH")):
|
||||
self.test_env_tags = {}
|
||||
@@ -52,21 +48,24 @@ class Parser(object):
|
||||
self.idf_path = idf_path
|
||||
self.tag_def = yaml.load(open(os.path.join(idf_path, self.TAG_DEF_FILE), "r"))
|
||||
self.module_map = yaml.load(open(os.path.join(idf_path, self.MODULE_DEF_FILE), "r"))
|
||||
self.config_dependency = yaml.load(open(os.path.join(idf_path, self.CONFIG_DEPENDENCY_FILE), "r"))
|
||||
# used to check if duplicated test case names
|
||||
self.test_case_names = set()
|
||||
self.parsing_errors = []
|
||||
|
||||
def parse_test_cases_from_elf(self, elf_file, app_name):
|
||||
def parse_test_cases_for_one_config(self, config_output_folder, config_name):
|
||||
"""
|
||||
parse test cases from elf and save test cases need to be executed to unit test folder
|
||||
:param elf_file: elf file path
|
||||
:param app_name: built unit test app name
|
||||
:param config_output_folder: build folder of this config
|
||||
:param config_name: built unit test config name
|
||||
"""
|
||||
elf_file = os.path.join(config_output_folder, self.ELF_FILE)
|
||||
subprocess.check_output('xtensa-esp32-elf-objdump -t {} | grep test_desc > case_address.tmp'.format(elf_file),
|
||||
shell=True)
|
||||
subprocess.check_output('xtensa-esp32-elf-objdump -s {} > section_table.tmp'.format(elf_file), shell=True)
|
||||
|
||||
table = CreateSectionTable.SectionTable("section_table.tmp")
|
||||
tags = self.parse_tags(os.path.join(config_output_folder, self.SDKCONFIG_FILE))
|
||||
test_cases = []
|
||||
with open("case_address.tmp", "r") as f:
|
||||
for line in f:
|
||||
@@ -78,21 +77,21 @@ class Parser(object):
|
||||
name_addr = table.get_unsigned_int(section, test_addr, 4)
|
||||
desc_addr = table.get_unsigned_int(section, test_addr + 4, 4)
|
||||
file_name_addr = table.get_unsigned_int(section, test_addr + 12, 4)
|
||||
function_count = table.get_unsigned_int(section, test_addr+20, 4)
|
||||
name = table.get_string("any", name_addr)
|
||||
desc = table.get_string("any", desc_addr)
|
||||
file_name = table.get_string("any", file_name_addr)
|
||||
|
||||
tc = self.parse_one_test_case(name, desc, file_name, app_name)
|
||||
tc = self.parse_one_test_case(name, desc, file_name, config_name, tags)
|
||||
|
||||
# check if duplicated case names
|
||||
# we need to use it to select case,
|
||||
# if duplicated IDs, Unity could select incorrect case to run
|
||||
# and we need to check all cases no matter if it's going te be executed by CI
|
||||
# also add app_name here, we allow same case for different apps
|
||||
if (tc["summary"] + app_name) in self.test_case_names:
|
||||
if (tc["summary"] + config_name) in self.test_case_names:
|
||||
self.parsing_errors.append("duplicated test case ID: " + tc["summary"])
|
||||
else:
|
||||
self.test_case_names.add(tc["summary"] + app_name)
|
||||
self.test_case_names.add(tc["summary"] + config_name)
|
||||
|
||||
if tc["CI ready"] == "Yes":
|
||||
# update test env list and the cases of same env list
|
||||
@@ -100,7 +99,11 @@ class Parser(object):
|
||||
self.test_env_tags[tc["test environment"]].append(tc["ID"])
|
||||
else:
|
||||
self.test_env_tags.update({tc["test environment"]: [tc["ID"]]})
|
||||
# only add cases need to be executed
|
||||
|
||||
if function_count > 1:
|
||||
tc.update({"child case num": function_count})
|
||||
|
||||
# only add cases need to be executed
|
||||
test_cases.append(tc)
|
||||
|
||||
os.remove("section_table.tmp")
|
||||
@@ -146,46 +149,51 @@ class Parser(object):
|
||||
pass
|
||||
return p
|
||||
|
||||
def parse_one_test_case(self, name, description, file_name, app_name):
|
||||
def parse_tags(self, sdkconfig_file):
|
||||
"""
|
||||
Some test configs could requires different DUTs.
|
||||
For example, if CONFIG_SPIRAM_SUPPORT is enabled, we need WROVER-Kit to run test.
|
||||
This method will get tags for runners according to ConfigDependency.yml(maps tags to sdkconfig).
|
||||
|
||||
:param sdkconfig_file: sdkconfig file of the unit test config
|
||||
:return: required tags for runners
|
||||
"""
|
||||
required_tags = []
|
||||
with open(sdkconfig_file, "r") as f:
|
||||
configs_raw_data = f.read()
|
||||
configs = configs_raw_data.splitlines(False)
|
||||
for tag in self.config_dependency:
|
||||
if self.config_dependency[tag] in configs:
|
||||
required_tags.append(tag)
|
||||
return required_tags
|
||||
|
||||
def parse_one_test_case(self, name, description, file_name, config_name, tags):
|
||||
"""
|
||||
parse one test case
|
||||
:param name: test case name (summary)
|
||||
:param description: test case description (tag string)
|
||||
:param file_name: the file defines this test case
|
||||
:param app_name: built unit test app name
|
||||
:param config_name: built unit test app name
|
||||
:param tags: tags to select runners
|
||||
:return: parsed test case
|
||||
"""
|
||||
prop = self.parse_case_properities(description)
|
||||
|
||||
idf_path = os.getenv("IDF_PATH")
|
||||
|
||||
# use relative file path to IDF_PATH, to make sure file path is consist
|
||||
relative_file_path = os.path.relpath(file_name, idf_path)
|
||||
|
||||
file_name_hash = int(hashlib.sha256(relative_file_path).hexdigest(), base=16) % 1000
|
||||
|
||||
if file_name_hash in self.file_name_cache:
|
||||
self.file_name_cache[file_name_hash] += 1
|
||||
else:
|
||||
self.file_name_cache[file_name_hash] = 1
|
||||
|
||||
tc_id = "UT_%s_%s_%03d%02d" % (self.module_map[prop["module"]]['module abbr'],
|
||||
self.module_map[prop["module"]]['sub module abbr'],
|
||||
file_name_hash,
|
||||
self.file_name_cache[file_name_hash])
|
||||
|
||||
test_case = deepcopy(TEST_CASE_PATTERN)
|
||||
test_case.update({"Test App": self.APP_NAME_PREFIX + app_name,
|
||||
test_case.update({"config": config_name,
|
||||
"module": self.module_map[prop["module"]]['module'],
|
||||
"CI ready": "No" if prop["ignore"] == "Yes" else "Yes",
|
||||
"cmd set": ["IDFUnitTest/UnitTest", [name]],
|
||||
"ID": tc_id,
|
||||
"ID": name,
|
||||
"test point 2": prop["module"],
|
||||
"steps": name,
|
||||
"test environment": prop["test_env"],
|
||||
"reset": prop["reset"],
|
||||
"sub module": self.module_map[prop["module"]]['sub module'],
|
||||
"summary": name})
|
||||
"summary": name,
|
||||
"multi_device": prop["multi_device"],
|
||||
"multi_stage": prop["multi_stage"],
|
||||
"timeout": int(prop["timeout"]),
|
||||
"tags": tags})
|
||||
return test_case
|
||||
|
||||
def dump_test_cases(self, test_cases):
|
||||
@@ -206,13 +214,13 @@ class Parser(object):
|
||||
""" parse test cases from multiple built unit test apps """
|
||||
test_cases = []
|
||||
|
||||
test_app_folder = os.path.join(self.idf_path, self.UT_BIN_FOLDER)
|
||||
test_apps = os.listdir(test_app_folder)
|
||||
for app in test_apps:
|
||||
elf_file = os.path.join(test_app_folder, app, self.ELF_FILE)
|
||||
if os.path.exists(elf_file):
|
||||
test_cases.extend(self.parse_test_cases_from_elf(elf_file, app))
|
||||
|
||||
output_folder = os.path.join(self.idf_path, self.UT_BIN_FOLDER)
|
||||
test_configs = os.listdir(output_folder)
|
||||
for config in test_configs:
|
||||
config_output_folder = os.path.join(output_folder, config)
|
||||
if os.path.exists(config_output_folder):
|
||||
test_cases.extend(self.parse_test_cases_for_one_config(config_output_folder, config))
|
||||
test_cases.sort(key=lambda x: x["config"] + x["summary"])
|
||||
self.dump_test_cases(test_cases)
|
||||
|
||||
|
||||
@@ -262,4 +270,3 @@ def main():
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
588
tools/unit-test-app/unit_test.py
Normal file
588
tools/unit-test-app/unit_test.py
Normal file
@@ -0,0 +1,588 @@
|
||||
"""
|
||||
Test script for unit test case.
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
|
||||
# if we want to run test case outside `tiny-test-fw` folder,
|
||||
# we need to insert tiny-test-fw path into sys path
|
||||
test_fw_path = os.getenv("TEST_FW_PATH")
|
||||
if test_fw_path and test_fw_path not in sys.path:
|
||||
sys.path.insert(0, test_fw_path)
|
||||
|
||||
import TinyFW
|
||||
import IDF
|
||||
import Utility
|
||||
from DUT import ExpectTimeout
|
||||
from IDF.IDFApp import UT
|
||||
|
||||
|
||||
UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
|
||||
RESET_PATTERN = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))")
|
||||
EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
|
||||
ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-eA-E\d]{8} on core \d)")
|
||||
FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
|
||||
|
||||
STARTUP_TIMEOUT = 10
|
||||
DUT_STARTUP_CHECK_RETRY_COUNT = 5
|
||||
TEST_HISTORY_CHECK_TIMEOUT = 1
|
||||
|
||||
|
||||
def format_test_case_config(test_case_data):
|
||||
"""
|
||||
convert the test case data to unified format.
|
||||
We need to following info to run unit test cases:
|
||||
|
||||
1. unit test app config
|
||||
2. test case name
|
||||
3. test case reset info
|
||||
|
||||
the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
|
||||
Each test case is a dict with "name" and "reset" as keys. For example::
|
||||
|
||||
case_config = {
|
||||
"default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
|
||||
"psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
|
||||
}
|
||||
|
||||
If config is not specified for test case, then
|
||||
|
||||
:param test_case_data: string, list, or a dictionary list
|
||||
:return: formatted data
|
||||
"""
|
||||
|
||||
case_config = dict()
|
||||
|
||||
def parse_case(one_case_data):
|
||||
""" parse and format one case """
|
||||
|
||||
def process_reset_list(reset_list):
|
||||
# strip space and remove white space only items
|
||||
_output = list()
|
||||
for _r in reset_list:
|
||||
_data = _r.strip(" ")
|
||||
if _data:
|
||||
_output.append(_data)
|
||||
return _output
|
||||
|
||||
_case = dict()
|
||||
if isinstance(one_case_data, str):
|
||||
_temp = one_case_data.split(" [reset=")
|
||||
_case["name"] = _temp[0]
|
||||
try:
|
||||
_case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
|
||||
except IndexError:
|
||||
_case["reset"] = list()
|
||||
elif isinstance(one_case_data, dict):
|
||||
_case = one_case_data.copy()
|
||||
assert "name" in _case
|
||||
if "reset" not in _case:
|
||||
_case["reset"] = list()
|
||||
else:
|
||||
if isinstance(_case["reset"], str):
|
||||
_case["reset"] = process_reset_list(_case["reset"].split(","))
|
||||
else:
|
||||
raise TypeError("Not supported type during parsing unit test case")
|
||||
|
||||
if "config" not in _case:
|
||||
_case["config"] = "default"
|
||||
|
||||
return _case
|
||||
|
||||
if not isinstance(test_case_data, list):
|
||||
test_case_data = [test_case_data]
|
||||
|
||||
for case_data in test_case_data:
|
||||
parsed_case = parse_case(case_data)
|
||||
try:
|
||||
case_config[parsed_case["config"]].append(parsed_case)
|
||||
except KeyError:
|
||||
case_config[parsed_case["config"]] = [parsed_case]
|
||||
|
||||
return case_config
|
||||
|
||||
|
||||
def replace_app_bin(dut, name, new_app_bin):
|
||||
if new_app_bin is None:
|
||||
return
|
||||
search_pattern = '/{}.bin'.format(name)
|
||||
for i, config in enumerate(dut.download_config):
|
||||
if config.endswith(search_pattern):
|
||||
dut.download_config[i] = new_app_bin
|
||||
Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O")
|
||||
break
|
||||
|
||||
|
||||
def reset_dut(dut):
|
||||
dut.reset()
|
||||
# esptool ``run`` cmd takes quite long time.
|
||||
# before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
|
||||
# this could cause checking bootup print failed.
|
||||
# now use input cmd `-` and check test history to check if DUT is bootup.
|
||||
# we'll retry this step for a few times in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
|
||||
for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
|
||||
dut.write("-")
|
||||
try:
|
||||
dut.expect("0 Tests 0 Failures 0 Ignored", timeout=TEST_HISTORY_CHECK_TIMEOUT)
|
||||
break
|
||||
except ExpectTimeout:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
|
||||
|
||||
|
||||
def run_one_normal_case(dut, one_case, junit_test_case, failed_cases):
|
||||
|
||||
reset_dut(dut)
|
||||
|
||||
dut.start_capture_raw_data()
|
||||
# run test case
|
||||
dut.write("\"{}\"".format(one_case["name"]))
|
||||
dut.expect("Running " + one_case["name"] + "...")
|
||||
|
||||
exception_reset_list = []
|
||||
|
||||
# we want to set this flag in callbacks (inner functions)
|
||||
# use list here so we can use append to set this flag
|
||||
test_finish = list()
|
||||
|
||||
# expect callbacks
|
||||
def one_case_finish(result):
|
||||
""" one test finished, let expect loop break and log result """
|
||||
test_finish.append(True)
|
||||
output = dut.stop_capture_raw_data()
|
||||
if result:
|
||||
Utility.console_log("Success: " + one_case["name"], color="green")
|
||||
else:
|
||||
failed_cases.append(one_case["name"])
|
||||
Utility.console_log("Failed: " + one_case["name"], color="red")
|
||||
junit_test_case.add_failure_info(output)
|
||||
|
||||
def handle_exception_reset(data):
|
||||
"""
|
||||
just append data to exception list.
|
||||
exception list will be checked in ``handle_reset_finish``, once reset finished.
|
||||
"""
|
||||
exception_reset_list.append(data[0])
|
||||
|
||||
def handle_test_finish(data):
|
||||
""" test finished without reset """
|
||||
# in this scenario reset should not happen
|
||||
assert not exception_reset_list
|
||||
if int(data[1]):
|
||||
# case ignored
|
||||
Utility.console_log("Ignored: " + one_case["name"], color="orange")
|
||||
junit_test_case.add_skipped_info("ignored")
|
||||
one_case_finish(not int(data[0]))
|
||||
|
||||
def handle_reset_finish(data):
|
||||
""" reset happened and reboot finished """
|
||||
assert exception_reset_list # reboot but no exception/reset logged. should never happen
|
||||
result = False
|
||||
if len(one_case["reset"]) == len(exception_reset_list):
|
||||
for i, exception in enumerate(exception_reset_list):
|
||||
if one_case["reset"][i] not in exception:
|
||||
break
|
||||
else:
|
||||
result = True
|
||||
if not result:
|
||||
err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
|
||||
exception_reset_list)
|
||||
Utility.console_log(err_msg, color="orange")
|
||||
junit_test_case.add_error_info(err_msg)
|
||||
one_case_finish(result)
|
||||
|
||||
while not test_finish:
|
||||
try:
|
||||
dut.expect_any((RESET_PATTERN, handle_exception_reset),
|
||||
(EXCEPTION_PATTERN, handle_exception_reset),
|
||||
(ABORT_PATTERN, handle_exception_reset),
|
||||
(FINISH_PATTERN, handle_test_finish),
|
||||
(UT_APP_BOOT_UP_DONE, handle_reset_finish),
|
||||
timeout=one_case["timeout"])
|
||||
except ExpectTimeout:
|
||||
Utility.console_log("Timeout in expect", color="orange")
|
||||
junit_test_case.add_error_info("timeout")
|
||||
one_case_finish(False)
|
||||
break
|
||||
|
||||
|
||||
@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
|
||||
def run_unit_test_cases(env, extra_data):
|
||||
"""
|
||||
extra_data can be three types of value
|
||||
1. as string:
|
||||
1. "case_name"
|
||||
2. "case_name [reset=RESET_REASON]"
|
||||
2. as dict:
|
||||
1. with key like {"name": "Intr_alloc test, shared ints"}
|
||||
2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
|
||||
3. as list of string or dict:
|
||||
[case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
|
||||
|
||||
:param extra_data: the case name or case list or case dictionary
|
||||
:return: None
|
||||
"""
|
||||
|
||||
case_config = format_test_case_config(extra_data)
|
||||
|
||||
# we don't want stop on failed case (unless some special scenarios we can't handle)
|
||||
# this flag is used to log if any of the case failed during executing
|
||||
# Before exit test function this flag is used to log if the case fails
|
||||
failed_cases = []
|
||||
|
||||
for ut_config in case_config:
|
||||
Utility.console_log("Running unit test for config: " + ut_config, "O")
|
||||
dut = env.get_dut("unit-test-app", app_path=ut_config)
|
||||
dut.start_app()
|
||||
Utility.console_log("Download finished, start running test cases", "O")
|
||||
|
||||
for one_case in case_config[ut_config]:
|
||||
# create junit report test case
|
||||
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
|
||||
try:
|
||||
run_one_normal_case(dut, one_case, junit_test_case, failed_cases)
|
||||
TinyFW.JunitReport.test_case_finish(junit_test_case)
|
||||
except Exception as e:
|
||||
junit_test_case.add_error_info("Unexpected exception: " + str(e))
|
||||
TinyFW.JunitReport.test_case_finish(junit_test_case)
|
||||
|
||||
# raise exception if any case fails
|
||||
if failed_cases:
|
||||
Utility.console_log("Failed Cases:", color="red")
|
||||
for _case_name in failed_cases:
|
||||
Utility.console_log("\t" + _case_name, color="red")
|
||||
raise AssertionError("Unit Test Failed")
|
||||
|
||||
|
||||
class Handler(threading.Thread):
|
||||
|
||||
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
|
||||
SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[(.+)\]!')
|
||||
FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
|
||||
|
||||
def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
|
||||
self.dut = dut
|
||||
self.sent_signal_list = sent_signal_list
|
||||
self.lock = lock
|
||||
self.parent_case_name = parent_case_name
|
||||
self.child_case_name = ""
|
||||
self.child_case_index = child_case_index + 1
|
||||
self.finish = False
|
||||
self.result = False
|
||||
self.output = ""
|
||||
self.fail_name = None
|
||||
self.timeout = timeout
|
||||
self.force_stop = threading.Event() # it show the running status
|
||||
|
||||
reset_dut(self.dut) # reset the board to make it start from begining
|
||||
|
||||
threading.Thread.__init__(self, name="{} Handler".format(dut))
|
||||
|
||||
def run(self):
|
||||
|
||||
self.dut.start_capture_raw_data()
|
||||
|
||||
def get_child_case_name(data):
|
||||
self.child_case_name = data[0]
|
||||
time.sleep(1)
|
||||
self.dut.write(str(self.child_case_index))
|
||||
|
||||
def one_device_case_finish(result):
|
||||
""" one test finished, let expect loop break and log result """
|
||||
self.finish = True
|
||||
self.result = result
|
||||
self.output = "[{}]\n\n{}\n".format(self.child_case_name,
|
||||
self.dut.stop_capture_raw_data())
|
||||
if not result:
|
||||
self.fail_name = self.child_case_name
|
||||
|
||||
def device_wait_action(data):
|
||||
start_time = time.time()
|
||||
expected_signal = data[0]
|
||||
while 1:
|
||||
if time.time() > start_time + self.timeout:
|
||||
Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
|
||||
break
|
||||
with self.lock:
|
||||
if expected_signal in self.sent_signal_list:
|
||||
self.dut.write(" ")
|
||||
self.sent_signal_list.remove(expected_signal)
|
||||
break
|
||||
time.sleep(0.01)
|
||||
|
||||
def device_send_action(data):
|
||||
with self.lock:
|
||||
self.sent_signal_list.append(data[0].encode('utf-8'))
|
||||
|
||||
def handle_device_test_finish(data):
|
||||
""" test finished without reset """
|
||||
# in this scenario reset should not happen
|
||||
if int(data[1]):
|
||||
# case ignored
|
||||
Utility.console_log("Ignored: " + self.child_case_name, color="orange")
|
||||
one_device_case_finish(not int(data[0]))
|
||||
|
||||
try:
|
||||
time.sleep(1)
|
||||
self.dut.write("\"{}\"".format(self.parent_case_name))
|
||||
self.dut.expect("Running " + self.parent_case_name + "...")
|
||||
except ExpectTimeout:
|
||||
Utility.console_log("No case detected!", color="orange")
|
||||
while not self.finish and not self.force_stop.isSet():
|
||||
try:
|
||||
self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
|
||||
(self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
|
||||
(self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
|
||||
(self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
|
||||
timeout=self.timeout)
|
||||
except ExpectTimeout:
|
||||
Utility.console_log("Timeout in expect", color="orange")
|
||||
one_device_case_finish(False)
|
||||
break
|
||||
|
||||
def stop(self):
|
||||
self.force_stop.set()
|
||||
|
||||
|
||||
def get_case_info(one_case):
|
||||
parent_case = one_case["name"]
|
||||
child_case_num = one_case["child case num"]
|
||||
return parent_case, child_case_num
|
||||
|
||||
|
||||
def get_dut(duts, env, name, ut_config):
|
||||
if name in duts:
|
||||
dut = duts[name]
|
||||
else:
|
||||
dut = env.get_dut(name, app_path=ut_config)
|
||||
duts[name] = dut
|
||||
dut.start_app()
|
||||
return dut
|
||||
|
||||
|
||||
def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case):
|
||||
lock = threading.RLock()
|
||||
threads = []
|
||||
send_signal_list = []
|
||||
result = True
|
||||
parent_case, case_num = get_case_info(one_case)
|
||||
|
||||
for i in range(case_num):
|
||||
dut = get_dut(duts, env, "dut%d" % i, ut_config)
|
||||
threads.append(Handler(dut, send_signal_list, lock,
|
||||
parent_case, i, one_case["timeout"]))
|
||||
for thread in threads:
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
output = "Multiple Device Failed\n"
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
result = result and thread.result
|
||||
output += thread.output
|
||||
if not thread.result:
|
||||
[thd.stop() for thd in threads]
|
||||
|
||||
if result:
|
||||
Utility.console_log("Success: " + one_case["name"], color="green")
|
||||
else:
|
||||
failed_cases.append(one_case["name"])
|
||||
junit_test_case.add_failure_info(output)
|
||||
Utility.console_log("Failed: " + one_case["name"], color="red")
|
||||
|
||||
|
||||
@IDF.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True)
|
||||
def run_multiple_devices_cases(env, extra_data):
|
||||
"""
|
||||
extra_data can be two types of value
|
||||
1. as dict:
|
||||
e.g.
|
||||
{"name": "gpio master/slave test example",
|
||||
"child case num": 2,
|
||||
"config": "release",
|
||||
"env_tag": "UT_T2_1"}
|
||||
2. as list dict:
|
||||
e.g.
|
||||
[{"name": "gpio master/slave test example1",
|
||||
"child case num": 2,
|
||||
"config": "release",
|
||||
"env_tag": "UT_T2_1"},
|
||||
{"name": "gpio master/slave test example2",
|
||||
"child case num": 2,
|
||||
"config": "release",
|
||||
"env_tag": "UT_T2_1"}]
|
||||
|
||||
"""
|
||||
failed_cases = []
|
||||
case_config = format_test_case_config(extra_data)
|
||||
duts = {}
|
||||
for ut_config in case_config:
|
||||
Utility.console_log("Running unit test for config: " + ut_config, "O")
|
||||
for one_case in case_config[ut_config]:
|
||||
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
|
||||
try:
|
||||
run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, junit_test_case)
|
||||
TinyFW.JunitReport.test_case_finish(junit_test_case)
|
||||
except Exception as e:
|
||||
junit_test_case.add_error_info("Unexpected exception: " + str(e))
|
||||
TinyFW.JunitReport.test_case_finish(junit_test_case)
|
||||
|
||||
if failed_cases:
|
||||
Utility.console_log("Failed Cases:", color="red")
|
||||
for _case_name in failed_cases:
|
||||
Utility.console_log("\t" + _case_name, color="red")
|
||||
raise AssertionError("Unit Test Failed")
|
||||
|
||||
|
||||
def run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case):
|
||||
reset_dut(dut)
|
||||
|
||||
dut.start_capture_raw_data()
|
||||
|
||||
exception_reset_list = []
|
||||
|
||||
for test_stage in range(one_case["child case num"]):
|
||||
# select multi stage test case name
|
||||
dut.write("\"{}\"".format(one_case["name"]))
|
||||
dut.expect("Running " + one_case["name"] + "...")
|
||||
# select test function for current stage
|
||||
dut.write(str(test_stage + 1))
|
||||
|
||||
# we want to set this flag in callbacks (inner functions)
|
||||
# use list here so we can use append to set this flag
|
||||
stage_finish = list()
|
||||
|
||||
def last_stage():
|
||||
return test_stage == one_case["child case num"] - 1
|
||||
|
||||
def check_reset():
|
||||
if one_case["reset"]:
|
||||
assert exception_reset_list # reboot but no exception/reset logged. should never happen
|
||||
result = False
|
||||
if len(one_case["reset"]) == len(exception_reset_list):
|
||||
for i, exception in enumerate(exception_reset_list):
|
||||
if one_case["reset"][i] not in exception:
|
||||
break
|
||||
else:
|
||||
result = True
|
||||
if not result:
|
||||
err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
|
||||
exception_reset_list)
|
||||
Utility.console_log(err_msg, color="orange")
|
||||
junit_test_case.add_error_info(err_msg)
|
||||
else:
|
||||
# we allow omit reset in multi stage cases
|
||||
result = True
|
||||
return result
|
||||
|
||||
# expect callbacks
|
||||
def one_case_finish(result):
|
||||
""" one test finished, let expect loop break and log result """
|
||||
# handle test finish
|
||||
result = result and check_reset()
|
||||
output = dut.stop_capture_raw_data()
|
||||
if result:
|
||||
Utility.console_log("Success: " + one_case["name"], color="green")
|
||||
else:
|
||||
failed_cases.append(one_case["name"])
|
||||
Utility.console_log("Failed: " + one_case["name"], color="red")
|
||||
junit_test_case.add_failure_info(output)
|
||||
stage_finish.append("break")
|
||||
|
||||
def handle_exception_reset(data):
|
||||
"""
|
||||
just append data to exception list.
|
||||
exception list will be checked in ``handle_reset_finish``, once reset finished.
|
||||
"""
|
||||
exception_reset_list.append(data[0])
|
||||
|
||||
def handle_test_finish(data):
|
||||
""" test finished without reset """
|
||||
# in this scenario reset should not happen
|
||||
if int(data[1]):
|
||||
# case ignored
|
||||
Utility.console_log("Ignored: " + one_case["name"], color="orange")
|
||||
junit_test_case.add_skipped_info("ignored")
|
||||
# only passed in last stage will be regarded as real pass
|
||||
if last_stage():
|
||||
one_case_finish(not int(data[0]))
|
||||
else:
|
||||
Utility.console_log("test finished before enter last stage", color="orange")
|
||||
one_case_finish(False)
|
||||
|
||||
def handle_next_stage(data):
|
||||
""" reboot finished. we goto next stage """
|
||||
if last_stage():
|
||||
# already last stage, should never goto next stage
|
||||
Utility.console_log("didn't finish at last stage", color="orange")
|
||||
one_case_finish(False)
|
||||
else:
|
||||
stage_finish.append("continue")
|
||||
|
||||
while not stage_finish:
|
||||
try:
|
||||
dut.expect_any((RESET_PATTERN, handle_exception_reset),
|
||||
(EXCEPTION_PATTERN, handle_exception_reset),
|
||||
(ABORT_PATTERN, handle_exception_reset),
|
||||
(FINISH_PATTERN, handle_test_finish),
|
||||
(UT_APP_BOOT_UP_DONE, handle_next_stage),
|
||||
timeout=one_case["timeout"])
|
||||
except ExpectTimeout:
|
||||
Utility.console_log("Timeout in expect", color="orange")
|
||||
one_case_finish(False)
|
||||
break
|
||||
if stage_finish[0] == "break":
|
||||
# test breaks on current stage
|
||||
break
|
||||
|
||||
|
||||
@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
|
||||
def run_multiple_stage_cases(env, extra_data):
|
||||
"""
|
||||
extra_data can be 2 types of value
|
||||
1. as dict: Mandantory keys: "name" and "child case num", optional keys: "reset" and others
|
||||
3. as list of string or dict:
|
||||
[case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
|
||||
|
||||
:param extra_data: the case name or case list or case dictionary
|
||||
:return: None
|
||||
"""
|
||||
|
||||
case_config = format_test_case_config(extra_data)
|
||||
|
||||
# we don't want stop on failed case (unless some special scenarios we can't handle)
|
||||
# this flag is used to log if any of the case failed during executing
|
||||
# Before exit test function this flag is used to log if the case fails
|
||||
failed_cases = []
|
||||
|
||||
for ut_config in case_config:
|
||||
Utility.console_log("Running unit test for config: " + ut_config, "O")
|
||||
dut = env.get_dut("unit-test-app", app_path=ut_config)
|
||||
dut.start_app()
|
||||
|
||||
for one_case in case_config[ut_config]:
|
||||
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
|
||||
try:
|
||||
run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case)
|
||||
TinyFW.JunitReport.test_case_finish(junit_test_case)
|
||||
except Exception as e:
|
||||
junit_test_case.add_error_info("Unexpected exception: " + str(e))
|
||||
TinyFW.JunitReport.test_case_finish(junit_test_case)
|
||||
|
||||
# raise exception if any case fails
|
||||
if failed_cases:
|
||||
Utility.console_log("Failed Cases:", color="red")
|
||||
for _case_name in failed_cases:
|
||||
Utility.console_log("\t" + _case_name, color="red")
|
||||
raise AssertionError("Unit Test Failed")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run_multiple_devices_cases(extra_data={"name": "gpio master/slave test example",
|
||||
"child case num": 2,
|
||||
"config": "release",
|
||||
"env_tag": "UT_T2_1"})
|
||||
Reference in New Issue
Block a user