record: Remove rtems_record_drain()

The rtems_record_drain() function is unreliable.  Replace it with
rtems_record_fetch().
This commit is contained in:
Sebastian Huber
2024-10-10 07:47:10 +02:00
committed by Chris Johns
parent 049b9b5c82
commit be764f7dec
8 changed files with 98 additions and 321 deletions

View File

@@ -1891,29 +1891,6 @@ uint32_t rtems_record_interrupt_disable( void );
*/
void rtems_record_interrupt_enable( uint32_t level );
typedef void ( *rtems_record_drain_visitor )(
const rtems_record_item *items,
size_t count,
void *arg
);
void _Record_Drain(
Record_Control *control,
uint32_t cpu_index,
rtems_record_drain_visitor visitor,
void *arg
);
/**
* @brief Drains the record items on all processors.
*
* Calls the visitor function for each drained item set.
*
* @param visitor The visitor function.
* @param arg The argument for the visitor function.
*/
void rtems_record_drain( rtems_record_drain_visitor visitor, void *arg );
/**
* @brief This structure controls the record fetching performed by rtems_record_fetch().
*

View File

@@ -111,22 +111,15 @@ typedef struct {
uint32_t overflow;
/**
* @brief If true, then hold back items for overflow or initial ramp up
* processing.
* @brief If true, then hold back items.
*/
bool hold_back;
/**
* @brief Storage for hold back items.
*
* In case of a ring buffer overflow, the rtems_record_drain() will push the
* complete ring buffer content to the client. While the items are processed
* by the client, new items may overwrite some items being processed. The
* overwritten items can be detected in the following iteration once the next
* tail/head information is pushed to the client.
*
* In case of the initial ramp up, the items are stored in the hold back
* buffer to determine the uptime of the first event.
* Once the time stamp association with the uptime is known, the hold back
* items can be processed.
*/
rtems_record_item_64 *items;

View File

@@ -37,18 +37,6 @@
extern "C" {
#endif /* __cplusplus */
/**
* @brief Drains the record items on all processors an writes them to the file
* descriptor.
*
* @param fd The file descriptor.
* @param written Set to true if items were written to the file descriptor,
* otherwise set to false.
*
* @retval The bytes written to the file descriptor.
*/
ssize_t rtems_record_writev( int fd, bool *written );
/**
* @brief Runs a record TCP server loop.
*

View File

@@ -1,7 +1,7 @@
/* SPDX-License-Identifier: BSD-2-Clause */
/*
* Copyright (C) 2020 embedded brains GmbH & Co. KG
* Copyright (C) 2020, 2024 embedded brains GmbH & Co. KG
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -80,29 +80,36 @@ static bool thread_names_visitor( rtems_tcb *tcb, void *arg )
return false;
}
static void drain_visitor(
const rtems_record_item *items,
size_t count,
void *arg
)
{
dump_chunk( arg, items, count * sizeof( *items ) );
}
void rtems_record_dump(
rtems_record_dump_chunk chunk,
void *arg
)
{
Record_Stream_header header;
size_t size;
dump_context ctx;
Record_Stream_header header;
size_t size;
dump_context ctx;
rtems_record_fetch_control control;
rtems_record_item items[ 128 ];
rtems_record_fetch_status status;
ctx.chunk = chunk;
ctx.arg = arg;
size = _Record_Stream_header_initialize( &header );
dump_chunk( &ctx, &header, size );
rtems_task_iterate( thread_names_visitor, &ctx );
rtems_record_drain( drain_visitor, &ctx );
_Thread_Iterate( thread_names_visitor, &ctx );
rtems_record_fetch_initialize(
&control,
&items[ 0 ],
RTEMS_ARRAY_SIZE( items )
);
do {
status = rtems_record_fetch( &control );
dump_chunk(
&ctx,
control.fetched_items,
control.fetched_count * sizeof( *control.fetched_items )
);
} while ( status == RTEMS_RECORD_FETCH_CONTINUE );
}

View File

@@ -1,7 +1,7 @@
/*
* SPDX-License-Identifier: BSD-2-Clause
*
* Copyright (C) 2018, 2019 embedded brains GmbH & Co. KG
* Copyright (C) 2018, 2024 embedded brains GmbH & Co. KG
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -36,60 +36,12 @@
#include <sys/socket.h>
#include <sys/uio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#ifdef RTEMS_SMP
#define CHUNKS (3 * CPU_MAXIMUM_PROCESSORS)
#else
#define CHUNKS 4
#endif
typedef struct {
int available;
struct iovec *current;
struct iovec iov[CHUNKS];
} writev_visitor_context;
static void writev_visitor(
const rtems_record_item *items,
size_t count,
void *arg
)
{
writev_visitor_context *ctx;
ctx = arg;
if ( ctx->available > 0 ) {
ctx->current->iov_base = RTEMS_DECONST( rtems_record_item *, items );
ctx->current->iov_len = count * sizeof( *items );
--ctx->available;
++ctx->current;
}
}
ssize_t rtems_record_writev( int fd, bool *written )
{
writev_visitor_context ctx;
int n;
ctx.available = CHUNKS;
ctx.current = &ctx.iov[ 0 ];
rtems_record_drain( writev_visitor, &ctx );
n = CHUNKS - ctx.available;
if ( n > 0 ) {
*written = true;
return writev( fd, &ctx.iov[ 0 ], n );
} else {
*written = false;
return 0;
}
}
#define WAKEUP_EVENT RTEMS_EVENT_0
static void wakeup( rtems_id task )
@@ -198,6 +150,38 @@ static void send_thread_names( int fd )
}
}
static void fetch_and_write(
int fd,
rtems_record_item *items,
size_t count
)
{
rtems_record_fetch_control control;
rtems_record_fetch_initialize( &control, items, count );
while ( true ) {
rtems_record_fetch_status status;
do {
ssize_t n;
status = rtems_record_fetch( &control );
n = write(
fd,
control.fetched_items,
control.fetched_count * sizeof( *control.fetched_items )
);
if ( n <= 0 ) {
return;
}
} while ( status == RTEMS_RECORD_FETCH_CONTINUE );
wait( RTEMS_WAIT );
}
}
void rtems_record_server( uint16_t port, rtems_interval period )
{
rtems_status_code sc;
@@ -206,13 +190,21 @@ void rtems_record_server( uint16_t port, rtems_interval period )
struct sockaddr_in addr;
int sd;
int rv;
size_t count;
rtems_record_item *items;
sd = -1;
self = rtems_task_self();
count = rtems_record_get_item_count_for_fetch();
items = calloc( count, sizeof( *items ) );
if ( items == NULL ) {
return;
}
sc = rtems_timer_create( rtems_build_name( 'R', 'C', 'R', 'D' ), &timer );
if ( sc != RTEMS_SUCCESSFUL ) {
return;
goto error;
}
sd = socket( PF_INET, SOCK_STREAM, 0 );
@@ -237,8 +229,6 @@ void rtems_record_server( uint16_t port, rtems_interval period )
while ( true ) {
int cd;
bool written;
ssize_t n;
cd = accept( sd, NULL, NULL );
@@ -250,23 +240,14 @@ void rtems_record_server( uint16_t port, rtems_interval period )
(void) rtems_timer_fire_after( timer, period, wakeup_timer, &self );
send_header( cd );
send_thread_names( cd );
while ( true ) {
n = rtems_record_writev( cd, &written );
if ( written && n <= 0 ) {
break;
}
wait( RTEMS_WAIT );
}
fetch_and_write( cd, items, count );
(void) rtems_timer_cancel( timer );
(void) close( cd );
}
error:
free( items );
(void) close( sd );
(void) rtems_timer_delete( timer );
}

View File

@@ -123,64 +123,3 @@ size_t _Record_String_to_items(
return i;
}
void _Record_Drain(
Record_Control *control,
uint32_t cpu_index,
rtems_record_drain_visitor visitor,
void *arg
)
{
rtems_record_item header[ 3 ];
unsigned int tail;
unsigned int head;
tail = _Record_Tail( control );
head = _Atomic_Load_uint( &control->head, ATOMIC_ORDER_ACQUIRE );
if ( tail == head ) {
return;
}
control->tail = head;
header[ 0 ].event = RTEMS_RECORD_PROCESSOR;
header[ 0 ].data = cpu_index;
header[ 1 ].event = RTEMS_RECORD_PER_CPU_TAIL;
header[ 1 ].data = tail;
header[ 2 ].event = RTEMS_RECORD_PER_CPU_HEAD;
header[ 2 ].data = head;
( *visitor )( header, RTEMS_ARRAY_SIZE( header ), arg );
if ( _Record_Is_overflow( control, tail, head ) ) {
tail = head + 1;
}
tail = _Record_Index( control, tail );
head = _Record_Index( control, head );
if ( tail < head ) {
( *visitor )( &control->Items[ tail ], head - tail, arg );
} else {
( *visitor )( &control->Items[ tail ], control->mask + 1 - tail, arg );
if ( head > 0 ) {
( *visitor )( &control->Items[ 0 ], head, arg );
}
}
}
void rtems_record_drain( rtems_record_drain_visitor visitor, void *arg )
{
uint32_t cpu_max;
uint32_t cpu_index;
cpu_max = rtems_configuration_get_maximum_processors();
for ( cpu_index = 0; cpu_index < cpu_max; ++cpu_index ) {
Per_CPU_Control *cpu;
cpu = _Per_CPU_Get_by_index( cpu_index );
_Record_Drain( cpu->record, cpu_index, visitor, arg );
}
}

View File

@@ -112,6 +112,7 @@ static const rtems_record_item expected_items_7[ITEM_COUNT] = {
{ .event = TE(10, UE(9)), .data = 11 }
};
#ifdef RTEMS_NETWORKING
static const rtems_record_item expected_items_8[] = {
{ .event = TE(0, RTEMS_RECORD_PROCESSOR), .data = 0 },
{ .event = TE(0, RTEMS_RECORD_PER_CPU_TAIL), .data = 0 },
@@ -154,7 +155,6 @@ static const rtems_record_item expected_items_12[] = {
{ .event = TE(44, UE(43)), .data = 45 }
};
#ifdef RTEMS_NETWORKING
static const rtems_record_item expected_items_13[] = {
{ .event = TE(0, RTEMS_RECORD_THREAD_ID), .data = 0x9010001 },
{
@@ -396,124 +396,6 @@ static void test_produce_n(test_context *ctx, Record_Control *control)
rtems_test_assert(_Record_Tail(control) == 0);
}
typedef struct {
size_t todo;
const rtems_record_item *items;
} visitor_context;
static void visitor(const rtems_record_item *items, size_t count, void *arg)
{
visitor_context *vctx;
vctx = arg;
rtems_test_assert(vctx->todo >= count);
while (count > 0) {
rtems_test_assert(memcmp(items, vctx->items, sizeof(*items)) == 0);
++items;
++vctx->items;
--count;
--vctx->todo;
}
}
static void test_drain(test_context *ctx, Record_Control *control)
{
visitor_context vctx;
init_context(ctx);
vctx.todo = 0;
vctx.items = NULL;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
rtems_record_produce(UE(1), 3);
set_time(&control->Items[0], 2);
rtems_record_produce(UE(4), 6);
set_time(&control->Items[1], 5);
rtems_record_produce(UE(7), 9);
set_time(&control->Items[2], 8);
vctx.todo = RTEMS_ARRAY_SIZE(expected_items_8);
vctx.items = expected_items_8;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
vctx.todo = 0;
vctx.items = NULL;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
rtems_record_produce(UE(10), 12);
set_time(&control->Items[3], 11);
rtems_record_produce(UE(13), 15);
set_time(&control->Items[0], 14);
vctx.todo = RTEMS_ARRAY_SIZE(expected_items_9);
vctx.items = expected_items_9;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
vctx.todo = 0;
vctx.items = NULL;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
rtems_record_produce(UE(16), 18);
set_time(&control->Items[1], 17);
rtems_record_produce(UE(19), 21);
set_time(&control->Items[2], 20);
rtems_record_produce(UE(22), 24);
set_time(&control->Items[3], 23);
vctx.todo = RTEMS_ARRAY_SIZE(expected_items_10);
vctx.items = expected_items_10;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
vctx.todo = 0;
vctx.items = NULL;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
rtems_record_produce(UE(25), 27);
set_time(&control->Items[0], 26);
vctx.todo = RTEMS_ARRAY_SIZE(expected_items_11);
vctx.items = expected_items_11;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
vctx.todo = 0;
vctx.items = NULL;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
rtems_record_produce(UE(28), 30);
set_time(&control->Items[1], 29);
rtems_record_produce(UE(31), 33);
set_time(&control->Items[2], 32);
rtems_record_produce(UE(34), 36);
set_time(&control->Items[3], 35);
rtems_record_produce(UE(37), 39);
set_time(&control->Items[0], 38);
rtems_record_produce(UE(40), 42);
set_time(&control->Items[1], 41);
rtems_record_produce(UE(43), 45);
set_time(&control->Items[2], 44);
vctx.todo = RTEMS_ARRAY_SIZE(expected_items_12);
vctx.items = expected_items_12;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
vctx.todo = 0;
vctx.items = NULL;
rtems_record_drain(visitor, &vctx);
rtems_test_assert(vctx.todo == 0);
}
#ifdef RTEMS_NETWORKING
#define PORT 1234
@@ -757,7 +639,6 @@ static void Init(rtems_task_argument arg)
test_produce(ctx, &ctx->control);
test_produce_2(ctx, &ctx->control);
test_produce_n(ctx, &ctx->control);
test_drain(ctx, &ctx->control);
#ifdef RTEMS_NETWORKING
test_server(ctx, &ctx->control);
#endif

View File

@@ -1,7 +1,7 @@
/* SPDX-License-Identifier: BSD-2-Clause */
/*
* Copyright (C) 2018, 2020 embedded brains GmbH & Co. KG
* Copyright (C) 2018, 2024 embedded brains GmbH & Co. KG
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@@ -80,20 +80,6 @@ static rtems_record_client_status client_handler(
return RTEMS_RECORD_CLIENT_SUCCESS;
}
static void drain_visitor(
const rtems_record_item *items,
size_t count,
void *arg
)
{
test_context *ctx;
rtems_record_client_status cs;
ctx = arg;
cs = rtems_record_client_run(&ctx->client, items, count * sizeof(*items));
rtems_test_assert(cs == RTEMS_RECORD_CLIENT_SUCCESS);
}
static void generate_events(void)
{
int i;
@@ -155,6 +141,31 @@ static void generate_events(void)
rtems_record_interrupt_enable(level);
}
static void fetch(test_context *ctx)
{
rtems_record_client_status cs;
rtems_record_fetch_control control;
rtems_record_item items[256];
rtems_record_fetch_status fs;
rtems_record_fetch_initialize(
&control,
&items[0],
RTEMS_ARRAY_SIZE( items )
);
do {
fs = rtems_record_fetch(&control);
cs = rtems_record_client_run(
&ctx->client,
control.fetched_items,
control.fetched_count * sizeof(*control.fetched_items)
);
rtems_test_assert(cs == RTEMS_RECORD_CLIENT_SUCCESS);
} while (fs == RTEMS_RECORD_FETCH_CONTINUE);
}
static void Init(rtems_task_argument arg)
{
test_context *ctx;
@@ -171,7 +182,7 @@ static void Init(rtems_task_argument arg)
size = _Record_Stream_header_initialize(&header);
cs = rtems_record_client_run(&ctx->client, &header, size);
rtems_test_assert(cs == RTEMS_RECORD_CLIENT_SUCCESS);
rtems_record_drain(drain_visitor, ctx);
fetch(ctx);
rtems_record_client_destroy(&ctx->client);
generate_events();