Whamcloud - gitweb
LU-936 Remove LUSTRE_KERNEL_VERSION
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_proto.c
1 /*
2  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
3  *
4  *   Author: Zach Brown <zab@zabbo.net>
5  *   Author: Peter J. Braam <braam@clusterfs.com>
6  *   Author: Phil Schwan <phil@clusterfs.com>
7  *   Author: Eric Barton <eric@bartonsoftware.com>
8  *
9  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
10  *
11  *   Portals is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Portals is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Portals; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #include "socklnd.h"
26
27 /*
28  * Protocol entries :
29  *   pro_send_hello       : send hello message
30  *   pro_recv_hello       : receive hello message
31  *   pro_pack             : pack message header
32  *   pro_unpack           : unpack message header
33  *   pro_queue_tx_zcack() : Called holding BH lock: kss_lock
34  *                          return 1 if ACK is piggybacked, otherwise return 0
35  *   pro_queue_tx_msg()   : Called holding BH lock: kss_lock
36  *                          return the ACK that piggybacked by my message, or NULL
37  *   pro_handle_zcreq()   : handler of incoming ZC-REQ
38  *   pro_handle_zcack()   : handler of incoming ZC-ACK
39  *   pro_match_tx()       : Called holding glock
40  */
41
42 static ksock_tx_t *
43 ksocknal_queue_tx_msg_v1(ksock_conn_t *conn, ksock_tx_t *tx_msg)
44 {
45         /* V1.x, just enqueue it */
46         cfs_list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
47         return NULL;
48 }
49
50 void
51 ksocknal_next_tx_carrier(ksock_conn_t *conn)
52 {
53         ksock_tx_t     *tx = conn->ksnc_tx_carrier;
54
55         /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */
56         LASSERT (!cfs_list_empty(&conn->ksnc_tx_queue));
57         LASSERT (tx != NULL);
58
59         /* Next TX that can carry ZC-ACK or LNet message */
60         if (tx->tx_list.next == &conn->ksnc_tx_queue) {
61                 /* no more packets queued */
62                 conn->ksnc_tx_carrier = NULL;
63         } else {
64                 conn->ksnc_tx_carrier = cfs_list_entry(tx->tx_list.next,
65                                                        ksock_tx_t, tx_list);
66                 LASSERT (conn->ksnc_tx_carrier->tx_msg.ksm_type == tx->tx_msg.ksm_type);
67         }
68 }
69
70 static int
71 ksocknal_queue_tx_zcack_v2(ksock_conn_t *conn,
72                            ksock_tx_t *tx_ack, __u64 cookie)
73 {
74         ksock_tx_t *tx = conn->ksnc_tx_carrier;
75
76         LASSERT (tx_ack == NULL ||
77                  tx_ack->tx_msg.ksm_type == KSOCK_MSG_NOOP);
78
79         /*
80          * Enqueue or piggyback tx_ack / cookie
81          * . no tx can piggyback cookie of tx_ack (or cookie), just
82          *   enqueue the tx_ack (if tx_ack != NUL) and return NULL.
83          * . There is tx can piggyback cookie of tx_ack (or cookie),
84          *   piggyback the cookie and return the tx.
85          */
86         if (tx == NULL) {
87                 if (tx_ack != NULL) {
88                         cfs_list_add_tail(&tx_ack->tx_list,
89                                           &conn->ksnc_tx_queue);
90                         conn->ksnc_tx_carrier = tx_ack;
91                 }
92                 return 0;
93         }
94
95         if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) {
96                 /* tx is noop zc-ack, can't piggyback zc-ack cookie */
97                 if (tx_ack != NULL)
98                         cfs_list_add_tail(&tx_ack->tx_list,
99                                           &conn->ksnc_tx_queue);
100                 return 0;
101         }
102
103         LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET);
104         LASSERT(tx->tx_msg.ksm_zc_cookies[1] == 0);
105
106         if (tx_ack != NULL)
107                 cookie = tx_ack->tx_msg.ksm_zc_cookies[1];
108
109         /* piggyback the zc-ack cookie */
110         tx->tx_msg.ksm_zc_cookies[1] = cookie;
111         /* move on to the next TX which can carry cookie */
112         ksocknal_next_tx_carrier(conn);
113
114         return 1;
115 }
116
117 static ksock_tx_t *
118 ksocknal_queue_tx_msg_v2(ksock_conn_t *conn, ksock_tx_t *tx_msg)
119 {
120         ksock_tx_t  *tx  = conn->ksnc_tx_carrier;
121
122         /*
123          * Enqueue tx_msg:
124          * . If there is no NOOP on the connection, just enqueue
125          *   tx_msg and return NULL
126          * . If there is NOOP on the connection, piggyback the cookie
127          *   and replace the NOOP tx, and return the NOOP tx.
128          */
129         if (tx == NULL) { /* nothing on queue */
130                 cfs_list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
131                 conn->ksnc_tx_carrier = tx_msg;
132                 return NULL;
133         }
134
135         if (tx->tx_msg.ksm_type == KSOCK_MSG_LNET) { /* nothing to carry */
136                 cfs_list_add_tail(&tx_msg->tx_list, &conn->ksnc_tx_queue);
137                 return NULL;
138         }
139
140         LASSERT (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
141
142         /* There is a noop zc-ack can be piggybacked */
143         tx_msg->tx_msg.ksm_zc_cookies[1] = tx->tx_msg.ksm_zc_cookies[1];
144         ksocknal_next_tx_carrier(conn);
145
146         /* use new_tx to replace the noop zc-ack packet */
147         cfs_list_add(&tx_msg->tx_list, &tx->tx_list);
148         cfs_list_del(&tx->tx_list);
149
150         return tx;
151 }
152
153 static int
154 ksocknal_queue_tx_zcack_v3(ksock_conn_t *conn,
155                            ksock_tx_t *tx_ack, __u64 cookie)
156 {
157         ksock_tx_t *tx;
158
159         if (conn->ksnc_type != SOCKLND_CONN_ACK)
160                 return ksocknal_queue_tx_zcack_v2(conn, tx_ack, cookie);
161
162         /* non-blocking ZC-ACK (to router) */
163         LASSERT (tx_ack == NULL ||
164                  tx_ack->tx_msg.ksm_type == KSOCK_MSG_NOOP);
165
166         if ((tx = conn->ksnc_tx_carrier) == NULL) {
167                 if (tx_ack != NULL) {
168                         cfs_list_add_tail(&tx_ack->tx_list,
169                                           &conn->ksnc_tx_queue);
170                         conn->ksnc_tx_carrier = tx_ack;
171                 }
172                 return 0;
173         }
174
175         /* conn->ksnc_tx_carrier != NULL */
176
177         if (tx_ack != NULL)
178                 cookie = tx_ack->tx_msg.ksm_zc_cookies[1];
179
180         if (cookie == SOCKNAL_KEEPALIVE_PING) /* ignore keepalive PING */
181                 return 1;
182
183         if (tx->tx_msg.ksm_zc_cookies[1] == SOCKNAL_KEEPALIVE_PING) {
184                 /* replace the keepalive PING with a real ACK */
185                 LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0);
186                 tx->tx_msg.ksm_zc_cookies[1] = cookie;
187                 return 1;
188         }
189
190         if (cookie == tx->tx_msg.ksm_zc_cookies[0] ||
191             cookie == tx->tx_msg.ksm_zc_cookies[1]) {
192                 CWARN("%s: duplicated ZC cookie: "LPU64"\n",
193                       libcfs_id2str(conn->ksnc_peer->ksnp_id), cookie);
194                 return 1; /* XXX return error in the future */
195         }
196
197         if (tx->tx_msg.ksm_zc_cookies[0] == 0) {
198                 /* NOOP tx has only one ZC-ACK cookie, can carry at least one more */
199                 if (tx->tx_msg.ksm_zc_cookies[1] > cookie) {
200                         tx->tx_msg.ksm_zc_cookies[0] = tx->tx_msg.ksm_zc_cookies[1];
201                         tx->tx_msg.ksm_zc_cookies[1] = cookie;
202                 } else {
203                         tx->tx_msg.ksm_zc_cookies[0] = cookie;
204                 }
205
206                 if (tx->tx_msg.ksm_zc_cookies[0] - tx->tx_msg.ksm_zc_cookies[1] > 2) {
207                         /* not likely to carry more ACKs, skip it to simplify logic */
208                         ksocknal_next_tx_carrier(conn);
209                 }
210
211                 return 1;
212         }
213
214         /* takes two or more cookies already */
215
216         if (tx->tx_msg.ksm_zc_cookies[0] > tx->tx_msg.ksm_zc_cookies[1]) {
217                 __u64   tmp = 0;
218
219                 /* two seperated cookies: (a+2, a) or (a+1, a) */
220                 LASSERT (tx->tx_msg.ksm_zc_cookies[0] -
221                          tx->tx_msg.ksm_zc_cookies[1] <= 2);
222
223                 if (tx->tx_msg.ksm_zc_cookies[0] -
224                     tx->tx_msg.ksm_zc_cookies[1] == 2) {
225                         if (cookie == tx->tx_msg.ksm_zc_cookies[1] + 1)
226                                 tmp = cookie;
227                 } else if (cookie == tx->tx_msg.ksm_zc_cookies[1] - 1) {
228                         tmp = tx->tx_msg.ksm_zc_cookies[1];
229                 } else if (cookie == tx->tx_msg.ksm_zc_cookies[0] + 1) {
230                         tmp = tx->tx_msg.ksm_zc_cookies[0];
231                 }
232
233                 if (tmp != 0) {
234                         /* range of cookies */
235                         tx->tx_msg.ksm_zc_cookies[0] = tmp - 1;
236                         tx->tx_msg.ksm_zc_cookies[1] = tmp + 1;
237                         return 1;
238                 }
239
240         } else {
241                 /* ksm_zc_cookies[0] < ksm_zc_cookies[1], it is range of cookies */
242                 if (cookie >= tx->tx_msg.ksm_zc_cookies[0] &&
243                     cookie <= tx->tx_msg.ksm_zc_cookies[1]) {
244                         CWARN("%s: duplicated ZC cookie: "LPU64"\n",
245                               libcfs_id2str(conn->ksnc_peer->ksnp_id), cookie);
246                         return 1; /* XXX: return error in the future */
247                 }
248
249                 if (cookie == tx->tx_msg.ksm_zc_cookies[1] + 1) {
250                         tx->tx_msg.ksm_zc_cookies[1] = cookie;
251                         return 1;
252                 }
253
254                 if (cookie == tx->tx_msg.ksm_zc_cookies[0] - 1) {
255                         tx->tx_msg.ksm_zc_cookies[0] = cookie;
256                         return 1;
257                 }
258         }
259
260         /* failed to piggyback ZC-ACK */
261         if (tx_ack != NULL) {
262                 cfs_list_add_tail(&tx_ack->tx_list, &conn->ksnc_tx_queue);
263                 /* the next tx can piggyback at least 1 ACK */
264                 ksocknal_next_tx_carrier(conn);
265         }
266
267         return 0;
268 }
269
270 static int
271 ksocknal_match_tx(ksock_conn_t *conn, ksock_tx_t *tx, int nonblk)
272 {
273         int nob;
274
275 #if SOCKNAL_VERSION_DEBUG
276         if (!*ksocknal_tunables.ksnd_typed_conns)
277                 return SOCKNAL_MATCH_YES;
278 #endif
279
280         if (tx == NULL || tx->tx_lnetmsg == NULL) {
281                 /* noop packet */
282                 nob = offsetof(ksock_msg_t, ksm_u);
283         } else {
284                 nob = tx->tx_lnetmsg->msg_len +
285                       ((conn->ksnc_proto == &ksocknal_protocol_v1x) ?
286                        sizeof(lnet_hdr_t) : sizeof(ksock_msg_t));
287         }
288
289         /* default checking for typed connection */
290         switch (conn->ksnc_type) {
291         default:
292                 CERROR("ksnc_type bad: %u\n", conn->ksnc_type);
293                 LBUG();
294         case SOCKLND_CONN_ANY:
295                 return SOCKNAL_MATCH_YES;
296
297         case SOCKLND_CONN_BULK_IN:
298                 return SOCKNAL_MATCH_MAY;
299
300         case SOCKLND_CONN_BULK_OUT:
301                 if (nob < *ksocknal_tunables.ksnd_min_bulk)
302                         return SOCKNAL_MATCH_MAY;
303                 else
304                         return SOCKNAL_MATCH_YES;
305
306         case SOCKLND_CONN_CONTROL:
307                 if (nob >= *ksocknal_tunables.ksnd_min_bulk)
308                         return SOCKNAL_MATCH_MAY;
309                 else
310                         return SOCKNAL_MATCH_YES;
311         }
312 }
313
314 static int
315 ksocknal_match_tx_v3(ksock_conn_t *conn, ksock_tx_t *tx, int nonblk)
316 {
317         int nob;
318
319         if (tx == NULL || tx->tx_lnetmsg == NULL)
320                 nob = offsetof(ksock_msg_t, ksm_u);
321         else
322                 nob = tx->tx_lnetmsg->msg_len + sizeof(ksock_msg_t);
323
324         switch (conn->ksnc_type) {
325         default:
326                 CERROR("ksnc_type bad: %u\n", conn->ksnc_type);
327                 LBUG();
328         case SOCKLND_CONN_ANY:
329                 return SOCKNAL_MATCH_NO;
330
331         case SOCKLND_CONN_ACK:
332                 if (nonblk)
333                         return SOCKNAL_MATCH_YES;
334                 else if (tx == NULL || tx->tx_lnetmsg == NULL)
335                         return SOCKNAL_MATCH_MAY;
336                 else
337                         return SOCKNAL_MATCH_NO;
338
339         case SOCKLND_CONN_BULK_OUT:
340                 if (nonblk)
341                         return SOCKNAL_MATCH_NO;
342                 else if (nob < *ksocknal_tunables.ksnd_min_bulk)
343                         return SOCKNAL_MATCH_MAY;
344                 else
345                         return SOCKNAL_MATCH_YES;
346
347         case SOCKLND_CONN_CONTROL:
348                 if (nonblk)
349                         return SOCKNAL_MATCH_NO;
350                 else if (nob >= *ksocknal_tunables.ksnd_min_bulk)
351                         return SOCKNAL_MATCH_MAY;
352                 else
353                         return SOCKNAL_MATCH_YES;
354         }
355 }
356
357 /* (Sink) handle incoming ZC request from sender */
358 static int
359 ksocknal_handle_zcreq(ksock_conn_t *c, __u64 cookie, int remote)
360 {
361         ksock_peer_t   *peer = c->ksnc_peer;
362         ksock_conn_t   *conn;
363         ksock_tx_t     *tx;
364         int             rc;
365
366         cfs_read_lock (&ksocknal_data.ksnd_global_lock);
367
368         conn = ksocknal_find_conn_locked(peer, NULL, !!remote);
369         if (conn != NULL) {
370                 ksock_sched_t *sched = conn->ksnc_scheduler;
371
372                 LASSERT (conn->ksnc_proto->pro_queue_tx_zcack != NULL);
373
374                 cfs_spin_lock_bh (&sched->kss_lock);
375
376                 rc = conn->ksnc_proto->pro_queue_tx_zcack(conn, NULL, cookie);
377
378                 cfs_spin_unlock_bh (&sched->kss_lock);
379
380                 if (rc) { /* piggybacked */
381                         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
382                         return 0;
383                 }
384         }
385
386         cfs_read_unlock (&ksocknal_data.ksnd_global_lock);
387
388         /* ACK connection is not ready, or can't piggyback the ACK */
389         tx = ksocknal_alloc_tx_noop(cookie, !!remote);
390         if (tx == NULL)
391                 return -ENOMEM;
392
393         if ((rc = ksocknal_launch_packet(peer->ksnp_ni, tx, peer->ksnp_id)) == 0)
394                 return 0;
395
396         ksocknal_free_tx(tx);
397         return rc;
398 }
399
400 /* (Sender) handle ZC_ACK from sink */
401 static int
402 ksocknal_handle_zcack(ksock_conn_t *conn, __u64 cookie1, __u64 cookie2)
403 {
404         ksock_peer_t      *peer = conn->ksnc_peer;
405         ksock_tx_t        *tx;
406         ksock_tx_t        *tmp;
407         CFS_LIST_HEAD     (zlist);
408         int                count;
409
410         if (cookie1 == 0)
411                 cookie1 = cookie2;
412
413         count = (cookie1 > cookie2) ? 2 : (cookie2 - cookie1 + 1);
414
415         if (cookie2 == SOCKNAL_KEEPALIVE_PING &&
416             conn->ksnc_proto == &ksocknal_protocol_v3x) {
417                 /* keepalive PING for V3.x, just ignore it */
418                 return count == 1 ? 0 : -EPROTO;
419         }
420
421         cfs_spin_lock(&peer->ksnp_lock);
422
423         cfs_list_for_each_entry_safe(tx, tmp,
424                                      &peer->ksnp_zc_req_list, tx_zc_list) {
425                 __u64 c = tx->tx_msg.ksm_zc_cookies[0];
426
427                 if (c == cookie1 || c == cookie2 || (cookie1 < c && c < cookie2)) {
428                         tx->tx_msg.ksm_zc_cookies[0] = 0;
429                         cfs_list_del(&tx->tx_zc_list);
430                         cfs_list_add(&tx->tx_zc_list, &zlist);
431
432                         if (--count == 0)
433                                 break;
434                 }
435         }
436
437         cfs_spin_unlock(&peer->ksnp_lock);
438
439         while (!cfs_list_empty(&zlist)) {
440                 tx = cfs_list_entry(zlist.next, ksock_tx_t, tx_zc_list);
441                 cfs_list_del(&tx->tx_zc_list);
442                 ksocknal_tx_decref(tx);
443         }
444
445         return count == 0 ? 0 : -EPROTO;
446 }
447
448 static int
449 ksocknal_send_hello_v1 (ksock_conn_t *conn, ksock_hello_msg_t *hello)
450 {
451         cfs_socket_t        *sock = conn->ksnc_sock;
452         lnet_hdr_t          *hdr;
453         lnet_magicversion_t *hmv;
454         int                  rc;
455         int                  i;
456
457         CLASSERT(sizeof(lnet_magicversion_t) == offsetof(lnet_hdr_t, src_nid));
458
459         LIBCFS_ALLOC(hdr, sizeof(*hdr));
460         if (hdr == NULL) {
461                 CERROR("Can't allocate lnet_hdr_t\n");
462                 return -ENOMEM;
463         }
464
465         hmv = (lnet_magicversion_t *)&hdr->dest_nid;
466
467         /* Re-organize V2.x message header to V1.x (lnet_hdr_t)
468          * header and send out */
469         hmv->magic         = cpu_to_le32 (LNET_PROTO_TCP_MAGIC);
470         hmv->version_major = cpu_to_le16 (KSOCK_PROTO_V1_MAJOR);
471         hmv->version_minor = cpu_to_le16 (KSOCK_PROTO_V1_MINOR);
472
473         if (the_lnet.ln_testprotocompat != 0) {
474                 /* single-shot proto check */
475                 LNET_LOCK();
476                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
477                         hmv->version_major++;   /* just different! */
478                         the_lnet.ln_testprotocompat &= ~1;
479                 }
480                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
481                         hmv->magic = LNET_PROTO_MAGIC;
482                         the_lnet.ln_testprotocompat &= ~2;
483                 }
484                 LNET_UNLOCK();
485         }
486
487         hdr->src_nid        = cpu_to_le64 (hello->kshm_src_nid);
488         hdr->src_pid        = cpu_to_le32 (hello->kshm_src_pid);
489         hdr->type           = cpu_to_le32 (LNET_MSG_HELLO);
490         hdr->payload_length = cpu_to_le32 (hello->kshm_nips * sizeof(__u32));
491         hdr->msg.hello.type = cpu_to_le32 (hello->kshm_ctype);
492         hdr->msg.hello.incarnation = cpu_to_le64 (hello->kshm_src_incarnation);
493
494         rc = libcfs_sock_write(sock, hdr, sizeof(*hdr),lnet_acceptor_timeout());
495
496         if (rc != 0) {
497                 CNETERR("Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
498                         rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
499                 goto out;
500         }
501
502         if (hello->kshm_nips == 0)
503                 goto out;
504
505         for (i = 0; i < (int) hello->kshm_nips; i++) {
506                 hello->kshm_ips[i] = __cpu_to_le32 (hello->kshm_ips[i]);
507         }
508
509         rc = libcfs_sock_write(sock, hello->kshm_ips,
510                                hello->kshm_nips * sizeof(__u32),
511                                lnet_acceptor_timeout());
512         if (rc != 0) {
513                 CNETERR("Error %d sending HELLO payload (%d)"
514                         " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips,
515                         HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
516         }
517 out:
518         LIBCFS_FREE(hdr, sizeof(*hdr));
519
520         return rc;
521 }
522
523 static int
524 ksocknal_send_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello)
525 {
526         cfs_socket_t   *sock = conn->ksnc_sock;
527         int             rc;
528
529         hello->kshm_magic   = LNET_PROTO_MAGIC;
530         hello->kshm_version = conn->ksnc_proto->pro_version;
531
532         if (the_lnet.ln_testprotocompat != 0) {
533                 /* single-shot proto check */
534                 LNET_LOCK();
535                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
536                         hello->kshm_version++;   /* just different! */
537                         the_lnet.ln_testprotocompat &= ~1;
538                 }
539                 LNET_UNLOCK();
540         }
541
542         rc = libcfs_sock_write(sock, hello, offsetof(ksock_hello_msg_t, kshm_ips),
543                                lnet_acceptor_timeout());
544
545         if (rc != 0) {
546                 CNETERR("Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
547                         rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
548                 return rc;
549         }
550
551         if (hello->kshm_nips == 0)
552                 return 0;
553
554         rc = libcfs_sock_write(sock, hello->kshm_ips,
555                                hello->kshm_nips * sizeof(__u32),
556                                lnet_acceptor_timeout());
557         if (rc != 0) {
558                 CNETERR("Error %d sending HELLO payload (%d)"
559                         " to %u.%u.%u.%u/%d\n", rc, hello->kshm_nips,
560                         HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
561         }
562
563         return rc;
564 }
565
566 static int
567 ksocknal_recv_hello_v1(ksock_conn_t *conn, ksock_hello_msg_t *hello,int timeout)
568 {
569         cfs_socket_t        *sock = conn->ksnc_sock;
570         lnet_hdr_t          *hdr;
571         int                  rc;
572         int                  i;
573
574         LIBCFS_ALLOC(hdr, sizeof(*hdr));
575         if (hdr == NULL) {
576                 CERROR("Can't allocate lnet_hdr_t\n");
577                 return -ENOMEM;
578         }
579
580         rc = libcfs_sock_read(sock, &hdr->src_nid,
581                               sizeof (*hdr) - offsetof (lnet_hdr_t, src_nid),
582                               timeout);
583         if (rc != 0) {
584                 CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
585                         rc, HIPQUAD(conn->ksnc_ipaddr));
586                 LASSERT (rc < 0 && rc != -EALREADY);
587                 goto out;
588         }
589
590         /* ...and check we got what we expected */
591         if (hdr->type != cpu_to_le32 (LNET_MSG_HELLO)) {
592                 CERROR ("Expecting a HELLO hdr,"
593                         " but got type %d from %u.%u.%u.%u\n",
594                         le32_to_cpu (hdr->type),
595                         HIPQUAD(conn->ksnc_ipaddr));
596                 rc = -EPROTO;
597                 goto out;
598         }
599
600         hello->kshm_src_nid         = le64_to_cpu (hdr->src_nid);
601         hello->kshm_src_pid         = le32_to_cpu (hdr->src_pid);
602         hello->kshm_src_incarnation = le64_to_cpu (hdr->msg.hello.incarnation);
603         hello->kshm_ctype           = le32_to_cpu (hdr->msg.hello.type);
604         hello->kshm_nips            = le32_to_cpu (hdr->payload_length) /
605                                          sizeof (__u32);
606
607         if (hello->kshm_nips > LNET_MAX_INTERFACES) {
608                 CERROR("Bad nips %d from ip %u.%u.%u.%u\n",
609                        hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr));
610                 rc = -EPROTO;
611                 goto out;
612         }
613
614         if (hello->kshm_nips == 0)
615                 goto out;
616
617         rc = libcfs_sock_read(sock, hello->kshm_ips,
618                               hello->kshm_nips * sizeof(__u32), timeout);
619         if (rc != 0) {
620                 CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n",
621                         rc, HIPQUAD(conn->ksnc_ipaddr));
622                 LASSERT (rc < 0 && rc != -EALREADY);
623                 goto out;
624         }
625
626         for (i = 0; i < (int) hello->kshm_nips; i++) {
627                 hello->kshm_ips[i] = __le32_to_cpu(hello->kshm_ips[i]);
628
629                 if (hello->kshm_ips[i] == 0) {
630                         CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n",
631                                i, HIPQUAD(conn->ksnc_ipaddr));
632                         rc = -EPROTO;
633                         break;
634                 }
635         }
636 out:
637         LIBCFS_FREE(hdr, sizeof(*hdr));
638
639         return rc;
640 }
641
642 static int
643 ksocknal_recv_hello_v2 (ksock_conn_t *conn, ksock_hello_msg_t *hello, int timeout)
644 {
645         cfs_socket_t      *sock = conn->ksnc_sock;
646         int                rc;
647         int                i;
648
649         if (hello->kshm_magic == LNET_PROTO_MAGIC)
650                 conn->ksnc_flip = 0;
651         else
652                 conn->ksnc_flip = 1;
653
654         rc = libcfs_sock_read(sock, &hello->kshm_src_nid,
655                               offsetof(ksock_hello_msg_t, kshm_ips) -
656                                        offsetof(ksock_hello_msg_t, kshm_src_nid),
657                               timeout);
658         if (rc != 0) {
659                 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
660                         rc, HIPQUAD(conn->ksnc_ipaddr));
661                 LASSERT (rc < 0 && rc != -EALREADY);
662                 return rc;
663         }
664
665         if (conn->ksnc_flip) {
666                 __swab32s(&hello->kshm_src_pid);
667                 __swab64s(&hello->kshm_src_nid);
668                 __swab32s(&hello->kshm_dst_pid);
669                 __swab64s(&hello->kshm_dst_nid);
670                 __swab64s(&hello->kshm_src_incarnation);
671                 __swab64s(&hello->kshm_dst_incarnation);
672                 __swab32s(&hello->kshm_ctype);
673                 __swab32s(&hello->kshm_nips);
674         }
675
676         if (hello->kshm_nips > LNET_MAX_INTERFACES) {
677                 CERROR("Bad nips %d from ip %u.%u.%u.%u\n",
678                        hello->kshm_nips, HIPQUAD(conn->ksnc_ipaddr));
679                 return -EPROTO;
680         }
681
682         if (hello->kshm_nips == 0)
683                 return 0;
684
685         rc = libcfs_sock_read(sock, hello->kshm_ips,
686                               hello->kshm_nips * sizeof(__u32), timeout);
687         if (rc != 0) {
688                 CERROR ("Error %d reading IPs from ip %u.%u.%u.%u\n",
689                         rc, HIPQUAD(conn->ksnc_ipaddr));
690                 LASSERT (rc < 0 && rc != -EALREADY);
691                 return rc;
692         }
693
694         for (i = 0; i < (int) hello->kshm_nips; i++) {
695                 if (conn->ksnc_flip)
696                         __swab32s(&hello->kshm_ips[i]);
697
698                 if (hello->kshm_ips[i] == 0) {
699                         CERROR("Zero IP[%d] from ip %u.%u.%u.%u\n",
700                                i, HIPQUAD(conn->ksnc_ipaddr));
701                         return -EPROTO;
702                 }
703         }
704
705         return 0;
706 }
707
708 static void
709 ksocknal_pack_msg_v1(ksock_tx_t *tx)
710 {
711         /* V1.x has no KSOCK_MSG_NOOP */
712         LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
713         LASSERT(tx->tx_lnetmsg != NULL);
714
715         tx->tx_iov[0].iov_base = (void *)&tx->tx_lnetmsg->msg_hdr;
716         tx->tx_iov[0].iov_len  = sizeof(lnet_hdr_t);
717
718         tx->tx_resid = tx->tx_nob = tx->tx_lnetmsg->msg_len + sizeof(lnet_hdr_t);
719 }
720
721 static void
722 ksocknal_pack_msg_v2(ksock_tx_t *tx)
723 {
724         tx->tx_iov[0].iov_base = (void *)&tx->tx_msg;
725
726         if (tx->tx_lnetmsg != NULL) {
727                 LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
728
729                 tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = tx->tx_lnetmsg->msg_hdr;
730                 tx->tx_iov[0].iov_len = sizeof(ksock_msg_t);
731                 tx->tx_resid = tx->tx_nob = sizeof(ksock_msg_t) + tx->tx_lnetmsg->msg_len;
732         } else {
733                 LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_NOOP);
734
735                 tx->tx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);
736                 tx->tx_resid = tx->tx_nob = offsetof(ksock_msg_t,  ksm_u.lnetmsg.ksnm_hdr);
737         }
738         /* Don't checksum before start sending, because packet can be piggybacked with ACK */
739 }
740
741 static void
742 ksocknal_unpack_msg_v1(ksock_msg_t *msg)
743 {
744         msg->ksm_csum           = 0;
745         msg->ksm_type           = KSOCK_MSG_LNET;
746         msg->ksm_zc_cookies[0]  = msg->ksm_zc_cookies[1]  = 0;
747 }
748
749 static void
750 ksocknal_unpack_msg_v2(ksock_msg_t *msg)
751 {
752         return;  /* Do nothing */
753 }
754
755 ksock_proto_t  ksocknal_protocol_v1x =
756 {
757         .pro_version            = KSOCK_PROTO_V1,
758         .pro_send_hello         = ksocknal_send_hello_v1,
759         .pro_recv_hello         = ksocknal_recv_hello_v1,
760         .pro_pack               = ksocknal_pack_msg_v1,
761         .pro_unpack             = ksocknal_unpack_msg_v1,
762         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v1,
763         .pro_handle_zcreq       = NULL,
764         .pro_handle_zcack       = NULL,
765         .pro_queue_tx_zcack     = NULL,
766         .pro_match_tx           = ksocknal_match_tx
767 };
768
769 ksock_proto_t  ksocknal_protocol_v2x =
770 {
771         .pro_version            = KSOCK_PROTO_V2,
772         .pro_send_hello         = ksocknal_send_hello_v2,
773         .pro_recv_hello         = ksocknal_recv_hello_v2,
774         .pro_pack               = ksocknal_pack_msg_v2,
775         .pro_unpack             = ksocknal_unpack_msg_v2,
776         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v2,
777         .pro_queue_tx_zcack     = ksocknal_queue_tx_zcack_v2,
778         .pro_handle_zcreq       = ksocknal_handle_zcreq,
779         .pro_handle_zcack       = ksocknal_handle_zcack,
780         .pro_match_tx           = ksocknal_match_tx
781 };
782
783 ksock_proto_t  ksocknal_protocol_v3x =
784 {
785         .pro_version            = KSOCK_PROTO_V3,
786         .pro_send_hello         = ksocknal_send_hello_v2,
787         .pro_recv_hello         = ksocknal_recv_hello_v2,
788         .pro_pack               = ksocknal_pack_msg_v2,
789         .pro_unpack             = ksocknal_unpack_msg_v2,
790         .pro_queue_tx_msg       = ksocknal_queue_tx_msg_v2,
791         .pro_queue_tx_zcack     = ksocknal_queue_tx_zcack_v3,
792         .pro_handle_zcreq       = ksocknal_handle_zcreq,
793         .pro_handle_zcack       = ksocknal_handle_zcack,
794         .pro_match_tx           = ksocknal_match_tx_v3
795 };
796