Skip to content

Commit ad3b8a5

Browse files
gerd-rauschBrian Maly
authored andcommitted
net/rds: serialize up+down-work to relax strict ordering
Serialize calls to rds_connect_worker() and rds_shutdown_worker() by taking advantage of the workqueue-API only executing a single running callback per "struct work_struct": We simply en-queue a combined rds_up_or_down_worker(), that calls either rds_shutdown_worker() or rds_connect_worker() depending on the state of "RDS_SHUTDOWN_WORK_QUEUED" or "RDS_RECONNECT_PENDING". Ideally, we wouldn't have such a mess and complexity to deal with state transitions, but rather have a proper state-machine with trigger- and action-definitions, all dispatched by a single worker: That way we'd know that parallelism was taken care of properly, and would also be able to understand which events in a given context lead to what actions. But this commit doesn't aim at rewriting all of this, but rather kick the can down the road by simply combining up+down-work to avoid race conditions and allow "cp->cp_wq" to no longer be strictly ordered. For that we introduce module parameter "rds_wq_strictly_ordered", which is "true" by default, until we convince ourselves, that all race-conditions in the code triggered by no longer using strictly-ordered workqueues are taken care of. Orabug: 35094723 Signed-off-by: Gerd Rausch <[email protected]> Reviewed-by: Håkon Bugge <[email protected]> Signed-off-by: Brian Maly <[email protected]>
1 parent 4b36792 commit ad3b8a5

File tree

4 files changed

+68
-24
lines changed

4 files changed

+68
-24
lines changed

net/rds/connection.c

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646

4747
#define RDS_CONN_FADDR_HASH_ENTRIES 256
4848

49+
static bool rds_wq_strictly_ordered = true;
50+
module_param(rds_wq_strictly_ordered, bool, 0644);
51+
4952
/* converting this to RCU is a chore for another day.. */
5053
static DEFINE_SPINLOCK(rds_conn_lock);
5154
static struct hlist_head rds_conn_hash[RDS_CONNECTION_HASH_ENTRIES];
@@ -215,9 +218,8 @@ static void __rds_conn_path_init(struct rds_connection *conn,
215218

216219
INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker);
217220
INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker);
218-
INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker);
219221
INIT_DELAYED_WORK(&cp->cp_hb_w, rds_hb_worker);
220-
INIT_WORK(&cp->cp_down_w, rds_shutdown_worker);
222+
INIT_DELAYED_WORK(&cp->cp_up_or_down_w, rds_up_or_down_worker);
221223
INIT_DELAYED_WORK(&cp->cp_down_wait_w, rds_conn_shutdown_check_wait);
222224
mutex_init(&cp->cp_cm_lock);
223225
atomic_set(&cp->cp_rdma_map_pending, 0);
@@ -359,7 +361,12 @@ static struct rds_connection *__rds_conn_create(struct net *net,
359361
__rds_conn_path_init(conn, cp, is_outgoing);
360362
cp->cp_index = i;
361363

362-
cp->cp_wq = alloc_ordered_workqueue("krds_cp_wq#%d/%d", 0,
364+
if (rds_wq_strictly_ordered)
365+
cp->cp_wq = alloc_ordered_workqueue("krds_cp_wq#%d/%d", 0,
366+
atomic_read(&conn->c_trans->t_conn_count), i);
367+
else
368+
cp->cp_wq = alloc_workqueue("krds_cp_wq#%d/%d", 0,
369+
RDS_CP_WQ_MAX_ACTIVE,
363370
atomic_read(&conn->c_trans->t_conn_count), i);
364371
if (!cp->cp_wq) {
365372
while (--i >= 0) {
@@ -494,7 +501,6 @@ static void rds_conn_shutdown_final(struct rds_conn_path *cp)
494501
* Otherwise, the reconnect is always triggered by the active peer.
495502
*/
496503

497-
rds_queue_cancel_work(cp, &cp->cp_conn_w, "final shutdown");
498504
rds_clear_reconnect_pending_work_bit(cp);
499505

500506
rcu_read_lock();
@@ -678,7 +684,7 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp, int shutdown)
678684

679685
cp->cp_shutdown_final = &shutdown_final;
680686
rds_conn_path_drop(cp, DR_CONN_DESTROY, 0);
681-
rds_queue_flush_work(cp, &cp->cp_down_w, "conn path destroy down work");
687+
flush_delayed_work(&cp->cp_up_or_down_w);
682688
wait_for_completion(&shutdown_final);
683689

684690
/* tear down queued messages */

net/rds/rds.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ enum {
107107
#define RDS_CONG_MAP_PAGES (PAGE_ALIGN(RDS_CONG_MAP_BYTES) / RDS_CONG_PAGE_SIZE)
108108
#define RDS_CONG_MAP_PAGE_BITS (RDS_CONG_PAGE_SIZE * 8)
109109

110+
#define RDS_CP_WQ_MAX_ACTIVE 4
111+
110112
struct rds_cong_map {
111113
struct rb_node m_rb_node;
112114
struct in6_addr m_addr;
@@ -266,9 +268,8 @@ struct rds_conn_path {
266268
unsigned long cp_reconnect_jiffies;
267269
struct delayed_work cp_send_w;
268270
struct delayed_work cp_recv_w;
269-
struct delayed_work cp_conn_w;
270271
struct delayed_work cp_hb_w;
271-
struct work_struct cp_down_w;
272+
struct delayed_work cp_up_or_down_w;
272273
struct delayed_work cp_down_wait_w;
273274
struct mutex cp_cm_lock; /* protect cp_state & cm */
274275
wait_queue_head_t cp_waitq;
@@ -1129,6 +1130,10 @@ void rds_queue_delayed_work_on(struct rds_conn_path *cp, int cpu,
11291130
struct workqueue_struct *wq,
11301131
struct delayed_work *dwork,
11311132
unsigned long delay, char *reason);
1133+
void rds_mod_delayed_work(struct rds_conn_path *cp,
1134+
struct workqueue_struct *wq,
1135+
struct delayed_work *dwork,
1136+
unsigned long delay, char *reason);
11321137

11331138
static inline void rds_conn_path_state_change(struct rds_conn_path *cp,
11341139
int new, int reason, int err)
@@ -1142,8 +1147,8 @@ static inline void rds_conn_path_state_change(struct rds_conn_path *cp,
11421147
static inline void rds_cond_queue_shutdown_work(struct rds_conn_path *cp)
11431148
{
11441149
if (!test_and_set_bit(RDS_SHUTDOWN_WORK_QUEUED, &cp->cp_flags))
1145-
rds_queue_work(cp, cp->cp_wq, &cp->cp_down_w,
1146-
"queue shutdown work");
1150+
rds_mod_delayed_work(cp, cp->cp_wq, &cp->cp_up_or_down_w,
1151+
0, "queue shutdown work");
11471152
}
11481153

11491154
static inline void rds_clear_shutdown_pending_work_bit(struct rds_conn_path *cp)
@@ -1158,11 +1163,12 @@ static inline void rds_clear_shutdown_pending_work_bit(struct rds_conn_path *cp)
11581163
static inline bool rds_cond_queue_reconnect_work(struct rds_conn_path *cp, unsigned long delay)
11591164
{
11601165
if (!test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) {
1161-
rds_queue_delayed_work(cp, cp->cp_wq, &cp->cp_conn_w, delay,
1162-
"reconnect work");
1166+
rds_queue_delayed_work(cp, cp->cp_wq, &cp->cp_up_or_down_w,
1167+
delay, "reconnect work");
11631168
return true;
1164-
} else
1169+
} else {
11651170
return false;
1171+
}
11661172
}
11671173

11681174
static inline void
@@ -1460,8 +1466,7 @@ int rds_threads_init(void);
14601466
void rds_threads_exit(void);
14611467
extern struct workqueue_struct *rds_wq;
14621468
void rds_queue_reconnect(struct rds_conn_path *cp, bool immediate);
1463-
void rds_connect_worker(struct work_struct *);
1464-
void rds_shutdown_worker(struct work_struct *);
1469+
void rds_up_or_down_worker(struct work_struct *);
14651470
void rds_send_worker(struct work_struct *);
14661471
void rds_recv_worker(struct work_struct *);
14671472
void rds_hb_worker(struct work_struct *);

net/rds/rds_single_path.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
#define c_reconnect_jiffies c_path[0].cp_reconnect_jiffies
2222
#define c_send_w c_path[0].cp_send_w
2323
#define c_recv_w c_path[0].cp_recv_w
24-
#define c_conn_w c_path[0].cp_conn_w
25-
#define c_down_w c_path[0].cp_down_w
24+
#define c_up_or_down_w c_path[0].cp_up_or_down_w
2625
#define c_down_wait_w c_path[0].cp_down_wait_w
2726
#define c_cm_lock c_path[0].cp_cm_lock
2827
#define c_waitq c_path[0].cp_waitq

net/rds/threads.c

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,25 @@ void rds_queue_delayed_work_on(struct rds_conn_path *cp,
144144
}
145145
EXPORT_SYMBOL_GPL(rds_queue_delayed_work_on);
146146

147+
void rds_mod_delayed_work(struct rds_conn_path *cp,
148+
struct workqueue_struct *wq,
149+
struct delayed_work *dwork,
150+
unsigned long delay,
151+
char *reason)
152+
{
153+
int cpu;
154+
155+
trace_rds_queue_work(cp ? cp->cp_conn : NULL, cp, wq, &dwork->work,
156+
delay, reason);
157+
158+
if (cp && cp->cp_conn->c_trans->conn_preferred_cpu) {
159+
cpu = cp->cp_conn->c_trans->conn_preferred_cpu(cp->cp_conn, false);
160+
mod_delayed_work_on(cpu, wq, dwork, delay);
161+
} else
162+
mod_delayed_work(wq, dwork, delay);
163+
}
164+
EXPORT_SYMBOL_GPL(rds_mod_delayed_work);
165+
147166
void rds_queue_cancel_work(struct rds_conn_path *cp,
148167
struct delayed_work *dwork, char *reason)
149168
{
@@ -236,11 +255,9 @@ void rds_queue_reconnect(struct rds_conn_path *cp, bool immediate)
236255
rds_sysctl_reconnect_max_jiffies);
237256
}
238257

239-
void rds_connect_worker(struct work_struct *work)
258+
static void rds_connect_worker(struct rds_conn_path *cp,
259+
struct work_struct *work)
240260
{
241-
struct rds_conn_path *cp = container_of(work,
242-
struct rds_conn_path,
243-
cp_conn_w.work);
244261
struct rds_connection *conn = cp->cp_conn;
245262
int ret;
246263
bool is_tcp = conn->c_trans->t_type == RDS_TRANS_TCP;
@@ -394,11 +411,9 @@ void rds_hb_worker(struct work_struct *work)
394411
}
395412
}
396413

397-
void rds_shutdown_worker(struct work_struct *work)
414+
static void rds_shutdown_worker(struct rds_conn_path *cp,
415+
struct work_struct *work)
398416
{
399-
struct rds_conn_path *cp = container_of(work,
400-
struct rds_conn_path,
401-
cp_down_w);
402417
time64_t now = ktime_get_real_seconds();
403418
bool is_tcp = cp->cp_conn->c_trans->t_type == RDS_TRANS_TCP;
404419
struct rds_connection *conn = cp->cp_conn;
@@ -420,6 +435,25 @@ void rds_shutdown_worker(struct work_struct *work)
420435
rds_conn_init_shutdown(cp);
421436
}
422437

438+
void rds_up_or_down_worker(struct work_struct *work)
439+
{
440+
struct rds_conn_path *cp = container_of(work,
441+
struct rds_conn_path,
442+
cp_up_or_down_w.work);
443+
444+
/* shutdown-work always takes precedence over reconnect-work */
445+
if (test_bit(RDS_SHUTDOWN_WORK_QUEUED, &cp->cp_flags)) {
446+
/* reconnect is always re-scheduled in
447+
* rds_conn_shutdown_final(), if necessary
448+
*/
449+
rds_clear_reconnect_pending_work_bit(cp);
450+
451+
rds_shutdown_worker(cp, work);
452+
} else if (test_bit(RDS_RECONNECT_PENDING, &cp->cp_flags)) {
453+
rds_connect_worker(cp, work);
454+
}
455+
}
456+
423457
void rds_threads_exit(void)
424458
{
425459
destroy_workqueue(rds_wq);

0 commit comments

Comments
 (0)