Skip to content

Commit 034b238

Browse files
authored
Merge pull request #6891 from dmitrygx/topic/ucp/ka_iter
UCP/CORE: Ensure that KA isn't done several times for the same EP during the round
2 parents 430e031 + 0e22a53 commit 034b238

File tree

4 files changed

+81
-78
lines changed

4 files changed

+81
-78
lines changed

src/ucp/core/ucp_ep.c

Lines changed: 31 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -164,23 +164,23 @@ ucs_status_t ucp_ep_create_base(ucp_worker_h worker, const char *peer_name,
164164
goto err_free_ep;
165165
}
166166

167-
ep->refcount = 1;
168-
ep->cfg_index = UCP_WORKER_CFG_INDEX_NULL;
169-
ep->worker = worker;
170-
ep->am_lane = UCP_NULL_LANE;
171-
ep->flags = 0;
172-
ep->conn_sn = UCP_EP_MATCH_CONN_SN_MAX;
167+
ep->refcount = 1;
168+
ep->cfg_index = UCP_WORKER_CFG_INDEX_NULL;
169+
ep->worker = worker;
170+
ep->am_lane = UCP_NULL_LANE;
171+
ep->flags = 0;
172+
ep->conn_sn = UCP_EP_MATCH_CONN_SN_MAX;
173173
#if UCS_ENABLE_ASSERT
174-
ep->flush_iter_refcount = 0;
175-
ep->discard_refcount = 0;
174+
ep->flush_iter_refcount = 0;
175+
ep->discard_refcount = 0;
176176
#endif
177-
ucp_ep_ext_gen(ep)->user_data = NULL;
178-
ucp_ep_ext_control(ep)->cm_idx = UCP_NULL_RESOURCE;
179-
ucp_ep_ext_control(ep)->err_cb = NULL;
180-
ucp_ep_ext_control(ep)->local_ep_id = UCS_PTR_MAP_KEY_INVALID;
181-
ucp_ep_ext_control(ep)->remote_ep_id = UCS_PTR_MAP_KEY_INVALID;
177+
ucp_ep_ext_gen(ep)->user_data = NULL;
178+
ucp_ep_ext_control(ep)->cm_idx = UCP_NULL_RESOURCE;
179+
ucp_ep_ext_control(ep)->err_cb = NULL;
180+
ucp_ep_ext_control(ep)->local_ep_id = UCS_PTR_MAP_KEY_INVALID;
181+
ucp_ep_ext_control(ep)->remote_ep_id = UCS_PTR_MAP_KEY_INVALID;
182182
#if UCS_ENABLE_ASSERT
183-
ucp_ep_ext_control(ep)->ka_count = 0;
183+
ucp_ep_ext_control(ep)->ka_last_round = 0;
184184
#endif
185185

186186
UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen(ep)->ep_match) >=
@@ -2754,7 +2754,7 @@ ucs_status_t ucp_ep_do_uct_ep_keepalive(ucp_ep_h ucp_ep, uct_ep_h uct_ep,
27542754
return (packed_len > 0) ? UCS_OK : (ucs_status_t)packed_len;
27552755
}
27562756

2757-
int ucp_ep_do_keepalive(ucp_ep_h ep)
2757+
int ucp_ep_do_keepalive(ucp_ep_h ep, ucs_time_t now)
27582758
{
27592759
ucp_worker_h worker = ep->worker;
27602760
ucp_lane_index_t lane;
@@ -2763,16 +2763,8 @@ int ucp_ep_do_keepalive(ucp_ep_h ep)
27632763

27642764
UCP_WORKER_THREAD_CS_CHECK_IS_BLOCKED(ep->worker);
27652765

2766-
if (ep->flags & UCP_EP_FLAG_FAILED) {
2767-
worker->keepalive.lane_map = 0;
2768-
return 1;
2769-
}
2770-
2771-
/* Take updated ep_check_map, in case ep configuration has changed */
2772-
worker->keepalive.lane_map &= ucp_ep_config(ep)->key.ep_check_map;
2773-
if (worker->keepalive.lane_map == 0) {
2774-
return 1;
2775-
}
2766+
ucs_assert(!(ep->flags & UCP_EP_FLAG_FAILED));
2767+
ucs_assert(worker->keepalive.lane_map != 0);
27762768

27772769
ucs_for_each_bit(lane, worker->keepalive.lane_map) {
27782770
ucs_assert(lane < UCP_MAX_LANES);
@@ -2793,23 +2785,23 @@ int ucp_ep_do_keepalive(ucp_ep_h ep)
27932785
worker->keepalive.lane_map &= ~UCS_BIT(lane);
27942786
}
27952787

2796-
if (worker->keepalive.lane_map == 0) {
2788+
/* Keepalive round is still not finished for this ep - skip ka_last_round
2789+
* update */
2790+
if (worker->keepalive.lane_map != 0) {
2791+
return 0;
2792+
}
2793+
27972794
#if UCS_ENABLE_ASSERT
2798-
ucp_ep_ext_control(ep)->ka_count++;
2799-
/* Difference between the number of EP keepalive rounds and total number
2800-
* of KA rounds done on Worker must be <= 2, because keepalive could be
2801-
* done twice for the same EP in case of number of EPs was decreased */
2802-
ucs_assertv((ucp_ep_ext_control(ep)->ka_count /
2803-
(worker->keepalive.round_count + 1)) <= 2,
2804-
"ep %p: keepalive.round_count=%" PRIu64
2805-
" ep.ka_count=%" PRIu64,
2806-
ep, worker->keepalive.round_count,
2807-
ucp_ep_ext_control(ep)->ka_count);
2795+
ucs_assertv((now - ucp_ep_ext_control(ep)->ka_last_round) >=
2796+
worker->context->config.ext.keepalive_interval,
2797+
"ep %p: now=%" PRIu64 " ka_last_round=%" PRIu64
2798+
" ka_interval=%" PRIu64,
2799+
ep, now, ucp_ep_ext_control(ep)->ka_last_round,
2800+
worker->context->config.ext.keepalive_interval);
2801+
ucp_ep_ext_control(ep)->ka_last_round = now;
28082802
#endif
2809-
return 1;
2810-
}
28112803

2812-
return 0;
2804+
return 1;
28132805
}
28142806

28152807
static void ucp_ep_req_purge(ucp_ep_h ucp_ep, ucp_request_t *req,

src/ucp/core/ucp_ep.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ typedef struct {
433433
ucp_err_handler_cb_t err_cb; /* Error handler */
434434
ucp_ep_close_proto_req_t close_req; /* Close protocol request */
435435
#if UCS_ENABLE_ASSERT
436-
size_t ka_count; /* Number of KA rounds done */
436+
ucs_time_t ka_last_round; /* Time of last KA round done */
437437
#endif
438438
} ucp_ep_ext_control_t;
439439

@@ -671,10 +671,11 @@ ucs_status_t ucp_ep_do_uct_ep_keepalive(ucp_ep_h ucp_ep, uct_ep_h uct_ep,
671671
* @brief Do keepalive operation.
672672
*
673673
* @param [in] ep UCP Endpoint object to operate keepalive.
674+
* @param [in] now Current time when keepalive started.
674675
*
675676
* @return Indication whether keepalive was fully done for UCP Endpoint or not.
676677
*/
677-
int ucp_ep_do_keepalive(ucp_ep_h ep);
678+
int ucp_ep_do_keepalive(ucp_ep_h ep, ucs_time_t now);
678679

679680
/**
680681
* @brief Purge flush and protocol requests scheduled on a given UCP endpoint.

src/ucp/core/ucp_worker.c

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2055,6 +2055,7 @@ static UCS_F_ALWAYS_INLINE void ucp_worker_keepalive_reset(ucp_worker_h worker)
20552055
worker->keepalive.ep_count = 0;
20562056
worker->keepalive.iter_count = 0;
20572057
worker->keepalive.iter = &worker->all_eps;
2058+
worker->keepalive.iter_begin = worker->keepalive.iter;
20582059
worker->keepalive.round_count = 0;
20592060
}
20602061

@@ -2908,42 +2909,50 @@ void ucp_worker_print_info(ucp_worker_h worker, FILE *stream)
29082909
}
29092910

29102911
static UCS_F_ALWAYS_INLINE ucp_ep_h
2911-
ucp_worker_keepalive_current_ep(ucp_worker_h worker)
2912-
{
2913-
ucp_ep_ext_gen_t *ep_ext;
2914-
2915-
ucs_assert(worker->keepalive.iter != &worker->all_eps);
2916-
ep_ext = ucs_container_of(worker->keepalive.iter, ucp_ep_ext_gen_t,
2917-
ep_list);
2918-
return ucp_ep_from_ext_gen(ep_ext);
2919-
}
2920-
2921-
static UCS_F_ALWAYS_INLINE void
29222912
ucp_worker_keepalive_next_ep(ucp_worker_h worker)
29232913
{
29242914
ucp_ep_h ep;
29252915

2926-
worker->keepalive.iter = worker->keepalive.iter->next;
2927-
if (worker->keepalive.iter == &worker->all_eps) {
2928-
/* if next list item points to all_eps then step one more time */
2916+
if (worker->keepalive.lane_map == 0) {
29292917
worker->keepalive.iter = worker->keepalive.iter->next;
2918+
if (worker->keepalive.iter == &worker->all_eps) {
2919+
return NULL;
2920+
}
2921+
2922+
worker->keepalive.lane_map = UCS_MASK(UCP_MAX_LANES);
29302923
}
29312924

29322925
ucs_assert(worker->keepalive.iter != &worker->all_eps);
2933-
ep = ucp_worker_keepalive_current_ep(worker);
2934-
worker->keepalive.lane_map = ((ep->cfg_index != UCP_WORKER_CFG_INDEX_NULL) &&
2935-
!(ep->flags & UCP_EP_FLAG_FAILED)) ?
2936-
ucp_ep_config(ep)->key.ep_check_map : 0;
2926+
ep = ucp_ep_from_ext_gen(ucs_container_of(worker->keepalive.iter,
2927+
ucp_ep_ext_gen_t, ep_list));
2928+
2929+
if ((ep->cfg_index == UCP_WORKER_CFG_INDEX_NULL) ||
2930+
(ep->flags & UCP_EP_FLAG_FAILED)) {
2931+
worker->keepalive.lane_map = 0;
2932+
return NULL;
2933+
}
2934+
2935+
/* Take updated ep_check_map, in case endpoint configuration has changed
2936+
* before continuing this round */
2937+
worker->keepalive.lane_map &= ucp_ep_config(ep)->key.ep_check_map;
2938+
if (worker->keepalive.lane_map == 0) {
2939+
return NULL;
2940+
}
2941+
2942+
return ep;
29372943
}
29382944

29392945
static UCS_F_NOINLINE unsigned
29402946
ucp_worker_do_keepalive_progress(ucp_worker_h worker)
29412947
{
29422948
unsigned progress_count = 0;
2943-
unsigned max_ep_count;
2949+
unsigned max_ep_count = worker->context->config.ext.keepalive_num_eps;
29442950
ucs_time_t now;
29452951
ucp_ep_h ep;
29462952

2953+
ucs_assertv(worker->keepalive.ep_count < max_ep_count,
2954+
"worker %p: ep_count=%u max_ep_count=%u",
2955+
worker, worker->keepalive.ep_count, max_ep_count);
29472956
ucs_assert(worker->context->config.ext.keepalive_num_eps != 0);
29482957

29492958
now = ucs_get_time();
@@ -2967,22 +2976,19 @@ ucp_worker_do_keepalive_progress(ucp_worker_h worker)
29672976
goto out_unblock;
29682977
}
29692978

2970-
if (ucs_unlikely(worker->keepalive.iter == &worker->all_eps)) {
2971-
ucp_worker_keepalive_next_ep(worker);
2972-
}
2973-
2974-
max_ep_count = ucs_min(worker->context->config.ext.keepalive_num_eps,
2975-
worker->num_all_eps);
2976-
29772979
/* Use own loop for elements because standard for_each skips
29782980
* head element */
29792981
/* TODO: use more optimal algo to enumerate EPs to keepalive
29802982
* (linked list) */
2981-
while (worker->keepalive.ep_count < max_ep_count) {
2982-
ep = ucp_worker_keepalive_current_ep(worker);
2983+
do {
2984+
ep = ucp_worker_keepalive_next_ep(worker);
2985+
if (ep == NULL) {
2986+
continue;
2987+
}
2988+
29832989
ucs_trace_func("worker %p: do keepalive on ep %p lane_map 0x%x", worker,
29842990
ep, worker->keepalive.lane_map);
2985-
if (!ucp_ep_do_keepalive(ep)) {
2991+
if (!ucp_ep_do_keepalive(ep, now)) {
29862992
/* In case if EP has no resources to send keepalive message
29872993
* then just return without update of last_round timestamp,
29882994
* on next progress iteration we will continue from this point */
@@ -2991,11 +2997,12 @@ ucp_worker_do_keepalive_progress(ucp_worker_h worker)
29912997

29922998
progress_count++;
29932999
worker->keepalive.ep_count++;
2994-
ucp_worker_keepalive_next_ep(worker);
2995-
}
3000+
} while ((worker->keepalive.ep_count < max_ep_count) &&
3001+
(worker->keepalive.iter != worker->keepalive.iter_begin));
29963002

29973003
ucs_trace("worker %p: sent keepalive on %u endpoints",
29983004
worker, worker->keepalive.ep_count);
3005+
worker->keepalive.iter_begin = worker->keepalive.iter;
29993006
worker->keepalive.last_round = now;
30003007
worker->keepalive.ep_count = 0;
30013008
worker->keepalive.round_count++;
@@ -3053,13 +3060,14 @@ void ucp_worker_keepalive_remove_ep(ucp_ep_h ep)
30533060
return;
30543061
}
30553062

3056-
if (ucs_list_is_only(&worker->all_eps, &ucp_ep_ext_gen(ep)->ep_list)) {
3057-
/* this is the last EP in worker */
3058-
worker->keepalive.iter = &worker->all_eps;
3059-
} else if (worker->keepalive.iter == &ucp_ep_ext_gen(ep)->ep_list) {
3060-
/* if iterator points into EP to be removed - then
3061-
* step to next EP */
3063+
if (worker->keepalive.iter == &ucp_ep_ext_gen(ep)->ep_list) {
3064+
/* Set lane_map=0 to make sure the endpoint won't be selected again */
3065+
worker->keepalive.lane_map = 0;
3066+
30623067
ucp_worker_keepalive_next_ep(worker);
3068+
ucs_assert(worker->keepalive.iter != &ucp_ep_ext_gen(ep)->ep_list);
3069+
3070+
worker->keepalive.iter_begin = worker->keepalive.iter;
30633071
}
30643072
}
30653073

src/ucp/core/ucp_worker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,8 @@ typedef struct ucp_worker {
302302
uct_worker_cb_id_t cb_id; /* Keepalive callback id */
303303
ucs_time_t last_round; /* Last round timestamp */
304304
ucs_list_link_t *iter; /* Last EP processed keepalive */
305+
ucs_list_link_t *iter_begin; /* First EP processed keepalive in the
306+
* current round */
305307
ucp_lane_map_t lane_map; /* Lane map used to retry after no-resources */
306308
unsigned ep_count; /* Number of EPs processed in current time slot */
307309
unsigned iter_count; /* Number of progress iterations to skip,

0 commit comments

Comments
 (0)