Whamcloud - gitweb
8f93d530622a303a16cf36b7869b4d0b16cdf470
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/viblnd/viblnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  * Author: Frank Zago <fzago@systemfabricworks.com>
40  */
41
42 #include "viblnd.h"
43
44 lnd_t the_kiblnd = {
45         .lnd_type       = VIBLND,
46         .lnd_startup    = kibnal_startup,
47         .lnd_shutdown   = kibnal_shutdown,
48         .lnd_ctl        = kibnal_ctl,
49         .lnd_send       = kibnal_send,
50         .lnd_recv       = kibnal_recv,
51         .lnd_eager_recv = kibnal_eager_recv,
52 };
53
54 kib_data_t              kibnal_data;
55
56 void vibnal_assert_wire_constants (void)
57 {
58         /* Wire protocol assertions generated by 'wirecheck'
59          * running on Linux robert 2.6.11-1.27_FC3 #1 Tue May 17 20:27:37 EDT 2005 i686 athlon i386 G
60          * with gcc version 3.4.3 20050227 (Red Hat 3.4.3-22.fc3) */
61
62
63         /* Constants... */
64         CLASSERT (IBNAL_MSG_MAGIC == 0x0be91b91);
65         CLASSERT (IBNAL_MSG_VERSION == 0x11);
66         CLASSERT (IBNAL_MSG_CONNREQ == 0xc0);
67         CLASSERT (IBNAL_MSG_CONNACK == 0xc1);
68         CLASSERT (IBNAL_MSG_NOOP == 0xd0);
69         CLASSERT (IBNAL_MSG_IMMEDIATE == 0xd1);
70         CLASSERT (IBNAL_MSG_PUT_REQ == 0xd2);
71         CLASSERT (IBNAL_MSG_PUT_NAK == 0xd3);
72         CLASSERT (IBNAL_MSG_PUT_ACK == 0xd4);
73         CLASSERT (IBNAL_MSG_PUT_DONE == 0xd5);
74         CLASSERT (IBNAL_MSG_GET_REQ == 0xd6);
75         CLASSERT (IBNAL_MSG_GET_DONE == 0xd7);
76
77         /* Checks for struct kib_connparams_t */
78         CLASSERT ((int)sizeof(kib_connparams_t) == 12);
79         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_queue_depth) == 0);
80         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_queue_depth) == 4);
81         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_max_msg_size) == 4);
82         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_max_msg_size) == 4);
83         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_max_frags) == 8);
84         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_max_frags) == 4);
85
86         /* Checks for struct kib_immediate_msg_t */
87         CLASSERT ((int)sizeof(kib_immediate_msg_t) == 72);
88         CLASSERT ((int)offsetof(kib_immediate_msg_t, ibim_hdr) == 0);
89         CLASSERT ((int)sizeof(((kib_immediate_msg_t *)0)->ibim_hdr) == 72);
90         CLASSERT ((int)offsetof(kib_immediate_msg_t, ibim_payload[13]) == 85);
91         CLASSERT ((int)sizeof(((kib_immediate_msg_t *)0)->ibim_payload[13]) == 1);
92         CLASSERT (IBNAL_USE_FMR == 1);
93
94         /* Checks for struct kib_rdma_desc_t */
95         CLASSERT ((int)sizeof(kib_rdma_desc_t) == 16);
96         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_addr) == 0);
97         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_addr) == 8);
98         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_nob) == 8);
99         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_nob) == 4);
100         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_key) == 12);
101         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_key) == 4);
102
103         /* Checks for struct kib_putreq_msg_t */
104         CLASSERT ((int)sizeof(kib_putreq_msg_t) == 80);
105         CLASSERT ((int)offsetof(kib_putreq_msg_t, ibprm_hdr) == 0);
106         CLASSERT ((int)sizeof(((kib_putreq_msg_t *)0)->ibprm_hdr) == 72);
107         CLASSERT ((int)offsetof(kib_putreq_msg_t, ibprm_cookie) == 72);
108         CLASSERT ((int)sizeof(((kib_putreq_msg_t *)0)->ibprm_cookie) == 8);
109
110         /* Checks for struct kib_putack_msg_t */
111         CLASSERT ((int)sizeof(kib_putack_msg_t) == 32);
112         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_src_cookie) == 0);
113         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_src_cookie) == 8);
114         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_dst_cookie) == 8);
115         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_dst_cookie) == 8);
116         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_rd) == 16);
117         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_rd) == 16);
118
119         /* Checks for struct kib_get_msg_t */
120         CLASSERT ((int)sizeof(kib_get_msg_t) == 96);
121         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_hdr) == 0);
122         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_hdr) == 72);
123         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_cookie) == 72);
124         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_cookie) == 8);
125         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_rd) == 80);
126         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_rd) == 16);
127
128         /* Checks for struct kib_completion_msg_t */
129         CLASSERT ((int)sizeof(kib_completion_msg_t) == 12);
130         CLASSERT ((int)offsetof(kib_completion_msg_t, ibcm_cookie) == 0);
131         CLASSERT ((int)sizeof(((kib_completion_msg_t *)0)->ibcm_cookie) == 8);
132         CLASSERT ((int)offsetof(kib_completion_msg_t, ibcm_status) == 8);
133         CLASSERT ((int)sizeof(((kib_completion_msg_t *)0)->ibcm_status) == 4);
134
135         /* Checks for struct kib_msg_t */
136         CLASSERT ((int)sizeof(kib_msg_t) == 152);
137         CLASSERT ((int)offsetof(kib_msg_t, ibm_magic) == 0);
138         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_magic) == 4);
139         CLASSERT ((int)offsetof(kib_msg_t, ibm_version) == 4);
140         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_version) == 2);
141         CLASSERT ((int)offsetof(kib_msg_t, ibm_type) == 6);
142         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_type) == 1);
143         CLASSERT ((int)offsetof(kib_msg_t, ibm_credits) == 7);
144         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_credits) == 1);
145         CLASSERT ((int)offsetof(kib_msg_t, ibm_nob) == 8);
146         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_nob) == 4);
147         CLASSERT ((int)offsetof(kib_msg_t, ibm_cksum) == 12);
148         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_cksum) == 4);
149         CLASSERT ((int)offsetof(kib_msg_t, ibm_srcnid) == 16);
150         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_srcnid) == 8);
151         CLASSERT ((int)offsetof(kib_msg_t, ibm_srcstamp) == 24);
152         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_srcstamp) == 8);
153         CLASSERT ((int)offsetof(kib_msg_t, ibm_dstnid) == 32);
154         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_dstnid) == 8);
155         CLASSERT ((int)offsetof(kib_msg_t, ibm_dststamp) == 40);
156         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_dststamp) == 8);
157         CLASSERT ((int)offsetof(kib_msg_t, ibm_seq) == 48);
158         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_seq) == 8);
159         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.connparams) == 56);
160         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.connparams) == 12);
161         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.immediate) == 56);
162         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.immediate) == 72);
163         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.putreq) == 56);
164         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.putreq) == 80);
165         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.putack) == 56);
166         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.putack) == 32);
167         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.get) == 56);
168         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.get) == 96);
169         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.completion) == 56);
170         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.completion) == 12);
171 }
172
173 __u32 
174 kibnal_cksum (void *ptr, int nob)
175 {
176         char  *c  = ptr;
177         __u32  sum = 0;
178
179         while (nob-- > 0)
180                 sum = ((sum << 1) | (sum >> 31)) + *c++;
181
182         /* ensure I don't return 0 (== no checksum) */
183         return (sum == 0) ? 1 : sum;
184 }
185
186 void
187 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
188 {
189         msg->ibm_type = type;
190         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
191 }
192
193 void
194 kibnal_pack_msg(kib_msg_t *msg, __u32 version, int credits,
195                 lnet_nid_t dstnid, __u64 dststamp, __u64 seq)
196 {
197         /* CAVEAT EMPTOR! all message fields not set here should have been
198          * initialised previously. */
199         msg->ibm_magic    = IBNAL_MSG_MAGIC;
200         msg->ibm_version  = version;
201         /*   ibm_type */
202         msg->ibm_credits  = credits;
203         /*   ibm_nob */
204         msg->ibm_cksum    = 0;
205         msg->ibm_srcnid   = kibnal_data.kib_ni->ni_nid;
206         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
207         msg->ibm_dstnid   = dstnid;
208         msg->ibm_dststamp = dststamp;
209         msg->ibm_seq      = seq;
210
211         if (*kibnal_tunables.kib_cksum) {
212                 /* NB ibm_cksum zero while computing cksum */
213                 msg->ibm_cksum = kibnal_cksum(msg, msg->ibm_nob);
214         }
215 }
216
217 int
218 kibnal_unpack_msg(kib_msg_t *msg, __u32 expected_version, int nob)
219 {
220         const int hdr_size = offsetof(kib_msg_t, ibm_u);
221         __u32     msg_cksum;
222         __u32     msg_version;
223         int       flip;
224         int       msg_nob;
225 #if !IBNAL_USE_FMR
226         int       i;
227         int       n;
228 #endif
229         /* 6 bytes are enough to have received magic + version */
230         if (nob < 6) {
231                 CERROR("Short message: %d\n", nob);
232                 return -EPROTO;
233         }
234
235         /* Future protocol version compatibility support!
236          * If the viblnd-specific protocol changes, or when LNET unifies
237          * protocols over all LNDs, the initial connection will negotiate a
238          * protocol version.  If I find this, I avoid any console errors.  If
239          * my is doing connection establishment, the reject will tell the peer
240          * which version I'm running. */
241
242         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
243                 flip = 0;
244         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
245                 flip = 1;
246         } else {
247                 if (msg->ibm_magic == LNET_PROTO_MAGIC ||
248                     msg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
249                         return -EPROTO;
250
251                 /* Completely out to lunch */
252                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
253                 return -EPROTO;
254         }
255
256         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
257         if (expected_version == 0) {
258                 if (msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
259                     msg_version != IBNAL_MSG_VERSION)
260                         return -EPROTO;
261         } else if (msg_version != expected_version) {
262                 CERROR("Bad version: %x(%x expected)\n",
263                        msg_version, expected_version);
264                 return -EPROTO;
265         }
266
267         if (nob < hdr_size) {
268                 CERROR("Short message: %d\n", nob);
269                 return -EPROTO;
270         }
271
272         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
273         if (msg_nob > nob) {
274                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
275                 return -EPROTO;
276         }
277
278         /* checksum must be computed with ibm_cksum zero and BEFORE anything
279          * gets flipped */
280         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
281         msg->ibm_cksum = 0;
282         if (msg_cksum != 0 &&
283             msg_cksum != kibnal_cksum(msg, msg_nob)) {
284                 CERROR("Bad checksum\n");
285                 return -EPROTO;
286         }
287         msg->ibm_cksum = msg_cksum;
288
289         if (flip) {
290                 /* leave magic unflipped as a clue to peer endianness */
291                 msg->ibm_version = msg_version;
292                 CLASSERT (sizeof(msg->ibm_type) == 1);
293                 CLASSERT (sizeof(msg->ibm_credits) == 1);
294                 msg->ibm_nob = msg_nob;
295                 __swab64s(&msg->ibm_srcnid);
296                 __swab64s(&msg->ibm_srcstamp);
297                 __swab64s(&msg->ibm_dstnid);
298                 __swab64s(&msg->ibm_dststamp);
299                 __swab64s(&msg->ibm_seq);
300         }
301
302         if (msg->ibm_srcnid == LNET_NID_ANY) {
303                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
304                 return -EPROTO;
305         }
306
307         switch (msg->ibm_type) {
308         default:
309                 CERROR("Unknown message type %x\n", msg->ibm_type);
310                 return -EPROTO;
311
312         case IBNAL_MSG_NOOP:
313                 break;
314
315         case IBNAL_MSG_IMMEDIATE:
316                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
317                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
318                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
319                         return -EPROTO;
320                 }
321                 break;
322
323         case IBNAL_MSG_PUT_REQ:
324                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
325                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
326                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
327                         return -EPROTO;
328                 }
329                 break;
330
331         case IBNAL_MSG_PUT_ACK:
332                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
333                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
334                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
335                         return -EPROTO;
336                 }
337 #if IBNAL_USE_FMR
338                 if (flip) {
339                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
340                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
341                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
342                 }
343 #else
344                 if (flip) {
345                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
346                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrag);
347                 }
348
349                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrag;
350                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
351                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n",
352                                n, IBNAL_MAX_RDMA_FRAGS);
353                         return -EPROTO;
354                 }
355
356                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
357                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
358                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
359                         return -EPROTO;
360                 }
361
362                 if (flip) {
363                         for (i = 0; i < n; i++) {
364                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
365                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr_lo);
366                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr_hi);
367                         }
368                 }
369 #endif
370                 break;
371
372         case IBNAL_MSG_GET_REQ:
373                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
374                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
375                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
376                         return -EPROTO;
377                 }
378 #if IBNAL_USE_FMR
379                 if (flip) {
380                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
381                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
382                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
383                 }
384 #else
385                 if (flip) {
386                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
387                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrag);
388                 }
389
390                 n = msg->ibm_u.get.ibgm_rd.rd_nfrag;
391                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
392                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n",
393                                n, IBNAL_MAX_RDMA_FRAGS);
394                         return -EPROTO;
395                 }
396
397                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
398                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
399                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
400                         return -EPROTO;
401                 }
402
403                 if (flip)
404                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrag; i++) {
405                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
406                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr_lo);
407                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr_hi);
408                         }
409 #endif
410                 break;
411
412         case IBNAL_MSG_PUT_NAK:
413         case IBNAL_MSG_PUT_DONE:
414         case IBNAL_MSG_GET_DONE:
415                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
416                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
417                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
418                         return -EPROTO;
419                 }
420                 if (flip)
421                         __swab32s(&msg->ibm_u.completion.ibcm_status);
422                 break;
423
424         case IBNAL_MSG_CONNREQ:
425         case IBNAL_MSG_CONNACK:
426                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
427                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
428                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
429                         return -EPROTO;
430                 }
431                 if (flip) {
432                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
433                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
434                         __swab32s(&msg->ibm_u.connparams.ibcp_max_frags);
435                 }
436                 break;
437         }
438         return 0;
439 }
440
441 int
442 kibnal_start_listener (lnet_ni_t *ni)
443 {
444         static cm_listen_data_t info;
445
446         cm_return_t      cmrc;
447
448         LASSERT (kibnal_data.kib_listen_handle == NULL);
449
450         kibnal_data.kib_listen_handle =
451                 cm_create_cep(cm_cep_transp_rc);
452         if (kibnal_data.kib_listen_handle == NULL) {
453                 CERROR ("Can't create listen CEP\n");
454                 return -ENOMEM;
455         }
456
457         CDEBUG(D_NET, "Created CEP %p for listening\n",
458                kibnal_data.kib_listen_handle);
459
460         memset(&info, 0, sizeof(info));
461         info.listen_addr.end_pt.sid =
462                 (__u64)(*kibnal_tunables.kib_service_number);
463
464         cmrc = cm_listen(kibnal_data.kib_listen_handle, &info,
465                          kibnal_listen_callback, NULL);
466         if (cmrc == cm_stat_success)
467                 return 0;
468
469         CERROR ("cm_listen error: %d\n", cmrc);
470
471         cmrc = cm_destroy_cep(kibnal_data.kib_listen_handle);
472         LASSERT (cmrc == cm_stat_success);
473
474         kibnal_data.kib_listen_handle = NULL;
475         return -EINVAL;
476 }
477
478 void
479 kibnal_stop_listener(lnet_ni_t *ni)
480 {
481         cm_return_t      cmrc;
482
483         LASSERT (kibnal_data.kib_listen_handle != NULL);
484
485         cmrc = cm_cancel(kibnal_data.kib_listen_handle);
486         if (cmrc != cm_stat_success)
487                 CERROR ("Error %d stopping listener\n", cmrc);
488
489         cfs_pause(cfs_time_seconds(1)/10);   /* ensure no more callbacks */
490
491         cmrc = cm_destroy_cep(kibnal_data.kib_listen_handle);
492         if (cmrc != vv_return_ok)
493                 CERROR ("Error %d destroying CEP\n", cmrc);
494
495         kibnal_data.kib_listen_handle = NULL;
496 }
497
498 int
499 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
500 {
501         kib_peer_t     *peer;
502         unsigned long   flags;
503         int             rc;
504
505         LASSERT (nid != LNET_NID_ANY);
506
507         LIBCFS_ALLOC(peer, sizeof (*peer));
508         if (peer == NULL) {
509                 CERROR("Cannot allocate peer\n");
510                 return -ENOMEM;
511         }
512
513         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
514
515         peer->ibp_nid = nid;
516         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
517
518         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
519         INIT_LIST_HEAD (&peer->ibp_conns);
520         INIT_LIST_HEAD (&peer->ibp_tx_queue);
521
522         peer->ibp_error = 0;
523         peer->ibp_last_alive = cfs_time_current();
524         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
525
526         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
527
528         if (atomic_read(&kibnal_data.kib_npeers) >=
529             *kibnal_tunables.kib_concurrent_peers) {
530                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
531         } else if (kibnal_data.kib_listen_handle == NULL) {
532                 rc = -ESHUTDOWN;        /* shutdown has started */
533         } else {
534                 rc = 0;
535                 /* npeers only grows with the global lock held */
536                 atomic_inc(&kibnal_data.kib_npeers);
537         }
538
539         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
540
541         if (rc != 0) {
542                 CERROR("Can't create peer: %s\n", 
543                        (rc == -ESHUTDOWN) ? "shutting down" :
544                        "too many peers");
545                 LIBCFS_FREE(peer, sizeof(*peer));
546         } else {
547                 *peerp = peer;
548         }
549
550         return rc;
551 }
552
553 void
554 kibnal_destroy_peer (kib_peer_t *peer)
555 {
556         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
557         LASSERT (peer->ibp_persistence == 0);
558         LASSERT (!kibnal_peer_active(peer));
559         LASSERT (peer->ibp_connecting == 0);
560         LASSERT (peer->ibp_accepting == 0);
561         LASSERT (list_empty (&peer->ibp_conns));
562         LASSERT (list_empty (&peer->ibp_tx_queue));
563
564         LIBCFS_FREE (peer, sizeof (*peer));
565
566         /* NB a peer's connections keep a reference on their peer until
567          * they are destroyed, so we can be assured that _all_ state to do
568          * with this peer has been cleaned up when its refcount drops to
569          * zero. */
570         atomic_dec(&kibnal_data.kib_npeers);
571 }
572
573 kib_peer_t *
574 kibnal_find_peer_locked (lnet_nid_t nid)
575 {
576         /* the caller is responsible for accounting the additional reference
577          * that this creates */
578         struct list_head *peer_list = kibnal_nid2peerlist (nid);
579         struct list_head *tmp;
580         kib_peer_t       *peer;
581
582         list_for_each (tmp, peer_list) {
583
584                 peer = list_entry (tmp, kib_peer_t, ibp_list);
585
586                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
587                          peer->ibp_connecting != 0 || /* creating conns */
588                          peer->ibp_accepting != 0 ||
589                          !list_empty (&peer->ibp_conns));  /* active conn */
590
591                 if (peer->ibp_nid != nid)
592                         continue;
593
594                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
595                        peer, libcfs_nid2str(nid),
596                        atomic_read (&peer->ibp_refcount));
597                 return (peer);
598         }
599         return (NULL);
600 }
601
602 void
603 kibnal_unlink_peer_locked (kib_peer_t *peer)
604 {
605         LASSERT (peer->ibp_persistence == 0);
606         LASSERT (list_empty(&peer->ibp_conns));
607
608         LASSERT (kibnal_peer_active(peer));
609         list_del_init (&peer->ibp_list);
610         /* lose peerlist's ref */
611         kibnal_peer_decref(peer);
612 }
613
614 int
615 kibnal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp,
616                       int *persistencep)
617 {
618         kib_peer_t        *peer;
619         struct list_head  *ptmp;
620         int                i;
621         unsigned long      flags;
622
623         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
624
625         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
626
627                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
628
629                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
630                         LASSERT (peer->ibp_persistence != 0 ||
631                                  peer->ibp_connecting != 0 ||
632                                  peer->ibp_accepting != 0 ||
633                                  !list_empty (&peer->ibp_conns));
634
635                         if (index-- > 0)
636                                 continue;
637
638                         *nidp = peer->ibp_nid;
639                         *ipp = peer->ibp_ip;
640                         *persistencep = peer->ibp_persistence;
641
642                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
643                                                flags);
644                         return (0);
645                 }
646         }
647
648         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
649         return (-ENOENT);
650 }
651
652 int
653 kibnal_add_persistent_peer (lnet_nid_t nid, __u32 ip)
654 {
655         kib_peer_t        *peer;
656         kib_peer_t        *peer2;
657         unsigned long      flags;
658         int                rc;
659
660         CDEBUG(D_NET, "%s at %u.%u.%u.%u\n",
661                libcfs_nid2str(nid), HIPQUAD(ip));
662
663         if (nid == LNET_NID_ANY)
664                 return (-EINVAL);
665
666         rc = kibnal_create_peer(&peer, nid);
667         if (rc != 0)
668                 return rc;
669
670         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
671
672         /* I'm always called with a reference on kibnal_data.kib_ni
673          * so shutdown can't have started */
674         LASSERT (kibnal_data.kib_listen_handle != NULL);
675
676         peer2 = kibnal_find_peer_locked (nid);
677         if (peer2 != NULL) {
678                 kibnal_peer_decref (peer);
679                 peer = peer2;
680         } else {
681                 /* peer table takes existing ref on peer */
682                 list_add_tail (&peer->ibp_list,
683                                kibnal_nid2peerlist (nid));
684         }
685
686         peer->ibp_ip = ip;
687         peer->ibp_persistence++;
688
689         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
690         return (0);
691 }
692
693 void
694 kibnal_del_peer_locked (kib_peer_t *peer)
695 {
696         struct list_head *ctmp;
697         struct list_head *cnxt;
698         kib_conn_t       *conn;
699
700         peer->ibp_persistence = 0;
701
702         if (list_empty(&peer->ibp_conns)) {
703                 kibnal_unlink_peer_locked(peer);
704         } else {
705                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
706                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
707
708                         kibnal_close_conn_locked (conn, 0);
709                 }
710                 /* NB peer is no longer persistent; closing its last conn
711                  * unlinked it. */
712         }
713         /* NB peer now unlinked; might even be freed if the peer table had the
714          * last ref on it. */
715 }
716
717 int
718 kibnal_del_peer (lnet_nid_t nid)
719 {
720         CFS_LIST_HEAD     (zombies);
721         struct list_head  *ptmp;
722         struct list_head  *pnxt;
723         kib_peer_t        *peer;
724         int                lo;
725         int                hi;
726         int                i;
727         unsigned long      flags;
728         int                rc = -ENOENT;
729
730         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
731
732         if (nid != LNET_NID_ANY)
733                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
734         else {
735                 lo = 0;
736                 hi = kibnal_data.kib_peer_hash_size - 1;
737         }
738
739         for (i = lo; i <= hi; i++) {
740                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
741                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
742                         LASSERT (peer->ibp_persistence != 0 ||
743                                  peer->ibp_connecting != 0 ||
744                                  peer->ibp_accepting != 0 ||
745                                  !list_empty (&peer->ibp_conns));
746
747                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
748                                 continue;
749
750                         if (!list_empty(&peer->ibp_tx_queue)) {
751                                 LASSERT (list_empty(&peer->ibp_conns));
752
753                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
754                         }
755
756                         kibnal_del_peer_locked (peer);
757                         rc = 0;         /* matched something */
758                 }
759         }
760
761         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
762
763         kibnal_txlist_done(&zombies, -EIO);
764
765         return (rc);
766 }
767
768 kib_conn_t *
769 kibnal_get_conn_by_idx (int index)
770 {
771         kib_peer_t        *peer;
772         struct list_head  *ptmp;
773         kib_conn_t        *conn;
774         struct list_head  *ctmp;
775         int                i;
776         unsigned long      flags;
777
778         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
779
780         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
781                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
782
783                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
784                         LASSERT (peer->ibp_persistence > 0 ||
785                                  peer->ibp_connecting != 0 ||
786                                  peer->ibp_accepting != 0 ||
787                                  !list_empty (&peer->ibp_conns));
788
789                         list_for_each (ctmp, &peer->ibp_conns) {
790                                 if (index-- > 0)
791                                         continue;
792
793                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
794                                 kibnal_conn_addref(conn);
795                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
796                                                        flags);
797                                 return (conn);
798                         }
799                 }
800         }
801
802         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
803         return (NULL);
804 }
805
806 void
807 kibnal_debug_rx (kib_rx_t *rx)
808 {
809         CDEBUG(D_CONSOLE, "      %p nob %d msg_type %x "
810                "cred %d seq "LPD64"\n",
811                rx, rx->rx_nob, rx->rx_msg->ibm_type,
812                rx->rx_msg->ibm_credits, rx->rx_msg->ibm_seq);
813 }
814
815 void
816 kibnal_debug_tx (kib_tx_t *tx)
817 {
818         CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "
819                "cookie "LPX64" msg %s%s type %x cred %d seq "LPD64"\n",
820                tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,
821                tx->tx_status, tx->tx_deadline, tx->tx_cookie,
822                tx->tx_lntmsg[0] == NULL ? "-" : "!",
823                tx->tx_lntmsg[1] == NULL ? "-" : "!",
824                tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits,
825                tx->tx_msg->ibm_seq);
826 }
827
828 void
829 kibnal_debug_conn (kib_conn_t *conn)
830 {
831         struct list_head *tmp;
832         int               i;
833
834         spin_lock(&conn->ibc_lock);
835
836         CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n",
837                atomic_read(&conn->ibc_refcount), conn,
838                libcfs_nid2str(conn->ibc_peer->ibp_nid));
839         CDEBUG(D_CONSOLE, "   txseq "LPD64" rxseq "LPD64" state %d \n",
840                conn->ibc_txseq, conn->ibc_rxseq, conn->ibc_state);
841         CDEBUG(D_CONSOLE, "   nposted %d cred %d o_cred %d r_cred %d\n",
842                conn->ibc_nsends_posted, conn->ibc_credits,
843                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);
844         CDEBUG(D_CONSOLE, "   disc %d comms_err %d\n",
845                conn->ibc_disconnect, conn->ibc_comms_error);
846
847         CDEBUG(D_CONSOLE, "   early_rxs:\n");
848         list_for_each(tmp, &conn->ibc_early_rxs)
849                 kibnal_debug_rx(list_entry(tmp, kib_rx_t, rx_list));
850
851         CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");
852         list_for_each(tmp, &conn->ibc_tx_queue_nocred)
853                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
854
855         CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");
856         list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)
857                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
858
859         CDEBUG(D_CONSOLE, "   tx_queue:\n");
860         list_for_each(tmp, &conn->ibc_tx_queue)
861                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
862
863         CDEBUG(D_CONSOLE, "   active_txs:\n");
864         list_for_each(tmp, &conn->ibc_active_txs)
865                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
866
867         CDEBUG(D_CONSOLE, "   rxs:\n");
868         for (i = 0; i < IBNAL_RX_MSGS; i++)
869                 kibnal_debug_rx(&conn->ibc_rxs[i]);
870
871         spin_unlock(&conn->ibc_lock);
872 }
873
874 int
875 kibnal_set_qp_state (kib_conn_t *conn, vv_qp_state_t new_state)
876 {
877         static vv_qp_attr_t attr;
878
879         kib_connvars_t   *cv = conn->ibc_connvars;
880         vv_return_t       vvrc;
881
882         /* Only called by connd => static OK */
883         LASSERT (!in_interrupt());
884         LASSERT (current == kibnal_data.kib_connd);
885
886         memset(&attr, 0, sizeof(attr));
887
888         switch (new_state) {
889         default:
890                 LBUG();
891
892         case vv_qp_state_init: {
893                 struct vv_qp_modify_init_st *init = &attr.modify.params.init;
894
895                 init->p_key_indx     = cv->cv_pkey_index;
896                 init->phy_port_num   = cv->cv_port;
897                 init->q_key          = IBNAL_QKEY; /* XXX but VV_QP_AT_Q_KEY not set! */
898                 init->access_control = vv_acc_r_mem_read |
899                                        vv_acc_r_mem_write; /* XXX vv_acc_l_mem_write ? */
900
901                 attr.modify.vv_qp_attr_mask = VV_QP_AT_P_KEY_IX |
902                                               VV_QP_AT_PHY_PORT_NUM |
903                                               VV_QP_AT_ACCESS_CON_F;
904                 break;
905         }
906         case vv_qp_state_rtr: {
907                 struct vv_qp_modify_rtr_st *rtr = &attr.modify.params.rtr;
908                 vv_add_vec_t               *av  = &rtr->remote_add_vec;
909
910                 av->dlid                      = cv->cv_path.dlid;
911                 av->grh_flag                  = (!IBNAL_LOCAL_SUB);
912                 av->max_static_rate           = IBNAL_R_2_STATIC_RATE(cv->cv_path.rate);
913                 av->service_level             = cv->cv_path.sl;
914                 av->source_path_bit           = IBNAL_SOURCE_PATH_BIT;
915                 av->pmtu                      = cv->cv_path.mtu;
916                 av->rnr_retry_count           = cv->cv_rnr_count;
917                 av->global_dest.traffic_class = cv->cv_path.traffic_class;
918                 av->global_dest.hope_limit    = cv->cv_path.hop_limut;
919                 av->global_dest.flow_lable    = cv->cv_path.flow_label;
920                 av->global_dest.s_gid_index   = cv->cv_sgid_index;
921                 // XXX other av fields zero?
922
923                 rtr->destanation_qp            = cv->cv_remote_qpn;
924                 rtr->receive_psn               = cv->cv_rxpsn;
925                 rtr->responder_rdma_r_atom_num = IBNAL_OUS_DST_RD;
926                 rtr->opt_min_rnr_nak_timer     = *kibnal_tunables.kib_rnr_nak_timer;
927
928
929                 // XXX sdp sets VV_QP_AT_OP_F but no actual optional options
930                 attr.modify.vv_qp_attr_mask = VV_QP_AT_ADD_VEC |
931                                               VV_QP_AT_DEST_QP |
932                                               VV_QP_AT_R_PSN |
933                                               VV_QP_AT_MIN_RNR_NAK_T |
934                                               VV_QP_AT_RESP_RDMA_ATOM_OUT_NUM |
935                                               VV_QP_AT_OP_F;
936                 break;
937         }
938         case vv_qp_state_rts: {
939                 struct vv_qp_modify_rts_st *rts = &attr.modify.params.rts;
940
941                 rts->send_psn                 = cv->cv_txpsn;
942                 rts->local_ack_timeout        = *kibnal_tunables.kib_local_ack_timeout;
943                 rts->retry_num                = *kibnal_tunables.kib_retry_cnt;
944                 rts->rnr_num                  = *kibnal_tunables.kib_rnr_cnt;
945                 rts->dest_out_rdma_r_atom_num = IBNAL_OUS_DST_RD;
946
947                 attr.modify.vv_qp_attr_mask = VV_QP_AT_S_PSN |
948                                               VV_QP_AT_L_ACK_T |
949                                               VV_QP_AT_RETRY_NUM |
950                                               VV_QP_AT_RNR_NUM |
951                                               VV_QP_AT_DEST_RDMA_ATOM_OUT_NUM;
952                 break;
953         }
954         case vv_qp_state_error:
955         case vv_qp_state_reset:
956                 attr.modify.vv_qp_attr_mask = 0;
957                 break;
958         }
959
960         attr.modify.qp_modify_into_state = new_state;
961         attr.modify.vv_qp_attr_mask |= VV_QP_AT_STATE;
962
963         vvrc = vv_qp_modify(kibnal_data.kib_hca, conn->ibc_qp, &attr, NULL);
964         if (vvrc != vv_return_ok) {
965                 CERROR("Can't modify qp -> %s state to %d: %d\n",
966                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
967                        new_state, vvrc);
968                 return -EIO;
969         }
970
971         return 0;
972 }
973
974 kib_conn_t *
975 kibnal_create_conn (cm_cep_handle_t cep)
976 {
977         kib_conn_t   *conn;
978         int           i;
979         int           page_offset;
980         int           ipage;
981         vv_return_t   vvrc;
982         int           rc;
983
984         static vv_qp_attr_t  reqattr;
985         static vv_qp_attr_t  rspattr;
986
987         /* Only the connd creates conns => single threaded */
988         LASSERT(!in_interrupt());
989         LASSERT(current == kibnal_data.kib_connd);
990
991         LIBCFS_ALLOC(conn, sizeof (*conn));
992         if (conn == NULL) {
993                 CERROR ("Can't allocate connection\n");
994                 return (NULL);
995         }
996
997         /* zero flags, NULL pointers etc... */
998         memset (conn, 0, sizeof (*conn));
999
1000         conn->ibc_version = IBNAL_MSG_VERSION;  /* Use latest version at first */
1001
1002         INIT_LIST_HEAD (&conn->ibc_early_rxs);
1003         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
1004         INIT_LIST_HEAD (&conn->ibc_tx_queue);
1005         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
1006         INIT_LIST_HEAD (&conn->ibc_active_txs);
1007         spin_lock_init (&conn->ibc_lock);
1008
1009         atomic_inc (&kibnal_data.kib_nconns);
1010         /* well not really, but I call destroy() on failure, which decrements */
1011
1012         conn->ibc_cep = cep;
1013
1014         LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1015         if (conn->ibc_connvars == NULL) {
1016                 CERROR("Can't allocate in-progress connection state\n");
1017                 goto failed;
1018         }
1019         memset (conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));
1020         /* Random seed for QP sequence number */
1021         get_random_bytes(&conn->ibc_connvars->cv_rxpsn,
1022                          sizeof(conn->ibc_connvars->cv_rxpsn));
1023
1024         LIBCFS_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
1025         if (conn->ibc_rxs == NULL) {
1026                 CERROR("Cannot allocate RX buffers\n");
1027                 goto failed;
1028         }
1029         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
1030
1031         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES, 1);
1032         if (rc != 0)
1033                 goto failed;
1034
1035         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
1036                 struct page    *page = conn->ibc_rx_pages->ibp_pages[ipage];
1037                 kib_rx_t       *rx = &conn->ibc_rxs[i];
1038                 vv_mem_reg_h_t  mem_h;
1039                 vv_r_key_t      r_key;
1040
1041                 rx->rx_conn = conn;
1042                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1043                              page_offset);
1044
1045                 vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1046                                             rx->rx_msg,
1047                                             IBNAL_MSG_SIZE,
1048                                             &mem_h,
1049                                             &rx->rx_lkey,
1050                                             &r_key);
1051                 LASSERT (vvrc == vv_return_ok);
1052
1053                 CDEBUG(D_NET, "Rx[%d] %p->%p[%x]\n", i, rx, 
1054                        rx->rx_msg, rx->rx_lkey);
1055
1056                 page_offset += IBNAL_MSG_SIZE;
1057                 LASSERT (page_offset <= PAGE_SIZE);
1058
1059                 if (page_offset == PAGE_SIZE) {
1060                         page_offset = 0;
1061                         ipage++;
1062                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
1063                 }
1064         }
1065
1066         memset(&reqattr, 0, sizeof(reqattr));
1067
1068         reqattr.create.qp_type                    = vv_qp_type_r_conn;
1069         reqattr.create.cq_send_h                  = kibnal_data.kib_cq;
1070         reqattr.create.cq_receive_h               = kibnal_data.kib_cq;
1071         reqattr.create.send_max_outstand_wr       = (1 + IBNAL_MAX_RDMA_FRAGS) *
1072                                                     (*kibnal_tunables.kib_concurrent_sends);
1073         reqattr.create.receive_max_outstand_wr    = IBNAL_RX_MSGS;
1074         reqattr.create.max_scatgat_per_send_wr    = 1;
1075         reqattr.create.max_scatgat_per_receive_wr = 1;
1076         reqattr.create.signaling_type             = vv_selectable_signaling;
1077         reqattr.create.pd_h                       = kibnal_data.kib_pd;
1078         reqattr.create.recv_solicited_events      = vv_selectable_signaling; // vv_signal_all;
1079
1080         vvrc = vv_qp_create(kibnal_data.kib_hca, &reqattr, NULL,
1081                             &conn->ibc_qp, &rspattr);
1082         if (vvrc != vv_return_ok) {
1083                 CERROR ("Failed to create queue pair: %d\n", vvrc);
1084                 goto failed;
1085         }
1086
1087         /* Mark QP created */
1088         conn->ibc_state = IBNAL_CONN_INIT_QP;
1089         conn->ibc_connvars->cv_local_qpn = rspattr.create_return.qp_num;
1090
1091         if (rspattr.create_return.receive_max_outstand_wr <
1092             IBNAL_RX_MSGS ||
1093             rspattr.create_return.send_max_outstand_wr <
1094             (1 + IBNAL_MAX_RDMA_FRAGS) * (*kibnal_tunables.kib_concurrent_sends)) {
1095                 CERROR("Insufficient rx/tx work items: wanted %d/%d got %d/%d\n",
1096                        IBNAL_RX_MSGS,
1097                        (1 + IBNAL_MAX_RDMA_FRAGS) *
1098                        (*kibnal_tunables.kib_concurrent_sends),
1099                        rspattr.create_return.receive_max_outstand_wr,
1100                        rspattr.create_return.send_max_outstand_wr);
1101                 goto failed;
1102         }
1103
1104         /* Mark init complete */
1105         conn->ibc_state = IBNAL_CONN_INIT;
1106
1107         /* 1 ref for caller */
1108         atomic_set (&conn->ibc_refcount, 1);
1109         return (conn);
1110
1111  failed:
1112         kibnal_destroy_conn (conn);
1113         return (NULL);
1114 }
1115
1116 void
1117 kibnal_destroy_conn (kib_conn_t *conn)
1118 {
1119         vv_return_t vvrc;
1120
1121         /* Only the connd does this (i.e. single threaded) */
1122         LASSERT (!in_interrupt());
1123         LASSERT (current == kibnal_data.kib_connd);
1124
1125         CDEBUG (D_NET, "connection %p\n", conn);
1126
1127         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1128         LASSERT (list_empty(&conn->ibc_early_rxs));
1129         LASSERT (list_empty(&conn->ibc_tx_queue));
1130         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1131         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1132         LASSERT (list_empty(&conn->ibc_active_txs));
1133         LASSERT (conn->ibc_nsends_posted == 0);
1134
1135         switch (conn->ibc_state) {
1136         default:
1137                 /* conn must be completely disengaged from the network */
1138                 LBUG();
1139
1140         case IBNAL_CONN_DISCONNECTED:
1141                 /* connvars should have been freed already */
1142                 LASSERT (conn->ibc_connvars == NULL);
1143                 /* fall through */
1144
1145         case IBNAL_CONN_INIT:
1146                 vvrc = cm_destroy_cep(conn->ibc_cep);
1147                 LASSERT (vvrc == vv_return_ok);
1148                 /* fall through */
1149
1150         case IBNAL_CONN_INIT_QP:
1151                 kibnal_set_qp_state(conn, vv_qp_state_reset);
1152                 vvrc = vv_qp_destroy(kibnal_data.kib_hca, conn->ibc_qp);
1153                 if (vvrc != vv_return_ok)
1154                         CERROR("Can't destroy QP: %d\n", vvrc);
1155                 /* fall through */
1156
1157         case IBNAL_CONN_INIT_NOTHING:
1158                 break;
1159         }
1160
1161         if (conn->ibc_rx_pages != NULL)
1162                 kibnal_free_pages(conn->ibc_rx_pages);
1163
1164         if (conn->ibc_rxs != NULL)
1165                 LIBCFS_FREE(conn->ibc_rxs,
1166                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1167
1168         if (conn->ibc_connvars != NULL)
1169                 LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1170
1171         if (conn->ibc_peer != NULL)
1172                 kibnal_peer_decref(conn->ibc_peer);
1173
1174         LIBCFS_FREE(conn, sizeof (*conn));
1175
1176         atomic_dec(&kibnal_data.kib_nconns);
1177 }
1178
1179 int
1180 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1181 {
1182         kib_conn_t         *conn;
1183         struct list_head   *ctmp;
1184         struct list_head   *cnxt;
1185         int                 count = 0;
1186
1187         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1188                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1189
1190                 count++;
1191                 kibnal_close_conn_locked (conn, why);
1192         }
1193
1194         return (count);
1195 }
1196
1197 int
1198 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1199 {
1200         kib_conn_t         *conn;
1201         struct list_head   *ctmp;
1202         struct list_head   *cnxt;
1203         int                 count = 0;
1204
1205         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1206                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1207
1208                 if (conn->ibc_incarnation == incarnation)
1209                         continue;
1210
1211                 CDEBUG(D_NET, "Closing stale conn -> %s incarnation:"LPX64"("LPX64")\n",
1212                        libcfs_nid2str(peer->ibp_nid),
1213                        conn->ibc_incarnation, incarnation);
1214
1215                 count++;
1216                 kibnal_close_conn_locked (conn, -ESTALE);
1217         }
1218
1219         return (count);
1220 }
1221
1222 int
1223 kibnal_close_matching_conns (lnet_nid_t nid)
1224 {
1225         kib_peer_t         *peer;
1226         struct list_head   *ptmp;
1227         struct list_head   *pnxt;
1228         int                 lo;
1229         int                 hi;
1230         int                 i;
1231         unsigned long       flags;
1232         int                 count = 0;
1233
1234         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1235
1236         if (nid != LNET_NID_ANY)
1237                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1238         else {
1239                 lo = 0;
1240                 hi = kibnal_data.kib_peer_hash_size - 1;
1241         }
1242
1243         for (i = lo; i <= hi; i++) {
1244                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1245
1246                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1247                         LASSERT (peer->ibp_persistence != 0 ||
1248                                  peer->ibp_connecting != 0 ||
1249                                  peer->ibp_accepting != 0 ||
1250                                  !list_empty (&peer->ibp_conns));
1251
1252                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1253                                 continue;
1254
1255                         count += kibnal_close_peer_conns_locked (peer, 0);
1256                 }
1257         }
1258
1259         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1260
1261         /* wildcards always succeed */
1262         if (nid == LNET_NID_ANY)
1263                 return (0);
1264
1265         return (count == 0 ? -ENOENT : 0);
1266 }
1267
1268 int
1269 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1270 {
1271         struct libcfs_ioctl_data *data = arg;
1272         int                       rc = -EINVAL;
1273
1274         LASSERT (ni == kibnal_data.kib_ni);
1275
1276         switch(cmd) {
1277         case IOC_LIBCFS_GET_PEER: {
1278                 lnet_nid_t   nid = 0;
1279                 __u32        ip = 0;
1280                 int          share_count = 0;
1281
1282                 rc = kibnal_get_peer_info(data->ioc_count,
1283                                           &nid, &ip, &share_count);
1284                 data->ioc_nid    = nid;
1285                 data->ioc_count  = share_count;
1286                 data->ioc_u32[0] = ip;
1287                 data->ioc_u32[1] = *kibnal_tunables.kib_service_number; /* port */
1288                 break;
1289         }
1290         case IOC_LIBCFS_ADD_PEER: {
1291                 rc = kibnal_add_persistent_peer (data->ioc_nid,
1292                                                  data->ioc_u32[0]); /* IP */
1293                 break;
1294         }
1295         case IOC_LIBCFS_DEL_PEER: {
1296                 rc = kibnal_del_peer (data->ioc_nid);
1297                 break;
1298         }
1299         case IOC_LIBCFS_GET_CONN: {
1300                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1301
1302                 if (conn == NULL)
1303                         rc = -ENOENT;
1304                 else {
1305                         // kibnal_debug_conn(conn);
1306                         rc = 0;
1307                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1308                         kibnal_conn_decref(conn);
1309                 }
1310                 break;
1311         }
1312         case IOC_LIBCFS_CLOSE_CONNECTION: {
1313                 rc = kibnal_close_matching_conns (data->ioc_nid);
1314                 break;
1315         }
1316         case IOC_LIBCFS_REGISTER_MYNID: {
1317                 if (ni->ni_nid == data->ioc_nid) {
1318                         rc = 0;
1319                 } else {
1320                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1321                                libcfs_nid2str(data->ioc_nid),
1322                                libcfs_nid2str(ni->ni_nid));
1323                         rc = -EINVAL;
1324                 }
1325                 break;
1326         }
1327         }
1328
1329         return rc;
1330 }
1331
1332 void
1333 kibnal_free_pages (kib_pages_t *p)
1334 {
1335         int         npages = p->ibp_npages;
1336         int         i;
1337
1338         for (i = 0; i < npages; i++)
1339                 if (p->ibp_pages[i] != NULL)
1340                         __free_page(p->ibp_pages[i]);
1341
1342         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1343 }
1344
1345 int
1346 kibnal_alloc_pages (kib_pages_t **pp, int npages, int allow_write)
1347 {
1348         kib_pages_t   *p;
1349         int            i;
1350
1351         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1352         if (p == NULL) {
1353                 CERROR ("Can't allocate buffer %d\n", npages);
1354                 return (-ENOMEM);
1355         }
1356
1357         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1358         p->ibp_npages = npages;
1359
1360         for (i = 0; i < npages; i++) {
1361                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1362                 if (p->ibp_pages[i] == NULL) {
1363                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1364                         kibnal_free_pages(p);
1365                         return (-ENOMEM);
1366                 }
1367         }
1368
1369         *pp = p;
1370         return (0);
1371 }
1372
1373 int
1374 kibnal_alloc_tx_descs (void)
1375 {
1376         int    i;
1377
1378         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1379                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1380         if (kibnal_data.kib_tx_descs == NULL)
1381                 return -ENOMEM;
1382
1383         memset(kibnal_data.kib_tx_descs, 0,
1384                IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1385
1386         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1387                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1388
1389 #if IBNAL_USE_FMR
1390                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1391                              sizeof(*tx->tx_pages));
1392                 if (tx->tx_pages == NULL)
1393                         return -ENOMEM;
1394 #else
1395                 LIBCFS_ALLOC(tx->tx_wrq,
1396                              (1 + IBNAL_MAX_RDMA_FRAGS) *
1397                              sizeof(*tx->tx_wrq));
1398                 if (tx->tx_wrq == NULL)
1399                         return -ENOMEM;
1400
1401                 LIBCFS_ALLOC(tx->tx_gl,
1402                              (1 + IBNAL_MAX_RDMA_FRAGS) *
1403                              sizeof(*tx->tx_gl));
1404                 if (tx->tx_gl == NULL)
1405                         return -ENOMEM;
1406
1407                 LIBCFS_ALLOC(tx->tx_rd,
1408                              offsetof(kib_rdma_desc_t,
1409                                       rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1410                 if (tx->tx_rd == NULL)
1411                         return -ENOMEM;
1412 #endif
1413         }
1414
1415         return 0;
1416 }
1417
1418 void
1419 kibnal_free_tx_descs (void)
1420 {
1421         int    i;
1422
1423         if (kibnal_data.kib_tx_descs == NULL)
1424                 return;
1425
1426         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1427                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1428
1429 #if IBNAL_USE_FMR
1430                 if (tx->tx_pages != NULL)
1431                         LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1432                                     sizeof(*tx->tx_pages));
1433 #else
1434                 if (tx->tx_wrq != NULL)
1435                         LIBCFS_FREE(tx->tx_wrq,
1436                                     (1 + IBNAL_MAX_RDMA_FRAGS) *
1437                                     sizeof(*tx->tx_wrq));
1438
1439                 if (tx->tx_gl != NULL)
1440                         LIBCFS_FREE(tx->tx_gl,
1441                                     (1 + IBNAL_MAX_RDMA_FRAGS) *
1442                                     sizeof(*tx->tx_gl));
1443
1444                 if (tx->tx_rd != NULL)
1445                         LIBCFS_FREE(tx->tx_rd,
1446                                     offsetof(kib_rdma_desc_t,
1447                                              rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1448 #endif
1449         }
1450
1451         LIBCFS_FREE(kibnal_data.kib_tx_descs,
1452                     IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1453 }
1454
1455 #if IBNAL_USE_FMR
1456 void
1457 kibnal_free_fmrs (int n)
1458 {
1459         int             i;
1460         vv_return_t     vvrc;
1461         kib_tx_t       *tx;
1462
1463         for (i = 0; i < n; i++) {
1464                 tx = &kibnal_data.kib_tx_descs[i];
1465
1466                 vvrc = vv_free_fmr(kibnal_data.kib_hca,
1467                                    tx->tx_md.md_fmrhandle);
1468                 if (vvrc != vv_return_ok)
1469                         CWARN("vv_free_fmr[%d]: %d\n", i, vvrc);
1470         }
1471 }
1472 #endif
1473
1474 int
1475 kibnal_setup_tx_descs (void)
1476 {
1477         int             ipage = 0;
1478         int             page_offset = 0;
1479         struct page    *page;
1480         kib_tx_t       *tx;
1481         vv_mem_reg_h_t  mem_h;
1482         vv_r_key_t      rkey;
1483         vv_return_t     vvrc;
1484         int             i;
1485         int             rc;
1486 #if IBNAL_USE_FMR
1487         vv_fmr_t        fmr_props;
1488 #endif
1489
1490         /* pre-mapped messages are not bigger than 1 page */
1491         CLASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1492
1493         /* No fancy arithmetic when we do the buffer calculations */
1494         CLASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1495
1496         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1497                                 IBNAL_TX_MSG_PAGES(), 0);
1498         if (rc != 0)
1499                 return (rc);
1500
1501         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1502                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1503                 tx = &kibnal_data.kib_tx_descs[i];
1504
1505 #if IBNAL_USE_FMR
1506                 memset(&fmr_props, 0, sizeof(fmr_props));
1507                 fmr_props.pd_hndl              = kibnal_data.kib_pd;
1508                 fmr_props.acl                  = (vv_acc_r_mem_write |
1509                                                   vv_acc_l_mem_write);
1510                 fmr_props.max_pages            = LNET_MAX_IOV;
1511                 fmr_props.log2_page_sz         = PAGE_SHIFT;
1512                 fmr_props.max_outstanding_maps = *kibnal_tunables.kib_fmr_remaps;
1513
1514                 vvrc = vv_alloc_fmr(kibnal_data.kib_hca,
1515                                     &fmr_props,
1516                                     &tx->tx_md.md_fmrhandle);
1517                 if (vvrc != vv_return_ok) {
1518                         CERROR("Can't allocate fmr %d: %d\n", i, vvrc);
1519                         kibnal_free_fmrs(i);
1520                         kibnal_free_pages (kibnal_data.kib_tx_pages);
1521                         return -ENOMEM;
1522                 }
1523
1524                 tx->tx_md.md_fmrcount = *kibnal_tunables.kib_fmr_remaps;
1525                 tx->tx_md.md_active   = 0;
1526 #endif
1527                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1528                                            page_offset);
1529
1530                 vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1531                                             tx->tx_msg,
1532                                             IBNAL_MSG_SIZE,
1533                                             &mem_h,
1534                                             &tx->tx_lkey,
1535                                             &rkey);
1536                 LASSERT (vvrc == vv_return_ok);
1537
1538                 CDEBUG(D_NET, "Tx[%d] %p->%p[%x]\n", i, tx,
1539                        tx->tx_msg, tx->tx_lkey);
1540
1541                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1542
1543                 page_offset += IBNAL_MSG_SIZE;
1544                 LASSERT (page_offset <= PAGE_SIZE);
1545
1546                 if (page_offset == PAGE_SIZE) {
1547                         page_offset = 0;
1548                         ipage++;
1549                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1550                 }
1551         }
1552
1553         return (0);
1554 }
1555
1556 void
1557 kibnal_shutdown (lnet_ni_t *ni)
1558 {
1559         int           i;
1560         vv_return_t   vvrc;
1561
1562         LASSERT (ni == kibnal_data.kib_ni);
1563         LASSERT (ni->ni_data == &kibnal_data);
1564
1565         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1566                atomic_read (&libcfs_kmemory));
1567
1568         switch (kibnal_data.kib_init) {
1569
1570         case IBNAL_INIT_ALL:
1571                 /* stop accepting connections and prevent new peers */
1572                 kibnal_stop_listener(ni);
1573
1574                 /* nuke all existing peers */
1575                 kibnal_del_peer(LNET_NID_ANY);
1576
1577                 /* Wait for all peer state to clean up */
1578                 i = 2;
1579                 while (atomic_read(&kibnal_data.kib_npeers) != 0) {
1580                         i++;
1581                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
1582                                "waiting for %d peers to disconnect\n",
1583                                atomic_read(&kibnal_data.kib_npeers));
1584                         cfs_pause(cfs_time_seconds(1));
1585                 }
1586                 /* fall through */
1587
1588         case IBNAL_INIT_CQ:
1589                 vvrc = vv_cq_destroy(kibnal_data.kib_hca, kibnal_data.kib_cq);
1590                 if (vvrc != vv_return_ok)
1591                         CERROR ("Destroy CQ error: %d\n", vvrc);
1592                 /* fall through */
1593
1594         case IBNAL_INIT_TXD:
1595                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1596 #if IBNAL_USE_FMR
1597                 kibnal_free_fmrs(IBNAL_TX_MSGS());
1598 #endif
1599                 /* fall through */
1600
1601         case IBNAL_INIT_PD:
1602 #if 0
1603                 /* Only deallocate a PD if we actually allocated one */
1604                 vvrc = vv_pd_deallocate(kibnal_data.kib_hca,
1605                                         kibnal_data.kib_pd);
1606                 if (vvrc != vv_return_ok)
1607                         CERROR ("Destroy PD error: %d\n", vvrc);
1608 #endif
1609                 /* fall through */
1610
1611         case IBNAL_INIT_ASYNC:
1612                 vvrc = vv_dell_async_event_cb (kibnal_data.kib_hca,
1613                                               kibnal_async_callback);
1614                 if (vvrc != vv_return_ok)
1615                         CERROR("vv_dell_async_event_cb error: %d\n", vvrc);
1616
1617                 /* fall through */
1618
1619         case IBNAL_INIT_HCA:
1620                 vvrc = vv_hca_close(kibnal_data.kib_hca);
1621                 if (vvrc != vv_return_ok)
1622                         CERROR ("Close HCA  error: %d\n", vvrc);
1623                 /* fall through */
1624
1625         case IBNAL_INIT_DATA:
1626                 LASSERT (atomic_read(&kibnal_data.kib_npeers) == 0);
1627                 LASSERT (kibnal_data.kib_peers != NULL);
1628                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1629                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1630                 }
1631                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1632                 LASSERT (list_empty (&kibnal_data.kib_connd_zombies));
1633                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1634                 LASSERT (list_empty (&kibnal_data.kib_connd_pcreqs));
1635                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1636
1637                 /* flag threads to terminate; wake and wait for them to die */
1638                 kibnal_data.kib_shutdown = 1;
1639                 wake_up_all (&kibnal_data.kib_sched_waitq);
1640                 wake_up_all (&kibnal_data.kib_connd_waitq);
1641
1642                 i = 2;
1643                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1644                         i++;
1645                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1646                                "Waiting for %d threads to terminate\n",
1647                                atomic_read (&kibnal_data.kib_nthreads));
1648                         cfs_pause(cfs_time_seconds(1));
1649                 }
1650                 /* fall through */
1651
1652         case IBNAL_INIT_NOTHING:
1653                 break;
1654         }
1655
1656         kibnal_free_tx_descs();
1657
1658         if (kibnal_data.kib_peers != NULL)
1659                 LIBCFS_FREE (kibnal_data.kib_peers,
1660                              sizeof (struct list_head) *
1661                              kibnal_data.kib_peer_hash_size);
1662
1663         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1664                atomic_read (&libcfs_kmemory));
1665
1666         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1667         PORTAL_MODULE_UNUSE;
1668 }
1669
1670 int
1671 kibnal_startup (lnet_ni_t *ni)
1672 {
1673         char                      scratch[32];
1674         char                      ipif_name[32];
1675         char                     *hca_name;
1676         __u32                     ip;
1677         __u32                     netmask;
1678         int                       up;
1679         int                       nob;
1680         int                       devno;
1681         struct timeval            tv;
1682         int                       rc;
1683         int                       i;
1684         vv_request_event_record_t req_er;
1685         vv_return_t               vvrc;
1686
1687         LASSERT (ni->ni_lnd == &the_kiblnd);
1688
1689         /* Only 1 instance supported */
1690         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1691                 CERROR ("Only 1 instance supported\n");
1692                 return -EPERM;
1693         }
1694
1695         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1696                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1697                         *kibnal_tunables.kib_credits,
1698                         *kibnal_tunables.kib_ntx);
1699                 return -EINVAL;
1700         }
1701
1702         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1703         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1704
1705         CLASSERT (LNET_MAX_INTERFACES > 1);
1706
1707         if (ni->ni_interfaces[0] != NULL) {
1708                 /* Use the HCA specified in 'networks=' */
1709
1710                 if (ni->ni_interfaces[1] != NULL) {
1711                         CERROR("Multiple interfaces not supported\n");
1712                         return -EPERM;
1713                 }
1714
1715                 /* Parse <hca base name><number> */
1716                 hca_name = ni->ni_interfaces[0];
1717                 nob = strlen(*kibnal_tunables.kib_hca_basename);
1718
1719                 if (strncmp(hca_name, *kibnal_tunables.kib_hca_basename, nob) ||
1720                     sscanf(hca_name + nob, "%d%n", &devno, &nob) < 1) {
1721                         CERROR("Unrecognised HCA %s\n", hca_name);
1722                         return -EINVAL;
1723                 }
1724
1725         } else {
1726                 /* Use <hca base name>0 */
1727                 devno = 0;
1728
1729                 hca_name = scratch;
1730                 snprintf(hca_name, sizeof(scratch), "%s%d",
1731                          *kibnal_tunables.kib_hca_basename, devno);
1732                 if (strlen(hca_name) == sizeof(scratch) - 1) {
1733                         CERROR("HCA name %s truncated\n", hca_name);
1734                         return -EINVAL;
1735                 }
1736         }
1737
1738         /* Find IP address from <ipif base name><hca number> */
1739         snprintf(ipif_name, sizeof(ipif_name), "%s%d",
1740                  *kibnal_tunables.kib_ipif_basename, devno);
1741         if (strlen(ipif_name) == sizeof(ipif_name) - 1) {
1742                 CERROR("IPoIB interface name %s truncated\n", ipif_name);
1743                 return -EINVAL;
1744         }
1745
1746         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1747         if (rc != 0) {
1748                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1749                 return -ENETDOWN;
1750         }
1751
1752         if (!up) {
1753                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1754                 return -ENETDOWN;
1755         }
1756
1757         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1758
1759         PORTAL_MODULE_USE;
1760         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1761
1762         kibnal_data.kib_ni = ni;
1763         ni->ni_data = &kibnal_data;
1764
1765         do_gettimeofday(&tv);
1766         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1767
1768         rwlock_init(&kibnal_data.kib_global_lock);
1769
1770         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1771         LIBCFS_ALLOC (kibnal_data.kib_peers,
1772                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1773         if (kibnal_data.kib_peers == NULL) {
1774                 goto failed;
1775         }
1776         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1777                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1778
1779         spin_lock_init (&kibnal_data.kib_connd_lock);
1780         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1781         INIT_LIST_HEAD (&kibnal_data.kib_connd_pcreqs);
1782         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1783         INIT_LIST_HEAD (&kibnal_data.kib_connd_zombies);
1784         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1785
1786         spin_lock_init (&kibnal_data.kib_sched_lock);
1787         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1788
1789         spin_lock_init (&kibnal_data.kib_tx_lock);
1790         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1791
1792         rc = kibnal_alloc_tx_descs();
1793         if (rc != 0) {
1794                 CERROR("Can't allocate tx descs\n");
1795                 goto failed;
1796         }
1797
1798         /* lists/ptrs/locks initialised */
1799         kibnal_data.kib_init = IBNAL_INIT_DATA;
1800         /*****************************************************/
1801
1802         for (i = 0; i < IBNAL_N_SCHED; i++) {
1803                 rc = kibnal_thread_start (kibnal_scheduler, (void *)((long)i));
1804                 if (rc != 0) {
1805                         CERROR("Can't spawn vibnal scheduler[%d]: %d\n",
1806                                i, rc);
1807                         goto failed;
1808                 }
1809         }
1810
1811         rc = kibnal_thread_start (kibnal_connd, NULL);
1812         if (rc != 0) {
1813                 CERROR ("Can't spawn vibnal connd: %d\n", rc);
1814                 goto failed;
1815         }
1816
1817         vvrc = vv_hca_open(hca_name, NULL, &kibnal_data.kib_hca);
1818         if (vvrc != vv_return_ok) {
1819                 CERROR ("Can't open HCA %s: %d\n", hca_name, vvrc);
1820                 goto failed;
1821         }
1822
1823         /* Channel Adapter opened */
1824         kibnal_data.kib_init = IBNAL_INIT_HCA;
1825
1826         /* register to get HCA's asynchronous events. */
1827         req_er.req_event_type = VV_ASYNC_EVENT_ALL_MASK;
1828         vvrc = vv_set_async_event_cb (kibnal_data.kib_hca, req_er,
1829                                      kibnal_async_callback);
1830         if (vvrc != vv_return_ok) {
1831                 CERROR ("Can't set HCA %s callback: %d\n", hca_name, vvrc);
1832                 goto failed; 
1833         }
1834
1835         kibnal_data.kib_init = IBNAL_INIT_ASYNC;
1836
1837         /*****************************************************/
1838
1839         vvrc = vv_hca_query(kibnal_data.kib_hca, &kibnal_data.kib_hca_attrs);
1840         if (vvrc != vv_return_ok) {
1841                 CERROR ("Can't size port attrs for %s: %d\n", hca_name, vvrc);
1842                 goto failed;
1843         }
1844
1845         kibnal_data.kib_port = -1;
1846
1847         for (i = 0; i<kibnal_data.kib_hca_attrs.port_num; i++) {
1848
1849                 int port_num = i+1;
1850                 u_int32_t tbl_count;
1851                 vv_port_attrib_t *pattr = &kibnal_data.kib_port_attr;
1852
1853                 vvrc = vv_port_query(kibnal_data.kib_hca, port_num, pattr);
1854                 if (vvrc != vv_return_ok) {
1855                         CERROR("vv_port_query failed for %s port %d: %d\n",
1856                                hca_name, port_num, vvrc);
1857                         continue;
1858                 }
1859
1860                 switch (pattr->port_state) {
1861                 case vv_state_linkDoun:
1862                         CDEBUG(D_NET, "port[%d] Down\n", port_num);
1863                         continue;
1864                 case vv_state_linkInit:
1865                         CDEBUG(D_NET, "port[%d] Init\n", port_num);
1866                         continue;
1867                 case vv_state_linkArm:
1868                         CDEBUG(D_NET, "port[%d] Armed\n", port_num);
1869                         continue;
1870                 case vv_state_linkActive:
1871                         CDEBUG(D_NET, "port[%d] Active\n", port_num);
1872
1873                         /* Found a suitable port. Get its GUID and PKEY. */
1874                         tbl_count = 1;
1875                         vvrc = vv_get_port_gid_tbl(kibnal_data.kib_hca,
1876                                                    port_num, &tbl_count,
1877                                                    &kibnal_data.kib_port_gid);
1878                         if (vvrc != vv_return_ok) {
1879                                 CERROR("vv_get_port_gid_tbl failed "
1880                                        "for %s port %d: %d\n",
1881                                        hca_name, port_num, vvrc);
1882                                 continue;
1883                         }
1884
1885                         tbl_count = 1;
1886                         vvrc = vv_get_port_partition_tbl(kibnal_data.kib_hca,
1887                                                          port_num, &tbl_count,
1888                                                          &kibnal_data.kib_port_pkey);
1889                         if (vvrc != vv_return_ok) {
1890                                 CERROR("vv_get_port_partition_tbl failed "
1891                                        "for %s port %d: %d\n",
1892                                        hca_name, port_num, vvrc);
1893                                 continue;
1894                         }
1895
1896                         kibnal_data.kib_port = port_num;
1897
1898                         break;
1899                 case vv_state_linkActDefer: /* TODO: correct? */
1900                 case vv_state_linkNoChange:
1901                         CERROR("Unexpected %s port[%d] state %d\n",
1902                                hca_name, i, pattr->port_state);
1903                         continue;
1904                 }
1905                 break;
1906         }
1907
1908         if (kibnal_data.kib_port == -1) {
1909                 CERROR ("Can't find an active port on %s\n", hca_name);
1910                 goto failed;
1911         }
1912
1913         CDEBUG(D_NET, "Using %s port %d - GID="LPX64":"LPX64"\n",
1914                hca_name, kibnal_data.kib_port,
1915                kibnal_data.kib_port_gid.scope.g.subnet,
1916                kibnal_data.kib_port_gid.scope.g.eui64);
1917
1918         /*****************************************************/
1919
1920 #if 1
1921         /* We use a pre-allocated PD */
1922         vvrc = vv_get_gen_pd_h(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1923 #else
1924         vvrc = vv_pd_allocate(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1925 #endif
1926         if (vvrc != vv_return_ok) {
1927                 CERROR ("Can't init PD: %d\n", vvrc);
1928                 goto failed;
1929         }
1930
1931         /* flag PD initialised */
1932         kibnal_data.kib_init = IBNAL_INIT_PD;
1933         /*****************************************************/
1934
1935         rc = kibnal_setup_tx_descs();
1936         if (rc != 0) {
1937                 CERROR ("Can't register tx descs: %d\n", rc);
1938                 goto failed;
1939         }
1940
1941         /* flag TX descs initialised */
1942         kibnal_data.kib_init = IBNAL_INIT_TXD;
1943         /*****************************************************/
1944
1945         {
1946                 __u32 nentries;
1947
1948                 vvrc = vv_cq_create(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES(),
1949                                     kibnal_cq_callback,
1950                                     NULL, /* context */
1951                                     &kibnal_data.kib_cq, &nentries);
1952                 if (vvrc != 0) {
1953                         CERROR ("Can't create RX CQ: %d\n", vvrc);
1954                         goto failed;
1955                 }
1956
1957                 /* flag CQ initialised */
1958                 kibnal_data.kib_init = IBNAL_INIT_CQ;
1959
1960                 if (nentries < IBNAL_CQ_ENTRIES()) {
1961                         CERROR ("CQ only has %d entries, need %d\n",
1962                                 nentries, IBNAL_CQ_ENTRIES());
1963                         goto failed;
1964                 }
1965
1966                 vvrc = vv_request_completion_notification(kibnal_data.kib_hca,
1967                                                           kibnal_data.kib_cq,
1968                                                           vv_next_solicit_unsolicit_event);
1969                 if (vvrc != 0) {
1970                         CERROR ("Failed to re-arm completion queue: %d\n", rc);
1971                         goto failed;
1972                 }
1973         }
1974
1975         rc = kibnal_start_listener(ni);
1976         if (rc != 0) {
1977                 CERROR("Can't start listener: %d\n", rc);
1978                 goto failed;
1979         }
1980
1981         /* flag everything initialised */
1982         kibnal_data.kib_init = IBNAL_INIT_ALL;
1983         /*****************************************************/
1984
1985         return (0);
1986
1987  failed:
1988         CDEBUG(D_NET, "kibnal_startup failed\n");
1989         kibnal_shutdown (ni);
1990         return (-ENETDOWN);
1991 }
1992
1993 void __exit
1994 kibnal_module_fini (void)
1995 {
1996         lnet_unregister_lnd(&the_kiblnd);
1997         kibnal_tunables_fini();
1998 }
1999
2000 int __init
2001 kibnal_module_init (void)
2002 {
2003         int    rc;
2004
2005         vibnal_assert_wire_constants();
2006
2007         CLASSERT (offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t)
2008                   <= cm_REQ_priv_data_len);
2009         CLASSERT (offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t)
2010                   <= cm_REP_priv_data_len);
2011         CLASSERT (sizeof(kib_msg_t) <= IBNAL_MSG_SIZE);
2012 #if !IBNAL_USE_FMR
2013         CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBNAL_MAX_RDMA_FRAGS])
2014                   <= IBNAL_MSG_SIZE);
2015         CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBNAL_MAX_RDMA_FRAGS])
2016                   <= IBNAL_MSG_SIZE);
2017 #endif
2018         rc = kibnal_tunables_init();
2019         if (rc != 0)
2020                 return rc;
2021
2022         lnet_register_lnd(&the_kiblnd);
2023
2024         return 0;
2025 }
2026
2027 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2028 MODULE_DESCRIPTION("Kernel Voltaire IB LND v1.00");
2029 MODULE_LICENSE("GPL");
2030
2031 module_init(kibnal_module_init);
2032 module_exit(kibnal_module_fini);