1 | // SPDX-License-Identifier: GPL-2.0 |
2 | /* |
3 | * Shared Memory Communications over RDMA (SMC-R) and RoCE |
4 | * |
5 | * Connection Data Control (CDC) |
6 | * handles flow control |
7 | * |
8 | * Copyright IBM Corp. 2016 |
9 | * |
10 | * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com> |
11 | */ |
12 | |
13 | #include <linux/spinlock.h> |
14 | |
15 | #include "smc.h" |
16 | #include "smc_wr.h" |
17 | #include "smc_cdc.h" |
18 | #include "smc_tx.h" |
19 | #include "smc_rx.h" |
20 | #include "smc_close.h" |
21 | |
22 | /********************************** send *************************************/ |
23 | |
24 | /* handler for send/transmission completion of a CDC msg */ |
25 | static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd, |
26 | struct smc_link *link, |
27 | enum ib_wc_status wc_status) |
28 | { |
29 | struct smc_cdc_tx_pend *cdcpend = (struct smc_cdc_tx_pend *)pnd_snd; |
30 | struct smc_connection *conn = cdcpend->conn; |
31 | struct smc_buf_desc *sndbuf_desc; |
32 | struct smc_sock *smc; |
33 | int diff; |
34 | |
35 | sndbuf_desc = conn->sndbuf_desc; |
36 | smc = container_of(conn, struct smc_sock, conn); |
37 | bh_lock_sock(&smc->sk); |
38 | if (!wc_status && sndbuf_desc) { |
39 | diff = smc_curs_diff(size: sndbuf_desc->len, |
40 | old: &cdcpend->conn->tx_curs_fin, |
41 | new: &cdcpend->cursor); |
42 | /* sndbuf_space is decreased in smc_sendmsg */ |
43 | smp_mb__before_atomic(); |
44 | atomic_add(i: diff, v: &cdcpend->conn->sndbuf_space); |
45 | /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */ |
46 | smp_mb__after_atomic(); |
47 | smc_curs_copy(tgt: &conn->tx_curs_fin, src: &cdcpend->cursor, conn); |
48 | smc_curs_copy(tgt: &conn->local_tx_ctrl_fin, src: &cdcpend->p_cursor, |
49 | conn); |
50 | conn->tx_cdc_seq_fin = cdcpend->ctrl_seq; |
51 | } |
52 | |
53 | if (atomic_dec_and_test(v: &conn->cdc_pend_tx_wr)) { |
54 | /* If user owns the sock_lock, mark the connection need sending. |
55 | * User context will later try to send when it release sock_lock |
56 | * in smc_release_cb() |
57 | */ |
58 | if (sock_owned_by_user(sk: &smc->sk)) |
59 | conn->tx_in_release_sock = true; |
60 | else |
61 | smc_tx_pending(conn); |
62 | |
63 | if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq))) |
64 | wake_up(&conn->cdc_pend_tx_wq); |
65 | } |
66 | WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0); |
67 | |
68 | smc_tx_sndbuf_nonfull(smc); |
69 | bh_unlock_sock(&smc->sk); |
70 | } |
71 | |
72 | int smc_cdc_get_free_slot(struct smc_connection *conn, |
73 | struct smc_link *link, |
74 | struct smc_wr_buf **wr_buf, |
75 | struct smc_rdma_wr **wr_rdma_buf, |
76 | struct smc_cdc_tx_pend **pend) |
77 | { |
78 | int rc; |
79 | |
80 | rc = smc_wr_tx_get_free_slot(link, handler: smc_cdc_tx_handler, wr_buf, |
81 | wrs: wr_rdma_buf, |
82 | wr_pend_priv: (struct smc_wr_tx_pend_priv **)pend); |
83 | if (conn->killed) { |
84 | /* abnormal termination */ |
85 | if (!rc) |
86 | smc_wr_tx_put_slot(link, |
87 | wr_pend_priv: (struct smc_wr_tx_pend_priv *)(*pend)); |
88 | rc = -EPIPE; |
89 | } |
90 | return rc; |
91 | } |
92 | |
93 | static inline void smc_cdc_add_pending_send(struct smc_connection *conn, |
94 | struct smc_cdc_tx_pend *pend) |
95 | { |
96 | BUILD_BUG_ON_MSG( |
97 | sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE, |
98 | "must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)" ); |
99 | BUILD_BUG_ON_MSG( |
100 | offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE, |
101 | "must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()" ); |
102 | BUILD_BUG_ON_MSG( |
103 | sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE, |
104 | "must increase SMC_WR_TX_PEND_PRIV_SIZE to at least sizeof(struct smc_cdc_tx_pend)" ); |
105 | pend->conn = conn; |
106 | pend->cursor = conn->tx_curs_sent; |
107 | pend->p_cursor = conn->local_tx_ctrl.prod; |
108 | pend->ctrl_seq = conn->tx_cdc_seq; |
109 | } |
110 | |
111 | int smc_cdc_msg_send(struct smc_connection *conn, |
112 | struct smc_wr_buf *wr_buf, |
113 | struct smc_cdc_tx_pend *pend) |
114 | { |
115 | struct smc_link *link = conn->lnk; |
116 | union smc_host_cursor cfed; |
117 | int rc; |
118 | |
119 | smc_cdc_add_pending_send(conn, pend); |
120 | |
121 | conn->tx_cdc_seq++; |
122 | conn->local_tx_ctrl.seqno = conn->tx_cdc_seq; |
123 | smc_host_msg_to_cdc(peer: (struct smc_cdc_msg *)wr_buf, conn, save: &cfed); |
124 | |
125 | atomic_inc(v: &conn->cdc_pend_tx_wr); |
126 | smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */ |
127 | |
128 | rc = smc_wr_tx_send(link, wr_pend_priv: (struct smc_wr_tx_pend_priv *)pend); |
129 | if (!rc) { |
130 | smc_curs_copy(tgt: &conn->rx_curs_confirmed, src: &cfed, conn); |
131 | conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0; |
132 | } else { |
133 | conn->tx_cdc_seq--; |
134 | conn->local_tx_ctrl.seqno = conn->tx_cdc_seq; |
135 | atomic_dec(v: &conn->cdc_pend_tx_wr); |
136 | } |
137 | |
138 | return rc; |
139 | } |
140 | |
141 | /* send a validation msg indicating the move of a conn to an other QP link */ |
142 | int smcr_cdc_msg_send_validation(struct smc_connection *conn, |
143 | struct smc_cdc_tx_pend *pend, |
144 | struct smc_wr_buf *wr_buf) |
145 | { |
146 | struct smc_host_cdc_msg *local = &conn->local_tx_ctrl; |
147 | struct smc_link *link = conn->lnk; |
148 | struct smc_cdc_msg *peer; |
149 | int rc; |
150 | |
151 | peer = (struct smc_cdc_msg *)wr_buf; |
152 | peer->common.type = local->common.type; |
153 | peer->len = local->len; |
154 | peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */ |
155 | peer->token = htonl(local->token); |
156 | peer->prod_flags.failover_validation = 1; |
157 | |
158 | /* We need to set pend->conn here to make sure smc_cdc_tx_handler() |
159 | * can handle properly |
160 | */ |
161 | smc_cdc_add_pending_send(conn, pend); |
162 | |
163 | atomic_inc(v: &conn->cdc_pend_tx_wr); |
164 | smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */ |
165 | |
166 | rc = smc_wr_tx_send(link, wr_pend_priv: (struct smc_wr_tx_pend_priv *)pend); |
167 | if (unlikely(rc)) |
168 | atomic_dec(v: &conn->cdc_pend_tx_wr); |
169 | |
170 | return rc; |
171 | } |
172 | |
173 | static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn) |
174 | { |
175 | struct smc_cdc_tx_pend *pend; |
176 | struct smc_wr_buf *wr_buf; |
177 | struct smc_link *link; |
178 | bool again = false; |
179 | int rc; |
180 | |
181 | again: |
182 | link = conn->lnk; |
183 | if (!smc_wr_tx_link_hold(link)) |
184 | return -ENOLINK; |
185 | rc = smc_cdc_get_free_slot(conn, link, wr_buf: &wr_buf, NULL, pend: &pend); |
186 | if (rc) |
187 | goto put_out; |
188 | |
189 | spin_lock_bh(lock: &conn->send_lock); |
190 | if (link != conn->lnk) { |
191 | /* link of connection changed, try again one time*/ |
192 | spin_unlock_bh(lock: &conn->send_lock); |
193 | smc_wr_tx_put_slot(link, |
194 | wr_pend_priv: (struct smc_wr_tx_pend_priv *)pend); |
195 | smc_wr_tx_link_put(link); |
196 | if (again) |
197 | return -ENOLINK; |
198 | again = true; |
199 | goto again; |
200 | } |
201 | rc = smc_cdc_msg_send(conn, wr_buf, pend); |
202 | spin_unlock_bh(lock: &conn->send_lock); |
203 | put_out: |
204 | smc_wr_tx_link_put(link); |
205 | return rc; |
206 | } |
207 | |
208 | int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn) |
209 | { |
210 | int rc; |
211 | |
212 | if (!smc_conn_lgr_valid(conn) || |
213 | (conn->lgr->is_smcd && conn->lgr->peer_shutdown)) |
214 | return -EPIPE; |
215 | |
216 | if (conn->lgr->is_smcd) { |
217 | spin_lock_bh(lock: &conn->send_lock); |
218 | rc = smcd_cdc_msg_send(conn); |
219 | spin_unlock_bh(lock: &conn->send_lock); |
220 | } else { |
221 | rc = smcr_cdc_get_slot_and_msg_send(conn); |
222 | } |
223 | |
224 | return rc; |
225 | } |
226 | |
227 | void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn) |
228 | { |
229 | wait_event(conn->cdc_pend_tx_wq, !atomic_read(&conn->cdc_pend_tx_wr)); |
230 | } |
231 | |
232 | /* Send a SMC-D CDC header. |
233 | * This increments the free space available in our send buffer. |
234 | * Also update the confirmed receive buffer with what was sent to the peer. |
235 | */ |
236 | int smcd_cdc_msg_send(struct smc_connection *conn) |
237 | { |
238 | struct smc_sock *smc = container_of(conn, struct smc_sock, conn); |
239 | union smc_host_cursor curs; |
240 | struct smcd_cdc_msg cdc; |
241 | int rc, diff; |
242 | |
243 | memset(&cdc, 0, sizeof(cdc)); |
244 | cdc.common.type = SMC_CDC_MSG_TYPE; |
245 | curs.acurs.counter = atomic64_read(v: &conn->local_tx_ctrl.prod.acurs); |
246 | cdc.prod.wrap = curs.wrap; |
247 | cdc.prod.count = curs.count; |
248 | curs.acurs.counter = atomic64_read(v: &conn->local_tx_ctrl.cons.acurs); |
249 | cdc.cons.wrap = curs.wrap; |
250 | cdc.cons.count = curs.count; |
251 | cdc.cons.prod_flags = conn->local_tx_ctrl.prod_flags; |
252 | cdc.cons.conn_state_flags = conn->local_tx_ctrl.conn_state_flags; |
253 | rc = smcd_tx_ism_write(conn, data: &cdc, len: sizeof(cdc), offset: 0, signal: 1); |
254 | if (rc) |
255 | return rc; |
256 | smc_curs_copy(tgt: &conn->rx_curs_confirmed, src: &curs, conn); |
257 | conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0; |
258 | /* Calculate transmitted data and increment free send buffer space */ |
259 | diff = smc_curs_diff(size: conn->sndbuf_desc->len, old: &conn->tx_curs_fin, |
260 | new: &conn->tx_curs_sent); |
261 | /* increased by confirmed number of bytes */ |
262 | smp_mb__before_atomic(); |
263 | atomic_add(i: diff, v: &conn->sndbuf_space); |
264 | /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */ |
265 | smp_mb__after_atomic(); |
266 | smc_curs_copy(tgt: &conn->tx_curs_fin, src: &conn->tx_curs_sent, conn); |
267 | |
268 | smc_tx_sndbuf_nonfull(smc); |
269 | return rc; |
270 | } |
271 | |
272 | /********************************* receive ***********************************/ |
273 | |
274 | static inline bool smc_cdc_before(u16 seq1, u16 seq2) |
275 | { |
276 | return (s16)(seq1 - seq2) < 0; |
277 | } |
278 | |
279 | static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc, |
280 | int *diff_prod) |
281 | { |
282 | struct smc_connection *conn = &smc->conn; |
283 | char *base; |
284 | |
285 | /* new data included urgent business */ |
286 | smc_curs_copy(tgt: &conn->urg_curs, src: &conn->local_rx_ctrl.prod, conn); |
287 | conn->urg_state = SMC_URG_VALID; |
288 | if (!sock_flag(sk: &smc->sk, flag: SOCK_URGINLINE)) |
289 | /* we'll skip the urgent byte, so don't account for it */ |
290 | (*diff_prod)--; |
291 | base = (char *)conn->rmb_desc->cpu_addr + conn->rx_off; |
292 | if (conn->urg_curs.count) |
293 | conn->urg_rx_byte = *(base + conn->urg_curs.count - 1); |
294 | else |
295 | conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1); |
296 | sk_send_sigurg(sk: &smc->sk); |
297 | } |
298 | |
299 | static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc, |
300 | struct smc_link *link) |
301 | { |
302 | struct smc_connection *conn = &smc->conn; |
303 | u16 recv_seq = ntohs(cdc->seqno); |
304 | s16 diff; |
305 | |
306 | /* check that seqnum was seen before */ |
307 | diff = conn->local_rx_ctrl.seqno - recv_seq; |
308 | if (diff < 0) { /* diff larger than 0x7fff */ |
309 | /* drop connection */ |
310 | conn->out_of_sync = 1; /* prevent any further receives */ |
311 | spin_lock_bh(lock: &conn->send_lock); |
312 | conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1; |
313 | conn->lnk = link; |
314 | spin_unlock_bh(lock: &conn->send_lock); |
315 | sock_hold(sk: &smc->sk); /* sock_put in abort_work */ |
316 | if (!queue_work(wq: smc_close_wq, work: &conn->abort_work)) |
317 | sock_put(sk: &smc->sk); |
318 | } |
319 | } |
320 | |
321 | static void smc_cdc_msg_recv_action(struct smc_sock *smc, |
322 | struct smc_cdc_msg *cdc) |
323 | { |
324 | union smc_host_cursor cons_old, prod_old; |
325 | struct smc_connection *conn = &smc->conn; |
326 | int diff_cons, diff_prod; |
327 | |
328 | smc_curs_copy(tgt: &prod_old, src: &conn->local_rx_ctrl.prod, conn); |
329 | smc_curs_copy(tgt: &cons_old, src: &conn->local_rx_ctrl.cons, conn); |
330 | smc_cdc_msg_to_host(local: &conn->local_rx_ctrl, peer: cdc, conn); |
331 | |
332 | diff_cons = smc_curs_diff(size: conn->peer_rmbe_size, old: &cons_old, |
333 | new: &conn->local_rx_ctrl.cons); |
334 | if (diff_cons) { |
335 | /* peer_rmbe_space is decreased during data transfer with RDMA |
336 | * write |
337 | */ |
338 | smp_mb__before_atomic(); |
339 | atomic_add(i: diff_cons, v: &conn->peer_rmbe_space); |
340 | /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */ |
341 | smp_mb__after_atomic(); |
342 | } |
343 | |
344 | diff_prod = smc_curs_diff(size: conn->rmb_desc->len, old: &prod_old, |
345 | new: &conn->local_rx_ctrl.prod); |
346 | if (diff_prod) { |
347 | if (conn->local_rx_ctrl.prod_flags.urg_data_present) |
348 | smc_cdc_handle_urg_data_arrival(smc, diff_prod: &diff_prod); |
349 | /* bytes_to_rcv is decreased in smc_recvmsg */ |
350 | smp_mb__before_atomic(); |
351 | atomic_add(i: diff_prod, v: &conn->bytes_to_rcv); |
352 | /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */ |
353 | smp_mb__after_atomic(); |
354 | smc->sk.sk_data_ready(&smc->sk); |
355 | } else { |
356 | if (conn->local_rx_ctrl.prod_flags.write_blocked) |
357 | smc->sk.sk_data_ready(&smc->sk); |
358 | if (conn->local_rx_ctrl.prod_flags.urg_data_pending) |
359 | conn->urg_state = SMC_URG_NOTYET; |
360 | } |
361 | |
362 | /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */ |
363 | if ((diff_cons && smc_tx_prepared_sends(conn)) || |
364 | conn->local_rx_ctrl.prod_flags.cons_curs_upd_req || |
365 | conn->local_rx_ctrl.prod_flags.urg_data_pending) { |
366 | if (!sock_owned_by_user(sk: &smc->sk)) |
367 | smc_tx_pending(conn); |
368 | else |
369 | conn->tx_in_release_sock = true; |
370 | } |
371 | |
372 | if (diff_cons && conn->urg_tx_pend && |
373 | atomic_read(v: &conn->peer_rmbe_space) == conn->peer_rmbe_size) { |
374 | /* urg data confirmed by peer, indicate we're ready for more */ |
375 | conn->urg_tx_pend = false; |
376 | smc->sk.sk_write_space(&smc->sk); |
377 | } |
378 | |
379 | if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) { |
380 | smc->sk.sk_err = ECONNRESET; |
381 | conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1; |
382 | } |
383 | if (smc_cdc_rxed_any_close_or_senddone(conn)) { |
384 | smc->sk.sk_shutdown |= RCV_SHUTDOWN; |
385 | if (smc->clcsock && smc->clcsock->sk) |
386 | smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN; |
387 | smc_sock_set_flag(sk: &smc->sk, flag: SOCK_DONE); |
388 | sock_hold(sk: &smc->sk); /* sock_put in close_work */ |
389 | if (!queue_work(wq: smc_close_wq, work: &conn->close_work)) |
390 | sock_put(sk: &smc->sk); |
391 | } |
392 | } |
393 | |
394 | /* called under tasklet context */ |
395 | static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc) |
396 | { |
397 | sock_hold(sk: &smc->sk); |
398 | bh_lock_sock(&smc->sk); |
399 | smc_cdc_msg_recv_action(smc, cdc); |
400 | bh_unlock_sock(&smc->sk); |
401 | sock_put(sk: &smc->sk); /* no free sk in softirq-context */ |
402 | } |
403 | |
404 | /* Schedule a tasklet for this connection. Triggered from the ISM device IRQ |
405 | * handler to indicate update in the DMBE. |
406 | * |
407 | * Context: |
408 | * - tasklet context |
409 | */ |
410 | static void smcd_cdc_rx_tsklet(struct tasklet_struct *t) |
411 | { |
412 | struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet); |
413 | struct smcd_cdc_msg *data_cdc; |
414 | struct smcd_cdc_msg cdc; |
415 | struct smc_sock *smc; |
416 | |
417 | if (!conn || conn->killed) |
418 | return; |
419 | |
420 | data_cdc = (struct smcd_cdc_msg *)conn->rmb_desc->cpu_addr; |
421 | smcd_curs_copy(tgt: &cdc.prod, src: &data_cdc->prod, conn); |
422 | smcd_curs_copy(tgt: &cdc.cons, src: &data_cdc->cons, conn); |
423 | smc = container_of(conn, struct smc_sock, conn); |
424 | smc_cdc_msg_recv(smc, cdc: (struct smc_cdc_msg *)&cdc); |
425 | } |
426 | |
427 | /* Initialize receive tasklet. Called from ISM device IRQ handler to start |
428 | * receiver side. |
429 | */ |
430 | void smcd_cdc_rx_init(struct smc_connection *conn) |
431 | { |
432 | tasklet_setup(t: &conn->rx_tsklet, callback: smcd_cdc_rx_tsklet); |
433 | } |
434 | |
435 | /***************************** init, exit, misc ******************************/ |
436 | |
437 | static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf) |
438 | { |
439 | struct smc_link *link = (struct smc_link *)wc->qp->qp_context; |
440 | struct smc_cdc_msg *cdc = buf; |
441 | struct smc_connection *conn; |
442 | struct smc_link_group *lgr; |
443 | struct smc_sock *smc; |
444 | |
445 | if (wc->byte_len < offsetof(struct smc_cdc_msg, reserved)) |
446 | return; /* short message */ |
447 | if (cdc->len != SMC_WR_TX_SIZE) |
448 | return; /* invalid message */ |
449 | |
450 | /* lookup connection */ |
451 | lgr = smc_get_lgr(link); |
452 | read_lock_bh(&lgr->conns_lock); |
453 | conn = smc_lgr_find_conn(ntohl(cdc->token), lgr); |
454 | read_unlock_bh(&lgr->conns_lock); |
455 | if (!conn || conn->out_of_sync) |
456 | return; |
457 | smc = container_of(conn, struct smc_sock, conn); |
458 | |
459 | if (cdc->prod_flags.failover_validation) { |
460 | smc_cdc_msg_validate(smc, cdc, link); |
461 | return; |
462 | } |
463 | if (smc_cdc_before(ntohs(cdc->seqno), |
464 | seq2: conn->local_rx_ctrl.seqno)) |
465 | /* received seqno is old */ |
466 | return; |
467 | |
468 | smc_cdc_msg_recv(smc, cdc); |
469 | } |
470 | |
471 | static struct smc_wr_rx_handler smc_cdc_rx_handlers[] = { |
472 | { |
473 | .handler = smc_cdc_rx_handler, |
474 | .type = SMC_CDC_MSG_TYPE |
475 | }, |
476 | { |
477 | .handler = NULL, |
478 | } |
479 | }; |
480 | |
481 | int __init smc_cdc_init(void) |
482 | { |
483 | struct smc_wr_rx_handler *handler; |
484 | int rc = 0; |
485 | |
486 | for (handler = smc_cdc_rx_handlers; handler->handler; handler++) { |
487 | INIT_HLIST_NODE(h: &handler->list); |
488 | rc = smc_wr_rx_register_handler(handler); |
489 | if (rc) |
490 | break; |
491 | } |
492 | return rc; |
493 | } |
494 | |