Whamcloud - gitweb
14bdb45825d3dd69786ac84316c1180df033a1f3
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_proto.c
1 /*
2  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2012, 2017, Intel Corporation.
5  *
6  *   Author: Zach Brown <zab@zabbo.net>
7  *   Author: Peter J. Braam <braam@clusterfs.com>
8  *   Author: Phil Schwan <phil@clusterfs.com>
9  *   Author: Eric Barton <eric@bartonsoftware.com>
10  *
11  *   This file is part of Lustre, https://wiki.whamcloud.com/
12  *
13  *   Portals is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Portals is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Portals; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include "socklnd.h"
28
29 /*
30  * Protocol entries :
31  *   pro_send_hello       : send hello message
32  *   pro_recv_hello       : receive hello message
33  *   pro_pack             : pack message header
34  *   pro_unpack           : unpack message header
35  *   pro_queue_tx_zcack() : Called holding BH lock: kss_lock
36  *                          return 1 if ACK is piggybacked, otherwise return 0
37  *   pro_queue_tx_msg()   : Called holding BH lock: kss_lock
38  *                          return the ACK that piggybacked by my message, or NULL
39  *   pro_handle_zcreq()   : handler of incoming ZC-REQ
40  *   pro_handle_zcack()   : handler of incoming ZC-ACK
41  *   pro_match_tx()       : Called holding glock
42  */
43
44 static struct ksock_tx *
45 ksocknal_queue_tx_msg_v1(struct ksock_conn *conn, struct ksock_tx *tx_msg)
46 {
47         /* V1.x, just enqueue it */
48         list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
49         return NULL;
50 }
51
52 void
53 ksocknal_next_tx_carrier(struct ksock_conn *conn)
54 {
55         struct ksock_tx *tx = conn->ksnc_tx_carrier;
56
57         /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */
58         LASSERT(!list_empty(&conn->ksnc_tx_queue));
59         LASSERT(tx != NULL);
60
61         /* Next TX that can carry ZC-ACK or LNet message */
62         if (tx->tx_list.next == &conn->ksnc_tx_queue) {
63                 /* no more packets queued */
64                 conn->ksnc_tx_carrier = NULL;
65         } else {
66                 conn->ksnc_tx_carrier = list_entry(tx->tx_list.next,
67                                                    struct ksock_tx, tx_list);
68                 LASSERT(conn->ksnc_tx_carrier->tx_msg.ksm_type ==
69                         tx->tx_msg.ksm_type);
70         }
71 }
72
73 static int
74 ksocknal_queue_tx_zcack_v2(struct ksock_conn *conn,
75                            struct ksock_tx *tx_ack, __u64 cookie)
76 {
77         struct ksock_tx *tx = conn->ksnc_tx_carrier;
78
79         LASSERT (tx_ack == NULL ||
80                  tx_ack->tx_msg.ksm_type == KSOCK_MSG_NOOP);
81
82         /*
83          * Enqueue or piggyback tx_ack / cookie
84          * . no tx can piggyback cookie of tx_ack (or cookie), just
85          *   enqueue the tx_ack (if tx_ack != NUL) and return NULL.
86          * . There is tx can piggyback cookie of tx_ack (or cookie),
87          *   piggyback the cookie and return the tx.
88          */
89         if (tx == NULL) {
90                 if (tx_ack != NULL) {
91                         list_add_tail(&tx_ack->tx_list,
92                                           &conn->ksnc_tx_queue);
93                         conn->ksnc_tx_carrier = tx_ack;
94                 }
95                 return 0;
96         }
97
98         if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) {
99                 /* tx is noop zc-ack, can't piggyback zc-ack cookie */
100                 if (tx_ack != NULL)
101                         list_add_tail(&tx_ack->tx_list,
102                                           &conn->ksnc_tx_queue);
103                 return 0;
104         }
105
106         LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET);
107         LASSERT(tx->tx_msg.ksm_zc_cookies[1] == 0);
108
109         if (tx_ack != NULL)
110                 cookie = tx_ack->tx_msg.ksm_zc_cookies[1];
111
112         /* piggyback the zc-ack cookie */
113         tx->tx_msg.ksm_zc_cookies[1] = cookie;
114         /* move on to the next TX which can carry cookie */
115         ksocknal_next_tx_carrier(conn);
116
117         return 1;
118 }
119
120 static struct ksock_tx *
121 ksocknal_queue_tx_msg_v2(struct ksock_conn *conn, struct ksock_tx *tx_msg)
122 {
123         struct ksock_tx  *tx  = conn->ksnc_tx_carrier;
124
125         /*
126          * Enqueue tx_msg:
127          * . If there is no NOOP on the connection, just enqueue
128          *   tx_msg and return NULL
129          * . If there is NOOP on the connection, piggyback the cookie
130          *   and replace the NOOP tx, and return the NOOP tx.
131          */
132         if (tx == NULL) { /* nothing on queue */
133                 list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
134                 conn->ksnc_tx_carrier = tx_msg;
135                 return NULL;
136         }
137
138         if (tx->tx_msg.ksm_type == KSOCK_MSG_LNET) { /* nothing to carry */
139                 list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
140                 return NULL;
141         }
142
143         LASSERT (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
144
145         /* There is a noop zc-ack can be piggybacked */
146         tx_msg->tx_msg.ksm_zc_cookies[1] = tx->tx_msg.ksm_zc_cookies[1];
147         ksocknal_next_tx_carrier(conn);
148
149         /* use new_tx to replace the noop zc-ack packet */
150         list_splice(&tx->tx_list, &tx_msg->tx_list);
151
152         return tx;
153 }
154
155 static int
156 ksocknal_queue_tx_zcack_v3(struct ksock_conn *conn,
157                            struct ksock_tx *tx_ack, __u64 cookie)
158 {
159         struct ksock_tx *tx;
160
161         if (conn->ksnc_type != SOCKLND_CONN_ACK)
162                 return ksocknal_queue_tx_zcack_v2(conn, tx_ack, cookie);
163
164         /* non-blocking ZC-ACK (to router) */
165         LASSERT (tx_ack == NULL ||
166                  tx_ack->tx_msg.ksm_type == KSOCK_MSG_NOOP);
167
168         if ((tx = conn->ksnc_tx_carrier) == NULL) {
169                 if (tx_ack != NULL) {
170                         list_add_tail(&tx_ack->tx_list,
171                                           &conn->ksnc_tx_queue);
172                         conn->ksnc_tx_carrier = tx_ack;
173                 }
174                 return 0;
175         }
176
177         /* conn->ksnc_tx_carrier != NULL */
178
179         if (tx_ack != NULL)
180                 cookie = tx_ack->tx_msg.ksm_zc_cookies[1];
181
182         if (cookie == SOCKNAL_KEEPALIVE_PING) /* ignore keepalive PING */
183                 return 1;
184
185         if (tx->tx_msg.ksm_zc_cookies[1] == SOCKNAL_KEEPALIVE_PING) {
186                 /* replace the keepalive PING with a real ACK */
187                 LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0);
188                 tx->tx_msg.ksm_zc_cookies[1] = cookie;
189                 return 1;
190         }
191
192         if (cookie == tx->tx_msg.ksm_zc_cookies[0] ||
193             cookie == tx->tx_msg.ksm_zc_cookies[1]) {
194                 CWARN("%s: duplicated ZC cookie: %llu\n",
195                       libcfs_id2str(conn->ksnc_peer->ksnp_id), cookie);
196                 return 1; /* XXX return error in the future */
197         }
198
199         if (tx->tx_msg.ksm_zc_cookies[0] == 0) {
200                 /* NOOP tx has only one ZC-ACK cookie, can carry at least one more */
201                 if (tx->tx_msg.ksm_zc_cookies[1] > cookie) {
202                         tx->tx_msg.ksm_zc_cookies[0] = tx->tx_msg.ksm_zc_cookies[1];
203                         tx->tx_msg.ksm_zc_cookies[1] = cookie;
204                 } else {
205                         tx->tx_msg.ksm_zc_cookies[0] = cookie;
206                 }
207
208                 if (tx->tx_msg.ksm_zc_cookies[0] - tx->tx_msg.ksm_zc_cookies[1] > 2) {
209                         /* not likely to carry more ACKs, skip it to simplify logic */
210                         ksocknal_next_tx_carrier(conn);
211                 }
212
213                 return 1;
214         }
215
216         /* takes two or more cookies already */
217
218         if (tx->tx_msg.ksm_zc_cookies[0] > tx->tx_msg.ksm_zc_cookies[1]) {
219                 __u64   tmp = 0;
220
221                 /* two separated cookies: (a+2, a) or (a+1, a) */
222                 LASSERT (tx->tx_msg.ksm_zc_cookies[0] -
223                          tx->tx_msg.ksm_zc_cookies[1] <= 2);
224
225                 if (tx->tx_msg.ksm_zc_cookies[0] -
226                     tx->tx_msg.ksm_zc_cookies[1] == 2) {
227                         if (cookie == tx->tx_msg.ksm_zc_cookies[1] + 1)
228                                 tmp = cookie;
229                 } else if (cookie == tx->tx_msg.ksm_zc_cookies[1] - 1) {
230                         tmp = tx->tx_msg.ksm_zc_cookies[1];
231                 } else if (cookie == tx->tx_msg.ksm_zc_cookies[0] + 1) {
232                         tmp = tx->tx_msg.ksm_zc_cookies[0];
233                 }
234
235                 if (tmp != 0) {
236                         /* range of cookies */
237                         tx->tx_msg.ksm_zc_cookies[0] = tmp - 1;
238                         tx->tx_msg.ksm_zc_cookies[1] = tmp + 1;
239                         return 1;
240                 }
241
242         } else {
243                 /* ksm_zc_cookies[0] < ksm_zc_cookies[1], it is range of cookies */
244                 if (cookie >= tx->tx_msg.ksm_zc_cookies[0] &&
245                     cookie <= tx->tx_msg.ksm_zc_cookies[1]) {
246                         CWARN("%s: duplicated ZC cookie: %llu\n",
247                               libcfs_id2str(conn->ksnc_peer->ksnp_id), cookie);
248                         return 1; /* XXX: return error in the future */
249                 }
250
251                 if (cookie == tx->tx_msg.ksm_zc_cookies[1] + 1) {
252                         tx->tx_msg.ksm_zc_cookies[1] = cookie;
253                         return 1;
254                 }
255
256                 if (cookie == tx->tx_msg.ksm_zc_cookies[0] - 1) {
257                         tx->tx_msg.ksm_zc_cookies[0] = cookie;
258                         return 1;
259                 }
260         }
261
262         /* failed to piggyback ZC-ACK */
263         if (tx_ack != NULL) {
264                 list_add_tail(&tx_ack->tx_list, &conn->ksnc_tx_queue);
265                 /* the next tx can piggyback at least 1 ACK */
266                 ksocknal_next_tx_carrier(conn);
267         }
268
269         return 0;
270 }
271
272 static int
273 ksocknal_match_tx(struct ksock_conn *conn, struct ksock_tx *tx, int nonblk)
274 {
275         int nob;
276
277 #if SOCKNAL_VERSION_DEBUG
278         if (!*ksocknal_tunables.ksnd_typed_conns)
279                 return SOCKNAL_MATCH_YES;
280 #endif
281
282         if (tx == NULL || tx->tx_lnetmsg == NULL) {
283                 /* noop packet */
284                 nob = offsetof(struct ksock_msg, ksm_u);
285         } else {
286                 nob = tx->tx_lnetmsg->msg_len +
287                       ((conn->ksnc_proto == &ksocknal_protocol_v1x) ?
288                        sizeof(struct lnet_hdr) : sizeof(struct ksock_msg));
289         }
290
291         /* default checking for typed connection */
292         switch (conn->ksnc_type) {
293         default:
294                 CERROR("ksnc_type bad: %u\n", conn->ksnc_type);
295                 LBUG();
296         case SOCKLND_CONN_ANY:
297                 return SOCKNAL_MATCH_YES;
298
299         case SOCKLND_CONN_BULK_IN:
300                 return SOCKNAL_MATCH_MAY;
301
302         case SOCKLND_CONN_BULK_OUT:
303                 if (nob < *ksocknal_tunables.ksnd_min_bulk)
304                         return SOCKNAL_MATCH_MAY;
305                 else
306                         return SOCKNAL_MATCH_YES;
307
308         case SOCKLND_CONN_CONTROL:
309                 if (nob >= *ksocknal_tunables.ksnd_min_bulk)
310                         return SOCKNAL_MATCH_MAY;
311                 else
312                         return SOCKNAL_MATCH_YES;
313         }
314 }
315
316 static int
317 ksocknal_match_tx_v3(struct ksock_conn *conn, struct ksock_tx *tx, int nonblk)
318 {
319         int nob;
320
321         if (tx == NULL || tx->tx_lnetmsg == NULL)
322                 nob = offsetof(struct ksock_msg, ksm_u);
323         else
324                 nob = tx->tx_lnetmsg->msg_len + sizeof(struct ksock_msg);
325
326         switch (conn->ksnc_type) {
327         default:
328                 CERROR("ksnc_type bad: %u\n", conn->ksnc_type);
329                 LBUG();
330         case SOCKLND_CONN_ANY:
331                 return SOCKNAL_MATCH_NO;
332
333         case SOCKLND_CONN_ACK:
334                 if (nonblk)
335                         return SOCKNAL_MATCH_YES;
336                 else if (tx == NULL || tx->tx_lnetmsg == NULL)
337                         return SOCKNAL_MATCH_MAY;
338                 else
339                         return SOCKNAL_MATCH_NO;
340
341         case SOCKLND_CONN_BULK_OUT:
342                 if (nonblk)
343                         return SOCKNAL_MATCH_NO;
344                 else if (nob < *ksocknal_tunables.ksnd_min_bulk)
345                         return SOCKNAL_MATCH_MAY;
346                 else
347                         return SOCKNAL_MATCH_YES;
348
349         case SOCKLND_CONN_CONTROL:
350                 if (nonblk)
351                         return SOCKNAL_MATCH_NO;
352                 else if (nob >= *ksocknal_tunables.ksnd_min_bulk)
353                         return SOCKNAL_MATCH_MAY;
354                 else
355                         return SOCKNAL_MATCH_YES;
356         }
357 }
358
359 /* (Sink) handle incoming ZC request from sender */
360 static int
361 ksocknal_handle_zcreq(struct ksock_conn *c, __u64 cookie, int remote)
362 {
363         struct ksock_peer_ni *peer_ni = c->ksnc_peer;
364         struct ksock_conn *conn;
365         struct ksock_tx *tx;
366         int rc;
367
368         read_lock(&ksocknal_data.ksnd_global_lock);
369
370         conn = ksocknal_find_conn_locked(peer_ni, NULL, !!remote);
371         if (conn != NULL) {
372                 struct ksock_sched *sched = conn->ksnc_scheduler;
373
374                 LASSERT(conn->ksnc_proto->pro_queue_tx_zcack != NULL);
375
376                 spin_lock_bh(&sched->kss_lock);
377
378                 rc = conn->ksnc_proto->pro_queue_tx_zcack(conn, NULL, cookie);
379
380                 spin_unlock_bh(&sched->kss_lock);
381
382                 if (rc) { /* piggybacked */
383                         read_unlock(&ksocknal_data.ksnd_global_lock);
384                         return 0;
385                 }
386         }
387
388         read_unlock(&ksocknal_data.ksnd_global_lock);
389
390         /* ACK connection is not ready, or can't piggyback the ACK */
391         tx = ksocknal_alloc_tx_noop(cookie, !!remote);
392         if (tx == NULL)
393                 return -ENOMEM;
394
395         if ((rc = ksocknal_launch_packet(peer_ni->ksnp_ni, tx, peer_ni->ksnp_id)) == 0)
396                 return 0;
397
398         ksocknal_free_tx(tx);
399         return rc;
400 }
401
402 /* (Sender) handle ZC_ACK from sink */
403 static int
404 ksocknal_handle_zcack(struct ksock_conn *conn, __u64 cookie1, __u64 cookie2)
405 {
406         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
407         struct ksock_tx *tx;
408         struct ksock_tx *tmp;
409         LIST_HEAD(zlist);
410         int count;
411
412         if (cookie1 == 0)
413                 cookie1 = cookie2;
414
415         count = (cookie1 > cookie2) ? 2 : (cookie2 - cookie1 + 1);
416
417         if (cookie2 == SOCKNAL_KEEPALIVE_PING &&
418             conn->ksnc_proto == &ksocknal_protocol_v3x) {
419                 /* keepalive PING for V3.x, just ignore it */
420                 return count == 1 ? 0 : -EPROTO;
421         }
422
423         spin_lock(&peer_ni->ksnp_lock);
424
425         list_for_each_entry_safe(tx, tmp,
426                                      &peer_ni->ksnp_zc_req_list, tx_zc_list) {
427                 __u64 c = tx->tx_msg.ksm_zc_cookies[0];
428
429                 if (c == cookie1 || c == cookie2 || (cookie1 < c && c < cookie2)) {
430                         tx->tx_msg.ksm_zc_cookies[0] = 0;
431                         list_move(&tx->tx_zc_list, &zlist);
432
433                         if (--count == 0)
434                                 break;
435                 }
436         }
437
438         spin_unlock(&peer_ni->ksnp_lock);
439
440         while (!list_empty(&zlist)) {
441                 tx = list_entry(zlist.next, struct ksock_tx, tx_zc_list);
442                 list_del(&tx->tx_zc_list);
443                 ksocknal_tx_decref(tx);
444         }
445
446         return count == 0 ? 0 : -EPROTO;
447 }
448
449 static int
450 ksocknal_send_hello_v1(struct ksock_conn *conn, struct ksock_hello_msg *hello)
451 {
452         struct socket *sock = conn->ksnc_sock;
453         struct lnet_hdr *hdr;
454         struct lnet_magicversion *hmv;
455         int rc;
456         int i;
457
458         BUILD_BUG_ON(sizeof(struct lnet_magicversion) !=
459                      offsetof(struct lnet_hdr, src_nid));
460
461         LIBCFS_ALLOC(hdr, sizeof(*hdr));
462         if (hdr == NULL) {
463                 CERROR("Can't allocate struct lnet_hdr\n");
464                 return -ENOMEM;
465         }
466
467         hmv = (struct lnet_magicversion *)&hdr->dest_nid;
468
469         /* Re-organize V2.x message header to V1.x (struct lnet_hdr)
470          * header and send out */
471         hmv->magic         = cpu_to_le32 (LNET_PROTO_TCP_MAGIC);
472         hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR);
473         hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR);
474
475         if (the_lnet.ln_testprotocompat) {
476                 /* single-shot proto check */
477                 if (test_and_clear_bit(0, &the_lnet.ln_testprotocompat))
478                         hmv->version_major++;   /* just different! */
479
480                 if (test_and_clear_bit(1, &the_lnet.ln_testprotocompat))
481                         hmv->magic = LNET_PROTO_MAGIC;
482         }
483
484         hdr->src_nid        = cpu_to_le64 (hello->kshm_src_nid);
485         hdr->src_pid        = cpu_to_le32 (hello->kshm_src_pid);
486         hdr->type           = cpu_to_le32 (LNET_MSG_HELLO);
487         hdr->payload_length = cpu_to_le32 (hello->kshm_nips * sizeof(__u32));
488         hdr->msg.hello.type = cpu_to_le32 (hello->kshm_ctype);
489         hdr->msg.hello.incarnation = cpu_to_le64 (hello->kshm_src_incarnation);
490
491         rc = lnet_sock_write(sock, hdr, sizeof(*hdr), lnet_acceptor_timeout());
492         if (rc != 0) {
493                 CNETERR("Error %d sending HELLO hdr to %pI4h/%d\n",
494                         rc, &conn->ksnc_ipaddr, conn->ksnc_port);
495                 goto out;
496         }
497
498         if (hello->kshm_nips == 0)
499                 goto out;
500
501         for (i = 0; i < (int) hello->kshm_nips; i++) {
502                 hello->kshm_ips[i] = __cpu_to_le32 (hello->kshm_ips[i]);
503         }
504
505         rc = lnet_sock_write(sock, hello->kshm_ips,
506                                hello->kshm_nips * sizeof(__u32),
507                                lnet_acceptor_timeout());
508         if (rc != 0) {
509                 CNETERR("Error %d sending HELLO payload (%d)"
510                         " to %pI4h/%d\n", rc, hello->kshm_nips,
511                         &conn->ksnc_ipaddr, conn->ksnc_port);
512         }
513 out:
514         LIBCFS_FREE(hdr, sizeof(*hdr));
515
516         return rc;
517 }
518
519 static int
520 ksocknal_send_hello_v2(struct ksock_conn *conn, struct ksock_hello_msg *hello)
521 {
522         struct socket *sock = conn->ksnc_sock;
523         int rc;
524
525         hello->kshm_magic   = LNET_PROTO_MAGIC;
526         hello->kshm_version = conn->ksnc_proto->pro_version;
527
528         if (the_lnet.ln_testprotocompat) {
529                 /* single-shot proto check */
530                 if (test_and_clear_bit(0, &the_lnet.ln_testprotocompat))
531                         hello->kshm_version++;   /* just different! */
532         }
533
534         rc = lnet_sock_write(sock, hello, offsetof(struct ksock_hello_msg, kshm_ips),
535                                lnet_acceptor_timeout());
536
537         if (rc != 0) {
538                 CNETERR("Error %d sending HELLO hdr to %pI4h/%d\n",
539                         rc, &conn->ksnc_ipaddr, conn->ksnc_port);
540                 return rc;
541         }
542
543         if (hello->kshm_nips == 0)
544                 return 0;
545
546         rc = lnet_sock_write(sock, hello->kshm_ips,
547                                hello->kshm_nips * sizeof(__u32),
548                                lnet_acceptor_timeout());
549         if (rc != 0) {
550                 CNETERR("Error %d sending HELLO payload (%d)"
551                         " to %pI4h/%d\n", rc, hello->kshm_nips,
552                         &conn->ksnc_ipaddr, conn->ksnc_port);
553         }
554
555         return rc;
556 }
557
558 static int
559 ksocknal_recv_hello_v1(struct ksock_conn *conn, struct ksock_hello_msg *hello,
560                        int timeout)
561 {
562         struct socket *sock = conn->ksnc_sock;
563         struct lnet_hdr *hdr;
564         int rc;
565         int i;
566
567         LIBCFS_ALLOC(hdr, sizeof(*hdr));
568         if (hdr == NULL) {
569                 CERROR("Can't allocate struct lnet_hdr\n");
570                 return -ENOMEM;
571         }
572
573         rc = lnet_sock_read(sock, &hdr->src_nid,
574                               sizeof(*hdr) - offsetof(struct lnet_hdr, src_nid),
575                               timeout);
576         if (rc != 0) {
577                 CERROR("Error %d reading rest of HELLO hdr from %pI4h\n",
578                        rc, &conn->ksnc_ipaddr);
579                 LASSERT(rc < 0 && rc != -EALREADY);
580                 goto out;
581         }
582
583         /* ...and check we got what we expected */
584         if (hdr->type != cpu_to_le32 (LNET_MSG_HELLO)) {
585                 CERROR ("Expecting a HELLO hdr,"
586                         " but got type %d from %pI4h\n",
587                         le32_to_cpu (hdr->type),
588                         &conn->ksnc_ipaddr);
589                 rc = -EPROTO;
590                 goto out;
591         }
592
593         hello->kshm_src_nid         = le64_to_cpu (hdr->src_nid);
594         hello->kshm_src_pid         = le32_to_cpu (hdr->src_pid);
595         hello->kshm_src_incarnation = le64_to_cpu (hdr->msg.hello.incarnation);
596         hello->kshm_ctype           = le32_to_cpu (hdr->msg.hello.type);
597         hello->kshm_nips            = le32_to_cpu (hdr->payload_length) /
598                                          sizeof (__u32);
599
600         if (hello->kshm_nips > LNET_INTERFACES_NUM) {
601                 CERROR("Bad nips %d from ip %pI4h\n",
602                        hello->kshm_nips, &conn->ksnc_ipaddr);
603                 rc = -EPROTO;
604                 goto out;
605         }
606
607         if (hello->kshm_nips == 0)
608                 goto out;
609
610         rc = lnet_sock_read(sock, hello->kshm_ips,
611                               hello->kshm_nips * sizeof(__u32), timeout);
612         if (rc != 0) {
613                 CERROR("Error %d reading IPs from ip %pI4h\n",
614                        rc, &conn->ksnc_ipaddr);
615                 LASSERT(rc < 0 && rc != -EALREADY);
616                 goto out;
617         }
618
619         for (i = 0; i < (int) hello->kshm_nips; i++) {
620                 hello->kshm_ips[i] = __le32_to_cpu(hello->kshm_ips[i]);
621
622                 if (hello->kshm_ips[i] == 0) {
623                         CERROR("Zero IP[%d] from ip %pI4h\n",
624                                i, &conn->ksnc_ipaddr);
625                         rc = -EPROTO;
626                         break;
627                 }
628         }
629 out:
630         LIBCFS_FREE(hdr, sizeof(*hdr));
631
632         return rc;
633 }
634
635 static int
636 ksocknal_recv_hello_v2(struct ksock_conn *conn, struct ksock_hello_msg *hello,
637                        int timeout)
638 {
639         struct socket     *sock = conn->ksnc_sock;
640         int                rc;
641         int                i;
642
643         if (hello->kshm_magic == LNET_PROTO_MAGIC)
644                 conn->ksnc_flip = 0;
645         else
646                 conn->ksnc_flip = 1;
647
648         rc = lnet_sock_read(sock, &hello->kshm_src_nid,
649                               offsetof(struct ksock_hello_msg, kshm_ips) -
650                                        offsetof(struct ksock_hello_msg, kshm_src_nid),
651                               timeout);
652         if (rc != 0) {
653                 CERROR("Error %d reading HELLO from %pI4h\n",
654                        rc, &conn->ksnc_ipaddr);
655                 LASSERT(rc < 0 && rc != -EALREADY);
656                 return rc;
657         }
658
659         if (conn->ksnc_flip) {
660                 __swab32s(&hello->kshm_src_pid);
661                 __swab64s(&hello->kshm_src_nid);
662                 __swab32s(&hello->kshm_dst_pid);
663                 __swab64s(&hello->kshm_dst_nid);
664                 __swab64s(&hello->kshm_src_incarnation);
665                 __swab64s(&hello->kshm_dst_incarnation);
666                 __swab32s(&hello->kshm_ctype);
667                 __swab32s(&hello->kshm_nips);
668         }
669
670         if (hello->kshm_nips > LNET_INTERFACES_NUM) {
671                 CERROR("Bad nips %d from ip %pI4h\n",
672                        hello->kshm_nips, &conn->ksnc_ipaddr);
673                 return -EPROTO;
674         }
675
676         if (hello->kshm_nips == 0)
677                 return 0;
678
679         rc = lnet_sock_read(sock, hello->kshm_ips,
680                             hello->kshm_nips * sizeof(__u32), timeout);
681         if (rc != 0) {
682                 CERROR("Error %d reading IPs from ip %pI4h\n",
683                        rc, &conn->ksnc_ipaddr);
684                 LASSERT(rc < 0 && rc != -EALREADY);
685                 return rc;
686         }
687
688         for (i = 0; i < (int) hello->kshm_nips; i++) {
689                 if (conn->ksnc_flip)
690                         __swab32s(&hello->kshm_ips[i]);
691
692                 if (hello->kshm_ips[i] == 0) {
693                         CERROR("Zero IP[%d] from ip %pI4h\n",
694                                i, &conn->ksnc_ipaddr);
695                         return -EPROTO;
696                 }
697         }
698
699         return 0;
700 }
701
702 static void
703 ksocknal_pack_msg_v1(struct ksock_tx *tx)
704 {
705         /* V1.x has no KSOCK_MSG_NOOP */
706         LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
707         LASSERT(tx->tx_lnetmsg != NULL);
708
709         tx->tx_iov[0].iov_base = (void *)&tx->tx_lnetmsg->msg_hdr;
710         tx->tx_iov[0].iov_len  = sizeof(struct lnet_hdr);
711
712         tx->tx_nob = tx->tx_lnetmsg->msg_len + sizeof(struct lnet_hdr);
713         tx->tx_resid = tx->tx_nob;
714 }
715
716 static void
717 ksocknal_pack_msg_v2(struct ksock_tx *tx)
718 {
719         tx->tx_iov[0].iov_base = (void *)&tx->tx_msg;
720
721         if (tx->tx_lnetmsg != NULL) {
722                 LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
723
724                 tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = tx->tx_lnetmsg->msg_hdr;
725                 tx->tx_iov[0].iov_len = sizeof(struct ksock_msg);
726                 tx->tx_resid = tx->tx_nob = sizeof(struct ksock_msg) + tx->tx_lnetmsg->msg_len;
727         } else {
728                 LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
729
730                 tx->tx_iov[0].iov_len = offsetof(struct ksock_msg, ksm_u.lnetmsg.ksnm_hdr);
731                 tx->tx_resid = tx->tx_nob = offsetof(struct ksock_msg,  ksm_u.lnetmsg.ksnm_hdr);
732         }
733         /* Don't checksum before start sending, because packet can be piggybacked with ACK */
734 }
735
736 static void
737 ksocknal_unpack_msg_v1(struct ksock_msg *msg)
738 {
739         msg->ksm_csum           = 0;
740         msg->ksm_type           = KSOCK_MSG_LNET;
741         msg->ksm_zc_cookies[0]  = msg->ksm_zc_cookies[1]  = 0;
742 }
743
744 static void
745 ksocknal_unpack_msg_v2(struct ksock_msg *msg)
746 {
747         return;  /* Do nothing */
748 }
749
750 const struct ksock_proto ksocknal_protocol_v1x =
751 {
752         .pro_version            = KSOCK_PROTO_V1,
753         .pro_send_hello         = ksocknal_send_hello_v1,
754         .pro_recv_hello         = ksocknal_recv_hello_v1,
755         .pro_pack               = ksocknal_pack_msg_v1,
756         .pro_unpack             = ksocknal_unpack_msg_v1,
757         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v1,
758         .pro_handle_zcreq       = NULL,
759         .pro_handle_zcack       = NULL,
760         .pro_queue_tx_zcack     = NULL,
761         .pro_match_tx           = ksocknal_match_tx
762 };
763
764 const struct ksock_proto ksocknal_protocol_v2x =
765 {
766         .pro_version            = KSOCK_PROTO_V2,
767         .pro_send_hello         = ksocknal_send_hello_v2,
768         .pro_recv_hello         = ksocknal_recv_hello_v2,
769         .pro_pack               = ksocknal_pack_msg_v2,
770         .pro_unpack             = ksocknal_unpack_msg_v2,
771         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v2,
772         .pro_queue_tx_zcack     = ksocknal_queue_tx_zcack_v2,
773         .pro_handle_zcreq       = ksocknal_handle_zcreq,
774         .pro_handle_zcack       = ksocknal_handle_zcack,
775         .pro_match_tx           = ksocknal_match_tx
776 };
777
778 const struct ksock_proto ksocknal_protocol_v3x =
779 {
780         .pro_version            = KSOCK_PROTO_V3,
781         .pro_send_hello         = ksocknal_send_hello_v2,
782         .pro_recv_hello         = ksocknal_recv_hello_v2,
783         .pro_pack               = ksocknal_pack_msg_v2,
784         .pro_unpack             = ksocknal_unpack_msg_v2,
785         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v2,
786         .pro_queue_tx_zcack     = ksocknal_queue_tx_zcack_v3,
787         .pro_handle_zcreq       = ksocknal_handle_zcreq,
788         .pro_handle_zcack       = ksocknal_handle_zcack,
789         .pro_match_tx           = ksocknal_match_tx_v3
790 };
791