Whamcloud - gitweb
49267c0855b7d2b5aad389723ed5693e62b953a3
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_proto.c
1 /*
2  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  * Copyright (c) 2012, 2017, Intel Corporation.
5  *
6  *   Author: Zach Brown <zab@zabbo.net>
7  *   Author: Peter J. Braam <braam@clusterfs.com>
8  *   Author: Phil Schwan <phil@clusterfs.com>
9  *   Author: Eric Barton <eric@bartonsoftware.com>
10  *
11  *   This file is part of Lustre, https://wiki.whamcloud.com/
12  *
13  *   Portals is free software; you can redistribute it and/or
14  *   modify it under the terms of version 2 of the GNU General Public
15  *   License as published by the Free Software Foundation.
16  *
17  *   Portals is distributed in the hope that it will be useful,
18  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
19  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  *   GNU General Public License for more details.
21  *
22  *   You should have received a copy of the GNU General Public License
23  *   along with Portals; if not, write to the Free Software
24  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25  */
26
27 #include "socklnd.h"
28
29 /*
30  * Protocol entries :
31  *   pro_send_hello       : send hello message
32  *   pro_recv_hello       : receive hello message
33  *   pro_pack             : pack message header
34  *   pro_unpack           : unpack message header
35  *   pro_queue_tx_zcack() : Called holding BH lock: kss_lock
36  *                          return 1 if ACK is piggybacked, otherwise return 0
37  *   pro_queue_tx_msg()   : Called holding BH lock: kss_lock
38  *                          return the ACK that piggybacked by my message, or NULL
39  *   pro_handle_zcreq()   : handler of incoming ZC-REQ
40  *   pro_handle_zcack()   : handler of incoming ZC-ACK
41  *   pro_match_tx()       : Called holding glock
42  */
43
44 static struct ksock_tx *
45 ksocknal_queue_tx_msg_v1(struct ksock_conn *conn, struct ksock_tx *tx_msg)
46 {
47         /* V1.x, just enqueue it */
48         list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
49         return NULL;
50 }
51
52 void
53 ksocknal_next_tx_carrier(struct ksock_conn *conn)
54 {
55         struct ksock_tx *tx = conn->ksnc_tx_carrier;
56
57         /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */
58         LASSERT(!list_empty(&conn->ksnc_tx_queue));
59         LASSERT(tx != NULL);
60
61         /* Next TX that can carry ZC-ACK or LNet message */
62         if (tx->tx_list.next == &conn->ksnc_tx_queue) {
63                 /* no more packets queued */
64                 conn->ksnc_tx_carrier = NULL;
65         } else {
66                 conn->ksnc_tx_carrier = list_entry(tx->tx_list.next,
67                                                    struct ksock_tx, tx_list);
68                 LASSERT(conn->ksnc_tx_carrier->tx_msg.ksm_type ==
69                         tx->tx_msg.ksm_type);
70         }
71 }
72
73 static int
74 ksocknal_queue_tx_zcack_v2(struct ksock_conn *conn,
75                            struct ksock_tx *tx_ack, __u64 cookie)
76 {
77         struct ksock_tx *tx = conn->ksnc_tx_carrier;
78
79         LASSERT (tx_ack == NULL ||
80                  tx_ack->tx_msg.ksm_type == KSOCK_MSG_NOOP);
81
82         /*
83          * Enqueue or piggyback tx_ack / cookie
84          * . no tx can piggyback cookie of tx_ack (or cookie), just
85          *   enqueue the tx_ack (if tx_ack != NUL) and return NULL.
86          * . There is tx can piggyback cookie of tx_ack (or cookie),
87          *   piggyback the cookie and return the tx.
88          */
89         if (tx == NULL) {
90                 if (tx_ack != NULL) {
91                         list_add_tail(&tx_ack->tx_list,
92                                           &conn->ksnc_tx_queue);
93                         conn->ksnc_tx_carrier = tx_ack;
94                 }
95                 return 0;
96         }
97
98         if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) {
99                 /* tx is noop zc-ack, can't piggyback zc-ack cookie */
100                 if (tx_ack != NULL)
101                         list_add_tail(&tx_ack->tx_list,
102                                           &conn->ksnc_tx_queue);
103                 return 0;
104         }
105
106         LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET);
107         LASSERT(tx->tx_msg.ksm_zc_cookies[1] == 0);
108
109         if (tx_ack != NULL)
110                 cookie = tx_ack->tx_msg.ksm_zc_cookies[1];
111
112         /* piggyback the zc-ack cookie */
113         tx->tx_msg.ksm_zc_cookies[1] = cookie;
114         /* move on to the next TX which can carry cookie */
115         ksocknal_next_tx_carrier(conn);
116
117         return 1;
118 }
119
120 static struct ksock_tx *
121 ksocknal_queue_tx_msg_v2(struct ksock_conn *conn, struct ksock_tx *tx_msg)
122 {
123         struct ksock_tx  *tx  = conn->ksnc_tx_carrier;
124
125         /*
126          * Enqueue tx_msg:
127          * . If there is no NOOP on the connection, just enqueue
128          *   tx_msg and return NULL
129          * . If there is NOOP on the connection, piggyback the cookie
130          *   and replace the NOOP tx, and return the NOOP tx.
131          */
132         if (tx == NULL) { /* nothing on queue */
133                 list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
134                 conn->ksnc_tx_carrier = tx_msg;
135                 return NULL;
136         }
137
138         if (tx->tx_msg.ksm_type == KSOCK_MSG_LNET) { /* nothing to carry */
139                 list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
140                 return NULL;
141         }
142
143         LASSERT (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
144
145         /* There is a noop zc-ack can be piggybacked */
146         tx_msg->tx_msg.ksm_zc_cookies[1] = tx->tx_msg.ksm_zc_cookies[1];
147         ksocknal_next_tx_carrier(conn);
148
149         /* use new_tx to replace the noop zc-ack packet */
150         list_splice(&tx->tx_list, &tx_msg->tx_list);
151
152         return tx;
153 }
154
155 static int
156 ksocknal_queue_tx_zcack_v3(struct ksock_conn *conn,
157                            struct ksock_tx *tx_ack, __u64 cookie)
158 {
159         struct ksock_tx *tx;
160
161         if (conn->ksnc_type != SOCKLND_CONN_ACK)
162                 return ksocknal_queue_tx_zcack_v2(conn, tx_ack, cookie);
163
164         /* non-blocking ZC-ACK (to router) */
165         LASSERT (tx_ack == NULL ||
166                  tx_ack->tx_msg.ksm_type == KSOCK_MSG_NOOP);
167
168         if ((tx = conn->ksnc_tx_carrier) == NULL) {
169                 if (tx_ack != NULL) {
170                         list_add_tail(&tx_ack->tx_list,
171                                           &conn->ksnc_tx_queue);
172                         conn->ksnc_tx_carrier = tx_ack;
173                 }
174                 return 0;
175         }
176
177         /* conn->ksnc_tx_carrier != NULL */
178
179         if (tx_ack != NULL)
180                 cookie = tx_ack->tx_msg.ksm_zc_cookies[1];
181
182         if (cookie == SOCKNAL_KEEPALIVE_PING) /* ignore keepalive PING */
183                 return 1;
184
185         if (tx->tx_msg.ksm_zc_cookies[1] == SOCKNAL_KEEPALIVE_PING) {
186                 /* replace the keepalive PING with a real ACK */
187                 LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0);
188                 tx->tx_msg.ksm_zc_cookies[1] = cookie;
189                 return 1;
190         }
191
192         if (cookie == tx->tx_msg.ksm_zc_cookies[0] ||
193             cookie == tx->tx_msg.ksm_zc_cookies[1]) {
194                 CWARN("%s: duplicated ZC cookie: %llu\n",
195                       libcfs_id2str(conn->ksnc_peer->ksnp_id), cookie);
196                 return 1; /* XXX return error in the future */
197         }
198
199         if (tx->tx_msg.ksm_zc_cookies[0] == 0) {
200                 /* NOOP tx has only one ZC-ACK cookie, can carry at least one more */
201                 if (tx->tx_msg.ksm_zc_cookies[1] > cookie) {
202                         tx->tx_msg.ksm_zc_cookies[0] = tx->tx_msg.ksm_zc_cookies[1];
203                         tx->tx_msg.ksm_zc_cookies[1] = cookie;
204                 } else {
205                         tx->tx_msg.ksm_zc_cookies[0] = cookie;
206                 }
207
208                 if (tx->tx_msg.ksm_zc_cookies[0] - tx->tx_msg.ksm_zc_cookies[1] > 2) {
209                         /* not likely to carry more ACKs, skip it to simplify logic */
210                         ksocknal_next_tx_carrier(conn);
211                 }
212
213                 return 1;
214         }
215
216         /* takes two or more cookies already */
217
218         if (tx->tx_msg.ksm_zc_cookies[0] > tx->tx_msg.ksm_zc_cookies[1]) {
219                 __u64   tmp = 0;
220
221                 /* two separated cookies: (a+2, a) or (a+1, a) */
222                 LASSERT (tx->tx_msg.ksm_zc_cookies[0] -
223                          tx->tx_msg.ksm_zc_cookies[1] <= 2);
224
225                 if (tx->tx_msg.ksm_zc_cookies[0] -
226                     tx->tx_msg.ksm_zc_cookies[1] == 2) {
227                         if (cookie == tx->tx_msg.ksm_zc_cookies[1] + 1)
228                                 tmp = cookie;
229                 } else if (cookie == tx->tx_msg.ksm_zc_cookies[1] - 1) {
230                         tmp = tx->tx_msg.ksm_zc_cookies[1];
231                 } else if (cookie == tx->tx_msg.ksm_zc_cookies[0] + 1) {
232                         tmp = tx->tx_msg.ksm_zc_cookies[0];
233                 }
234
235                 if (tmp != 0) {
236                         /* range of cookies */
237                         tx->tx_msg.ksm_zc_cookies[0] = tmp - 1;
238                         tx->tx_msg.ksm_zc_cookies[1] = tmp + 1;
239                         return 1;
240                 }
241
242         } else {
243                 /* ksm_zc_cookies[0] < ksm_zc_cookies[1], it is range of cookies */
244                 if (cookie >= tx->tx_msg.ksm_zc_cookies[0] &&
245                     cookie <= tx->tx_msg.ksm_zc_cookies[1]) {
246                         CWARN("%s: duplicated ZC cookie: %llu\n",
247                               libcfs_id2str(conn->ksnc_peer->ksnp_id), cookie);
248                         return 1; /* XXX: return error in the future */
249                 }
250
251                 if (cookie == tx->tx_msg.ksm_zc_cookies[1] + 1) {
252                         tx->tx_msg.ksm_zc_cookies[1] = cookie;
253                         return 1;
254                 }
255
256                 if (cookie == tx->tx_msg.ksm_zc_cookies[0] - 1) {
257                         tx->tx_msg.ksm_zc_cookies[0] = cookie;
258                         return 1;
259                 }
260         }
261
262         /* failed to piggyback ZC-ACK */
263         if (tx_ack != NULL) {
264                 list_add_tail(&tx_ack->tx_list, &conn->ksnc_tx_queue);
265                 /* the next tx can piggyback at least 1 ACK */
266                 ksocknal_next_tx_carrier(conn);
267         }
268
269         return 0;
270 }
271
272 static int
273 ksocknal_match_tx(struct ksock_conn *conn, struct ksock_tx *tx, int nonblk)
274 {
275         int nob;
276
277 #if SOCKNAL_VERSION_DEBUG
278         if (!*ksocknal_tunables.ksnd_typed_conns)
279                 return SOCKNAL_MATCH_YES;
280 #endif
281
282         if (tx == NULL || tx->tx_lnetmsg == NULL) {
283                 /* noop packet */
284                 nob = offsetof(struct ksock_msg, ksm_u);
285         } else {
286                 nob = tx->tx_lnetmsg->msg_len +
287                       ((conn->ksnc_proto == &ksocknal_protocol_v1x) ?
288                        sizeof(struct lnet_hdr) : sizeof(struct ksock_msg));
289         }
290
291         /* default checking for typed connection */
292         switch (conn->ksnc_type) {
293         default:
294                 CERROR("ksnc_type bad: %u\n", conn->ksnc_type);
295                 LBUG();
296         case SOCKLND_CONN_ANY:
297                 return SOCKNAL_MATCH_YES;
298
299         case SOCKLND_CONN_BULK_IN:
300                 return SOCKNAL_MATCH_MAY;
301
302         case SOCKLND_CONN_BULK_OUT:
303                 if (nob < *ksocknal_tunables.ksnd_min_bulk)
304                         return SOCKNAL_MATCH_MAY;
305                 else
306                         return SOCKNAL_MATCH_YES;
307
308         case SOCKLND_CONN_CONTROL:
309                 if (nob >= *ksocknal_tunables.ksnd_min_bulk)
310                         return SOCKNAL_MATCH_MAY;
311                 else
312                         return SOCKNAL_MATCH_YES;
313         }
314 }
315
316 static int
317 ksocknal_match_tx_v3(struct ksock_conn *conn, struct ksock_tx *tx, int nonblk)
318 {
319         int nob;
320
321         if (tx == NULL || tx->tx_lnetmsg == NULL)
322                 nob = offsetof(struct ksock_msg, ksm_u);
323         else
324                 nob = tx->tx_lnetmsg->msg_len + sizeof(struct ksock_msg);
325
326         switch (conn->ksnc_type) {
327         default:
328                 CERROR("ksnc_type bad: %u\n", conn->ksnc_type);
329                 LBUG();
330         case SOCKLND_CONN_ANY:
331                 return SOCKNAL_MATCH_NO;
332
333         case SOCKLND_CONN_ACK:
334                 if (nonblk)
335                         return SOCKNAL_MATCH_YES;
336                 else if (tx == NULL || tx->tx_lnetmsg == NULL)
337                         return SOCKNAL_MATCH_MAY;
338                 else
339                         return SOCKNAL_MATCH_NO;
340
341         case SOCKLND_CONN_BULK_OUT:
342                 if (nonblk)
343                         return SOCKNAL_MATCH_NO;
344                 else if (nob < *ksocknal_tunables.ksnd_min_bulk)
345                         return SOCKNAL_MATCH_MAY;
346                 else
347                         return SOCKNAL_MATCH_YES;
348
349         case SOCKLND_CONN_CONTROL:
350                 if (nonblk)
351                         return SOCKNAL_MATCH_NO;
352                 else if (nob >= *ksocknal_tunables.ksnd_min_bulk)
353                         return SOCKNAL_MATCH_MAY;
354                 else
355                         return SOCKNAL_MATCH_YES;
356         }
357 }
358
359 /* (Sink) handle incoming ZC request from sender */
360 static int
361 ksocknal_handle_zcreq(struct ksock_conn *c, __u64 cookie, int remote)
362 {
363         struct ksock_peer_ni *peer_ni = c->ksnc_peer;
364         struct ksock_conn *conn;
365         struct ksock_tx *tx;
366         int rc;
367
368         read_lock(&ksocknal_data.ksnd_global_lock);
369
370         conn = ksocknal_find_conn_locked(peer_ni, NULL, !!remote);
371         if (conn != NULL) {
372                 struct ksock_sched *sched = conn->ksnc_scheduler;
373
374                 LASSERT(conn->ksnc_proto->pro_queue_tx_zcack != NULL);
375
376                 spin_lock_bh(&sched->kss_lock);
377
378                 rc = conn->ksnc_proto->pro_queue_tx_zcack(conn, NULL, cookie);
379
380                 spin_unlock_bh(&sched->kss_lock);
381
382                 if (rc) { /* piggybacked */
383                         read_unlock(&ksocknal_data.ksnd_global_lock);
384                         return 0;
385                 }
386         }
387
388         read_unlock(&ksocknal_data.ksnd_global_lock);
389
390         /* ACK connection is not ready, or can't piggyback the ACK */
391         tx = ksocknal_alloc_tx_noop(cookie, !!remote);
392         if (tx == NULL)
393                 return -ENOMEM;
394
395         if ((rc = ksocknal_launch_packet(peer_ni->ksnp_ni, tx, peer_ni->ksnp_id)) == 0)
396                 return 0;
397
398         ksocknal_free_tx(tx);
399         return rc;
400 }
401
402 /* (Sender) handle ZC_ACK from sink */
403 static int
404 ksocknal_handle_zcack(struct ksock_conn *conn, __u64 cookie1, __u64 cookie2)
405 {
406         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
407         struct ksock_tx *tx;
408         struct ksock_tx *tmp;
409         LIST_HEAD(zlist);
410         int count;
411
412         if (cookie1 == 0)
413                 cookie1 = cookie2;
414
415         count = (cookie1 > cookie2) ? 2 : (cookie2 - cookie1 + 1);
416
417         if (cookie2 == SOCKNAL_KEEPALIVE_PING &&
418             conn->ksnc_proto == &ksocknal_protocol_v3x) {
419                 /* keepalive PING for V3.x, just ignore it */
420                 return count == 1 ? 0 : -EPROTO;
421         }
422
423         spin_lock(&peer_ni->ksnp_lock);
424
425         list_for_each_entry_safe(tx, tmp,
426                                      &peer_ni->ksnp_zc_req_list, tx_zc_list) {
427                 __u64 c = tx->tx_msg.ksm_zc_cookies[0];
428
429                 if (c == cookie1 || c == cookie2 || (cookie1 < c && c < cookie2)) {
430                         tx->tx_msg.ksm_zc_cookies[0] = 0;
431                         list_move(&tx->tx_zc_list, &zlist);
432
433                         if (--count == 0)
434                                 break;
435                 }
436         }
437
438         spin_unlock(&peer_ni->ksnp_lock);
439
440         while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
441                                               tx_zc_list)) != NULL) {
442                 list_del(&tx->tx_zc_list);
443                 ksocknal_tx_decref(tx);
444         }
445
446         return count == 0 ? 0 : -EPROTO;
447 }
448
449 static int
450 ksocknal_send_hello_v1(struct ksock_conn *conn, struct ksock_hello_msg *hello)
451 {
452         struct socket *sock = conn->ksnc_sock;
453         struct lnet_hdr *hdr;
454         struct lnet_magicversion *hmv;
455         int rc;
456         int i;
457
458         BUILD_BUG_ON(sizeof(struct lnet_magicversion) !=
459                      offsetof(struct lnet_hdr, src_nid));
460
461         LIBCFS_ALLOC(hdr, sizeof(*hdr));
462         if (hdr == NULL) {
463                 CERROR("Can't allocate struct lnet_hdr\n");
464                 return -ENOMEM;
465         }
466
467         hmv = (struct lnet_magicversion *)&hdr->dest_nid;
468
469         /* Re-organize V2.x message header to V1.x (struct lnet_hdr)
470          * header and send out */
471         hmv->magic         = cpu_to_le32 (LNET_PROTO_TCP_MAGIC);
472         hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR);
473         hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR);
474
475         if (the_lnet.ln_testprotocompat) {
476                 /* single-shot proto check */
477                 if (test_and_clear_bit(0, &the_lnet.ln_testprotocompat))
478                         hmv->version_major++;   /* just different! */
479
480                 if (test_and_clear_bit(1, &the_lnet.ln_testprotocompat))
481                         hmv->magic = LNET_PROTO_MAGIC;
482         }
483
484         hdr->src_nid        = cpu_to_le64 (hello->kshm_src_nid);
485         hdr->src_pid        = cpu_to_le32 (hello->kshm_src_pid);
486         hdr->type           = cpu_to_le32 (LNET_MSG_HELLO);
487         hdr->payload_length = cpu_to_le32 (hello->kshm_nips * sizeof(__u32));
488         hdr->msg.hello.type = cpu_to_le32 (hello->kshm_ctype);
489         hdr->msg.hello.incarnation = cpu_to_le64 (hello->kshm_src_incarnation);
490
491         rc = lnet_sock_write(sock, hdr, sizeof(*hdr), lnet_acceptor_timeout());
492         if (rc != 0) {
493                 CNETERR("Error %d sending HELLO hdr to %pISp\n",
494                         rc, &conn->ksnc_peeraddr);
495                 goto out;
496         }
497
498         if (hello->kshm_nips == 0)
499                 goto out;
500
501         for (i = 0; i < (int) hello->kshm_nips; i++) {
502                 hello->kshm_ips[i] = __cpu_to_le32 (hello->kshm_ips[i]);
503         }
504
505         rc = lnet_sock_write(sock, hello->kshm_ips,
506                              hello->kshm_nips * sizeof(__u32),
507                              lnet_acceptor_timeout());
508         if (rc != 0) {
509                 CNETERR("Error %d sending HELLO payload (%d) to %pISp\n",
510                         rc, hello->kshm_nips,
511                         &conn->ksnc_peeraddr);
512         }
513 out:
514         LIBCFS_FREE(hdr, sizeof(*hdr));
515
516         return rc;
517 }
518
519 static int
520 ksocknal_send_hello_v2(struct ksock_conn *conn, struct ksock_hello_msg *hello)
521 {
522         struct socket *sock = conn->ksnc_sock;
523         int rc;
524
525         hello->kshm_magic   = LNET_PROTO_MAGIC;
526         hello->kshm_version = conn->ksnc_proto->pro_version;
527
528         if (the_lnet.ln_testprotocompat) {
529                 /* single-shot proto check */
530                 if (test_and_clear_bit(0, &the_lnet.ln_testprotocompat))
531                         hello->kshm_version++;   /* just different! */
532         }
533
534         rc = lnet_sock_write(sock, hello, offsetof(struct ksock_hello_msg, kshm_ips),
535                                lnet_acceptor_timeout());
536
537         if (rc != 0) {
538                 CNETERR("Error %d sending HELLO hdr to %pISp\n",
539                         rc, &conn->ksnc_peeraddr);
540                 return rc;
541         }
542
543         if (hello->kshm_nips == 0)
544                 return 0;
545
546         rc = lnet_sock_write(sock, hello->kshm_ips,
547                              hello->kshm_nips * sizeof(__u32),
548                              lnet_acceptor_timeout());
549         if (rc != 0) {
550                 CNETERR("Error %d sending HELLO payload (%d) to %pISp\n", rc,
551                         hello->kshm_nips,
552                         &conn->ksnc_peeraddr);
553         }
554
555         return rc;
556 }
557
558 static int
559 ksocknal_recv_hello_v1(struct ksock_conn *conn, struct ksock_hello_msg *hello,
560                        int timeout)
561 {
562         struct socket *sock = conn->ksnc_sock;
563         struct lnet_hdr *hdr;
564         int rc;
565         int i;
566
567         LIBCFS_ALLOC(hdr, sizeof(*hdr));
568         if (hdr == NULL) {
569                 CERROR("Can't allocate struct lnet_hdr\n");
570                 return -ENOMEM;
571         }
572
573         rc = lnet_sock_read(sock, &hdr->src_nid,
574                               sizeof(*hdr) - offsetof(struct lnet_hdr, src_nid),
575                               timeout);
576         if (rc != 0) {
577                 CERROR("Error %d reading rest of HELLO hdr from %pIS\n",
578                        rc, &conn->ksnc_peeraddr);
579                 LASSERT(rc < 0 && rc != -EALREADY);
580                 goto out;
581         }
582
583         /* ...and check we got what we expected */
584         if (hdr->type != cpu_to_le32 (LNET_MSG_HELLO)) {
585                 CERROR("Expecting a HELLO hdr, but got type %d from %pIS\n",
586                        le32_to_cpu(hdr->type),
587                        &conn->ksnc_peeraddr);
588                 rc = -EPROTO;
589                 goto out;
590         }
591
592         hello->kshm_src_nid         = le64_to_cpu (hdr->src_nid);
593         hello->kshm_src_pid         = le32_to_cpu (hdr->src_pid);
594         hello->kshm_src_incarnation = le64_to_cpu (hdr->msg.hello.incarnation);
595         hello->kshm_ctype           = le32_to_cpu (hdr->msg.hello.type);
596         hello->kshm_nips            = le32_to_cpu (hdr->payload_length) /
597                                          sizeof (__u32);
598
599         if (hello->kshm_nips > LNET_INTERFACES_NUM) {
600                 CERROR("Bad nips %d from ip %pIS\n",
601                        hello->kshm_nips, &conn->ksnc_peeraddr);
602                 rc = -EPROTO;
603                 goto out;
604         }
605
606         if (hello->kshm_nips == 0)
607                 goto out;
608
609         rc = lnet_sock_read(sock, hello->kshm_ips,
610                               hello->kshm_nips * sizeof(__u32), timeout);
611         if (rc != 0) {
612                 CERROR("Error %d reading IPs from ip %pIS\n",
613                        rc, &conn->ksnc_peeraddr);
614                 LASSERT(rc < 0 && rc != -EALREADY);
615                 goto out;
616         }
617
618         for (i = 0; i < (int) hello->kshm_nips; i++) {
619                 hello->kshm_ips[i] = __le32_to_cpu(hello->kshm_ips[i]);
620
621                 if (hello->kshm_ips[i] == 0) {
622                         CERROR("Zero IP[%d] from ip %pIS\n",
623                                i, &conn->ksnc_peeraddr);
624                         rc = -EPROTO;
625                         break;
626                 }
627         }
628 out:
629         LIBCFS_FREE(hdr, sizeof(*hdr));
630
631         return rc;
632 }
633
634 static int
635 ksocknal_recv_hello_v2(struct ksock_conn *conn, struct ksock_hello_msg *hello,
636                        int timeout)
637 {
638         struct socket     *sock = conn->ksnc_sock;
639         int                rc;
640         int                i;
641
642         if (hello->kshm_magic == LNET_PROTO_MAGIC)
643                 conn->ksnc_flip = 0;
644         else
645                 conn->ksnc_flip = 1;
646
647         rc = lnet_sock_read(sock, &hello->kshm_src_nid,
648                               offsetof(struct ksock_hello_msg, kshm_ips) -
649                                        offsetof(struct ksock_hello_msg, kshm_src_nid),
650                               timeout);
651         if (rc != 0) {
652                 CERROR("Error %d reading HELLO from %pIS\n",
653                        rc, &conn->ksnc_peeraddr);
654                 LASSERT(rc < 0 && rc != -EALREADY);
655                 return rc;
656         }
657
658         if (conn->ksnc_flip) {
659                 __swab32s(&hello->kshm_src_pid);
660                 __swab64s(&hello->kshm_src_nid);
661                 __swab32s(&hello->kshm_dst_pid);
662                 __swab64s(&hello->kshm_dst_nid);
663                 __swab64s(&hello->kshm_src_incarnation);
664                 __swab64s(&hello->kshm_dst_incarnation);
665                 __swab32s(&hello->kshm_ctype);
666                 __swab32s(&hello->kshm_nips);
667         }
668
669         if (hello->kshm_nips > LNET_INTERFACES_NUM) {
670                 CERROR("Bad nips %d from ip %pIS\n",
671                        hello->kshm_nips, &conn->ksnc_peeraddr);
672                 return -EPROTO;
673         }
674
675         if (hello->kshm_nips == 0)
676                 return 0;
677
678         rc = lnet_sock_read(sock, hello->kshm_ips,
679                             hello->kshm_nips * sizeof(__u32), timeout);
680         if (rc != 0) {
681                 CERROR("Error %d reading IPs from ip %pIS\n",
682                        rc, &conn->ksnc_peeraddr);
683                 LASSERT(rc < 0 && rc != -EALREADY);
684                 return rc;
685         }
686
687         for (i = 0; i < (int) hello->kshm_nips; i++) {
688                 if (conn->ksnc_flip)
689                         __swab32s(&hello->kshm_ips[i]);
690
691                 if (hello->kshm_ips[i] == 0) {
692                         CERROR("Zero IP[%d] from ip %pIS\n",
693                                i, &conn->ksnc_peeraddr);
694                         return -EPROTO;
695                 }
696         }
697
698         return 0;
699 }
700
701 static void
702 ksocknal_pack_msg_v1(struct ksock_tx *tx)
703 {
704         /* V1.x has no KSOCK_MSG_NOOP */
705         LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
706         LASSERT(tx->tx_lnetmsg != NULL);
707
708         tx->tx_hdr.iov_base = (void *)&tx->tx_lnetmsg->msg_hdr;
709         tx->tx_hdr.iov_len  = sizeof(struct lnet_hdr);
710
711         tx->tx_nob = tx->tx_lnetmsg->msg_len + sizeof(struct lnet_hdr);
712         tx->tx_resid = tx->tx_nob;
713 }
714
715 static void
716 ksocknal_pack_msg_v2(struct ksock_tx *tx)
717 {
718         tx->tx_hdr.iov_base = (void *)&tx->tx_msg;
719
720         if (tx->tx_lnetmsg != NULL) {
721                 LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
722
723                 tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = tx->tx_lnetmsg->msg_hdr;
724                 tx->tx_hdr.iov_len = sizeof(struct ksock_msg);
725                 tx->tx_resid = tx->tx_nob = sizeof(struct ksock_msg) + tx->tx_lnetmsg->msg_len;
726         } else {
727                 LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
728
729                 tx->tx_hdr.iov_len = offsetof(struct ksock_msg,
730                                               ksm_u.lnetmsg.ksnm_hdr);
731                 tx->tx_resid = tx->tx_nob = offsetof(struct ksock_msg,  ksm_u.lnetmsg.ksnm_hdr);
732         }
733         /* Don't checksum before start sending, because packet can be piggybacked with ACK */
734 }
735
736 static void
737 ksocknal_unpack_msg_v1(struct ksock_msg *msg)
738 {
739         msg->ksm_csum           = 0;
740         msg->ksm_type           = KSOCK_MSG_LNET;
741         msg->ksm_zc_cookies[0]  = msg->ksm_zc_cookies[1]  = 0;
742 }
743
744 static void
745 ksocknal_unpack_msg_v2(struct ksock_msg *msg)
746 {
747         return;  /* Do nothing */
748 }
749
750 const struct ksock_proto ksocknal_protocol_v1x =
751 {
752         .pro_version            = KSOCK_PROTO_V1,
753         .pro_send_hello         = ksocknal_send_hello_v1,
754         .pro_recv_hello         = ksocknal_recv_hello_v1,
755         .pro_pack               = ksocknal_pack_msg_v1,
756         .pro_unpack             = ksocknal_unpack_msg_v1,
757         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v1,
758         .pro_handle_zcreq       = NULL,
759         .pro_handle_zcack       = NULL,
760         .pro_queue_tx_zcack     = NULL,
761         .pro_match_tx           = ksocknal_match_tx
762 };
763
764 const struct ksock_proto ksocknal_protocol_v2x =
765 {
766         .pro_version            = KSOCK_PROTO_V2,
767         .pro_send_hello         = ksocknal_send_hello_v2,
768         .pro_recv_hello         = ksocknal_recv_hello_v2,
769         .pro_pack               = ksocknal_pack_msg_v2,
770         .pro_unpack             = ksocknal_unpack_msg_v2,
771         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v2,
772         .pro_queue_tx_zcack     = ksocknal_queue_tx_zcack_v2,
773         .pro_handle_zcreq       = ksocknal_handle_zcreq,
774         .pro_handle_zcack       = ksocknal_handle_zcack,
775         .pro_match_tx           = ksocknal_match_tx
776 };
777
778 const struct ksock_proto ksocknal_protocol_v3x =
779 {
780         .pro_version            = KSOCK_PROTO_V3,
781         .pro_send_hello         = ksocknal_send_hello_v2,
782         .pro_recv_hello         = ksocknal_recv_hello_v2,
783         .pro_pack               = ksocknal_pack_msg_v2,
784         .pro_unpack             = ksocknal_unpack_msg_v2,
785         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v2,
786         .pro_queue_tx_zcack     = ksocknal_queue_tx_zcack_v3,
787         .pro_handle_zcreq       = ksocknal_handle_zcreq,
788         .pro_handle_zcack       = ksocknal_handle_zcack,
789         .pro_match_tx           = ksocknal_match_tx_v3
790 };
791