Whamcloud - gitweb
LU-12678 lnet: use init_wait() rather than init_waitqueue_entry()
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /*
2  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2011, 2017, Intel Corporation.
5  *
6  *   Author: Zach Brown <zab@zabbo.net>
7  *   Author: Peter J. Braam <braam@clusterfs.com>
8  *   Author: Phil Schwan <phil@clusterfs.com>
9  *   Author: Eric Barton <eric@bartonsoftware.com>
10  *
11  *   This file is part of Lustre, https://wiki.whamcloud.com/
12  *
13  *   Portals is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Portals is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Portals; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include <libcfs/linux/linux-mem.h>
28 #include "socklnd.h"
29 #include <linux/sunrpc/addr.h>
30
31 struct ksock_tx *
32 ksocknal_alloc_tx(int type, int size)
33 {
34         struct ksock_tx *tx = NULL;
35
36         if (type == KSOCK_MSG_NOOP) {
37                 LASSERT(size == KSOCK_NOOP_TX_SIZE);
38
39                 /* searching for a noop tx in free list */
40                 spin_lock(&ksocknal_data.ksnd_tx_lock);
41
42                 if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
43                         tx = list_entry(ksocknal_data.ksnd_idle_noop_txs.next,
44                                         struct ksock_tx, tx_list);
45                         LASSERT(tx->tx_desc_size == size);
46                         list_del(&tx->tx_list);
47                 }
48
49                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
50         }
51
52         if (tx == NULL)
53                 LIBCFS_ALLOC(tx, size);
54
55         if (tx == NULL)
56                 return NULL;
57
58         refcount_set(&tx->tx_refcount, 1);
59         tx->tx_zc_aborted = 0;
60         tx->tx_zc_capable = 0;
61         tx->tx_zc_checked = 0;
62         tx->tx_hstatus = LNET_MSG_STATUS_OK;
63         tx->tx_desc_size  = size;
64
65         atomic_inc(&ksocknal_data.ksnd_nactive_txs);
66
67         return tx;
68 }
69
70 struct ksock_tx *
71 ksocknal_alloc_tx_noop(__u64 cookie, int nonblk)
72 {
73         struct ksock_tx *tx;
74
75         tx = ksocknal_alloc_tx(KSOCK_MSG_NOOP, KSOCK_NOOP_TX_SIZE);
76         if (tx == NULL) {
77                 CERROR("Can't allocate noop tx desc\n");
78                 return NULL;
79         }
80
81         tx->tx_conn     = NULL;
82         tx->tx_lnetmsg  = NULL;
83         tx->tx_kiov     = NULL;
84         tx->tx_nkiov    = 0;
85         tx->tx_niov     = 1;
86         tx->tx_nonblk   = nonblk;
87
88         tx->tx_msg.ksm_csum = 0;
89         tx->tx_msg.ksm_type = KSOCK_MSG_NOOP;
90         tx->tx_msg.ksm_zc_cookies[0] = 0;
91         tx->tx_msg.ksm_zc_cookies[1] = cookie;
92
93         return tx;
94 }
95
96
97 void
98 ksocknal_free_tx(struct ksock_tx *tx)
99 {
100         atomic_dec(&ksocknal_data.ksnd_nactive_txs);
101
102         if (tx->tx_lnetmsg == NULL && tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) {
103                 /* it's a noop tx */
104                 spin_lock(&ksocknal_data.ksnd_tx_lock);
105
106                 list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs);
107
108                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
109         } else {
110                 LIBCFS_FREE(tx, tx->tx_desc_size);
111         }
112 }
113
114 static int
115 ksocknal_send_hdr(struct ksock_conn *conn, struct ksock_tx *tx,
116                   struct kvec *scratch_iov)
117 {
118         struct kvec *iov = &tx->tx_hdr;
119         int    nob;
120         int    rc;
121
122         LASSERT(tx->tx_niov > 0);
123
124         /* Never touch tx->tx_hdr inside ksocknal_lib_send_hdr() */
125         rc = ksocknal_lib_send_hdr(conn, tx, scratch_iov);
126
127         if (rc <= 0)                            /* sent nothing? */
128                 return rc;
129
130         nob = rc;
131         LASSERT(nob <= tx->tx_resid);
132         tx->tx_resid -= nob;
133
134         /* "consume" iov */
135         LASSERT(tx->tx_niov == 1);
136
137         if (nob < (int) iov->iov_len) {
138                 iov->iov_base += nob;
139                 iov->iov_len -= nob;
140                 return rc;
141         }
142
143         LASSERT(nob == iov->iov_len);
144         tx->tx_niov--;
145
146         return rc;
147 }
148
149 static int
150 ksocknal_send_kiov(struct ksock_conn *conn, struct ksock_tx *tx,
151                    struct kvec *scratch_iov)
152 {
153         struct bio_vec *kiov = tx->tx_kiov;
154         int nob;
155         int rc;
156
157         LASSERT(tx->tx_niov == 0);
158         LASSERT(tx->tx_nkiov > 0);
159
160         /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */
161         rc = ksocknal_lib_send_kiov(conn, tx, scratch_iov);
162
163         if (rc <= 0)                            /* sent nothing? */
164                 return rc;
165
166         nob = rc;
167         LASSERT(nob <= tx->tx_resid);
168         tx->tx_resid -= nob;
169
170         /* "consume" kiov */
171         do {
172                 LASSERT(tx->tx_nkiov > 0);
173
174                 if (nob < (int)kiov->bv_len) {
175                         kiov->bv_offset += nob;
176                         kiov->bv_len -= nob;
177                         return rc;
178                 }
179
180                 nob -= (int)kiov->bv_len;
181                 tx->tx_kiov = ++kiov;
182                 tx->tx_nkiov--;
183         } while (nob != 0);
184
185         return rc;
186 }
187
188 static int
189 ksocknal_transmit(struct ksock_conn *conn, struct ksock_tx *tx,
190                   struct kvec *scratch_iov)
191 {
192         int     rc;
193         int     bufnob;
194
195         if (ksocknal_data.ksnd_stall_tx != 0)
196                 schedule_timeout_uninterruptible(
197                         cfs_time_seconds(ksocknal_data.ksnd_stall_tx));
198
199         LASSERT(tx->tx_resid != 0);
200
201         rc = ksocknal_connsock_addref(conn);
202         if (rc != 0) {
203                 LASSERT(conn->ksnc_closing);
204                 return -ESHUTDOWN;
205         }
206
207         do {
208                 if (ksocknal_data.ksnd_enomem_tx > 0) {
209                         /* testing... */
210                         ksocknal_data.ksnd_enomem_tx--;
211                         rc = -EAGAIN;
212                 } else if (tx->tx_niov != 0) {
213                         rc = ksocknal_send_hdr(conn, tx, scratch_iov);
214                 } else {
215                         rc = ksocknal_send_kiov(conn, tx, scratch_iov);
216                 }
217
218                 bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
219                 if (rc > 0)                     /* sent something? */
220                         conn->ksnc_tx_bufnob += rc; /* account it */
221
222                 if (bufnob < conn->ksnc_tx_bufnob) {
223                         /* allocated send buffer bytes < computed; infer
224                          * something got ACKed */
225                         conn->ksnc_tx_deadline = ktime_get_seconds() +
226                                                  ksocknal_timeout();
227                         conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds();
228                         conn->ksnc_tx_bufnob = bufnob;
229                         smp_mb();
230                 }
231
232                 if (rc <= 0) { /* Didn't write anything? */
233                         /* some stacks return 0 instead of -EAGAIN */
234                         if (rc == 0)
235                                 rc = -EAGAIN;
236
237                         /* Check if EAGAIN is due to memory pressure */
238                         if (rc == -EAGAIN && ksocknal_lib_memory_pressure(conn))
239                                 rc = -ENOMEM;
240
241                         break;
242                 }
243
244                 /* socket's wmem_queued now includes 'rc' bytes */
245                 atomic_sub (rc, &conn->ksnc_tx_nob);
246                 rc = 0;
247
248         } while (tx->tx_resid != 0);
249
250         ksocknal_connsock_decref(conn);
251         return rc;
252 }
253
254 static int
255 ksocknal_recv_iov(struct ksock_conn *conn, struct kvec *scratchiov)
256 {
257         struct kvec *iov = conn->ksnc_rx_iov;
258         int     nob;
259         int     rc;
260
261         LASSERT(conn->ksnc_rx_niov > 0);
262
263         /* Never touch conn->ksnc_rx_iov or change connection
264          * status inside ksocknal_lib_recv_iov */
265         rc = ksocknal_lib_recv_iov(conn, scratchiov);
266
267         if (rc <= 0)
268                 return rc;
269
270         /* received something... */
271         nob = rc;
272
273         conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds();
274         conn->ksnc_rx_deadline = ktime_get_seconds() +
275                                  ksocknal_timeout();
276         smp_mb();                       /* order with setting rx_started */
277         conn->ksnc_rx_started = 1;
278
279         conn->ksnc_rx_nob_wanted -= nob;
280         conn->ksnc_rx_nob_left -= nob;
281
282         do {
283                 LASSERT(conn->ksnc_rx_niov > 0);
284
285                 if (nob < (int)iov->iov_len) {
286                         iov->iov_len -= nob;
287                         iov->iov_base += nob;
288                         return -EAGAIN;
289                 }
290
291                 nob -= iov->iov_len;
292                 conn->ksnc_rx_iov = ++iov;
293                 conn->ksnc_rx_niov--;
294         } while (nob != 0);
295
296         return rc;
297 }
298
299 static int
300 ksocknal_recv_kiov(struct ksock_conn *conn, struct page **rx_scratch_pgs,
301                    struct kvec *scratch_iov)
302 {
303         struct bio_vec *kiov = conn->ksnc_rx_kiov;
304         int nob;
305         int rc;
306         LASSERT(conn->ksnc_rx_nkiov > 0);
307
308         /* Never touch conn->ksnc_rx_kiov or change connection
309          * status inside ksocknal_lib_recv_iov */
310         rc = ksocknal_lib_recv_kiov(conn, rx_scratch_pgs, scratch_iov);
311
312         if (rc <= 0)
313                 return rc;
314
315         /* received something... */
316         nob = rc;
317
318         conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds();
319         conn->ksnc_rx_deadline = ktime_get_seconds() +
320                                  ksocknal_timeout();
321         smp_mb();                       /* order with setting rx_started */
322         conn->ksnc_rx_started = 1;
323
324         conn->ksnc_rx_nob_wanted -= nob;
325         conn->ksnc_rx_nob_left -= nob;
326
327         do {
328                 LASSERT(conn->ksnc_rx_nkiov > 0);
329
330                 if (nob < (int) kiov->bv_len) {
331                         kiov->bv_offset += nob;
332                         kiov->bv_len -= nob;
333                         return -EAGAIN;
334                 }
335
336                 nob -= kiov->bv_len;
337                 conn->ksnc_rx_kiov = ++kiov;
338                 conn->ksnc_rx_nkiov--;
339         } while (nob != 0);
340
341         return 1;
342 }
343
344 static int
345 ksocknal_receive(struct ksock_conn *conn, struct page **rx_scratch_pgs,
346                  struct kvec *scratch_iov)
347 {
348         /* Return 1 on success, 0 on EOF, < 0 on error.
349          * Caller checks ksnc_rx_nob_wanted to determine
350          * progress/completion. */
351         int     rc;
352         ENTRY;
353
354         if (ksocknal_data.ksnd_stall_rx != 0)
355                 schedule_timeout_uninterruptible(
356                         cfs_time_seconds(ksocknal_data.ksnd_stall_rx));
357
358         rc = ksocknal_connsock_addref(conn);
359         if (rc != 0) {
360                 LASSERT(conn->ksnc_closing);
361                 return -ESHUTDOWN;
362         }
363
364         for (;;) {
365                 if (conn->ksnc_rx_niov != 0)
366                         rc = ksocknal_recv_iov(conn, scratch_iov);
367                 else
368                         rc = ksocknal_recv_kiov(conn, rx_scratch_pgs,
369                                                  scratch_iov);
370
371                 if (rc <= 0) {
372                         /* error/EOF or partial receive */
373                         if (rc == -EAGAIN) {
374                                 rc = 1;
375                         } else if (rc == 0 && conn->ksnc_rx_started) {
376                                 /* EOF in the middle of a message */
377                                 rc = -EPROTO;
378                         }
379                         break;
380                 }
381
382                 /* Completed a fragment */
383
384                 if (conn->ksnc_rx_nob_wanted == 0) {
385                         rc = 1;
386                         break;
387                 }
388         }
389
390         ksocknal_connsock_decref(conn);
391         RETURN(rc);
392 }
393
394 void
395 ksocknal_tx_done(struct lnet_ni *ni, struct ksock_tx *tx, int rc)
396 {
397         struct lnet_msg *lnetmsg = tx->tx_lnetmsg;
398         enum lnet_msg_hstatus hstatus = tx->tx_hstatus;
399         ENTRY;
400
401         LASSERT(ni != NULL || tx->tx_conn != NULL);
402
403         if (!rc && (tx->tx_resid != 0 || tx->tx_zc_aborted)) {
404                 rc = -EIO;
405                 if (hstatus == LNET_MSG_STATUS_OK)
406                         hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
407         }
408
409         if (tx->tx_conn != NULL)
410                 ksocknal_conn_decref(tx->tx_conn);
411
412         ksocknal_free_tx(tx);
413         if (lnetmsg != NULL) { /* KSOCK_MSG_NOOP go without lnetmsg */
414                 lnetmsg->msg_health_status = hstatus;
415                 lnet_finalize(lnetmsg, rc);
416         }
417
418         EXIT;
419 }
420
421 void
422 ksocknal_txlist_done(struct lnet_ni *ni, struct list_head *txlist, int error)
423 {
424         struct ksock_tx *tx;
425
426         while (!list_empty(txlist)) {
427                 tx = list_entry(txlist->next, struct ksock_tx, tx_list);
428
429                 if (error && tx->tx_lnetmsg != NULL) {
430                         CNETERR("Deleting packet type %d len %d %s->%s\n",
431                                 le32_to_cpu(tx->tx_lnetmsg->msg_hdr.type),
432                                 le32_to_cpu(tx->tx_lnetmsg->msg_hdr.payload_length),
433                                 libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
434                                 libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.dest_nid)));
435                 } else if (error) {
436                         CNETERR("Deleting noop packet\n");
437                 }
438
439                 list_del(&tx->tx_list);
440
441                 if (tx->tx_hstatus == LNET_MSG_STATUS_OK) {
442                         if (error == -ETIMEDOUT)
443                                 tx->tx_hstatus =
444                                   LNET_MSG_STATUS_LOCAL_TIMEOUT;
445                         else if (error == -ENETDOWN ||
446                                  error == -EHOSTUNREACH ||
447                                  error == -ENETUNREACH ||
448                                  error == -ECONNREFUSED ||
449                                  error == -ECONNRESET)
450                                 tx->tx_hstatus = LNET_MSG_STATUS_REMOTE_DROPPED;
451                         /*
452                          * for all other errors we don't want to
453                          * retransmit
454                          */
455                         else if (error)
456                                 tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
457                 }
458
459                 LASSERT(refcount_read(&tx->tx_refcount) == 1);
460                 ksocknal_tx_done(ni, tx, error);
461         }
462 }
463
464 static void
465 ksocknal_check_zc_req(struct ksock_tx *tx)
466 {
467         struct ksock_conn *conn = tx->tx_conn;
468         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
469
470         /* Set tx_msg.ksm_zc_cookies[0] to a unique non-zero cookie and add tx
471          * to ksnp_zc_req_list if some fragment of this message should be sent
472          * zero-copy.  Our peer_ni will send an ACK containing this cookie when
473          * she has received this message to tell us we can signal completion.
474          * tx_msg.ksm_zc_cookies[0] remains non-zero while tx is on
475          * ksnp_zc_req_list. */
476         LASSERT (tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
477         LASSERT (tx->tx_zc_capable);
478
479         tx->tx_zc_checked = 1;
480
481         if (conn->ksnc_proto == &ksocknal_protocol_v1x ||
482             !conn->ksnc_zc_capable)
483                 return;
484
485         /* assign cookie and queue tx to pending list, it will be released when
486          * a matching ack is received. See ksocknal_handle_zcack() */
487
488         ksocknal_tx_addref(tx);
489
490         spin_lock(&peer_ni->ksnp_lock);
491
492         /* ZC_REQ is going to be pinned to the peer_ni */
493         tx->tx_deadline = ktime_get_seconds() +
494                           ksocknal_timeout();
495
496         LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0);
497
498         tx->tx_msg.ksm_zc_cookies[0] = peer_ni->ksnp_zc_next_cookie++;
499
500         if (peer_ni->ksnp_zc_next_cookie == 0)
501                 peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
502
503         list_add_tail(&tx->tx_zc_list, &peer_ni->ksnp_zc_req_list);
504
505         spin_unlock(&peer_ni->ksnp_lock);
506 }
507
508 static void
509 ksocknal_uncheck_zc_req(struct ksock_tx *tx)
510 {
511         struct ksock_peer_ni *peer_ni = tx->tx_conn->ksnc_peer;
512
513         LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
514         LASSERT(tx->tx_zc_capable);
515
516         tx->tx_zc_checked = 0;
517
518         spin_lock(&peer_ni->ksnp_lock);
519
520         if (tx->tx_msg.ksm_zc_cookies[0] == 0) {
521                 /* Not waiting for an ACK */
522                 spin_unlock(&peer_ni->ksnp_lock);
523                 return;
524         }
525
526         tx->tx_msg.ksm_zc_cookies[0] = 0;
527         list_del(&tx->tx_zc_list);
528
529         spin_unlock(&peer_ni->ksnp_lock);
530
531         ksocknal_tx_decref(tx);
532 }
533
534 static int
535 ksocknal_process_transmit(struct ksock_conn *conn, struct ksock_tx *tx,
536                           struct kvec *scratch_iov)
537 {
538         int rc;
539         bool error_sim = false;
540
541         if (lnet_send_error_simulation(tx->tx_lnetmsg, &tx->tx_hstatus)) {
542                 error_sim = true;
543                 rc = -EINVAL;
544                 goto simulate_error;
545         }
546
547         if (tx->tx_zc_capable && !tx->tx_zc_checked)
548                 ksocknal_check_zc_req(tx);
549
550         rc = ksocknal_transmit(conn, tx, scratch_iov);
551
552         CDEBUG(D_NET, "send(%d) %d\n", tx->tx_resid, rc);
553
554         if (tx->tx_resid == 0) {
555                 /* Sent everything OK */
556                 LASSERT(rc == 0);
557
558                 return 0;
559         }
560
561         if (rc == -EAGAIN)
562                 return rc;
563
564         if (rc == -ENOMEM) {
565                 static int counter;
566
567                 counter++;   /* exponential backoff warnings */
568                 if ((counter & (-counter)) == counter)
569                         CWARN("%u ENOMEM tx %p (%lld allocated)\n",
570                               counter, conn, libcfs_kmem_read());
571
572                 /* Queue on ksnd_enomem_conns for retry after a timeout */
573                 spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
574
575                 /* enomem list takes over scheduler's ref... */
576                 LASSERT(conn->ksnc_tx_scheduled);
577                 list_add_tail(&conn->ksnc_tx_list,
578                                   &ksocknal_data.ksnd_enomem_conns);
579                 if (ktime_get_seconds() + SOCKNAL_ENOMEM_RETRY <
580                     ksocknal_data.ksnd_reaper_waketime)
581                         wake_up(&ksocknal_data.ksnd_reaper_waitq);
582
583                 spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
584
585                 /*
586                  * set the health status of the message which determines
587                  * whether we should retry the transmit
588                  */
589                 tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
590                 return (rc);
591         }
592
593 simulate_error:
594
595         /* Actual error */
596         LASSERT(rc < 0);
597
598         if (!error_sim) {
599                 /*
600                 * set the health status of the message which determines
601                 * whether we should retry the transmit
602                 */
603                 if (rc == -ETIMEDOUT)
604                         tx->tx_hstatus = LNET_MSG_STATUS_REMOTE_TIMEOUT;
605                 else
606                         tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
607         }
608
609         if (!conn->ksnc_closing) {
610                 switch (rc) {
611                 case -ECONNRESET:
612                         LCONSOLE_WARN("Host %pIS reset our connection while we were sending data; it may have rebooted.\n",
613                                       &conn->ksnc_peeraddr);
614                         break;
615                 default:
616                         LCONSOLE_WARN("There was an unexpected network error while writing to %pIS: %d.\n",
617                                       &conn->ksnc_peeraddr, rc);
618                         break;
619                 }
620                 CDEBUG(D_NET, "[%p] Error %d on write to %s ip %pISp\n",
621                        conn, rc, libcfs_id2str(conn->ksnc_peer->ksnp_id),
622                        &conn->ksnc_peeraddr);
623         }
624
625         if (tx->tx_zc_checked)
626                 ksocknal_uncheck_zc_req(tx);
627
628         /* it's not an error if conn is being closed */
629         ksocknal_close_conn_and_siblings(conn,
630                                           (conn->ksnc_closing) ? 0 : rc);
631
632         return rc;
633 }
634
635 static void
636 ksocknal_launch_connection_locked(struct ksock_route *route)
637 {
638
639         /* called holding write lock on ksnd_global_lock */
640
641         LASSERT (!route->ksnr_scheduled);
642         LASSERT (!route->ksnr_connecting);
643         LASSERT ((ksocknal_route_mask() & ~route->ksnr_connected) != 0);
644
645         route->ksnr_scheduled = 1;              /* scheduling conn for connd */
646         ksocknal_route_addref(route);           /* extra ref for connd */
647
648         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
649
650         list_add_tail(&route->ksnr_connd_list,
651                           &ksocknal_data.ksnd_connd_routes);
652         wake_up(&ksocknal_data.ksnd_connd_waitq);
653
654         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
655 }
656
657 void
658 ksocknal_launch_all_connections_locked(struct ksock_peer_ni *peer_ni)
659 {
660         struct ksock_route *route;
661
662         /* called holding write lock on ksnd_global_lock */
663         for (;;) {
664                 /* launch any/all connections that need it */
665                 route = ksocknal_find_connectable_route_locked(peer_ni);
666                 if (route == NULL)
667                         return;
668
669                 ksocknal_launch_connection_locked(route);
670         }
671 }
672
673 struct ksock_conn *
674 ksocknal_find_conn_locked(struct ksock_peer_ni *peer_ni, struct ksock_tx *tx, int nonblk)
675 {
676         struct list_head *tmp;
677         struct ksock_conn *conn;
678         struct ksock_conn *typed = NULL;
679         struct ksock_conn *fallback = NULL;
680         int tnob = 0;
681         int fnob = 0;
682
683         list_for_each(tmp, &peer_ni->ksnp_conns) {
684                 struct ksock_conn *c = list_entry(tmp, struct ksock_conn,
685                                                   ksnc_list);
686                 int nob = atomic_read(&c->ksnc_tx_nob) +
687                           c->ksnc_sock->sk->sk_wmem_queued;
688                 int rc;
689
690                 LASSERT (!c->ksnc_closing);
691                 LASSERT (c->ksnc_proto != NULL &&
692                          c->ksnc_proto->pro_match_tx != NULL);
693
694                 rc = c->ksnc_proto->pro_match_tx(c, tx, nonblk);
695
696                 switch (rc) {
697                 default:
698                         LBUG();
699                 case SOCKNAL_MATCH_NO: /* protocol rejected the tx */
700                         continue;
701
702                 case SOCKNAL_MATCH_YES: /* typed connection */
703                         if (typed == NULL || tnob > nob ||
704                             (tnob == nob && *ksocknal_tunables.ksnd_round_robin &&
705                              typed->ksnc_tx_last_post > c->ksnc_tx_last_post)) {
706                                 typed = c;
707                                 tnob  = nob;
708                         }
709                         break;
710
711                 case SOCKNAL_MATCH_MAY: /* fallback connection */
712                         if (fallback == NULL || fnob > nob ||
713                             (fnob == nob && *ksocknal_tunables.ksnd_round_robin &&
714                              fallback->ksnc_tx_last_post > c->ksnc_tx_last_post)) {
715                                 fallback = c;
716                                 fnob     = nob;
717                         }
718                         break;
719                 }
720         }
721
722         /* prefer the typed selection */
723         conn = (typed != NULL) ? typed : fallback;
724
725         if (conn != NULL)
726                 conn->ksnc_tx_last_post = ktime_get_seconds();
727
728         return conn;
729 }
730
731 void
732 ksocknal_tx_prep(struct ksock_conn *conn, struct ksock_tx *tx)
733 {
734         conn->ksnc_proto->pro_pack(tx);
735
736         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
737         ksocknal_conn_addref(conn); /* +1 ref for tx */
738         tx->tx_conn = conn;
739 }
740
741 void
742 ksocknal_queue_tx_locked(struct ksock_tx *tx, struct ksock_conn *conn)
743 {
744         struct ksock_sched *sched = conn->ksnc_scheduler;
745         struct ksock_msg *msg = &tx->tx_msg;
746         struct ksock_tx *ztx = NULL;
747         int bufnob = 0;
748
749         /* called holding global lock (read or irq-write) and caller may
750          * not have dropped this lock between finding conn and calling me,
751          * so we don't need the {get,put}connsock dance to deref
752          * ksnc_sock... */
753         LASSERT(!conn->ksnc_closing);
754
755         CDEBUG(D_NET, "Sending to %s ip %pISp\n",
756                libcfs_id2str(conn->ksnc_peer->ksnp_id),
757                &conn->ksnc_peeraddr);
758
759         ksocknal_tx_prep(conn, tx);
760
761         /* Ensure the frags we've been given EXACTLY match the number of
762          * bytes we want to send.  Many TCP/IP stacks disregard any total
763          * size parameters passed to them and just look at the frags.
764          *
765          * We always expect at least 1 mapped fragment containing the
766          * complete ksocknal message header.
767          */
768         LASSERT(lnet_iov_nob(tx->tx_niov, &tx->tx_hdr) +
769                 lnet_kiov_nob(tx->tx_nkiov, tx->tx_kiov) ==
770                 (unsigned int)tx->tx_nob);
771         LASSERT(tx->tx_niov >= 1);
772         LASSERT(tx->tx_resid == tx->tx_nob);
773
774         CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n",
775                 tx, (tx->tx_lnetmsg != NULL) ? tx->tx_lnetmsg->msg_hdr.type:
776                                                KSOCK_MSG_NOOP,
777                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
778
779         bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
780         spin_lock_bh(&sched->kss_lock);
781
782         if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) {
783                 /* First packet starts the timeout */
784                 conn->ksnc_tx_deadline = ktime_get_seconds() +
785                                          ksocknal_timeout();
786                 if (conn->ksnc_tx_bufnob > 0) /* something got ACKed */
787                         conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds();
788                 conn->ksnc_tx_bufnob = 0;
789                 smp_mb(); /* order with adding to tx_queue */
790         }
791
792         if (msg->ksm_type == KSOCK_MSG_NOOP) {
793                 /* The packet is noop ZC ACK, try to piggyback the ack_cookie
794                  * on a normal packet so I don't need to send it */
795                 LASSERT (msg->ksm_zc_cookies[1] != 0);
796                 LASSERT (conn->ksnc_proto->pro_queue_tx_zcack != NULL);
797
798                 if (conn->ksnc_proto->pro_queue_tx_zcack(conn, tx, 0))
799                         ztx = tx; /* ZC ACK piggybacked on ztx release tx later */
800
801         } else {
802                 /* It's a normal packet - can it piggback a noop zc-ack that
803                  * has been queued already? */
804                 LASSERT (msg->ksm_zc_cookies[1] == 0);
805                 LASSERT (conn->ksnc_proto->pro_queue_tx_msg != NULL);
806
807                 ztx = conn->ksnc_proto->pro_queue_tx_msg(conn, tx);
808                 /* ztx will be released later */
809         }
810
811         if (ztx != NULL) {
812                 atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob);
813                 list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs);
814         }
815
816         if (conn->ksnc_tx_ready &&      /* able to send */
817             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
818                 /* +1 ref for scheduler */
819                 ksocknal_conn_addref(conn);
820                 list_add_tail(&conn->ksnc_tx_list,
821                                    &sched->kss_tx_conns);
822                 conn->ksnc_tx_scheduled = 1;
823                 wake_up(&sched->kss_waitq);
824         }
825
826         spin_unlock_bh(&sched->kss_lock);
827 }
828
829
830 struct ksock_route *
831 ksocknal_find_connectable_route_locked(struct ksock_peer_ni *peer_ni)
832 {
833         time64_t now = ktime_get_seconds();
834         struct list_head *tmp;
835         struct ksock_route *route;
836
837         list_for_each(tmp, &peer_ni->ksnp_routes) {
838                 route = list_entry(tmp, struct ksock_route, ksnr_list);
839
840                 LASSERT (!route->ksnr_connecting || route->ksnr_scheduled);
841
842                 if (route->ksnr_scheduled)      /* connections being established */
843                         continue;
844
845                 /* all route types connected ? */
846                 if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0)
847                         continue;
848
849                 if (!(route->ksnr_retry_interval == 0 || /* first attempt */
850                       now >= route->ksnr_timeout)) {
851                         CDEBUG(D_NET,
852                                "Too soon to retry route %pIS (cnted %d, interval %lld, %lld secs later)\n",
853                                &route->ksnr_addr,
854                                route->ksnr_connected,
855                                route->ksnr_retry_interval,
856                                route->ksnr_timeout - now);
857                         continue;
858                 }
859
860                 return (route);
861         }
862
863         return (NULL);
864 }
865
866 struct ksock_route *
867 ksocknal_find_connecting_route_locked(struct ksock_peer_ni *peer_ni)
868 {
869         struct list_head *tmp;
870         struct ksock_route *route;
871
872         list_for_each(tmp, &peer_ni->ksnp_routes) {
873                 route = list_entry(tmp, struct ksock_route, ksnr_list);
874
875                 LASSERT (!route->ksnr_connecting || route->ksnr_scheduled);
876
877                 if (route->ksnr_scheduled)
878                         return (route);
879         }
880
881         return (NULL);
882 }
883
884 int
885 ksocknal_launch_packet(struct lnet_ni *ni, struct ksock_tx *tx,
886                        struct lnet_process_id id)
887 {
888         struct ksock_peer_ni *peer_ni;
889         struct ksock_conn *conn;
890         rwlock_t *g_lock;
891         int retry;
892         int rc;
893
894         LASSERT (tx->tx_conn == NULL);
895
896         g_lock = &ksocknal_data.ksnd_global_lock;
897
898         for (retry = 0;; retry = 1) {
899                 read_lock(g_lock);
900                 peer_ni = ksocknal_find_peer_locked(ni, id);
901                 if (peer_ni != NULL) {
902                         if (ksocknal_find_connectable_route_locked(peer_ni) == NULL) {
903                                 conn = ksocknal_find_conn_locked(peer_ni, tx, tx->tx_nonblk);
904                                 if (conn != NULL) {
905                                         /* I've got no routes that need to be
906                                          * connecting and I do have an actual
907                                          * connection... */
908                                         ksocknal_queue_tx_locked (tx, conn);
909                                         read_unlock(g_lock);
910                                         return (0);
911                                 }
912                         }
913                 }
914
915                 /* I'll need a write lock... */
916                 read_unlock(g_lock);
917
918                 write_lock_bh(g_lock);
919
920                 peer_ni = ksocknal_find_peer_locked(ni, id);
921                 if (peer_ni != NULL)
922                         break;
923
924                 write_unlock_bh(g_lock);
925
926                 if ((id.pid & LNET_PID_USERFLAG) != 0) {
927                         CERROR("Refusing to create a connection to "
928                                "userspace process %s\n", libcfs_id2str(id));
929                         return -EHOSTUNREACH;
930                 }
931
932                 if (retry) {
933                         CERROR("Can't find peer_ni %s\n", libcfs_id2str(id));
934                         return -EHOSTUNREACH;
935                 }
936
937                 rc = ksocknal_add_peer(ni, id,
938                                        LNET_NIDADDR(id.nid),
939                                        lnet_acceptor_port());
940                 if (rc != 0) {
941                         CERROR("Can't add peer_ni %s: %d\n",
942                                libcfs_id2str(id), rc);
943                         return rc;
944                 }
945         }
946
947         ksocknal_launch_all_connections_locked(peer_ni);
948
949         conn = ksocknal_find_conn_locked(peer_ni, tx, tx->tx_nonblk);
950         if (conn != NULL) {
951                 /* Connection exists; queue message on it */
952                 ksocknal_queue_tx_locked (tx, conn);
953                 write_unlock_bh(g_lock);
954                 return (0);
955         }
956
957         if (peer_ni->ksnp_accepting > 0 ||
958             ksocknal_find_connecting_route_locked (peer_ni) != NULL) {
959                 /* the message is going to be pinned to the peer_ni */
960                 tx->tx_deadline = ktime_get_seconds() +
961                                   ksocknal_timeout();
962
963                 /* Queue the message until a connection is established */
964                 list_add_tail(&tx->tx_list, &peer_ni->ksnp_tx_queue);
965                 write_unlock_bh(g_lock);
966                 return 0;
967         }
968
969         write_unlock_bh(g_lock);
970
971         /* NB Routes may be ignored if connections to them failed recently */
972         CNETERR("No usable routes to %s\n", libcfs_id2str(id));
973         tx->tx_hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
974         return (-EHOSTUNREACH);
975 }
976
977 int
978 ksocknal_send(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg)
979 {
980         /* '1' for consistency with code that checks !mpflag to restore */
981         unsigned int mpflag = 1;
982         int type = lntmsg->msg_type;
983         struct lnet_process_id target = lntmsg->msg_target;
984         unsigned int payload_niov = lntmsg->msg_niov;
985         struct bio_vec *payload_kiov = lntmsg->msg_kiov;
986         unsigned int payload_offset = lntmsg->msg_offset;
987         unsigned int payload_nob = lntmsg->msg_len;
988         struct ksock_tx *tx;
989         int desc_size;
990         int rc;
991
992         /* NB 'private' is different depending on what we're sending.
993          * Just ignore it... */
994
995         CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
996                payload_nob, payload_niov, libcfs_id2str(target));
997
998         LASSERT (payload_nob == 0 || payload_niov > 0);
999         LASSERT (payload_niov <= LNET_MAX_IOV);
1000         LASSERT (!in_interrupt ());
1001
1002         desc_size = offsetof(struct ksock_tx,
1003                              tx_payload[payload_niov]);
1004
1005         if (lntmsg->msg_vmflush)
1006                 mpflag = memalloc_noreclaim_save();
1007
1008         tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size);
1009         if (tx == NULL) {
1010                 CERROR("Can't allocate tx desc type %d size %d\n",
1011                        type, desc_size);
1012                 if (lntmsg->msg_vmflush)
1013                         memalloc_noreclaim_restore(mpflag);
1014                 return -ENOMEM;
1015         }
1016
1017         tx->tx_conn = NULL;                     /* set when assigned a conn */
1018         tx->tx_lnetmsg = lntmsg;
1019
1020         tx->tx_niov = 1;
1021         tx->tx_kiov = tx->tx_payload;
1022         tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov,
1023                                          payload_niov, payload_kiov,
1024                                          payload_offset, payload_nob);
1025
1026         if (payload_nob >= *ksocknal_tunables.ksnd_zc_min_payload)
1027                 tx->tx_zc_capable = 1;
1028
1029         tx->tx_msg.ksm_csum = 0;
1030         tx->tx_msg.ksm_type = KSOCK_MSG_LNET;
1031         tx->tx_msg.ksm_zc_cookies[0] = 0;
1032         tx->tx_msg.ksm_zc_cookies[1] = 0;
1033
1034         /* The first fragment will be set later in pro_pack */
1035         rc = ksocknal_launch_packet(ni, tx, target);
1036         /*
1037          * We can't test lntsmg->msg_vmflush again as lntmsg may
1038          * have been freed.
1039          */
1040         if (!mpflag)
1041                 memalloc_noreclaim_restore(mpflag);
1042
1043         if (rc == 0)
1044                 return (0);
1045
1046         lntmsg->msg_health_status = tx->tx_hstatus;
1047         ksocknal_free_tx(tx);
1048         return (-EIO);
1049 }
1050
1051 int
1052 ksocknal_thread_start(int (*fn)(void *arg), void *arg, char *name)
1053 {
1054         struct task_struct *task = kthread_run(fn, arg, "%s", name);
1055
1056         if (IS_ERR(task))
1057                 return PTR_ERR(task);
1058
1059         atomic_inc(&ksocknal_data.ksnd_nthreads);
1060         return 0;
1061 }
1062
1063 void
1064 ksocknal_thread_fini (void)
1065 {
1066         if (atomic_dec_and_test(&ksocknal_data.ksnd_nthreads))
1067                 wake_up_var(&ksocknal_data.ksnd_nthreads);
1068 }
1069
1070 int
1071 ksocknal_new_packet(struct ksock_conn *conn, int nob_to_skip)
1072 {
1073         static char ksocknal_slop_buffer[4096];
1074         int nob;
1075         unsigned int niov;
1076         int skipped;
1077
1078         LASSERT(conn->ksnc_proto != NULL);
1079
1080         if ((*ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0) {
1081                 /* Remind the socket to ack eagerly... */
1082                 ksocknal_lib_eager_ack(conn);
1083         }
1084
1085         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1086                 conn->ksnc_rx_started = 0;
1087                 smp_mb();                       /* racing with timeout thread */
1088
1089                 switch (conn->ksnc_proto->pro_version) {
1090                 case  KSOCK_PROTO_V2:
1091                 case  KSOCK_PROTO_V3:
1092                         conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER;
1093                         conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
1094                         conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg;
1095
1096                         conn->ksnc_rx_nob_wanted = offsetof(struct ksock_msg, ksm_u);
1097                         conn->ksnc_rx_nob_left = offsetof(struct ksock_msg, ksm_u);
1098                         conn->ksnc_rx_iov[0].iov_len  = offsetof(struct ksock_msg, ksm_u);
1099                         break;
1100
1101                 case KSOCK_PROTO_V1:
1102                         /* Receiving bare struct lnet_hdr */
1103                         conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
1104                         conn->ksnc_rx_nob_wanted = sizeof(struct lnet_hdr);
1105                         conn->ksnc_rx_nob_left = sizeof(struct lnet_hdr);
1106
1107                         conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
1108                         conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg;
1109                         conn->ksnc_rx_iov[0].iov_len = sizeof(struct lnet_hdr);
1110                         break;
1111
1112                 default:
1113                         LBUG ();
1114                 }
1115                 conn->ksnc_rx_niov = 1;
1116
1117                 conn->ksnc_rx_kiov = NULL;
1118                 conn->ksnc_rx_nkiov = 0;
1119                 conn->ksnc_rx_csum = ~0;
1120                 return (1);
1121         }
1122
1123         /* Set up to skip as much as possible now.  If there's more left
1124          * (ran out of iov entries) we'll get called again */
1125
1126         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1127         conn->ksnc_rx_nob_left = nob_to_skip;
1128         conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
1129         skipped = 0;
1130         niov = 0;
1131
1132         do {
1133                 nob = min_t(int, nob_to_skip, sizeof(ksocknal_slop_buffer));
1134
1135                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1136                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1137                 niov++;
1138                 skipped += nob;
1139                 nob_to_skip -= nob;
1140
1141         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1142                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof(struct kvec));
1143
1144         conn->ksnc_rx_niov = niov;
1145         conn->ksnc_rx_kiov = NULL;
1146         conn->ksnc_rx_nkiov = 0;
1147         conn->ksnc_rx_nob_wanted = skipped;
1148         return (0);
1149 }
1150
1151 static int
1152 ksocknal_process_receive(struct ksock_conn *conn,
1153                          struct page **rx_scratch_pgs,
1154                          struct kvec *scratch_iov)
1155 {
1156         struct lnet_hdr *lhdr;
1157         struct lnet_process_id *id;
1158         int rc;
1159
1160         LASSERT(refcount_read(&conn->ksnc_conn_refcount) > 0);
1161
1162         /* NB: sched lock NOT held */
1163         /* SOCKNAL_RX_LNET_HEADER is here for backward compatibility */
1164         LASSERT(conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER ||
1165                 conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD ||
1166                 conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER ||
1167                 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1168  again:
1169         if (conn->ksnc_rx_nob_wanted != 0) {
1170                 rc = ksocknal_receive(conn, rx_scratch_pgs,
1171                                       scratch_iov);
1172
1173                 if (rc <= 0) {
1174                         struct lnet_process_id ksnp_id;
1175
1176                         ksnp_id = conn->ksnc_peer->ksnp_id;
1177
1178                         LASSERT(rc != -EAGAIN);
1179                         if (rc == 0)
1180                                 CDEBUG(D_NET, "[%p] EOF from %s ip %pISp\n",
1181                                        conn, libcfs_id2str(ksnp_id),
1182                                        &conn->ksnc_peeraddr);
1183                         else if (!conn->ksnc_closing)
1184                                 CERROR("[%p] Error %d on read from %s ip %pISp\n",
1185                                        conn, rc, libcfs_id2str(ksnp_id),
1186                                        &conn->ksnc_peeraddr);
1187
1188                         /* it's not an error if conn is being closed */
1189                         ksocknal_close_conn_and_siblings (conn,
1190                                                           (conn->ksnc_closing) ? 0 : rc);
1191                         return (rc == 0 ? -ESHUTDOWN : rc);
1192                 }
1193
1194                 if (conn->ksnc_rx_nob_wanted != 0) {
1195                         /* short read */
1196                         return (-EAGAIN);
1197                 }
1198         }
1199         switch (conn->ksnc_rx_state) {
1200         case SOCKNAL_RX_KSM_HEADER:
1201                 if (conn->ksnc_flip) {
1202                         __swab32s(&conn->ksnc_msg.ksm_type);
1203                         __swab32s(&conn->ksnc_msg.ksm_csum);
1204                         __swab64s(&conn->ksnc_msg.ksm_zc_cookies[0]);
1205                         __swab64s(&conn->ksnc_msg.ksm_zc_cookies[1]);
1206                 }
1207
1208                 if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP &&
1209                     conn->ksnc_msg.ksm_type != KSOCK_MSG_LNET) {
1210                         CERROR("%s: Unknown message type: %x\n",
1211                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1212                                conn->ksnc_msg.ksm_type);
1213                         ksocknal_new_packet(conn, 0);
1214                         ksocknal_close_conn_and_siblings(conn, -EPROTO);
1215                         return (-EPROTO);
1216                 }
1217
1218                 if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP &&
1219                     conn->ksnc_msg.ksm_csum != 0 &&     /* has checksum */
1220                     conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
1221                         /* NOOP Checksum error */
1222                         CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
1223                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1224                                conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
1225                         ksocknal_new_packet(conn, 0);
1226                         ksocknal_close_conn_and_siblings(conn, -EPROTO);
1227                         return (-EIO);
1228                 }
1229
1230                 if (conn->ksnc_msg.ksm_zc_cookies[1] != 0) {
1231                         __u64 cookie = 0;
1232
1233                         LASSERT (conn->ksnc_proto != &ksocknal_protocol_v1x);
1234
1235                         if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP)
1236                                 cookie = conn->ksnc_msg.ksm_zc_cookies[0];
1237
1238                         rc = conn->ksnc_proto->pro_handle_zcack(conn, cookie,
1239                                                conn->ksnc_msg.ksm_zc_cookies[1]);
1240
1241                         if (rc != 0) {
1242                                 CERROR("%s: Unknown ZC-ACK cookie: %llu, %llu\n",
1243                                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1244                                        cookie, conn->ksnc_msg.ksm_zc_cookies[1]);
1245                                 ksocknal_new_packet(conn, 0);
1246                                 ksocknal_close_conn_and_siblings(conn, -EPROTO);
1247                                 return (rc);
1248                         }
1249                 }
1250
1251                 if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) {
1252                         ksocknal_new_packet (conn, 0);
1253                         return 0;       /* NOOP is done and just return */
1254                 }
1255
1256                 conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
1257                 conn->ksnc_rx_nob_wanted = sizeof(struct ksock_lnet_msg);
1258                 conn->ksnc_rx_nob_left = sizeof(struct ksock_lnet_msg);
1259
1260                 conn->ksnc_rx_iov = (struct kvec *)&conn->ksnc_rx_iov_space;
1261                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg;
1262                 conn->ksnc_rx_iov[0].iov_len  = sizeof(struct ksock_lnet_msg);
1263
1264                 conn->ksnc_rx_niov = 1;
1265                 conn->ksnc_rx_kiov = NULL;
1266                 conn->ksnc_rx_nkiov = 0;
1267
1268                 goto again;     /* read lnet header now */
1269
1270         case SOCKNAL_RX_LNET_HEADER:
1271                 /* unpack message header */
1272                 conn->ksnc_proto->pro_unpack(&conn->ksnc_msg);
1273
1274                 if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) {
1275                         /* Userspace peer_ni */
1276                         lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
1277                         id   = &conn->ksnc_peer->ksnp_id;
1278
1279                         /* Substitute process ID assigned at connection time */
1280                         lhdr->src_pid = cpu_to_le32(id->pid);
1281                         lhdr->src_nid = cpu_to_le64(id->nid);
1282                 }
1283
1284                 conn->ksnc_rx_state = SOCKNAL_RX_PARSE;
1285                 ksocknal_conn_addref(conn);     /* ++ref while parsing */
1286
1287                 rc = lnet_parse(conn->ksnc_peer->ksnp_ni,
1288                                 &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr,
1289                                 conn->ksnc_peer->ksnp_id.nid, conn, 0);
1290                 if (rc < 0) {
1291                         /* I just received garbage: give up on this conn */
1292                         ksocknal_new_packet(conn, 0);
1293                         ksocknal_close_conn_and_siblings (conn, rc);
1294                         ksocknal_conn_decref(conn);
1295                         return (-EPROTO);
1296                 }
1297
1298                 /* I'm racing with ksocknal_recv() */
1299                 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_PARSE ||
1300                          conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD);
1301
1302                 if (conn->ksnc_rx_state != SOCKNAL_RX_LNET_PAYLOAD)
1303                         return 0;
1304
1305                 /* ksocknal_recv() got called */
1306                 goto again;
1307
1308         case SOCKNAL_RX_LNET_PAYLOAD:
1309                 /* payload all received */
1310                 rc = 0;
1311
1312                 if (conn->ksnc_rx_nob_left == 0 &&   /* not truncating */
1313                     conn->ksnc_msg.ksm_csum != 0 &&  /* has checksum */
1314                     conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
1315                         CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
1316                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1317                                conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
1318                         rc = -EIO;
1319                 }
1320
1321                 if (rc == 0 && conn->ksnc_msg.ksm_zc_cookies[0] != 0) {
1322                         LASSERT(conn->ksnc_proto != &ksocknal_protocol_v1x);
1323
1324                         lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
1325                         id   = &conn->ksnc_peer->ksnp_id;
1326
1327                         rc = conn->ksnc_proto->pro_handle_zcreq(conn,
1328                                         conn->ksnc_msg.ksm_zc_cookies[0],
1329                                         *ksocknal_tunables.ksnd_nonblk_zcack ||
1330                                         le64_to_cpu(lhdr->src_nid) != id->nid);
1331                 }
1332
1333                 if (rc && conn->ksnc_lnet_msg)
1334                         conn->ksnc_lnet_msg->msg_health_status =
1335                                 LNET_MSG_STATUS_REMOTE_ERROR;
1336                 lnet_finalize(conn->ksnc_lnet_msg, rc);
1337
1338                 if (rc != 0) {
1339                         ksocknal_new_packet(conn, 0);
1340                         ksocknal_close_conn_and_siblings (conn, rc);
1341                         return (-EPROTO);
1342                 }
1343                 /* Fall through */
1344
1345         case SOCKNAL_RX_SLOP:
1346                 /* starting new packet? */
1347                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1348                         return 0;       /* come back later */
1349                 goto again;             /* try to finish reading slop now */
1350
1351         default:
1352                 break;
1353         }
1354
1355         /* Not Reached */
1356         LBUG ();
1357         return (-EINVAL);                       /* keep gcc happy */
1358 }
1359
1360 int
1361 ksocknal_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
1362               int delayed, unsigned int niov,
1363               struct bio_vec *kiov, unsigned int offset, unsigned int mlen,
1364               unsigned int rlen)
1365 {
1366         struct ksock_conn *conn = private;
1367         struct ksock_sched *sched = conn->ksnc_scheduler;
1368
1369         LASSERT (mlen <= rlen);
1370         LASSERT (niov <= LNET_MAX_IOV);
1371
1372         conn->ksnc_lnet_msg = msg;
1373         conn->ksnc_rx_nob_wanted = mlen;
1374         conn->ksnc_rx_nob_left   = rlen;
1375
1376         if (mlen == 0) {
1377                 conn->ksnc_rx_nkiov = 0;
1378                 conn->ksnc_rx_kiov = NULL;
1379                 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1380                 conn->ksnc_rx_niov = 0;
1381         } else {
1382                 conn->ksnc_rx_niov = 0;
1383                 conn->ksnc_rx_iov  = NULL;
1384                 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1385                 conn->ksnc_rx_nkiov =
1386                         lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov,
1387                                           niov, kiov, offset, mlen);
1388         }
1389
1390         LASSERT (mlen ==
1391                  lnet_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1392                  lnet_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1393
1394         LASSERT (conn->ksnc_rx_scheduled);
1395
1396         spin_lock_bh(&sched->kss_lock);
1397
1398         switch (conn->ksnc_rx_state) {
1399         case SOCKNAL_RX_PARSE_WAIT:
1400                 list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns);
1401                 wake_up(&sched->kss_waitq);
1402                 LASSERT(conn->ksnc_rx_ready);
1403                 break;
1404
1405         case SOCKNAL_RX_PARSE:
1406                 /* scheduler hasn't noticed I'm parsing yet */
1407                 break;
1408         }
1409
1410         conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD;
1411
1412         spin_unlock_bh(&sched->kss_lock);
1413         ksocknal_conn_decref(conn);
1414         return 0;
1415 }
1416
1417 static inline int
1418 ksocknal_sched_cansleep(struct ksock_sched *sched)
1419 {
1420         int           rc;
1421
1422         spin_lock_bh(&sched->kss_lock);
1423
1424         rc = (!ksocknal_data.ksnd_shuttingdown &&
1425               list_empty(&sched->kss_rx_conns) &&
1426               list_empty(&sched->kss_tx_conns));
1427
1428         spin_unlock_bh(&sched->kss_lock);
1429         return rc;
1430 }
1431
1432 int ksocknal_scheduler(void *arg)
1433 {
1434         struct ksock_sched *sched;
1435         struct ksock_conn *conn;
1436         struct ksock_tx *tx;
1437         int rc;
1438         long id = (long)arg;
1439         struct page **rx_scratch_pgs;
1440         struct kvec *scratch_iov;
1441
1442         sched = ksocknal_data.ksnd_schedulers[KSOCK_THREAD_CPT(id)];
1443
1444         LIBCFS_CPT_ALLOC(rx_scratch_pgs, lnet_cpt_table(), sched->kss_cpt,
1445                          sizeof(*rx_scratch_pgs) * LNET_MAX_IOV);
1446         if (!rx_scratch_pgs) {
1447                 CERROR("Unable to allocate scratch pages\n");
1448                 return -ENOMEM;
1449         }
1450
1451         LIBCFS_CPT_ALLOC(scratch_iov, lnet_cpt_table(), sched->kss_cpt,
1452                          sizeof(*scratch_iov) * LNET_MAX_IOV);
1453         if (!scratch_iov) {
1454                 CERROR("Unable to allocate scratch iov\n");
1455                 return -ENOMEM;
1456         }
1457
1458         rc = cfs_cpt_bind(lnet_cpt_table(), sched->kss_cpt);
1459         if (rc != 0) {
1460                 CWARN("Can't set CPU partition affinity to %d: %d\n",
1461                         sched->kss_cpt, rc);
1462         }
1463
1464         spin_lock_bh(&sched->kss_lock);
1465
1466         while (!ksocknal_data.ksnd_shuttingdown) {
1467                 int did_something = 0;
1468
1469                 /* Ensure I progress everything semi-fairly */
1470
1471                 if (!list_empty(&sched->kss_rx_conns)) {
1472                         conn = list_entry(sched->kss_rx_conns.next,
1473                                           struct ksock_conn, ksnc_rx_list);
1474                         list_del(&conn->ksnc_rx_list);
1475
1476                         LASSERT(conn->ksnc_rx_scheduled);
1477                         LASSERT(conn->ksnc_rx_ready);
1478
1479                         /* clear rx_ready in case receive isn't complete.
1480                          * Do it BEFORE we call process_recv, since
1481                          * data_ready can set it any time after we release
1482                          * kss_lock. */
1483                         conn->ksnc_rx_ready = 0;
1484                         spin_unlock_bh(&sched->kss_lock);
1485
1486                         rc = ksocknal_process_receive(conn, rx_scratch_pgs,
1487                                                       scratch_iov);
1488
1489                         spin_lock_bh(&sched->kss_lock);
1490
1491                         /* I'm the only one that can clear this flag */
1492                         LASSERT(conn->ksnc_rx_scheduled);
1493
1494                         /* Did process_receive get everything it wanted? */
1495                         if (rc == 0)
1496                                 conn->ksnc_rx_ready = 1;
1497
1498                         if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) {
1499                                 /* Conn blocked waiting for ksocknal_recv()
1500                                  * I change its state (under lock) to signal
1501                                  * it can be rescheduled */
1502                                 conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT;
1503                         } else if (conn->ksnc_rx_ready) {
1504                                 /* reschedule for rx */
1505                                 list_add_tail(&conn->ksnc_rx_list,
1506                                                    &sched->kss_rx_conns);
1507                         } else {
1508                                 conn->ksnc_rx_scheduled = 0;
1509                                 /* drop my ref */
1510                                 ksocknal_conn_decref(conn);
1511                         }
1512
1513                         did_something = 1;
1514                 }
1515
1516                 if (!list_empty(&sched->kss_tx_conns)) {
1517                         LIST_HEAD(zlist);
1518
1519                         list_splice_init(&sched->kss_zombie_noop_txs, &zlist);
1520
1521                         conn = list_entry(sched->kss_tx_conns.next,
1522                                           struct ksock_conn, ksnc_tx_list);
1523                         list_del(&conn->ksnc_tx_list);
1524
1525                         LASSERT(conn->ksnc_tx_scheduled);
1526                         LASSERT(conn->ksnc_tx_ready);
1527                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1528
1529                         tx = list_entry(conn->ksnc_tx_queue.next,
1530                                         struct ksock_tx, tx_list);
1531
1532                         if (conn->ksnc_tx_carrier == tx)
1533                                 ksocknal_next_tx_carrier(conn);
1534
1535                         /* dequeue now so empty list => more to send */
1536                         list_del(&tx->tx_list);
1537
1538                         /* Clear tx_ready in case send isn't complete.  Do
1539                          * it BEFORE we call process_transmit, since
1540                          * write_space can set it any time after we release
1541                          * kss_lock. */
1542                         conn->ksnc_tx_ready = 0;
1543                         spin_unlock_bh(&sched->kss_lock);
1544
1545                         if (!list_empty(&zlist)) {
1546                                 /* free zombie noop txs, it's fast because
1547                                  * noop txs are just put in freelist */
1548                                 ksocknal_txlist_done(NULL, &zlist, 0);
1549                         }
1550
1551                         rc = ksocknal_process_transmit(conn, tx, scratch_iov);
1552
1553                         if (rc == -ENOMEM || rc == -EAGAIN) {
1554                                 /* Incomplete send: replace tx on HEAD of tx_queue */
1555                                 spin_lock_bh(&sched->kss_lock);
1556                                 list_add(&tx->tx_list,
1557                                          &conn->ksnc_tx_queue);
1558                         } else {
1559                                 /* Complete send; tx -ref */
1560                                 ksocknal_tx_decref(tx);
1561
1562                                 spin_lock_bh(&sched->kss_lock);
1563                                 /* assume space for more */
1564                                 conn->ksnc_tx_ready = 1;
1565                         }
1566
1567                         if (rc == -ENOMEM) {
1568                                 /* Do nothing; after a short timeout, this
1569                                  * conn will be reposted on kss_tx_conns. */
1570                         } else if (conn->ksnc_tx_ready &&
1571                                    !list_empty(&conn->ksnc_tx_queue)) {
1572                                 /* reschedule for tx */
1573                                 list_add_tail(&conn->ksnc_tx_list,
1574                                               &sched->kss_tx_conns);
1575                         } else {
1576                                 conn->ksnc_tx_scheduled = 0;
1577                                 /* drop my ref */
1578                                 ksocknal_conn_decref(conn);
1579                         }
1580
1581                         did_something = 1;
1582                 }
1583                 if (!did_something ||   /* nothing to do */
1584                     need_resched()) {   /* hogging CPU? */
1585                         spin_unlock_bh(&sched->kss_lock);
1586
1587                         if (!did_something) {   /* wait for something to do */
1588                                 rc = wait_event_interruptible_exclusive(
1589                                         sched->kss_waitq,
1590                                         !ksocknal_sched_cansleep(sched));
1591                                 LASSERT (rc == 0);
1592                         } else {
1593                                 cond_resched();
1594                         }
1595
1596                         spin_lock_bh(&sched->kss_lock);
1597                 }
1598         }
1599
1600         spin_unlock_bh(&sched->kss_lock);
1601         CFS_FREE_PTR_ARRAY(rx_scratch_pgs, LNET_MAX_IOV);
1602         CFS_FREE_PTR_ARRAY(scratch_iov, LNET_MAX_IOV);
1603         ksocknal_thread_fini();
1604         return 0;
1605 }
1606
1607 /*
1608  * Add connection to kss_rx_conns of scheduler
1609  * and wakeup the scheduler.
1610  */
1611 void ksocknal_read_callback(struct ksock_conn *conn)
1612 {
1613         struct ksock_sched *sched;
1614         ENTRY;
1615
1616         sched = conn->ksnc_scheduler;
1617
1618         spin_lock_bh(&sched->kss_lock);
1619
1620         conn->ksnc_rx_ready = 1;
1621
1622         if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1623                 list_add_tail(&conn->ksnc_rx_list,
1624                                   &sched->kss_rx_conns);
1625                 conn->ksnc_rx_scheduled = 1;
1626                 /* extra ref for scheduler */
1627                 ksocknal_conn_addref(conn);
1628
1629                 wake_up (&sched->kss_waitq);
1630         }
1631         spin_unlock_bh(&sched->kss_lock);
1632
1633         EXIT;
1634 }
1635
1636 /*
1637  * Add connection to kss_tx_conns of scheduler
1638  * and wakeup the scheduler.
1639  */
1640 void ksocknal_write_callback(struct ksock_conn *conn)
1641 {
1642         struct ksock_sched *sched;
1643         ENTRY;
1644
1645         sched = conn->ksnc_scheduler;
1646
1647         spin_lock_bh(&sched->kss_lock);
1648
1649         conn->ksnc_tx_ready = 1;
1650
1651         if (!conn->ksnc_tx_scheduled && /* not being progressed */
1652             !list_empty(&conn->ksnc_tx_queue)) { /* packets to send */
1653                 list_add_tail(&conn->ksnc_tx_list, &sched->kss_tx_conns);
1654                 conn->ksnc_tx_scheduled = 1;
1655                 /* extra ref for scheduler */
1656                 ksocknal_conn_addref(conn);
1657
1658                 wake_up(&sched->kss_waitq);
1659         }
1660
1661         spin_unlock_bh(&sched->kss_lock);
1662
1663         EXIT;
1664 }
1665
1666 static const struct ksock_proto *
1667 ksocknal_parse_proto_version(struct ksock_hello_msg *hello)
1668 {
1669         __u32   version = 0;
1670
1671         if (hello->kshm_magic == LNET_PROTO_MAGIC)
1672                 version = hello->kshm_version;
1673         else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
1674                 version = __swab32(hello->kshm_version);
1675
1676         if (version != 0) {
1677 #if SOCKNAL_VERSION_DEBUG
1678                 if (*ksocknal_tunables.ksnd_protocol == 1)
1679                         return NULL;
1680
1681                 if (*ksocknal_tunables.ksnd_protocol == 2 &&
1682                     version == KSOCK_PROTO_V3)
1683                         return NULL;
1684 #endif
1685                 if (version == KSOCK_PROTO_V2)
1686                         return &ksocknal_protocol_v2x;
1687
1688                 if (version == KSOCK_PROTO_V3)
1689                         return &ksocknal_protocol_v3x;
1690
1691                 return NULL;
1692         }
1693
1694         if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
1695                 struct lnet_magicversion *hmv;
1696
1697                 BUILD_BUG_ON(sizeof(struct lnet_magicversion) !=
1698                          offsetof(struct ksock_hello_msg, kshm_src_nid));
1699
1700                 hmv = (struct lnet_magicversion *)hello;
1701
1702                 if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) &&
1703                     hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR))
1704                         return &ksocknal_protocol_v1x;
1705         }
1706
1707         return NULL;
1708 }
1709
1710 int
1711 ksocknal_send_hello(struct lnet_ni *ni, struct ksock_conn *conn,
1712                     lnet_nid_t peer_nid, struct ksock_hello_msg *hello)
1713 {
1714         /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
1715         struct ksock_net *net = (struct ksock_net *)ni->ni_data;
1716
1717         LASSERT(hello->kshm_nips <= LNET_INTERFACES_NUM);
1718
1719         /* rely on caller to hold a ref on socket so it wouldn't disappear */
1720         LASSERT(conn->ksnc_proto != NULL);
1721
1722         hello->kshm_src_nid         = ni->ni_nid;
1723         hello->kshm_dst_nid         = peer_nid;
1724         hello->kshm_src_pid         = the_lnet.ln_pid;
1725
1726         hello->kshm_src_incarnation = net->ksnn_incarnation;
1727         hello->kshm_ctype           = conn->ksnc_type;
1728
1729         return conn->ksnc_proto->pro_send_hello(conn, hello);
1730 }
1731
1732 static int
1733 ksocknal_invert_type(int type)
1734 {
1735         switch (type)
1736         {
1737         case SOCKLND_CONN_ANY:
1738         case SOCKLND_CONN_CONTROL:
1739                 return (type);
1740         case SOCKLND_CONN_BULK_IN:
1741                 return SOCKLND_CONN_BULK_OUT;
1742         case SOCKLND_CONN_BULK_OUT:
1743                 return SOCKLND_CONN_BULK_IN;
1744         default:
1745                 return (SOCKLND_CONN_NONE);
1746         }
1747 }
1748
1749 int
1750 ksocknal_recv_hello(struct lnet_ni *ni, struct ksock_conn *conn,
1751                     struct ksock_hello_msg *hello,
1752                     struct lnet_process_id *peerid,
1753                     __u64 *incarnation)
1754 {
1755         /* Return < 0        fatal error
1756          *        0          success
1757          *        EALREADY   lost connection race
1758          *        EPROTO     protocol version mismatch
1759          */
1760         struct socket        *sock = conn->ksnc_sock;
1761         int                  active = (conn->ksnc_proto != NULL);
1762         int                  timeout;
1763         int                  proto_match;
1764         int                  rc;
1765         const struct ksock_proto *proto;
1766         struct lnet_process_id recv_id;
1767
1768         /* socket type set on active connections - not set on passive */
1769         LASSERT(!active == !(conn->ksnc_type != SOCKLND_CONN_NONE));
1770
1771         timeout = active ? ksocknal_timeout() :
1772                             lnet_acceptor_timeout();
1773
1774         rc = lnet_sock_read(sock, &hello->kshm_magic,
1775                             sizeof(hello->kshm_magic), timeout);
1776         if (rc != 0) {
1777                 CERROR("Error %d reading HELLO from %pIS\n",
1778                        rc, &conn->ksnc_peeraddr);
1779                 LASSERT(rc < 0);
1780                 return rc;
1781         }
1782
1783         if (hello->kshm_magic != LNET_PROTO_MAGIC &&
1784             hello->kshm_magic != __swab32(LNET_PROTO_MAGIC) &&
1785             hello->kshm_magic != le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
1786                 /* Unexpected magic! */
1787                 CERROR("Bad magic(1) %#08x (%#08x expected) from %pIS\n",
1788                        __cpu_to_le32 (hello->kshm_magic),
1789                        LNET_PROTO_TCP_MAGIC, &conn->ksnc_peeraddr);
1790                 return -EPROTO;
1791         }
1792
1793         rc = lnet_sock_read(sock, &hello->kshm_version,
1794                             sizeof(hello->kshm_version), timeout);
1795         if (rc != 0) {
1796                 CERROR("Error %d reading HELLO from %pIS\n",
1797                        rc, &conn->ksnc_peeraddr);
1798                 LASSERT(rc < 0);
1799                 return rc;
1800         }
1801
1802         proto = ksocknal_parse_proto_version(hello);
1803         if (proto == NULL) {
1804                 if (!active) {
1805                         /* unknown protocol from peer_ni, tell peer_ni my protocol */
1806                         conn->ksnc_proto = &ksocknal_protocol_v3x;
1807 #if SOCKNAL_VERSION_DEBUG
1808                         if (*ksocknal_tunables.ksnd_protocol == 2)
1809                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
1810                         else if (*ksocknal_tunables.ksnd_protocol == 1)
1811                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
1812 #endif
1813                         hello->kshm_nips = 0;
1814                         ksocknal_send_hello(ni, conn, ni->ni_nid, hello);
1815                 }
1816
1817                 CERROR("Unknown protocol version (%d.x expected) from %pIS\n",
1818                        conn->ksnc_proto->pro_version, &conn->ksnc_peeraddr);
1819
1820                 return -EPROTO;
1821         }
1822
1823         proto_match = (conn->ksnc_proto == proto);
1824         conn->ksnc_proto = proto;
1825
1826         /* receive the rest of hello message anyway */
1827         rc = conn->ksnc_proto->pro_recv_hello(conn, hello, timeout);
1828         if (rc != 0) {
1829                 CERROR("Error %d reading or checking hello from from %pIS\n",
1830                        rc, &conn->ksnc_peeraddr);
1831                 LASSERT (rc < 0);
1832                 return rc;
1833         }
1834
1835         *incarnation = hello->kshm_src_incarnation;
1836
1837         if (hello->kshm_src_nid == LNET_NID_ANY) {
1838                 CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY from %pIS\n",
1839                        &conn->ksnc_peeraddr);
1840                 return -EPROTO;
1841         }
1842
1843         if (!active &&
1844             rpc_get_port((struct sockaddr *)&conn->ksnc_peeraddr) >
1845             LNET_ACCEPTOR_MAX_RESERVED_PORT) {
1846                 /* Userspace NAL assigns peer_ni process ID from socket */
1847                 recv_id.pid = rpc_get_port((struct sockaddr *)
1848                                            &conn->ksnc_peeraddr) |
1849                         LNET_PID_USERFLAG;
1850                 LASSERT(conn->ksnc_peeraddr.ss_family == AF_INET);
1851                 recv_id.nid = LNET_MKNID(
1852                         LNET_NIDNET(ni->ni_nid),
1853                         ntohl(((struct sockaddr_in *)
1854                                &conn->ksnc_peeraddr)->sin_addr.s_addr));
1855         } else {
1856                 recv_id.nid = hello->kshm_src_nid;
1857                 recv_id.pid = hello->kshm_src_pid;
1858         }
1859
1860         if (!active) {
1861                 *peerid = recv_id;
1862
1863                 /* peer_ni determines type */
1864                 conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype);
1865                 if (conn->ksnc_type == SOCKLND_CONN_NONE) {
1866                         CERROR("Unexpected type %d from %s ip %pIS\n",
1867                                hello->kshm_ctype, libcfs_id2str(*peerid),
1868                                &conn->ksnc_peeraddr);
1869                         return -EPROTO;
1870                 }
1871                 return 0;
1872         }
1873
1874         if (peerid->pid != recv_id.pid ||
1875             peerid->nid != recv_id.nid) {
1876                 LCONSOLE_ERROR_MSG(0x130,
1877                                    "Connected successfully to %s on host %pIS, but they claimed they were %s; please check your Lustre configuration.\n",
1878                                    libcfs_id2str(*peerid),
1879                                    &conn->ksnc_peeraddr,
1880                                    libcfs_id2str(recv_id));
1881                 return -EPROTO;
1882         }
1883
1884         if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
1885                 /* Possible protocol mismatch or I lost the connection race */
1886                 return proto_match ? EALREADY : EPROTO;
1887         }
1888
1889         if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) {
1890                 CERROR("Mismatched types: me %d, %s ip %pIS %d\n",
1891                        conn->ksnc_type, libcfs_id2str(*peerid),
1892                        &conn->ksnc_peeraddr,
1893                        hello->kshm_ctype);
1894                 return -EPROTO;
1895         }
1896         return 0;
1897 }
1898
1899 static int
1900 ksocknal_connect(struct ksock_route *route)
1901 {
1902         LIST_HEAD(zombies);
1903         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
1904         int               type;
1905         int               wanted;
1906         struct socket     *sock;
1907         time64_t deadline;
1908         int               retry_later = 0;
1909         int               rc = 0;
1910
1911         deadline = ktime_get_seconds() + ksocknal_timeout();
1912
1913         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1914
1915         LASSERT (route->ksnr_scheduled);
1916         LASSERT (!route->ksnr_connecting);
1917
1918         route->ksnr_connecting = 1;
1919
1920         for (;;) {
1921                 wanted = ksocknal_route_mask() & ~route->ksnr_connected;
1922
1923                 /* stop connecting if peer_ni/route got closed under me, or
1924                  * route got connected while queued */
1925                 if (peer_ni->ksnp_closing || route->ksnr_deleted ||
1926                     wanted == 0) {
1927                         retry_later = 0;
1928                         break;
1929                 }
1930
1931                 /* reschedule if peer_ni is connecting to me */
1932                 if (peer_ni->ksnp_accepting > 0) {
1933                         CDEBUG(D_NET,
1934                                "peer_ni %s(%d) already connecting to me, retry later.\n",
1935                                libcfs_nid2str(peer_ni->ksnp_id.nid), peer_ni->ksnp_accepting);
1936                         retry_later = 1;
1937                 }
1938
1939                 if (retry_later) /* needs reschedule */
1940                         break;
1941
1942                 if ((wanted & BIT(SOCKLND_CONN_ANY)) != 0) {
1943                         type = SOCKLND_CONN_ANY;
1944                 } else if ((wanted & BIT(SOCKLND_CONN_CONTROL)) != 0) {
1945                         type = SOCKLND_CONN_CONTROL;
1946                 } else if ((wanted & BIT(SOCKLND_CONN_BULK_IN)) != 0) {
1947                         type = SOCKLND_CONN_BULK_IN;
1948                 } else {
1949                         LASSERT ((wanted & BIT(SOCKLND_CONN_BULK_OUT)) != 0);
1950                         type = SOCKLND_CONN_BULK_OUT;
1951                 }
1952
1953                 write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1954
1955                 if (ktime_get_seconds() >= deadline) {
1956                         rc = -ETIMEDOUT;
1957                         lnet_connect_console_error(rc, peer_ni->ksnp_id.nid,
1958                                                    (struct sockaddr *)
1959                                                    &route->ksnr_addr);
1960                         goto failed;
1961                 }
1962
1963                 sock = lnet_connect(peer_ni->ksnp_id.nid,
1964                                     route->ksnr_myiface,
1965                                     (struct sockaddr *)&route->ksnr_addr,
1966                                     peer_ni->ksnp_ni->ni_net_ns);
1967                 if (IS_ERR(sock)) {
1968                         rc = PTR_ERR(sock);
1969                         goto failed;
1970                 }
1971
1972                 rc = ksocknal_create_conn(peer_ni->ksnp_ni, route, sock, type);
1973                 if (rc < 0) {
1974                         lnet_connect_console_error(rc, peer_ni->ksnp_id.nid,
1975                                                    (struct sockaddr *)
1976                                                    &route->ksnr_addr);
1977                         goto failed;
1978                 }
1979
1980                 /* A +ve RC means I have to retry because I lost the connection
1981                  * race or I have to renegotiate protocol version */
1982                 retry_later = (rc != 0);
1983                 if (retry_later)
1984                         CDEBUG(D_NET, "peer_ni %s: conn race, retry later.\n",
1985                                libcfs_nid2str(peer_ni->ksnp_id.nid));
1986
1987                 write_lock_bh(&ksocknal_data.ksnd_global_lock);
1988         }
1989
1990         route->ksnr_scheduled = 0;
1991         route->ksnr_connecting = 0;
1992
1993         if (retry_later) {
1994                 /* re-queue for attention; this frees me up to handle
1995                  * the peer_ni's incoming connection request */
1996
1997                 if (rc == EALREADY ||
1998                     (rc == 0 && peer_ni->ksnp_accepting > 0)) {
1999                         /* We want to introduce a delay before next
2000                          * attempt to connect if we lost conn race,
2001                          * but the race is resolved quickly usually,
2002                          * so min_reconnectms should be good heuristic */
2003                         route->ksnr_retry_interval = *ksocknal_tunables.ksnd_min_reconnectms / 1000;
2004                         route->ksnr_timeout = ktime_get_seconds() +
2005                                               route->ksnr_retry_interval;
2006                 }
2007
2008                 ksocknal_launch_connection_locked(route);
2009         }
2010
2011         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2012         return retry_later;
2013
2014  failed:
2015         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2016
2017         route->ksnr_scheduled = 0;
2018         route->ksnr_connecting = 0;
2019
2020         /* This is a retry rather than a new connection */
2021         route->ksnr_retry_interval *= 2;
2022         route->ksnr_retry_interval =
2023                 max_t(time64_t, route->ksnr_retry_interval,
2024                       *ksocknal_tunables.ksnd_min_reconnectms / 1000);
2025         route->ksnr_retry_interval =
2026                 min_t(time64_t, route->ksnr_retry_interval,
2027                       *ksocknal_tunables.ksnd_max_reconnectms / 1000);
2028
2029         LASSERT(route->ksnr_retry_interval);
2030         route->ksnr_timeout = ktime_get_seconds() + route->ksnr_retry_interval;
2031
2032         if (!list_empty(&peer_ni->ksnp_tx_queue) &&
2033             peer_ni->ksnp_accepting == 0 &&
2034             ksocknal_find_connecting_route_locked(peer_ni) == NULL) {
2035                 struct ksock_conn *conn;
2036
2037                 /* ksnp_tx_queue is queued on a conn on successful
2038                  * connection for V1.x and V2.x */
2039                 if (!list_empty(&peer_ni->ksnp_conns)) {
2040                         conn = list_entry(peer_ni->ksnp_conns.next,
2041                                           struct ksock_conn, ksnc_list);
2042                         LASSERT (conn->ksnc_proto == &ksocknal_protocol_v3x);
2043                 }
2044
2045                 /* take all the blocked packets while I've got the lock and
2046                  * complete below... */
2047                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
2048         }
2049
2050         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2051
2052         ksocknal_peer_failed(peer_ni);
2053         ksocknal_txlist_done(peer_ni->ksnp_ni, &zombies, rc);
2054         return 0;
2055 }
2056
2057 /*
2058  * check whether we need to create more connds.
2059  * It will try to create new thread if it's necessary, @timeout can
2060  * be updated if failed to create, so caller wouldn't keep try while
2061  * running out of resource.
2062  */
2063 static int
2064 ksocknal_connd_check_start(time64_t sec, long *timeout)
2065 {
2066         char name[16];
2067         int rc;
2068         int total = ksocknal_data.ksnd_connd_starting +
2069                     ksocknal_data.ksnd_connd_running;
2070
2071         if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
2072                 /* still in initializing */
2073                 return 0;
2074         }
2075
2076         if (total >= *ksocknal_tunables.ksnd_nconnds_max ||
2077             total > ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV) {
2078                 /* can't create more connd, or still have enough
2079                  * threads to handle more connecting */
2080                 return 0;
2081         }
2082
2083         if (list_empty(&ksocknal_data.ksnd_connd_routes)) {
2084                 /* no pending connecting request */
2085                 return 0;
2086         }
2087
2088         if (sec - ksocknal_data.ksnd_connd_failed_stamp <= 1) {
2089                 /* may run out of resource, retry later */
2090                 *timeout = cfs_time_seconds(1);
2091                 return 0;
2092         }
2093
2094         if (ksocknal_data.ksnd_connd_starting > 0) {
2095                 /* serialize starting to avoid flood */
2096                 return 0;
2097         }
2098
2099         ksocknal_data.ksnd_connd_starting_stamp = sec;
2100         ksocknal_data.ksnd_connd_starting++;
2101         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2102
2103         /* NB: total is the next id */
2104         snprintf(name, sizeof(name), "socknal_cd%02d", total);
2105         rc = ksocknal_thread_start(ksocknal_connd, NULL, name);
2106
2107         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2108         if (rc == 0)
2109                 return 1;
2110
2111         /* we tried ... */
2112         LASSERT(ksocknal_data.ksnd_connd_starting > 0);
2113         ksocknal_data.ksnd_connd_starting--;
2114         ksocknal_data.ksnd_connd_failed_stamp = ktime_get_real_seconds();
2115
2116         return 1;
2117 }
2118
2119 /*
2120  * check whether current thread can exit, it will return 1 if there are too
2121  * many threads and no creating in past 120 seconds.
2122  * Also, this function may update @timeout to make caller come back
2123  * again to recheck these conditions.
2124  */
2125 static int
2126 ksocknal_connd_check_stop(time64_t sec, long *timeout)
2127 {
2128         int val;
2129
2130         if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
2131                 /* still in initializing */
2132                 return 0;
2133         }
2134
2135         if (ksocknal_data.ksnd_connd_starting > 0) {
2136                 /* in progress of starting new thread */
2137                 return 0;
2138         }
2139
2140         if (ksocknal_data.ksnd_connd_running <=
2141             *ksocknal_tunables.ksnd_nconnds) { /* can't shrink */
2142                 return 0;
2143         }
2144
2145         /* created thread in past 120 seconds? */
2146         val = (int)(ksocknal_data.ksnd_connd_starting_stamp +
2147                     SOCKNAL_CONND_TIMEOUT - sec);
2148
2149         *timeout = (val > 0) ? cfs_time_seconds(val) :
2150                                cfs_time_seconds(SOCKNAL_CONND_TIMEOUT);
2151         if (val > 0)
2152                 return 0;
2153
2154         /* no creating in past 120 seconds */
2155
2156         return ksocknal_data.ksnd_connd_running >
2157                ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV;
2158 }
2159
2160 /* Go through connd_routes queue looking for a route that we can process
2161  * right now, @timeout_p can be updated if we need to come back later */
2162 static struct ksock_route *
2163 ksocknal_connd_get_route_locked(signed long *timeout_p)
2164 {
2165         time64_t now = ktime_get_seconds();
2166         struct ksock_route *route;
2167
2168         /* connd_routes can contain both pending and ordinary routes */
2169         list_for_each_entry(route, &ksocknal_data.ksnd_connd_routes,
2170                                  ksnr_connd_list) {
2171
2172                 if (route->ksnr_retry_interval == 0 ||
2173                     now >= route->ksnr_timeout)
2174                         return route;
2175
2176                 if (*timeout_p == MAX_SCHEDULE_TIMEOUT ||
2177                     *timeout_p > cfs_time_seconds(route->ksnr_timeout - now))
2178                         *timeout_p = cfs_time_seconds(route->ksnr_timeout - now);
2179         }
2180
2181         return NULL;
2182 }
2183
2184 int
2185 ksocknal_connd(void *arg)
2186 {
2187         spinlock_t *connd_lock = &ksocknal_data.ksnd_connd_lock;
2188         struct ksock_connreq *cr;
2189         wait_queue_entry_t wait;
2190         int cons_retry = 0;
2191
2192         init_wait(&wait);
2193
2194         spin_lock_bh(connd_lock);
2195
2196         LASSERT(ksocknal_data.ksnd_connd_starting > 0);
2197         ksocknal_data.ksnd_connd_starting--;
2198         ksocknal_data.ksnd_connd_running++;
2199
2200         while (!ksocknal_data.ksnd_shuttingdown) {
2201                 struct ksock_route *route = NULL;
2202                 time64_t sec = ktime_get_real_seconds();
2203                 long timeout = MAX_SCHEDULE_TIMEOUT;
2204                 int  dropped_lock = 0;
2205
2206                 if (ksocknal_connd_check_stop(sec, &timeout)) {
2207                         /* wakeup another one to check stop */
2208                         wake_up(&ksocknal_data.ksnd_connd_waitq);
2209                         break;
2210                 }
2211
2212                 if (ksocknal_connd_check_start(sec, &timeout)) {
2213                         /* created new thread */
2214                         dropped_lock = 1;
2215                 }
2216
2217                 if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) {
2218                         /* Connection accepted by the listener */
2219                         cr = list_entry(ksocknal_data.ksnd_connd_connreqs.next,
2220                                         struct ksock_connreq, ksncr_list);
2221
2222                         list_del(&cr->ksncr_list);
2223                         spin_unlock_bh(connd_lock);
2224                         dropped_lock = 1;
2225
2226                         ksocknal_create_conn(cr->ksncr_ni, NULL,
2227                                              cr->ksncr_sock, SOCKLND_CONN_NONE);
2228                         lnet_ni_decref(cr->ksncr_ni);
2229                         LIBCFS_FREE(cr, sizeof(*cr));
2230
2231                         spin_lock_bh(connd_lock);
2232                 }
2233
2234                 /* Only handle an outgoing connection request if there
2235                  * is a thread left to handle incoming connections and
2236                  * create new connd */
2237                 if (ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV <
2238                     ksocknal_data.ksnd_connd_running) {
2239                         route = ksocknal_connd_get_route_locked(&timeout);
2240                 }
2241                 if (route != NULL) {
2242                         list_del(&route->ksnr_connd_list);
2243                         ksocknal_data.ksnd_connd_connecting++;
2244                         spin_unlock_bh(connd_lock);
2245                         dropped_lock = 1;
2246
2247                         if (ksocknal_connect(route)) {
2248                                 /* consecutive retry */
2249                                 if (cons_retry++ > SOCKNAL_INSANITY_RECONN) {
2250                                         CWARN("massive consecutive re-connecting to %pIS\n",
2251                                               &route->ksnr_addr);
2252                                         cons_retry = 0;
2253                                 }
2254                         } else {
2255                                 cons_retry = 0;
2256                         }
2257
2258                         ksocknal_route_decref(route);
2259
2260                         spin_lock_bh(connd_lock);
2261                         ksocknal_data.ksnd_connd_connecting--;
2262                 }
2263
2264                 if (dropped_lock) {
2265                         if (!need_resched())
2266                                 continue;
2267                         spin_unlock_bh(connd_lock);
2268                         cond_resched();
2269                         spin_lock_bh(connd_lock);
2270                         continue;
2271                 }
2272
2273                 /* Nothing to do for 'timeout'  */
2274                 set_current_state(TASK_INTERRUPTIBLE);
2275                 add_wait_queue_exclusive(&ksocknal_data.ksnd_connd_waitq, &wait);
2276                 spin_unlock_bh(connd_lock);
2277
2278                 schedule_timeout(timeout);
2279
2280                 remove_wait_queue(&ksocknal_data.ksnd_connd_waitq, &wait);
2281                 spin_lock_bh(connd_lock);
2282         }
2283         ksocknal_data.ksnd_connd_running--;
2284         spin_unlock_bh(connd_lock);
2285
2286         ksocknal_thread_fini();
2287         return 0;
2288 }
2289
2290 static struct ksock_conn *
2291 ksocknal_find_timed_out_conn(struct ksock_peer_ni *peer_ni)
2292 {
2293         /* We're called with a shared lock on ksnd_global_lock */
2294         struct ksock_conn *conn;
2295         struct list_head *ctmp;
2296         struct ksock_tx *tx;
2297
2298         list_for_each(ctmp, &peer_ni->ksnp_conns) {
2299                 int error;
2300
2301                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
2302
2303                 /* Don't need the {get,put}connsock dance to deref ksnc_sock */
2304                 LASSERT (!conn->ksnc_closing);
2305
2306                 error = conn->ksnc_sock->sk->sk_err;
2307                 if (error != 0) {
2308                         ksocknal_conn_addref(conn);
2309
2310                         switch (error) {
2311                         case ECONNRESET:
2312                                 CNETERR("A connection with %s (%pISp) was reset; it may have rebooted.\n",
2313                                         libcfs_id2str(peer_ni->ksnp_id),
2314                                         &conn->ksnc_peeraddr);
2315                                 break;
2316                         case ETIMEDOUT:
2317                                 CNETERR("A connection with %s (%pISp) timed out; the network or node may be down.\n",
2318                                         libcfs_id2str(peer_ni->ksnp_id),
2319                                         &conn->ksnc_peeraddr);
2320                                 break;
2321                         default:
2322                                 CNETERR("An unexpected network error %d occurred with %s (%pISp\n",
2323                                         error,
2324                                         libcfs_id2str(peer_ni->ksnp_id),
2325                                         &conn->ksnc_peeraddr);
2326                                 break;
2327                         }
2328
2329                         return conn;
2330                 }
2331
2332                 if (conn->ksnc_rx_started &&
2333                     ktime_get_seconds() >= conn->ksnc_rx_deadline) {
2334                         /* Timed out incomplete incoming message */
2335                         ksocknal_conn_addref(conn);
2336                         CNETERR("Timeout receiving from %s (%pISp), state %d wanted %d left %d\n",
2337                                 libcfs_id2str(peer_ni->ksnp_id),
2338                                 &conn->ksnc_peeraddr,
2339                                 conn->ksnc_rx_state,
2340                                 conn->ksnc_rx_nob_wanted,
2341                                 conn->ksnc_rx_nob_left);
2342                         return conn;
2343                 }
2344
2345                 if ((!list_empty(&conn->ksnc_tx_queue) ||
2346                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2347                     ktime_get_seconds() >= conn->ksnc_tx_deadline) {
2348                         /* Timed out messages queued for sending or
2349                          * buffered in the socket's send buffer
2350                          */
2351                         ksocknal_conn_addref(conn);
2352                         list_for_each_entry(tx, &conn->ksnc_tx_queue,
2353                                             tx_list)
2354                                 tx->tx_hstatus =
2355                                         LNET_MSG_STATUS_LOCAL_TIMEOUT;
2356                         CNETERR("Timeout sending data to %s (%pISp) the network or that node may be down.\n",
2357                                 libcfs_id2str(peer_ni->ksnp_id),
2358                                 &conn->ksnc_peeraddr);
2359                                 return conn;
2360                 }
2361         }
2362
2363         return (NULL);
2364 }
2365
2366 static inline void
2367 ksocknal_flush_stale_txs(struct ksock_peer_ni *peer_ni)
2368 {
2369         struct ksock_tx *tx;
2370         LIST_HEAD(stale_txs);
2371
2372         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2373
2374         while (!list_empty(&peer_ni->ksnp_tx_queue)) {
2375                 tx = list_entry(peer_ni->ksnp_tx_queue.next,
2376                                 struct ksock_tx, tx_list);
2377
2378                 if (ktime_get_seconds() < tx->tx_deadline)
2379                         break;
2380
2381                 tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_TIMEOUT;
2382
2383                 list_move_tail(&tx->tx_list, &stale_txs);
2384         }
2385
2386         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2387
2388         ksocknal_txlist_done(peer_ni->ksnp_ni, &stale_txs, -ETIMEDOUT);
2389 }
2390
2391 static int
2392 ksocknal_send_keepalive_locked(struct ksock_peer_ni *peer_ni)
2393 __must_hold(&ksocknal_data.ksnd_global_lock)
2394 {
2395         struct ksock_sched *sched;
2396         struct ksock_conn *conn;
2397         struct ksock_tx *tx;
2398
2399         /* last_alive will be updated by create_conn */
2400         if (list_empty(&peer_ni->ksnp_conns))
2401                 return 0;
2402
2403         if (peer_ni->ksnp_proto != &ksocknal_protocol_v3x)
2404                 return 0;
2405
2406         if (*ksocknal_tunables.ksnd_keepalive <= 0 ||
2407             ktime_get_seconds() < peer_ni->ksnp_last_alive +
2408                                   *ksocknal_tunables.ksnd_keepalive)
2409                 return 0;
2410
2411         if (ktime_get_seconds() < peer_ni->ksnp_send_keepalive)
2412                 return 0;
2413
2414         /* retry 10 secs later, so we wouldn't put pressure
2415          * on this peer_ni if we failed to send keepalive this time */
2416         peer_ni->ksnp_send_keepalive = ktime_get_seconds() + 10;
2417
2418         conn = ksocknal_find_conn_locked(peer_ni, NULL, 1);
2419         if (conn != NULL) {
2420                 sched = conn->ksnc_scheduler;
2421
2422                 spin_lock_bh(&sched->kss_lock);
2423                 if (!list_empty(&conn->ksnc_tx_queue)) {
2424                         spin_unlock_bh(&sched->kss_lock);
2425                         /* there is an queued ACK, don't need keepalive */
2426                         return 0;
2427                 }
2428
2429                 spin_unlock_bh(&sched->kss_lock);
2430         }
2431
2432         read_unlock(&ksocknal_data.ksnd_global_lock);
2433
2434         /* cookie = 1 is reserved for keepalive PING */
2435         tx = ksocknal_alloc_tx_noop(1, 1);
2436         if (tx == NULL) {
2437                 read_lock(&ksocknal_data.ksnd_global_lock);
2438                 return -ENOMEM;
2439         }
2440
2441         if (ksocknal_launch_packet(peer_ni->ksnp_ni, tx, peer_ni->ksnp_id) == 0) {
2442                 read_lock(&ksocknal_data.ksnd_global_lock);
2443                 return 1;
2444         }
2445
2446         ksocknal_free_tx(tx);
2447         read_lock(&ksocknal_data.ksnd_global_lock);
2448
2449         return -EIO;
2450 }
2451
2452
2453 static void
2454 ksocknal_check_peer_timeouts(int idx)
2455 {
2456         struct hlist_head *peers = &ksocknal_data.ksnd_peers[idx];
2457         struct ksock_peer_ni *peer_ni;
2458         struct ksock_conn *conn;
2459         struct ksock_tx *tx;
2460
2461  again:
2462         /* NB. We expect to have a look at all the peers and not find any
2463          * connections to time out, so we just use a shared lock while we
2464          * take a look...
2465          */
2466         read_lock(&ksocknal_data.ksnd_global_lock);
2467
2468         hlist_for_each_entry(peer_ni, peers, ksnp_list) {
2469                 struct ksock_tx *tx_stale;
2470                 time64_t deadline = 0;
2471                 int resid = 0;
2472                 int n = 0;
2473
2474                 if (ksocknal_send_keepalive_locked(peer_ni) != 0) {
2475                         read_unlock(&ksocknal_data.ksnd_global_lock);
2476                         goto again;
2477                 }
2478
2479                 conn = ksocknal_find_timed_out_conn(peer_ni);
2480
2481                 if (conn != NULL) {
2482                         read_unlock(&ksocknal_data.ksnd_global_lock);
2483
2484                         ksocknal_close_conn_and_siblings(conn, -ETIMEDOUT);
2485
2486                         /* NB we won't find this one again, but we can't
2487                          * just proceed with the next peer_ni, since we dropped
2488                          * ksnd_global_lock and it might be dead already!
2489                          */
2490                         ksocknal_conn_decref(conn);
2491                         goto again;
2492                 }
2493
2494                 /* we can't process stale txs right here because we're
2495                  * holding only shared lock
2496                  */
2497                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
2498                         struct ksock_tx *tx;
2499
2500                         tx = list_entry(peer_ni->ksnp_tx_queue.next,
2501                                         struct ksock_tx, tx_list);
2502                         if (ktime_get_seconds() >= tx->tx_deadline) {
2503                                 ksocknal_peer_addref(peer_ni);
2504                                 read_unlock(&ksocknal_data.ksnd_global_lock);
2505
2506                                 ksocknal_flush_stale_txs(peer_ni);
2507
2508                                 ksocknal_peer_decref(peer_ni);
2509                                 goto again;
2510                         }
2511                 }
2512
2513                 if (list_empty(&peer_ni->ksnp_zc_req_list))
2514                         continue;
2515
2516                 tx_stale = NULL;
2517                 spin_lock(&peer_ni->ksnp_lock);
2518                 list_for_each_entry(tx, &peer_ni->ksnp_zc_req_list, tx_zc_list) {
2519                         if (ktime_get_seconds() < tx->tx_deadline)
2520                                 break;
2521                         /* ignore the TX if connection is being closed */
2522                         if (tx->tx_conn->ksnc_closing)
2523                                 continue;
2524                         n++;
2525                         if (tx_stale == NULL)
2526                                 tx_stale = tx;
2527                 }
2528
2529                 if (tx_stale == NULL) {
2530                         spin_unlock(&peer_ni->ksnp_lock);
2531                         continue;
2532                 }
2533
2534                 deadline = tx_stale->tx_deadline;
2535                 resid    = tx_stale->tx_resid;
2536                 conn     = tx_stale->tx_conn;
2537                 ksocknal_conn_addref(conn);
2538
2539                 spin_unlock(&peer_ni->ksnp_lock);
2540                 read_unlock(&ksocknal_data.ksnd_global_lock);
2541
2542                 CERROR("Total %d stale ZC_REQs for peer_ni %s detected; the "
2543                        "oldest(%p) timed out %lld secs ago, "
2544                        "resid: %d, wmem: %d\n",
2545                        n, libcfs_nid2str(peer_ni->ksnp_id.nid), tx_stale,
2546                        ktime_get_seconds() - deadline,
2547                        resid, conn->ksnc_sock->sk->sk_wmem_queued);
2548
2549                 ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2550                 ksocknal_conn_decref(conn);
2551                 goto again;
2552         }
2553
2554         read_unlock(&ksocknal_data.ksnd_global_lock);
2555 }
2556
2557 int ksocknal_reaper(void *arg)
2558 {
2559         wait_queue_entry_t wait;
2560         struct ksock_conn *conn;
2561         struct ksock_sched *sched;
2562         LIST_HEAD(enomem_conns);
2563         int nenomem_conns;
2564         time64_t timeout;
2565         int i;
2566         int peer_index = 0;
2567         time64_t deadline = ktime_get_seconds();
2568
2569         init_wait(&wait);
2570
2571         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2572
2573         while (!ksocknal_data.ksnd_shuttingdown) {
2574                 if (!list_empty(&ksocknal_data.ksnd_deathrow_conns)) {
2575                         conn = list_entry(ksocknal_data.ksnd_deathrow_conns.next,
2576                                           struct ksock_conn, ksnc_list);
2577                         list_del(&conn->ksnc_list);
2578
2579                         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2580
2581                         ksocknal_terminate_conn(conn);
2582                         ksocknal_conn_decref(conn);
2583
2584                         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2585                         continue;
2586                 }
2587
2588                 if (!list_empty(&ksocknal_data.ksnd_zombie_conns)) {
2589                         conn = list_entry(ksocknal_data.ksnd_zombie_conns.next,
2590                                           struct ksock_conn, ksnc_list);
2591                         list_del(&conn->ksnc_list);
2592
2593                         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2594
2595                         ksocknal_destroy_conn(conn);
2596
2597                         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2598                         continue;
2599                 }
2600
2601                 list_splice_init(&ksocknal_data.ksnd_enomem_conns,
2602                                  &enomem_conns);
2603
2604                 spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2605
2606                 /* reschedule all the connections that stalled with ENOMEM... */
2607                 nenomem_conns = 0;
2608                 while (!list_empty(&enomem_conns)) {
2609                         conn = list_entry(enomem_conns.next,
2610                                           struct ksock_conn, ksnc_tx_list);
2611                         list_del(&conn->ksnc_tx_list);
2612
2613                         sched = conn->ksnc_scheduler;
2614
2615                         spin_lock_bh(&sched->kss_lock);
2616
2617                         LASSERT(conn->ksnc_tx_scheduled);
2618                         conn->ksnc_tx_ready = 1;
2619                         list_add_tail(&conn->ksnc_tx_list,
2620                                           &sched->kss_tx_conns);
2621                         wake_up(&sched->kss_waitq);
2622
2623                         spin_unlock_bh(&sched->kss_lock);
2624                         nenomem_conns++;
2625                 }
2626
2627                 /* careful with the jiffy wrap... */
2628                 while ((timeout = deadline - ktime_get_seconds()) <= 0) {
2629                         const int n = 4;
2630                         const int p = 1;
2631                         int  chunk = HASH_SIZE(ksocknal_data.ksnd_peers);
2632                         unsigned int lnd_timeout;
2633
2634                         /* Time to check for timeouts on a few more peers: I
2635                          * do checks every 'p' seconds on a proportion of the
2636                          * peer_ni table and I need to check every connection
2637                          * 'n' times within a timeout interval, to ensure I
2638                          * detect a timeout on any connection within (n+1)/n
2639                          * times the timeout interval.
2640                          */
2641
2642                         lnd_timeout = ksocknal_timeout();
2643                         if (lnd_timeout > n * p)
2644                                 chunk = (chunk * n * p) / lnd_timeout;
2645                         if (chunk == 0)
2646                                 chunk = 1;
2647
2648                         for (i = 0; i < chunk; i++) {
2649                                 ksocknal_check_peer_timeouts(peer_index);
2650                                 peer_index = (peer_index + 1) %
2651                                         HASH_SIZE(ksocknal_data.ksnd_peers);
2652                         }
2653
2654                         deadline += p;
2655                 }
2656
2657                 if (nenomem_conns != 0) {
2658                         /* Reduce my timeout if I rescheduled ENOMEM conns.
2659                          * This also prevents me getting woken immediately
2660                          * if any go back on my enomem list. */
2661                         timeout = SOCKNAL_ENOMEM_RETRY;
2662                 }
2663                 ksocknal_data.ksnd_reaper_waketime = ktime_get_seconds() +
2664                                                      timeout;
2665
2666                 set_current_state(TASK_INTERRUPTIBLE);
2667                 add_wait_queue(&ksocknal_data.ksnd_reaper_waitq, &wait);
2668
2669                 if (!ksocknal_data.ksnd_shuttingdown &&
2670                     list_empty(&ksocknal_data.ksnd_deathrow_conns) &&
2671                     list_empty(&ksocknal_data.ksnd_zombie_conns))
2672                         schedule_timeout(cfs_time_seconds(timeout));
2673
2674                 set_current_state(TASK_RUNNING);
2675                 remove_wait_queue(&ksocknal_data.ksnd_reaper_waitq, &wait);
2676
2677                 spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2678         }
2679
2680         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2681
2682         ksocknal_thread_fini();
2683         return 0;
2684 }