DPDK 23.11.0
Loading...
Searching...
No Matches
rte_rcu_qsbr.h
Go to the documentation of this file.
1/* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (c) 2018-2020 Arm Limited
3 */
4
5#ifndef _RTE_RCU_QSBR_H_
6#define _RTE_RCU_QSBR_H_
7
24#ifdef __cplusplus
25extern "C" {
26#endif
27
28#include <inttypes.h>
29#include <stdbool.h>
30#include <stdio.h>
31#include <stdint.h>
32
33#include <rte_common.h>
34#include <rte_debug.h>
35#include <rte_atomic.h>
36#include <rte_ring.h>
37
38extern int rte_rcu_log_type;
39
40#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
41#define __RTE_RCU_DP_LOG(level, fmt, args...) \
42 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
43 "%s(): " fmt "\n", __func__, ## args)
44#else
45#define __RTE_RCU_DP_LOG(level, fmt, args...)
46#endif
47
48#if defined(RTE_LIBRTE_RCU_DEBUG)
49#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
50 if (v->qsbr_cnt[thread_id].lock_cnt) \
51 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
52 "%s(): " fmt "\n", __func__, ## args); \
53} while (0)
54#else
55#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
56#endif
57
58/* Registered thread IDs are stored as a bitmap of 64b element array.
59 * Given thread id needs to be converted to index into the array and
60 * the id within the array element.
61 */
62#define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(RTE_ATOMIC(uint64_t)) * 8)
63#define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
64 RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
65 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
66#define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t __rte_atomic *) \
67 ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
68#define __RTE_QSBR_THRID_INDEX_SHIFT 6
69#define __RTE_QSBR_THRID_MASK 0x3f
70#define RTE_QSBR_THRID_INVALID 0xffffffff
71
72/* Worker thread counter */
73struct rte_rcu_qsbr_cnt {
74 RTE_ATOMIC(uint64_t) cnt;
80 RTE_ATOMIC(uint32_t) lock_cnt;
83
84#define __RTE_QSBR_CNT_THR_OFFLINE 0
85#define __RTE_QSBR_CNT_INIT 1
86#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
87#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
88
89/* RTE Quiescent State variable structure.
90 * This structure has two elements that vary in size based on the
91 * 'max_threads' parameter.
92 * 1) Quiescent state counter array
93 * 2) Register thread ID array
94 */
95struct rte_rcu_qsbr {
96 RTE_ATOMIC(uint64_t) token __rte_cache_aligned;
98 RTE_ATOMIC(uint64_t) acked_token;
103 uint32_t num_elems __rte_cache_aligned;
105 RTE_ATOMIC(uint32_t) num_threads;
107 uint32_t max_threads;
110 struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
117
131typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
132
133#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
134
143#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
144
149 const char *name;
151 uint32_t flags;
153 uint32_t size;
160 uint32_t esize;
182 void *p;
187 struct rte_rcu_qsbr *v;
189};
190
191/* RTE defer queue structure.
192 * This structure holds the defer queue. The defer queue is used to
193 * hold the deleted entries from the data structure that are not
194 * yet freed.
195 */
196struct rte_rcu_qsbr_dq;
197
209size_t
210rte_rcu_qsbr_get_memsize(uint32_t max_threads);
211
226int
227rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
228
249int
250rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
251
267int
268rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
269
295static __rte_always_inline void
296rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
297{
298 uint64_t t;
299
300 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
301
302 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
303 v->qsbr_cnt[thread_id].lock_cnt);
304
305 /* Copy the current value of token.
306 * The fence at the end of the function will ensure that
307 * the following will not move down after the load of any shared
308 * data structure.
309 */
310 t = rte_atomic_load_explicit(&v->token, rte_memory_order_relaxed);
311
312 /* rte_atomic_store_explicit(cnt, rte_memory_order_relaxed) is used to ensure
313 * 'cnt' (64b) is accessed atomically.
314 */
315 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
316 t, rte_memory_order_relaxed);
317
318 /* The subsequent load of the data structure should not
319 * move above the store. Hence a store-load barrier
320 * is required.
321 * If the load of the data structure moves above the store,
322 * writer might not see that the reader is online, even though
323 * the reader is referencing the shared data structure.
324 */
325 rte_atomic_thread_fence(rte_memory_order_seq_cst);
326}
327
348static __rte_always_inline void
349rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
350{
351 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
352
353 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
354 v->qsbr_cnt[thread_id].lock_cnt);
355
356 /* The reader can go offline only after the load of the
357 * data structure is completed. i.e. any load of the
358 * data structure can not move after this store.
359 */
360
361 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
362 __RTE_QSBR_CNT_THR_OFFLINE, rte_memory_order_release);
363}
364
385static __rte_always_inline void
386rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
387 __rte_unused unsigned int thread_id)
388{
389 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
390
391#if defined(RTE_LIBRTE_RCU_DEBUG)
392 /* Increment the lock counter */
393 rte_atomic_fetch_add_explicit(&v->qsbr_cnt[thread_id].lock_cnt,
394 1, rte_memory_order_acquire);
395#endif
396}
397
418static __rte_always_inline void
419rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
420 __rte_unused unsigned int thread_id)
421{
422 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
423
424#if defined(RTE_LIBRTE_RCU_DEBUG)
425 /* Decrement the lock counter */
426 rte_atomic_fetch_sub_explicit(&v->qsbr_cnt[thread_id].lock_cnt,
427 1, rte_memory_order_release);
428
429 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
430 "Lock counter %u. Nested locks?\n",
431 v->qsbr_cnt[thread_id].lock_cnt);
432#endif
433}
434
448static __rte_always_inline uint64_t
449rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
450{
451 uint64_t t;
452
453 RTE_ASSERT(v != NULL);
454
455 /* Release the changes to the shared data structure.
456 * This store release will ensure that changes to any data
457 * structure are visible to the workers before the token
458 * update is visible.
459 */
460 t = rte_atomic_fetch_add_explicit(&v->token, 1, rte_memory_order_release) + 1;
461
462 return t;
463}
464
477static __rte_always_inline void
478rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
479{
480 uint64_t t;
481
482 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
483
484 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
485 v->qsbr_cnt[thread_id].lock_cnt);
486
487 /* Acquire the changes to the shared data structure released
488 * by rte_rcu_qsbr_start.
489 * Later loads of the shared data structure should not move
490 * above this load. Hence, use load-acquire.
491 */
492 t = rte_atomic_load_explicit(&v->token, rte_memory_order_acquire);
493
494 /* Check if there are updates available from the writer.
495 * Inform the writer that updates are visible to this reader.
496 * Prior loads of the shared data structure should not move
497 * beyond this store. Hence use store-release.
498 */
499 if (t != rte_atomic_load_explicit(&v->qsbr_cnt[thread_id].cnt, rte_memory_order_relaxed))
500 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
501 t, rte_memory_order_release);
502
503 __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
504 __func__, t, thread_id);
505}
506
507/* Check the quiescent state counter for registered threads only, assuming
508 * that not all threads have registered.
509 */
510static __rte_always_inline int
511__rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
512{
513 uint32_t i, j, id;
514 uint64_t bmap;
515 uint64_t c;
516 RTE_ATOMIC(uint64_t) *reg_thread_id;
517 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
518
519 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
520 i < v->num_elems;
521 i++, reg_thread_id++) {
522 /* Load the current registered thread bit map before
523 * loading the reader thread quiescent state counters.
524 */
525 bmap = rte_atomic_load_explicit(reg_thread_id, rte_memory_order_acquire);
526 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
527
528 while (bmap) {
529 j = rte_ctz64(bmap);
530 __RTE_RCU_DP_LOG(DEBUG,
531 "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
532 __func__, t, wait, bmap, id + j);
533 c = rte_atomic_load_explicit(
534 &v->qsbr_cnt[id + j].cnt,
535 rte_memory_order_acquire);
536 __RTE_RCU_DP_LOG(DEBUG,
537 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
538 __func__, t, wait, c, id+j);
539
540 /* Counter is not checked for wrap-around condition
541 * as it is a 64b counter.
542 */
543 if (unlikely(c !=
544 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
545 /* This thread is not in quiescent state */
546 if (!wait)
547 return 0;
548
549 rte_pause();
550 /* This thread might have unregistered.
551 * Re-read the bitmap.
552 */
553 bmap = rte_atomic_load_explicit(reg_thread_id,
554 rte_memory_order_acquire);
555
556 continue;
557 }
558
559 /* This thread is in quiescent state. Use the counter
560 * to find the least acknowledged token among all the
561 * readers.
562 */
563 if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
564 acked_token = c;
565
566 bmap &= ~(1UL << j);
567 }
568 }
569
570 /* All readers are checked, update least acknowledged token.
571 * There might be multiple writers trying to update this. There is
572 * no need to update this very accurately using compare-and-swap.
573 */
574 if (acked_token != __RTE_QSBR_CNT_MAX)
575 rte_atomic_store_explicit(&v->acked_token, acked_token,
576 rte_memory_order_relaxed);
577
578 return 1;
579}
580
581/* Check the quiescent state counter for all threads, assuming that
582 * all the threads have registered.
583 */
584static __rte_always_inline int
585__rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
586{
587 uint32_t i;
588 struct rte_rcu_qsbr_cnt *cnt;
589 uint64_t c;
590 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
591
592 for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
593 __RTE_RCU_DP_LOG(DEBUG,
594 "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
595 __func__, t, wait, i);
596 while (1) {
597 c = rte_atomic_load_explicit(&cnt->cnt, rte_memory_order_acquire);
598 __RTE_RCU_DP_LOG(DEBUG,
599 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
600 __func__, t, wait, c, i);
601
602 /* Counter is not checked for wrap-around condition
603 * as it is a 64b counter.
604 */
605 if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
606 break;
607
608 /* This thread is not in quiescent state */
609 if (!wait)
610 return 0;
611
612 rte_pause();
613 }
614
615 /* This thread is in quiescent state. Use the counter to find
616 * the least acknowledged token among all the readers.
617 */
618 if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
619 acked_token = c;
620 }
621
622 /* All readers are checked, update least acknowledged token.
623 * There might be multiple writers trying to update this. There is
624 * no need to update this very accurately using compare-and-swap.
625 */
626 if (acked_token != __RTE_QSBR_CNT_MAX)
627 rte_atomic_store_explicit(&v->acked_token, acked_token,
628 rte_memory_order_relaxed);
629
630 return 1;
631}
632
664static __rte_always_inline int
665rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
666{
667 RTE_ASSERT(v != NULL);
668
669 /* Check if all the readers have already acknowledged this token */
670 if (likely(t <= v->acked_token)) {
671 __RTE_RCU_DP_LOG(DEBUG,
672 "%s: check: token = %" PRIu64 ", wait = %d",
673 __func__, t, wait);
674 __RTE_RCU_DP_LOG(DEBUG,
675 "%s: status: least acked token = %" PRIu64,
676 __func__, v->acked_token);
677 return 1;
678 }
679
680 if (likely(v->num_threads == v->max_threads))
681 return __rte_rcu_qsbr_check_all(v, t, wait);
682 else
683 return __rte_rcu_qsbr_check_selective(v, t, wait);
684}
685
704void
705rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
706
722int
723rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
724
738struct rte_rcu_qsbr_dq *
740
769int
770rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
771
794int
795rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
796 unsigned int *freed, unsigned int *pending, unsigned int *available);
797
816int
817rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
818
819#ifdef __cplusplus
820}
821#endif
822
823#endif /* _RTE_RCU_QSBR_H_ */
static void rte_atomic_thread_fence(rte_memory_order memorder)
static unsigned int rte_ctz64(uint64_t v)
Definition rte_bitops.h:395
#define likely(x)
#define unlikely(x)
#define __rte_cache_aligned
Definition rte_common.h:524
#define __rte_unused
Definition rte_common.h:143
#define __rte_always_inline
Definition rte_common.h:331
static void rte_pause(void)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
struct rte_rcu_qsbr * v
rte_rcu_qsbr_free_resource_t free_fn