Whamcloud - gitweb
63ec5b6216577e322259f0cb07c0abc1d4789f4a
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * lnet/klnds/viblnd/viblnd.c
38  *
39  * Author: Eric Barton <eric@bartonsoftware.com>
40  * Author: Frank Zago <fzago@systemfabricworks.com>
41  */
42
43 #include "viblnd.h"
44
45 lnd_t the_kiblnd = {
46         .lnd_type       = VIBLND,
47         .lnd_startup    = kibnal_startup,
48         .lnd_shutdown   = kibnal_shutdown,
49         .lnd_ctl        = kibnal_ctl,
50         .lnd_send       = kibnal_send,
51         .lnd_recv       = kibnal_recv,
52         .lnd_eager_recv = kibnal_eager_recv,
53 };
54
55 kib_data_t              kibnal_data;
56
57 void vibnal_assert_wire_constants (void)
58 {
59         /* Wire protocol assertions generated by 'wirecheck'
60          * running on Linux robert 2.6.11-1.27_FC3 #1 Tue May 17 20:27:37 EDT 2005 i686 athlon i386 G
61          * with gcc version 3.4.3 20050227 (Red Hat 3.4.3-22.fc3) */
62
63
64         /* Constants... */
65         CLASSERT (IBNAL_MSG_MAGIC == 0x0be91b91);
66         CLASSERT (IBNAL_MSG_VERSION == 0x11);
67         CLASSERT (IBNAL_MSG_CONNREQ == 0xc0);
68         CLASSERT (IBNAL_MSG_CONNACK == 0xc1);
69         CLASSERT (IBNAL_MSG_NOOP == 0xd0);
70         CLASSERT (IBNAL_MSG_IMMEDIATE == 0xd1);
71         CLASSERT (IBNAL_MSG_PUT_REQ == 0xd2);
72         CLASSERT (IBNAL_MSG_PUT_NAK == 0xd3);
73         CLASSERT (IBNAL_MSG_PUT_ACK == 0xd4);
74         CLASSERT (IBNAL_MSG_PUT_DONE == 0xd5);
75         CLASSERT (IBNAL_MSG_GET_REQ == 0xd6);
76         CLASSERT (IBNAL_MSG_GET_DONE == 0xd7);
77
78         /* Checks for struct kib_connparams_t */
79         CLASSERT ((int)sizeof(kib_connparams_t) == 12);
80         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_queue_depth) == 0);
81         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_queue_depth) == 4);
82         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_max_msg_size) == 4);
83         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_max_msg_size) == 4);
84         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_max_frags) == 8);
85         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_max_frags) == 4);
86
87         /* Checks for struct kib_immediate_msg_t */
88         CLASSERT ((int)sizeof(kib_immediate_msg_t) == 72);
89         CLASSERT ((int)offsetof(kib_immediate_msg_t, ibim_hdr) == 0);
90         CLASSERT ((int)sizeof(((kib_immediate_msg_t *)0)->ibim_hdr) == 72);
91         CLASSERT ((int)offsetof(kib_immediate_msg_t, ibim_payload[13]) == 85);
92         CLASSERT ((int)sizeof(((kib_immediate_msg_t *)0)->ibim_payload[13]) == 1);
93         CLASSERT (IBNAL_USE_FMR == 1);
94
95         /* Checks for struct kib_rdma_desc_t */
96         CLASSERT ((int)sizeof(kib_rdma_desc_t) == 16);
97         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_addr) == 0);
98         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_addr) == 8);
99         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_nob) == 8);
100         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_nob) == 4);
101         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_key) == 12);
102         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_key) == 4);
103
104         /* Checks for struct kib_putreq_msg_t */
105         CLASSERT ((int)sizeof(kib_putreq_msg_t) == 80);
106         CLASSERT ((int)offsetof(kib_putreq_msg_t, ibprm_hdr) == 0);
107         CLASSERT ((int)sizeof(((kib_putreq_msg_t *)0)->ibprm_hdr) == 72);
108         CLASSERT ((int)offsetof(kib_putreq_msg_t, ibprm_cookie) == 72);
109         CLASSERT ((int)sizeof(((kib_putreq_msg_t *)0)->ibprm_cookie) == 8);
110
111         /* Checks for struct kib_putack_msg_t */
112         CLASSERT ((int)sizeof(kib_putack_msg_t) == 32);
113         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_src_cookie) == 0);
114         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_src_cookie) == 8);
115         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_dst_cookie) == 8);
116         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_dst_cookie) == 8);
117         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_rd) == 16);
118         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_rd) == 16);
119
120         /* Checks for struct kib_get_msg_t */
121         CLASSERT ((int)sizeof(kib_get_msg_t) == 96);
122         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_hdr) == 0);
123         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_hdr) == 72);
124         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_cookie) == 72);
125         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_cookie) == 8);
126         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_rd) == 80);
127         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_rd) == 16);
128
129         /* Checks for struct kib_completion_msg_t */
130         CLASSERT ((int)sizeof(kib_completion_msg_t) == 12);
131         CLASSERT ((int)offsetof(kib_completion_msg_t, ibcm_cookie) == 0);
132         CLASSERT ((int)sizeof(((kib_completion_msg_t *)0)->ibcm_cookie) == 8);
133         CLASSERT ((int)offsetof(kib_completion_msg_t, ibcm_status) == 8);
134         CLASSERT ((int)sizeof(((kib_completion_msg_t *)0)->ibcm_status) == 4);
135
136         /* Checks for struct kib_msg_t */
137         CLASSERT ((int)sizeof(kib_msg_t) == 152);
138         CLASSERT ((int)offsetof(kib_msg_t, ibm_magic) == 0);
139         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_magic) == 4);
140         CLASSERT ((int)offsetof(kib_msg_t, ibm_version) == 4);
141         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_version) == 2);
142         CLASSERT ((int)offsetof(kib_msg_t, ibm_type) == 6);
143         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_type) == 1);
144         CLASSERT ((int)offsetof(kib_msg_t, ibm_credits) == 7);
145         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_credits) == 1);
146         CLASSERT ((int)offsetof(kib_msg_t, ibm_nob) == 8);
147         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_nob) == 4);
148         CLASSERT ((int)offsetof(kib_msg_t, ibm_cksum) == 12);
149         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_cksum) == 4);
150         CLASSERT ((int)offsetof(kib_msg_t, ibm_srcnid) == 16);
151         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_srcnid) == 8);
152         CLASSERT ((int)offsetof(kib_msg_t, ibm_srcstamp) == 24);
153         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_srcstamp) == 8);
154         CLASSERT ((int)offsetof(kib_msg_t, ibm_dstnid) == 32);
155         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_dstnid) == 8);
156         CLASSERT ((int)offsetof(kib_msg_t, ibm_dststamp) == 40);
157         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_dststamp) == 8);
158         CLASSERT ((int)offsetof(kib_msg_t, ibm_seq) == 48);
159         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_seq) == 8);
160         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.connparams) == 56);
161         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.connparams) == 12);
162         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.immediate) == 56);
163         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.immediate) == 72);
164         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.putreq) == 56);
165         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.putreq) == 80);
166         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.putack) == 56);
167         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.putack) == 32);
168         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.get) == 56);
169         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.get) == 96);
170         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.completion) == 56);
171         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.completion) == 12);
172 }
173
174 __u32 
175 kibnal_cksum (void *ptr, int nob)
176 {
177         char  *c  = ptr;
178         __u32  sum = 0;
179
180         while (nob-- > 0)
181                 sum = ((sum << 1) | (sum >> 31)) + *c++;
182
183         /* ensure I don't return 0 (== no checksum) */
184         return (sum == 0) ? 1 : sum;
185 }
186
187 void
188 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
189 {
190         msg->ibm_type = type;
191         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
192 }
193
194 void
195 kibnal_pack_msg(kib_msg_t *msg, __u32 version, int credits,
196                 lnet_nid_t dstnid, __u64 dststamp, __u64 seq)
197 {
198         /* CAVEAT EMPTOR! all message fields not set here should have been
199          * initialised previously. */
200         msg->ibm_magic    = IBNAL_MSG_MAGIC;
201         msg->ibm_version  = version;
202         /*   ibm_type */
203         msg->ibm_credits  = credits;
204         /*   ibm_nob */
205         msg->ibm_cksum    = 0;
206         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(kibnal_data.kib_ni->ni_nid,
207                                                   dstnid);
208         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
209         msg->ibm_dstnid   = dstnid;
210         msg->ibm_dststamp = dststamp;
211         msg->ibm_seq      = seq;
212
213         if (*kibnal_tunables.kib_cksum) {
214                 /* NB ibm_cksum zero while computing cksum */
215                 msg->ibm_cksum = kibnal_cksum(msg, msg->ibm_nob);
216         }
217 }
218
219 int
220 kibnal_unpack_msg(kib_msg_t *msg, __u32 expected_version, int nob)
221 {
222         const int hdr_size = offsetof(kib_msg_t, ibm_u);
223         __u32     msg_cksum;
224         __u32     msg_version;
225         int       flip;
226         int       msg_nob;
227 #if !IBNAL_USE_FMR
228         int       i;
229         int       n;
230 #endif
231         /* 6 bytes are enough to have received magic + version */
232         if (nob < 6) {
233                 CERROR("Short message: %d\n", nob);
234                 return -EPROTO;
235         }
236
237         /* Future protocol version compatibility support!
238          * If the viblnd-specific protocol changes, or when LNET unifies
239          * protocols over all LNDs, the initial connection will negotiate a
240          * protocol version.  If I find this, I avoid any console errors.  If
241          * my is doing connection establishment, the reject will tell the peer
242          * which version I'm running. */
243
244         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
245                 flip = 0;
246         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
247                 flip = 1;
248         } else {
249                 if (msg->ibm_magic == LNET_PROTO_MAGIC ||
250                     msg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
251                         return -EPROTO;
252
253                 /* Completely out to lunch */
254                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
255                 return -EPROTO;
256         }
257
258         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
259         if (expected_version == 0) {
260                 if (msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
261                     msg_version != IBNAL_MSG_VERSION)
262                         return -EPROTO;
263         } else if (msg_version != expected_version) {
264                 CERROR("Bad version: %x(%x expected)\n",
265                        msg_version, expected_version);
266                 return -EPROTO;
267         }
268
269         if (nob < hdr_size) {
270                 CERROR("Short message: %d\n", nob);
271                 return -EPROTO;
272         }
273
274         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
275         if (msg_nob > nob) {
276                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
277                 return -EPROTO;
278         }
279
280         /* checksum must be computed with ibm_cksum zero and BEFORE anything
281          * gets flipped */
282         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
283         msg->ibm_cksum = 0;
284         if (msg_cksum != 0 &&
285             msg_cksum != kibnal_cksum(msg, msg_nob)) {
286                 CERROR("Bad checksum\n");
287                 return -EPROTO;
288         }
289         msg->ibm_cksum = msg_cksum;
290
291         if (flip) {
292                 /* leave magic unflipped as a clue to peer endianness */
293                 msg->ibm_version = msg_version;
294                 CLASSERT (sizeof(msg->ibm_type) == 1);
295                 CLASSERT (sizeof(msg->ibm_credits) == 1);
296                 msg->ibm_nob = msg_nob;
297                 __swab64s(&msg->ibm_srcnid);
298                 __swab64s(&msg->ibm_srcstamp);
299                 __swab64s(&msg->ibm_dstnid);
300                 __swab64s(&msg->ibm_dststamp);
301                 __swab64s(&msg->ibm_seq);
302         }
303
304         if (msg->ibm_srcnid == LNET_NID_ANY) {
305                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
306                 return -EPROTO;
307         }
308
309         switch (msg->ibm_type) {
310         default:
311                 CERROR("Unknown message type %x\n", msg->ibm_type);
312                 return -EPROTO;
313
314         case IBNAL_MSG_NOOP:
315                 break;
316
317         case IBNAL_MSG_IMMEDIATE:
318                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
319                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
320                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
321                         return -EPROTO;
322                 }
323                 break;
324
325         case IBNAL_MSG_PUT_REQ:
326                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
327                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
328                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
329                         return -EPROTO;
330                 }
331                 break;
332
333         case IBNAL_MSG_PUT_ACK:
334                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
335                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
336                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
337                         return -EPROTO;
338                 }
339 #if IBNAL_USE_FMR
340                 if (flip) {
341                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
342                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
343                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
344                 }
345 #else
346                 if (flip) {
347                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
348                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrag);
349                 }
350
351                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrag;
352                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
353                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n",
354                                n, IBNAL_MAX_RDMA_FRAGS);
355                         return -EPROTO;
356                 }
357
358                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
359                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
360                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
361                         return -EPROTO;
362                 }
363
364                 if (flip) {
365                         for (i = 0; i < n; i++) {
366                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
367                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr_lo);
368                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr_hi);
369                         }
370                 }
371 #endif
372                 break;
373
374         case IBNAL_MSG_GET_REQ:
375                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
376                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
377                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
378                         return -EPROTO;
379                 }
380 #if IBNAL_USE_FMR
381                 if (flip) {
382                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
383                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
384                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
385                 }
386 #else
387                 if (flip) {
388                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
389                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrag);
390                 }
391
392                 n = msg->ibm_u.get.ibgm_rd.rd_nfrag;
393                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
394                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n",
395                                n, IBNAL_MAX_RDMA_FRAGS);
396                         return -EPROTO;
397                 }
398
399                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
400                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
401                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
402                         return -EPROTO;
403                 }
404
405                 if (flip)
406                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrag; i++) {
407                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
408                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr_lo);
409                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr_hi);
410                         }
411 #endif
412                 break;
413
414         case IBNAL_MSG_PUT_NAK:
415         case IBNAL_MSG_PUT_DONE:
416         case IBNAL_MSG_GET_DONE:
417                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
418                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
419                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
420                         return -EPROTO;
421                 }
422                 if (flip)
423                         __swab32s(&msg->ibm_u.completion.ibcm_status);
424                 break;
425
426         case IBNAL_MSG_CONNREQ:
427         case IBNAL_MSG_CONNACK:
428                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
429                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
430                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
431                         return -EPROTO;
432                 }
433                 if (flip) {
434                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
435                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
436                         __swab32s(&msg->ibm_u.connparams.ibcp_max_frags);
437                 }
438                 break;
439         }
440         return 0;
441 }
442
443 int
444 kibnal_start_listener (lnet_ni_t *ni)
445 {
446         static cm_listen_data_t info;
447
448         cm_return_t      cmrc;
449
450         LASSERT (kibnal_data.kib_listen_handle == NULL);
451
452         kibnal_data.kib_listen_handle =
453                 cm_create_cep(cm_cep_transp_rc);
454         if (kibnal_data.kib_listen_handle == NULL) {
455                 CERROR ("Can't create listen CEP\n");
456                 return -ENOMEM;
457         }
458
459         CDEBUG(D_NET, "Created CEP %p for listening\n",
460                kibnal_data.kib_listen_handle);
461
462         memset(&info, 0, sizeof(info));
463         info.listen_addr.end_pt.sid =
464                 (__u64)(*kibnal_tunables.kib_service_number);
465
466         cmrc = cm_listen(kibnal_data.kib_listen_handle, &info,
467                          kibnal_listen_callback, NULL);
468         if (cmrc == cm_stat_success)
469                 return 0;
470
471         CERROR ("cm_listen error: %d\n", cmrc);
472
473         cmrc = cm_destroy_cep(kibnal_data.kib_listen_handle);
474         LASSERT (cmrc == cm_stat_success);
475
476         kibnal_data.kib_listen_handle = NULL;
477         return -EINVAL;
478 }
479
480 void
481 kibnal_stop_listener(lnet_ni_t *ni)
482 {
483         cm_return_t      cmrc;
484
485         LASSERT (kibnal_data.kib_listen_handle != NULL);
486
487         cmrc = cm_cancel(kibnal_data.kib_listen_handle);
488         if (cmrc != cm_stat_success)
489                 CERROR ("Error %d stopping listener\n", cmrc);
490
491         cfs_pause(cfs_time_seconds(1)/10);   /* ensure no more callbacks */
492
493         cmrc = cm_destroy_cep(kibnal_data.kib_listen_handle);
494         if (cmrc != vv_return_ok)
495                 CERROR ("Error %d destroying CEP\n", cmrc);
496
497         kibnal_data.kib_listen_handle = NULL;
498 }
499
500 int
501 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
502 {
503         kib_peer_t     *peer;
504         unsigned long   flags;
505         int             rc;
506
507         LASSERT (nid != LNET_NID_ANY);
508
509         LIBCFS_ALLOC(peer, sizeof (*peer));
510         if (peer == NULL) {
511                 CERROR("Cannot allocate peer\n");
512                 return -ENOMEM;
513         }
514
515         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
516
517         peer->ibp_nid = nid;
518         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
519
520         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
521         INIT_LIST_HEAD (&peer->ibp_conns);
522         INIT_LIST_HEAD (&peer->ibp_tx_queue);
523
524         peer->ibp_error = 0;
525         peer->ibp_last_alive = cfs_time_current();
526         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
527
528         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
529
530         if (atomic_read(&kibnal_data.kib_npeers) >=
531             *kibnal_tunables.kib_concurrent_peers) {
532                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
533         } else if (kibnal_data.kib_listen_handle == NULL) {
534                 rc = -ESHUTDOWN;        /* shutdown has started */
535         } else {
536                 rc = 0;
537                 /* npeers only grows with the global lock held */
538                 atomic_inc(&kibnal_data.kib_npeers);
539         }
540
541         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
542
543         if (rc != 0) {
544                 CERROR("Can't create peer: %s\n", 
545                        (rc == -ESHUTDOWN) ? "shutting down" :
546                        "too many peers");
547                 LIBCFS_FREE(peer, sizeof(*peer));
548         } else {
549                 *peerp = peer;
550         }
551
552         return rc;
553 }
554
555 void
556 kibnal_destroy_peer (kib_peer_t *peer)
557 {
558         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
559         LASSERT (peer->ibp_persistence == 0);
560         LASSERT (!kibnal_peer_active(peer));
561         LASSERT (peer->ibp_connecting == 0);
562         LASSERT (peer->ibp_accepting == 0);
563         LASSERT (list_empty (&peer->ibp_conns));
564         LASSERT (list_empty (&peer->ibp_tx_queue));
565
566         LIBCFS_FREE (peer, sizeof (*peer));
567
568         /* NB a peer's connections keep a reference on their peer until
569          * they are destroyed, so we can be assured that _all_ state to do
570          * with this peer has been cleaned up when its refcount drops to
571          * zero. */
572         atomic_dec(&kibnal_data.kib_npeers);
573 }
574
575 kib_peer_t *
576 kibnal_find_peer_locked (lnet_nid_t nid)
577 {
578         /* the caller is responsible for accounting the additional reference
579          * that this creates */
580         struct list_head *peer_list = kibnal_nid2peerlist (nid);
581         struct list_head *tmp;
582         kib_peer_t       *peer;
583
584         list_for_each (tmp, peer_list) {
585
586                 peer = list_entry (tmp, kib_peer_t, ibp_list);
587
588                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
589                          peer->ibp_connecting != 0 || /* creating conns */
590                          peer->ibp_accepting != 0 ||
591                          !list_empty (&peer->ibp_conns));  /* active conn */
592
593                 if (peer->ibp_nid != nid)
594                         continue;
595
596                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
597                        peer, libcfs_nid2str(nid),
598                        atomic_read (&peer->ibp_refcount));
599                 return (peer);
600         }
601         return (NULL);
602 }
603
604 void
605 kibnal_unlink_peer_locked (kib_peer_t *peer)
606 {
607         LASSERT (peer->ibp_persistence == 0);
608         LASSERT (list_empty(&peer->ibp_conns));
609
610         LASSERT (kibnal_peer_active(peer));
611         list_del_init (&peer->ibp_list);
612         /* lose peerlist's ref */
613         kibnal_peer_decref(peer);
614 }
615
616 int
617 kibnal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp,
618                       int *persistencep)
619 {
620         kib_peer_t        *peer;
621         struct list_head  *ptmp;
622         int                i;
623         unsigned long      flags;
624
625         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
626
627         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
628
629                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
630
631                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
632                         LASSERT (peer->ibp_persistence != 0 ||
633                                  peer->ibp_connecting != 0 ||
634                                  peer->ibp_accepting != 0 ||
635                                  !list_empty (&peer->ibp_conns));
636
637                         if (index-- > 0)
638                                 continue;
639
640                         *nidp = peer->ibp_nid;
641                         *ipp = peer->ibp_ip;
642                         *persistencep = peer->ibp_persistence;
643
644                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
645                                                flags);
646                         return (0);
647                 }
648         }
649
650         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
651         return (-ENOENT);
652 }
653
654 int
655 kibnal_add_persistent_peer (lnet_nid_t nid, __u32 ip)
656 {
657         kib_peer_t        *peer;
658         kib_peer_t        *peer2;
659         unsigned long      flags;
660         int                rc;
661
662         CDEBUG(D_NET, "%s at %u.%u.%u.%u\n",
663                libcfs_nid2str(nid), HIPQUAD(ip));
664
665         if (nid == LNET_NID_ANY)
666                 return (-EINVAL);
667
668         rc = kibnal_create_peer(&peer, nid);
669         if (rc != 0)
670                 return rc;
671
672         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
673
674         /* I'm always called with a reference on kibnal_data.kib_ni
675          * so shutdown can't have started */
676         LASSERT (kibnal_data.kib_listen_handle != NULL);
677
678         peer2 = kibnal_find_peer_locked (nid);
679         if (peer2 != NULL) {
680                 kibnal_peer_decref (peer);
681                 peer = peer2;
682         } else {
683                 /* peer table takes existing ref on peer */
684                 list_add_tail (&peer->ibp_list,
685                                kibnal_nid2peerlist (nid));
686         }
687
688         peer->ibp_ip = ip;
689         peer->ibp_persistence++;
690
691         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
692         return (0);
693 }
694
695 void
696 kibnal_del_peer_locked (kib_peer_t *peer)
697 {
698         struct list_head *ctmp;
699         struct list_head *cnxt;
700         kib_conn_t       *conn;
701
702         peer->ibp_persistence = 0;
703
704         if (list_empty(&peer->ibp_conns)) {
705                 kibnal_unlink_peer_locked(peer);
706         } else {
707                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
708                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
709
710                         kibnal_close_conn_locked (conn, 0);
711                 }
712                 /* NB peer is no longer persistent; closing its last conn
713                  * unlinked it. */
714         }
715         /* NB peer now unlinked; might even be freed if the peer table had the
716          * last ref on it. */
717 }
718
719 int
720 kibnal_del_peer (lnet_nid_t nid)
721 {
722         CFS_LIST_HEAD     (zombies);
723         struct list_head  *ptmp;
724         struct list_head  *pnxt;
725         kib_peer_t        *peer;
726         int                lo;
727         int                hi;
728         int                i;
729         unsigned long      flags;
730         int                rc = -ENOENT;
731
732         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
733
734         if (nid != LNET_NID_ANY)
735                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
736         else {
737                 lo = 0;
738                 hi = kibnal_data.kib_peer_hash_size - 1;
739         }
740
741         for (i = lo; i <= hi; i++) {
742                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
743                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
744                         LASSERT (peer->ibp_persistence != 0 ||
745                                  peer->ibp_connecting != 0 ||
746                                  peer->ibp_accepting != 0 ||
747                                  !list_empty (&peer->ibp_conns));
748
749                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
750                                 continue;
751
752                         if (!list_empty(&peer->ibp_tx_queue)) {
753                                 LASSERT (list_empty(&peer->ibp_conns));
754
755                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
756                         }
757
758                         kibnal_del_peer_locked (peer);
759                         rc = 0;         /* matched something */
760                 }
761         }
762
763         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
764
765         kibnal_txlist_done(&zombies, -EIO);
766
767         return (rc);
768 }
769
770 kib_conn_t *
771 kibnal_get_conn_by_idx (int index)
772 {
773         kib_peer_t        *peer;
774         struct list_head  *ptmp;
775         kib_conn_t        *conn;
776         struct list_head  *ctmp;
777         int                i;
778         unsigned long      flags;
779
780         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
781
782         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
783                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
784
785                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
786                         LASSERT (peer->ibp_persistence > 0 ||
787                                  peer->ibp_connecting != 0 ||
788                                  peer->ibp_accepting != 0 ||
789                                  !list_empty (&peer->ibp_conns));
790
791                         list_for_each (ctmp, &peer->ibp_conns) {
792                                 if (index-- > 0)
793                                         continue;
794
795                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
796                                 kibnal_conn_addref(conn);
797                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
798                                                        flags);
799                                 return (conn);
800                         }
801                 }
802         }
803
804         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
805         return (NULL);
806 }
807
808 void
809 kibnal_debug_rx (kib_rx_t *rx)
810 {
811         CDEBUG(D_CONSOLE, "      %p nob %d msg_type %x "
812                "cred %d seq "LPD64"\n",
813                rx, rx->rx_nob, rx->rx_msg->ibm_type,
814                rx->rx_msg->ibm_credits, rx->rx_msg->ibm_seq);
815 }
816
817 void
818 kibnal_debug_tx (kib_tx_t *tx)
819 {
820         CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "
821                "cookie "LPX64" msg %s%s type %x cred %d seq "LPD64"\n",
822                tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,
823                tx->tx_status, tx->tx_deadline, tx->tx_cookie,
824                tx->tx_lntmsg[0] == NULL ? "-" : "!",
825                tx->tx_lntmsg[1] == NULL ? "-" : "!",
826                tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits,
827                tx->tx_msg->ibm_seq);
828 }
829
830 void
831 kibnal_debug_conn (kib_conn_t *conn)
832 {
833         struct list_head *tmp;
834         int               i;
835
836         spin_lock(&conn->ibc_lock);
837
838         CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n",
839                atomic_read(&conn->ibc_refcount), conn,
840                libcfs_nid2str(conn->ibc_peer->ibp_nid));
841         CDEBUG(D_CONSOLE, "   txseq "LPD64" rxseq "LPD64" state %d \n",
842                conn->ibc_txseq, conn->ibc_rxseq, conn->ibc_state);
843         CDEBUG(D_CONSOLE, "   nposted %d cred %d o_cred %d r_cred %d\n",
844                conn->ibc_nsends_posted, conn->ibc_credits,
845                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);
846         CDEBUG(D_CONSOLE, "   disc %d comms_err %d\n",
847                conn->ibc_disconnect, conn->ibc_comms_error);
848
849         CDEBUG(D_CONSOLE, "   early_rxs:\n");
850         list_for_each(tmp, &conn->ibc_early_rxs)
851                 kibnal_debug_rx(list_entry(tmp, kib_rx_t, rx_list));
852
853         CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");
854         list_for_each(tmp, &conn->ibc_tx_queue_nocred)
855                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
856
857         CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");
858         list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)
859                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
860
861         CDEBUG(D_CONSOLE, "   tx_queue:\n");
862         list_for_each(tmp, &conn->ibc_tx_queue)
863                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
864
865         CDEBUG(D_CONSOLE, "   active_txs:\n");
866         list_for_each(tmp, &conn->ibc_active_txs)
867                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
868
869         CDEBUG(D_CONSOLE, "   rxs:\n");
870         for (i = 0; i < IBNAL_RX_MSGS; i++)
871                 kibnal_debug_rx(&conn->ibc_rxs[i]);
872
873         spin_unlock(&conn->ibc_lock);
874 }
875
876 int
877 kibnal_set_qp_state (kib_conn_t *conn, vv_qp_state_t new_state)
878 {
879         static vv_qp_attr_t attr;
880
881         kib_connvars_t   *cv = conn->ibc_connvars;
882         vv_return_t       vvrc;
883
884         /* Only called by connd => static OK */
885         LASSERT (!in_interrupt());
886         LASSERT (current == kibnal_data.kib_connd);
887
888         memset(&attr, 0, sizeof(attr));
889
890         switch (new_state) {
891         default:
892                 LBUG();
893
894         case vv_qp_state_init: {
895                 struct vv_qp_modify_init_st *init = &attr.modify.params.init;
896
897                 init->p_key_indx     = cv->cv_pkey_index;
898                 init->phy_port_num   = cv->cv_port;
899                 init->q_key          = IBNAL_QKEY; /* XXX but VV_QP_AT_Q_KEY not set! */
900                 init->access_control = vv_acc_r_mem_read |
901                                        vv_acc_r_mem_write; /* XXX vv_acc_l_mem_write ? */
902
903                 attr.modify.vv_qp_attr_mask = VV_QP_AT_P_KEY_IX |
904                                               VV_QP_AT_PHY_PORT_NUM |
905                                               VV_QP_AT_ACCESS_CON_F;
906                 break;
907         }
908         case vv_qp_state_rtr: {
909                 struct vv_qp_modify_rtr_st *rtr = &attr.modify.params.rtr;
910                 vv_add_vec_t               *av  = &rtr->remote_add_vec;
911
912                 av->dlid                      = cv->cv_path.dlid;
913                 av->grh_flag                  = (!IBNAL_LOCAL_SUB);
914                 av->max_static_rate           = IBNAL_R_2_STATIC_RATE(cv->cv_path.rate);
915                 av->service_level             = cv->cv_path.sl;
916                 av->source_path_bit           = IBNAL_SOURCE_PATH_BIT;
917                 av->pmtu                      = cv->cv_path.mtu;
918                 av->rnr_retry_count           = cv->cv_rnr_count;
919                 av->global_dest.traffic_class = cv->cv_path.traffic_class;
920                 av->global_dest.hope_limit    = cv->cv_path.hop_limut;
921                 av->global_dest.flow_lable    = cv->cv_path.flow_label;
922                 av->global_dest.s_gid_index   = cv->cv_sgid_index;
923                 // XXX other av fields zero?
924
925                 rtr->destanation_qp            = cv->cv_remote_qpn;
926                 rtr->receive_psn               = cv->cv_rxpsn;
927                 rtr->responder_rdma_r_atom_num = IBNAL_OUS_DST_RD;
928                 rtr->opt_min_rnr_nak_timer     = *kibnal_tunables.kib_rnr_nak_timer;
929
930
931                 // XXX sdp sets VV_QP_AT_OP_F but no actual optional options
932                 attr.modify.vv_qp_attr_mask = VV_QP_AT_ADD_VEC |
933                                               VV_QP_AT_DEST_QP |
934                                               VV_QP_AT_R_PSN |
935                                               VV_QP_AT_MIN_RNR_NAK_T |
936                                               VV_QP_AT_RESP_RDMA_ATOM_OUT_NUM |
937                                               VV_QP_AT_OP_F;
938                 break;
939         }
940         case vv_qp_state_rts: {
941                 struct vv_qp_modify_rts_st *rts = &attr.modify.params.rts;
942
943                 rts->send_psn                 = cv->cv_txpsn;
944                 rts->local_ack_timeout        = *kibnal_tunables.kib_local_ack_timeout;
945                 rts->retry_num                = *kibnal_tunables.kib_retry_cnt;
946                 rts->rnr_num                  = *kibnal_tunables.kib_rnr_cnt;
947                 rts->dest_out_rdma_r_atom_num = IBNAL_OUS_DST_RD;
948
949                 attr.modify.vv_qp_attr_mask = VV_QP_AT_S_PSN |
950                                               VV_QP_AT_L_ACK_T |
951                                               VV_QP_AT_RETRY_NUM |
952                                               VV_QP_AT_RNR_NUM |
953                                               VV_QP_AT_DEST_RDMA_ATOM_OUT_NUM;
954                 break;
955         }
956         case vv_qp_state_error:
957         case vv_qp_state_reset:
958                 attr.modify.vv_qp_attr_mask = 0;
959                 break;
960         }
961
962         attr.modify.qp_modify_into_state = new_state;
963         attr.modify.vv_qp_attr_mask |= VV_QP_AT_STATE;
964
965         vvrc = vv_qp_modify(kibnal_data.kib_hca, conn->ibc_qp, &attr, NULL);
966         if (vvrc != vv_return_ok) {
967                 CERROR("Can't modify qp -> %s state to %d: %d\n",
968                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
969                        new_state, vvrc);
970                 return -EIO;
971         }
972
973         return 0;
974 }
975
976 kib_conn_t *
977 kibnal_create_conn (cm_cep_handle_t cep)
978 {
979         kib_conn_t   *conn;
980         int           i;
981         int           page_offset;
982         int           ipage;
983         vv_return_t   vvrc;
984         int           rc;
985
986         static vv_qp_attr_t  reqattr;
987         static vv_qp_attr_t  rspattr;
988
989         /* Only the connd creates conns => single threaded */
990         LASSERT(!in_interrupt());
991         LASSERT(current == kibnal_data.kib_connd);
992
993         LIBCFS_ALLOC(conn, sizeof (*conn));
994         if (conn == NULL) {
995                 CERROR ("Can't allocate connection\n");
996                 return (NULL);
997         }
998
999         /* zero flags, NULL pointers etc... */
1000         memset (conn, 0, sizeof (*conn));
1001
1002         conn->ibc_version = IBNAL_MSG_VERSION;  /* Use latest version at first */
1003
1004         INIT_LIST_HEAD (&conn->ibc_early_rxs);
1005         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
1006         INIT_LIST_HEAD (&conn->ibc_tx_queue);
1007         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
1008         INIT_LIST_HEAD (&conn->ibc_active_txs);
1009         spin_lock_init (&conn->ibc_lock);
1010
1011         atomic_inc (&kibnal_data.kib_nconns);
1012         /* well not really, but I call destroy() on failure, which decrements */
1013
1014         conn->ibc_cep = cep;
1015
1016         LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1017         if (conn->ibc_connvars == NULL) {
1018                 CERROR("Can't allocate in-progress connection state\n");
1019                 goto failed;
1020         }
1021         memset (conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));
1022         /* Random seed for QP sequence number */
1023         get_random_bytes(&conn->ibc_connvars->cv_rxpsn,
1024                          sizeof(conn->ibc_connvars->cv_rxpsn));
1025
1026         LIBCFS_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
1027         if (conn->ibc_rxs == NULL) {
1028                 CERROR("Cannot allocate RX buffers\n");
1029                 goto failed;
1030         }
1031         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
1032
1033         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES, 1);
1034         if (rc != 0)
1035                 goto failed;
1036
1037         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
1038                 struct page    *page = conn->ibc_rx_pages->ibp_pages[ipage];
1039                 kib_rx_t       *rx = &conn->ibc_rxs[i];
1040                 vv_mem_reg_h_t  mem_h;
1041                 vv_r_key_t      r_key;
1042
1043                 rx->rx_conn = conn;
1044                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1045                              page_offset);
1046
1047                 vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1048                                             rx->rx_msg,
1049                                             IBNAL_MSG_SIZE,
1050                                             &mem_h,
1051                                             &rx->rx_lkey,
1052                                             &r_key);
1053                 LASSERT (vvrc == vv_return_ok);
1054
1055                 CDEBUG(D_NET, "Rx[%d] %p->%p[%x]\n", i, rx, 
1056                        rx->rx_msg, rx->rx_lkey);
1057
1058                 page_offset += IBNAL_MSG_SIZE;
1059                 LASSERT (page_offset <= PAGE_SIZE);
1060
1061                 if (page_offset == PAGE_SIZE) {
1062                         page_offset = 0;
1063                         ipage++;
1064                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
1065                 }
1066         }
1067
1068         memset(&reqattr, 0, sizeof(reqattr));
1069
1070         reqattr.create.qp_type                    = vv_qp_type_r_conn;
1071         reqattr.create.cq_send_h                  = kibnal_data.kib_cq;
1072         reqattr.create.cq_receive_h               = kibnal_data.kib_cq;
1073         reqattr.create.send_max_outstand_wr       = (1 + IBNAL_MAX_RDMA_FRAGS) *
1074                                                     (*kibnal_tunables.kib_concurrent_sends);
1075         reqattr.create.receive_max_outstand_wr    = IBNAL_RX_MSGS;
1076         reqattr.create.max_scatgat_per_send_wr    = 1;
1077         reqattr.create.max_scatgat_per_receive_wr = 1;
1078         reqattr.create.signaling_type             = vv_selectable_signaling;
1079         reqattr.create.pd_h                       = kibnal_data.kib_pd;
1080         reqattr.create.recv_solicited_events      = vv_selectable_signaling; // vv_signal_all;
1081
1082         vvrc = vv_qp_create(kibnal_data.kib_hca, &reqattr, NULL,
1083                             &conn->ibc_qp, &rspattr);
1084         if (vvrc != vv_return_ok) {
1085                 CERROR ("Failed to create queue pair: %d\n", vvrc);
1086                 goto failed;
1087         }
1088
1089         /* Mark QP created */
1090         conn->ibc_state = IBNAL_CONN_INIT_QP;
1091         conn->ibc_connvars->cv_local_qpn = rspattr.create_return.qp_num;
1092
1093         if (rspattr.create_return.receive_max_outstand_wr <
1094             IBNAL_RX_MSGS ||
1095             rspattr.create_return.send_max_outstand_wr <
1096             (1 + IBNAL_MAX_RDMA_FRAGS) * (*kibnal_tunables.kib_concurrent_sends)) {
1097                 CERROR("Insufficient rx/tx work items: wanted %d/%d got %d/%d\n",
1098                        IBNAL_RX_MSGS,
1099                        (1 + IBNAL_MAX_RDMA_FRAGS) *
1100                        (*kibnal_tunables.kib_concurrent_sends),
1101                        rspattr.create_return.receive_max_outstand_wr,
1102                        rspattr.create_return.send_max_outstand_wr);
1103                 goto failed;
1104         }
1105
1106         /* Mark init complete */
1107         conn->ibc_state = IBNAL_CONN_INIT;
1108
1109         /* 1 ref for caller */
1110         atomic_set (&conn->ibc_refcount, 1);
1111         return (conn);
1112
1113  failed:
1114         kibnal_destroy_conn (conn);
1115         return (NULL);
1116 }
1117
1118 void
1119 kibnal_destroy_conn (kib_conn_t *conn)
1120 {
1121         vv_return_t vvrc;
1122
1123         /* Only the connd does this (i.e. single threaded) */
1124         LASSERT (!in_interrupt());
1125         LASSERT (current == kibnal_data.kib_connd);
1126
1127         CDEBUG (D_NET, "connection %p\n", conn);
1128
1129         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1130         LASSERT (list_empty(&conn->ibc_early_rxs));
1131         LASSERT (list_empty(&conn->ibc_tx_queue));
1132         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1133         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1134         LASSERT (list_empty(&conn->ibc_active_txs));
1135         LASSERT (conn->ibc_nsends_posted == 0);
1136
1137         switch (conn->ibc_state) {
1138         default:
1139                 /* conn must be completely disengaged from the network */
1140                 LBUG();
1141
1142         case IBNAL_CONN_DISCONNECTED:
1143                 /* connvars should have been freed already */
1144                 LASSERT (conn->ibc_connvars == NULL);
1145                 /* fall through */
1146
1147         case IBNAL_CONN_INIT:
1148                 vvrc = cm_destroy_cep(conn->ibc_cep);
1149                 LASSERT (vvrc == vv_return_ok);
1150                 /* fall through */
1151
1152         case IBNAL_CONN_INIT_QP:
1153                 kibnal_set_qp_state(conn, vv_qp_state_reset);
1154                 vvrc = vv_qp_destroy(kibnal_data.kib_hca, conn->ibc_qp);
1155                 if (vvrc != vv_return_ok)
1156                         CERROR("Can't destroy QP: %d\n", vvrc);
1157                 /* fall through */
1158
1159         case IBNAL_CONN_INIT_NOTHING:
1160                 break;
1161         }
1162
1163         if (conn->ibc_rx_pages != NULL)
1164                 kibnal_free_pages(conn->ibc_rx_pages);
1165
1166         if (conn->ibc_rxs != NULL)
1167                 LIBCFS_FREE(conn->ibc_rxs,
1168                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1169
1170         if (conn->ibc_connvars != NULL)
1171                 LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1172
1173         if (conn->ibc_peer != NULL)
1174                 kibnal_peer_decref(conn->ibc_peer);
1175
1176         LIBCFS_FREE(conn, sizeof (*conn));
1177
1178         atomic_dec(&kibnal_data.kib_nconns);
1179 }
1180
1181 int
1182 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1183 {
1184         kib_conn_t         *conn;
1185         struct list_head   *ctmp;
1186         struct list_head   *cnxt;
1187         int                 count = 0;
1188
1189         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1190                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1191
1192                 count++;
1193                 kibnal_close_conn_locked (conn, why);
1194         }
1195
1196         return (count);
1197 }
1198
1199 int
1200 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1201 {
1202         kib_conn_t         *conn;
1203         struct list_head   *ctmp;
1204         struct list_head   *cnxt;
1205         int                 count = 0;
1206
1207         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1208                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1209
1210                 if (conn->ibc_incarnation == incarnation)
1211                         continue;
1212
1213                 CDEBUG(D_NET, "Closing stale conn -> %s incarnation:"LPX64"("LPX64")\n",
1214                        libcfs_nid2str(peer->ibp_nid),
1215                        conn->ibc_incarnation, incarnation);
1216
1217                 count++;
1218                 kibnal_close_conn_locked (conn, -ESTALE);
1219         }
1220
1221         return (count);
1222 }
1223
1224 int
1225 kibnal_close_matching_conns (lnet_nid_t nid)
1226 {
1227         kib_peer_t         *peer;
1228         struct list_head   *ptmp;
1229         struct list_head   *pnxt;
1230         int                 lo;
1231         int                 hi;
1232         int                 i;
1233         unsigned long       flags;
1234         int                 count = 0;
1235
1236         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1237
1238         if (nid != LNET_NID_ANY)
1239                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1240         else {
1241                 lo = 0;
1242                 hi = kibnal_data.kib_peer_hash_size - 1;
1243         }
1244
1245         for (i = lo; i <= hi; i++) {
1246                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1247
1248                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1249                         LASSERT (peer->ibp_persistence != 0 ||
1250                                  peer->ibp_connecting != 0 ||
1251                                  peer->ibp_accepting != 0 ||
1252                                  !list_empty (&peer->ibp_conns));
1253
1254                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1255                                 continue;
1256
1257                         count += kibnal_close_peer_conns_locked (peer, 0);
1258                 }
1259         }
1260
1261         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1262
1263         /* wildcards always succeed */
1264         if (nid == LNET_NID_ANY)
1265                 return (0);
1266
1267         return (count == 0 ? -ENOENT : 0);
1268 }
1269
1270 int
1271 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1272 {
1273         struct libcfs_ioctl_data *data = arg;
1274         int                       rc = -EINVAL;
1275
1276         LASSERT (ni == kibnal_data.kib_ni);
1277
1278         switch(cmd) {
1279         case IOC_LIBCFS_GET_PEER: {
1280                 lnet_nid_t   nid = 0;
1281                 __u32        ip = 0;
1282                 int          share_count = 0;
1283
1284                 rc = kibnal_get_peer_info(data->ioc_count,
1285                                           &nid, &ip, &share_count);
1286                 data->ioc_nid    = nid;
1287                 data->ioc_count  = share_count;
1288                 data->ioc_u32[0] = ip;
1289                 data->ioc_u32[1] = *kibnal_tunables.kib_service_number; /* port */
1290                 break;
1291         }
1292         case IOC_LIBCFS_ADD_PEER: {
1293                 rc = kibnal_add_persistent_peer (data->ioc_nid,
1294                                                  data->ioc_u32[0]); /* IP */
1295                 break;
1296         }
1297         case IOC_LIBCFS_DEL_PEER: {
1298                 rc = kibnal_del_peer (data->ioc_nid);
1299                 break;
1300         }
1301         case IOC_LIBCFS_GET_CONN: {
1302                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1303
1304                 if (conn == NULL)
1305                         rc = -ENOENT;
1306                 else {
1307                         // kibnal_debug_conn(conn);
1308                         rc = 0;
1309                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1310                         kibnal_conn_decref(conn);
1311                 }
1312                 break;
1313         }
1314         case IOC_LIBCFS_CLOSE_CONNECTION: {
1315                 rc = kibnal_close_matching_conns (data->ioc_nid);
1316                 break;
1317         }
1318         case IOC_LIBCFS_REGISTER_MYNID: {
1319                 if (ni->ni_nid == data->ioc_nid) {
1320                         rc = 0;
1321                 } else {
1322                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1323                                libcfs_nid2str(data->ioc_nid),
1324                                libcfs_nid2str(ni->ni_nid));
1325                         rc = -EINVAL;
1326                 }
1327                 break;
1328         }
1329         }
1330
1331         return rc;
1332 }
1333
1334 void
1335 kibnal_free_pages (kib_pages_t *p)
1336 {
1337         int         npages = p->ibp_npages;
1338         int         i;
1339
1340         for (i = 0; i < npages; i++)
1341                 if (p->ibp_pages[i] != NULL)
1342                         __free_page(p->ibp_pages[i]);
1343
1344         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1345 }
1346
1347 int
1348 kibnal_alloc_pages (kib_pages_t **pp, int npages, int allow_write)
1349 {
1350         kib_pages_t   *p;
1351         int            i;
1352
1353         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1354         if (p == NULL) {
1355                 CERROR ("Can't allocate buffer %d\n", npages);
1356                 return (-ENOMEM);
1357         }
1358
1359         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1360         p->ibp_npages = npages;
1361
1362         for (i = 0; i < npages; i++) {
1363                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1364                 if (p->ibp_pages[i] == NULL) {
1365                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1366                         kibnal_free_pages(p);
1367                         return (-ENOMEM);
1368                 }
1369         }
1370
1371         *pp = p;
1372         return (0);
1373 }
1374
1375 int
1376 kibnal_alloc_tx_descs (void)
1377 {
1378         int    i;
1379
1380         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1381                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1382         if (kibnal_data.kib_tx_descs == NULL)
1383                 return -ENOMEM;
1384
1385         memset(kibnal_data.kib_tx_descs, 0,
1386                IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1387
1388         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1389                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1390
1391 #if IBNAL_USE_FMR
1392                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1393                              sizeof(*tx->tx_pages));
1394                 if (tx->tx_pages == NULL)
1395                         return -ENOMEM;
1396 #else
1397                 LIBCFS_ALLOC(tx->tx_wrq,
1398                              (1 + IBNAL_MAX_RDMA_FRAGS) *
1399                              sizeof(*tx->tx_wrq));
1400                 if (tx->tx_wrq == NULL)
1401                         return -ENOMEM;
1402
1403                 LIBCFS_ALLOC(tx->tx_gl,
1404                              (1 + IBNAL_MAX_RDMA_FRAGS) *
1405                              sizeof(*tx->tx_gl));
1406                 if (tx->tx_gl == NULL)
1407                         return -ENOMEM;
1408
1409                 LIBCFS_ALLOC(tx->tx_rd,
1410                              offsetof(kib_rdma_desc_t,
1411                                       rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1412                 if (tx->tx_rd == NULL)
1413                         return -ENOMEM;
1414 #endif
1415         }
1416
1417         return 0;
1418 }
1419
1420 void
1421 kibnal_free_tx_descs (void)
1422 {
1423         int    i;
1424
1425         if (kibnal_data.kib_tx_descs == NULL)
1426                 return;
1427
1428         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1429                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1430
1431 #if IBNAL_USE_FMR
1432                 if (tx->tx_pages != NULL)
1433                         LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1434                                     sizeof(*tx->tx_pages));
1435 #else
1436                 if (tx->tx_wrq != NULL)
1437                         LIBCFS_FREE(tx->tx_wrq,
1438                                     (1 + IBNAL_MAX_RDMA_FRAGS) *
1439                                     sizeof(*tx->tx_wrq));
1440
1441                 if (tx->tx_gl != NULL)
1442                         LIBCFS_FREE(tx->tx_gl,
1443                                     (1 + IBNAL_MAX_RDMA_FRAGS) *
1444                                     sizeof(*tx->tx_gl));
1445
1446                 if (tx->tx_rd != NULL)
1447                         LIBCFS_FREE(tx->tx_rd,
1448                                     offsetof(kib_rdma_desc_t,
1449                                              rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1450 #endif
1451         }
1452
1453         LIBCFS_FREE(kibnal_data.kib_tx_descs,
1454                     IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1455 }
1456
1457 #if IBNAL_USE_FMR
1458 void
1459 kibnal_free_fmrs (int n)
1460 {
1461         int             i;
1462         vv_return_t     vvrc;
1463         kib_tx_t       *tx;
1464
1465         for (i = 0; i < n; i++) {
1466                 tx = &kibnal_data.kib_tx_descs[i];
1467
1468                 vvrc = vv_free_fmr(kibnal_data.kib_hca,
1469                                    tx->tx_md.md_fmrhandle);
1470                 if (vvrc != vv_return_ok)
1471                         CWARN("vv_free_fmr[%d]: %d\n", i, vvrc);
1472         }
1473 }
1474 #endif
1475
1476 int
1477 kibnal_setup_tx_descs (void)
1478 {
1479         int             ipage = 0;
1480         int             page_offset = 0;
1481         struct page    *page;
1482         kib_tx_t       *tx;
1483         vv_mem_reg_h_t  mem_h;
1484         vv_r_key_t      rkey;
1485         vv_return_t     vvrc;
1486         int             i;
1487         int             rc;
1488 #if IBNAL_USE_FMR
1489         vv_fmr_t        fmr_props;
1490 #endif
1491
1492         /* pre-mapped messages are not bigger than 1 page */
1493         CLASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1494
1495         /* No fancy arithmetic when we do the buffer calculations */
1496         CLASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1497
1498         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1499                                 IBNAL_TX_MSG_PAGES(), 0);
1500         if (rc != 0)
1501                 return (rc);
1502
1503         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1504                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1505                 tx = &kibnal_data.kib_tx_descs[i];
1506
1507 #if IBNAL_USE_FMR
1508                 memset(&fmr_props, 0, sizeof(fmr_props));
1509                 fmr_props.pd_hndl              = kibnal_data.kib_pd;
1510                 fmr_props.acl                  = (vv_acc_r_mem_write |
1511                                                   vv_acc_l_mem_write);
1512                 fmr_props.max_pages            = LNET_MAX_IOV;
1513                 fmr_props.log2_page_sz         = PAGE_SHIFT;
1514                 fmr_props.max_outstanding_maps = *kibnal_tunables.kib_fmr_remaps;
1515
1516                 vvrc = vv_alloc_fmr(kibnal_data.kib_hca,
1517                                     &fmr_props,
1518                                     &tx->tx_md.md_fmrhandle);
1519                 if (vvrc != vv_return_ok) {
1520                         CERROR("Can't allocate fmr %d: %d\n", i, vvrc);
1521                         kibnal_free_fmrs(i);
1522                         kibnal_free_pages (kibnal_data.kib_tx_pages);
1523                         return -ENOMEM;
1524                 }
1525
1526                 tx->tx_md.md_fmrcount = *kibnal_tunables.kib_fmr_remaps;
1527                 tx->tx_md.md_active   = 0;
1528 #endif
1529                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1530                                            page_offset);
1531
1532                 vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1533                                             tx->tx_msg,
1534                                             IBNAL_MSG_SIZE,
1535                                             &mem_h,
1536                                             &tx->tx_lkey,
1537                                             &rkey);
1538                 LASSERT (vvrc == vv_return_ok);
1539
1540                 CDEBUG(D_NET, "Tx[%d] %p->%p[%x]\n", i, tx,
1541                        tx->tx_msg, tx->tx_lkey);
1542
1543                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1544
1545                 page_offset += IBNAL_MSG_SIZE;
1546                 LASSERT (page_offset <= PAGE_SIZE);
1547
1548                 if (page_offset == PAGE_SIZE) {
1549                         page_offset = 0;
1550                         ipage++;
1551                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1552                 }
1553         }
1554
1555         return (0);
1556 }
1557
1558 void
1559 kibnal_shutdown (lnet_ni_t *ni)
1560 {
1561         int           i;
1562         vv_return_t   vvrc;
1563
1564         LASSERT (ni == kibnal_data.kib_ni);
1565         LASSERT (ni->ni_data == &kibnal_data);
1566
1567         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1568                atomic_read (&libcfs_kmemory));
1569
1570         switch (kibnal_data.kib_init) {
1571
1572         case IBNAL_INIT_ALL:
1573                 /* stop accepting connections and prevent new peers */
1574                 kibnal_stop_listener(ni);
1575
1576                 /* nuke all existing peers */
1577                 kibnal_del_peer(LNET_NID_ANY);
1578
1579                 /* Wait for all peer state to clean up */
1580                 i = 2;
1581                 while (atomic_read(&kibnal_data.kib_npeers) != 0) {
1582                         i++;
1583                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
1584                                "waiting for %d peers to disconnect\n",
1585                                atomic_read(&kibnal_data.kib_npeers));
1586                         cfs_pause(cfs_time_seconds(1));
1587                 }
1588                 /* fall through */
1589
1590         case IBNAL_INIT_CQ:
1591                 vvrc = vv_cq_destroy(kibnal_data.kib_hca, kibnal_data.kib_cq);
1592                 if (vvrc != vv_return_ok)
1593                         CERROR ("Destroy CQ error: %d\n", vvrc);
1594                 /* fall through */
1595
1596         case IBNAL_INIT_TXD:
1597                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1598 #if IBNAL_USE_FMR
1599                 kibnal_free_fmrs(IBNAL_TX_MSGS());
1600 #endif
1601                 /* fall through */
1602
1603         case IBNAL_INIT_PD:
1604 #if 0
1605                 /* Only deallocate a PD if we actually allocated one */
1606                 vvrc = vv_pd_deallocate(kibnal_data.kib_hca,
1607                                         kibnal_data.kib_pd);
1608                 if (vvrc != vv_return_ok)
1609                         CERROR ("Destroy PD error: %d\n", vvrc);
1610 #endif
1611                 /* fall through */
1612
1613         case IBNAL_INIT_ASYNC:
1614                 vvrc = vv_dell_async_event_cb (kibnal_data.kib_hca,
1615                                               kibnal_async_callback);
1616                 if (vvrc != vv_return_ok)
1617                         CERROR("vv_dell_async_event_cb error: %d\n", vvrc);
1618
1619                 /* fall through */
1620
1621         case IBNAL_INIT_HCA:
1622                 vvrc = vv_hca_close(kibnal_data.kib_hca);
1623                 if (vvrc != vv_return_ok)
1624                         CERROR ("Close HCA  error: %d\n", vvrc);
1625                 /* fall through */
1626
1627         case IBNAL_INIT_DATA:
1628                 LASSERT (atomic_read(&kibnal_data.kib_npeers) == 0);
1629                 LASSERT (kibnal_data.kib_peers != NULL);
1630                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1631                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1632                 }
1633                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1634                 LASSERT (list_empty (&kibnal_data.kib_connd_zombies));
1635                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1636                 LASSERT (list_empty (&kibnal_data.kib_connd_pcreqs));
1637                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1638
1639                 /* flag threads to terminate; wake and wait for them to die */
1640                 kibnal_data.kib_shutdown = 1;
1641                 wake_up_all (&kibnal_data.kib_sched_waitq);
1642                 wake_up_all (&kibnal_data.kib_connd_waitq);
1643
1644                 i = 2;
1645                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1646                         i++;
1647                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1648                                "Waiting for %d threads to terminate\n",
1649                                atomic_read (&kibnal_data.kib_nthreads));
1650                         cfs_pause(cfs_time_seconds(1));
1651                 }
1652                 /* fall through */
1653
1654         case IBNAL_INIT_NOTHING:
1655                 break;
1656         }
1657
1658         kibnal_free_tx_descs();
1659
1660         if (kibnal_data.kib_peers != NULL)
1661                 LIBCFS_FREE (kibnal_data.kib_peers,
1662                              sizeof (struct list_head) *
1663                              kibnal_data.kib_peer_hash_size);
1664
1665         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1666                atomic_read (&libcfs_kmemory));
1667
1668         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1669         PORTAL_MODULE_UNUSE;
1670 }
1671
1672 int
1673 kibnal_startup (lnet_ni_t *ni)
1674 {
1675         char                      scratch[32];
1676         char                      ipif_name[32];
1677         char                     *hca_name;
1678         __u32                     ip;
1679         __u32                     netmask;
1680         int                       up;
1681         int                       nob;
1682         int                       devno;
1683         struct timeval            tv;
1684         int                       rc;
1685         int                       i;
1686         vv_request_event_record_t req_er;
1687         vv_return_t               vvrc;
1688
1689         LASSERT (ni->ni_lnd == &the_kiblnd);
1690
1691         /* Only 1 instance supported */
1692         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1693                 CERROR ("Only 1 instance supported\n");
1694                 return -EPERM;
1695         }
1696
1697         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1698                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1699                         *kibnal_tunables.kib_credits,
1700                         *kibnal_tunables.kib_ntx);
1701                 return -EINVAL;
1702         }
1703
1704         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1705         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1706
1707         CLASSERT (LNET_MAX_INTERFACES > 1);
1708
1709         if (ni->ni_interfaces[0] != NULL) {
1710                 /* Use the HCA specified in 'networks=' */
1711
1712                 if (ni->ni_interfaces[1] != NULL) {
1713                         CERROR("Multiple interfaces not supported\n");
1714                         return -EPERM;
1715                 }
1716
1717                 /* Parse <hca base name><number> */
1718                 hca_name = ni->ni_interfaces[0];
1719                 nob = strlen(*kibnal_tunables.kib_hca_basename);
1720
1721                 if (strncmp(hca_name, *kibnal_tunables.kib_hca_basename, nob) ||
1722                     sscanf(hca_name + nob, "%d%n", &devno, &nob) < 1) {
1723                         CERROR("Unrecognised HCA %s\n", hca_name);
1724                         return -EINVAL;
1725                 }
1726
1727         } else {
1728                 /* Use <hca base name>0 */
1729                 devno = 0;
1730
1731                 hca_name = scratch;
1732                 snprintf(hca_name, sizeof(scratch), "%s%d",
1733                          *kibnal_tunables.kib_hca_basename, devno);
1734                 if (strlen(hca_name) == sizeof(scratch) - 1) {
1735                         CERROR("HCA name %s truncated\n", hca_name);
1736                         return -EINVAL;
1737                 }
1738         }
1739
1740         /* Find IP address from <ipif base name><hca number> */
1741         snprintf(ipif_name, sizeof(ipif_name), "%s%d",
1742                  *kibnal_tunables.kib_ipif_basename, devno);
1743         if (strlen(ipif_name) == sizeof(ipif_name) - 1) {
1744                 CERROR("IPoIB interface name %s truncated\n", ipif_name);
1745                 return -EINVAL;
1746         }
1747
1748         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1749         if (rc != 0) {
1750                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1751                 return -ENETDOWN;
1752         }
1753
1754         if (!up) {
1755                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1756                 return -ENETDOWN;
1757         }
1758
1759         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1760
1761         PORTAL_MODULE_USE;
1762         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1763
1764         kibnal_data.kib_ni = ni;
1765         ni->ni_data = &kibnal_data;
1766
1767         do_gettimeofday(&tv);
1768         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1769
1770         rwlock_init(&kibnal_data.kib_global_lock);
1771
1772         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1773         LIBCFS_ALLOC (kibnal_data.kib_peers,
1774                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1775         if (kibnal_data.kib_peers == NULL) {
1776                 goto failed;
1777         }
1778         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1779                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1780
1781         spin_lock_init (&kibnal_data.kib_connd_lock);
1782         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1783         INIT_LIST_HEAD (&kibnal_data.kib_connd_pcreqs);
1784         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1785         INIT_LIST_HEAD (&kibnal_data.kib_connd_zombies);
1786         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1787
1788         spin_lock_init (&kibnal_data.kib_sched_lock);
1789         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1790
1791         spin_lock_init (&kibnal_data.kib_tx_lock);
1792         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1793
1794         rc = kibnal_alloc_tx_descs();
1795         if (rc != 0) {
1796                 CERROR("Can't allocate tx descs\n");
1797                 goto failed;
1798         }
1799
1800         /* lists/ptrs/locks initialised */
1801         kibnal_data.kib_init = IBNAL_INIT_DATA;
1802         /*****************************************************/
1803
1804         for (i = 0; i < IBNAL_N_SCHED; i++) {
1805                 rc = kibnal_thread_start (kibnal_scheduler, (void *)((long)i));
1806                 if (rc != 0) {
1807                         CERROR("Can't spawn vibnal scheduler[%d]: %d\n",
1808                                i, rc);
1809                         goto failed;
1810                 }
1811         }
1812
1813         rc = kibnal_thread_start (kibnal_connd, NULL);
1814         if (rc != 0) {
1815                 CERROR ("Can't spawn vibnal connd: %d\n", rc);
1816                 goto failed;
1817         }
1818
1819         vvrc = vv_hca_open(hca_name, NULL, &kibnal_data.kib_hca);
1820         if (vvrc != vv_return_ok) {
1821                 CERROR ("Can't open HCA %s: %d\n", hca_name, vvrc);
1822                 goto failed;
1823         }
1824
1825         /* Channel Adapter opened */
1826         kibnal_data.kib_init = IBNAL_INIT_HCA;
1827
1828         /* register to get HCA's asynchronous events. */
1829         req_er.req_event_type = VV_ASYNC_EVENT_ALL_MASK;
1830         vvrc = vv_set_async_event_cb (kibnal_data.kib_hca, req_er,
1831                                      kibnal_async_callback);
1832         if (vvrc != vv_return_ok) {
1833                 CERROR ("Can't set HCA %s callback: %d\n", hca_name, vvrc);
1834                 goto failed; 
1835         }
1836
1837         kibnal_data.kib_init = IBNAL_INIT_ASYNC;
1838
1839         /*****************************************************/
1840
1841         vvrc = vv_hca_query(kibnal_data.kib_hca, &kibnal_data.kib_hca_attrs);
1842         if (vvrc != vv_return_ok) {
1843                 CERROR ("Can't size port attrs for %s: %d\n", hca_name, vvrc);
1844                 goto failed;
1845         }
1846
1847         kibnal_data.kib_port = -1;
1848
1849         for (i = 0; i<kibnal_data.kib_hca_attrs.port_num; i++) {
1850
1851                 int port_num = i+1;
1852                 u_int32_t tbl_count;
1853                 vv_port_attrib_t *pattr = &kibnal_data.kib_port_attr;
1854
1855                 vvrc = vv_port_query(kibnal_data.kib_hca, port_num, pattr);
1856                 if (vvrc != vv_return_ok) {
1857                         CERROR("vv_port_query failed for %s port %d: %d\n",
1858                                hca_name, port_num, vvrc);
1859                         continue;
1860                 }
1861
1862                 switch (pattr->port_state) {
1863                 case vv_state_linkDoun:
1864                         CDEBUG(D_NET, "port[%d] Down\n", port_num);
1865                         continue;
1866                 case vv_state_linkInit:
1867                         CDEBUG(D_NET, "port[%d] Init\n", port_num);
1868                         continue;
1869                 case vv_state_linkArm:
1870                         CDEBUG(D_NET, "port[%d] Armed\n", port_num);
1871                         continue;
1872                 case vv_state_linkActive:
1873                         CDEBUG(D_NET, "port[%d] Active\n", port_num);
1874
1875                         /* Found a suitable port. Get its GUID and PKEY. */
1876                         tbl_count = 1;
1877                         vvrc = vv_get_port_gid_tbl(kibnal_data.kib_hca,
1878                                                    port_num, &tbl_count,
1879                                                    &kibnal_data.kib_port_gid);
1880                         if (vvrc != vv_return_ok) {
1881                                 CERROR("vv_get_port_gid_tbl failed "
1882                                        "for %s port %d: %d\n",
1883                                        hca_name, port_num, vvrc);
1884                                 continue;
1885                         }
1886
1887                         tbl_count = 1;
1888                         vvrc = vv_get_port_partition_tbl(kibnal_data.kib_hca,
1889                                                          port_num, &tbl_count,
1890                                                          &kibnal_data.kib_port_pkey);
1891                         if (vvrc != vv_return_ok) {
1892                                 CERROR("vv_get_port_partition_tbl failed "
1893                                        "for %s port %d: %d\n",
1894                                        hca_name, port_num, vvrc);
1895                                 continue;
1896                         }
1897
1898                         kibnal_data.kib_port = port_num;
1899
1900                         break;
1901                 case vv_state_linkActDefer: /* TODO: correct? */
1902                 case vv_state_linkNoChange:
1903                         CERROR("Unexpected %s port[%d] state %d\n",
1904                                hca_name, i, pattr->port_state);
1905                         continue;
1906                 }
1907                 break;
1908         }
1909
1910         if (kibnal_data.kib_port == -1) {
1911                 CERROR ("Can't find an active port on %s\n", hca_name);
1912                 goto failed;
1913         }
1914
1915         CDEBUG(D_NET, "Using %s port %d - GID="LPX64":"LPX64"\n",
1916                hca_name, kibnal_data.kib_port,
1917                kibnal_data.kib_port_gid.scope.g.subnet,
1918                kibnal_data.kib_port_gid.scope.g.eui64);
1919
1920         /*****************************************************/
1921
1922 #if 1
1923         /* We use a pre-allocated PD */
1924         vvrc = vv_get_gen_pd_h(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1925 #else
1926         vvrc = vv_pd_allocate(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1927 #endif
1928         if (vvrc != vv_return_ok) {
1929                 CERROR ("Can't init PD: %d\n", vvrc);
1930                 goto failed;
1931         }
1932
1933         /* flag PD initialised */
1934         kibnal_data.kib_init = IBNAL_INIT_PD;
1935         /*****************************************************/
1936
1937         rc = kibnal_setup_tx_descs();
1938         if (rc != 0) {
1939                 CERROR ("Can't register tx descs: %d\n", rc);
1940                 goto failed;
1941         }
1942
1943         /* flag TX descs initialised */
1944         kibnal_data.kib_init = IBNAL_INIT_TXD;
1945         /*****************************************************/
1946
1947         {
1948                 __u32 nentries;
1949
1950                 vvrc = vv_cq_create(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES(),
1951                                     kibnal_cq_callback,
1952                                     NULL, /* context */
1953                                     &kibnal_data.kib_cq, &nentries);
1954                 if (vvrc != 0) {
1955                         CERROR ("Can't create RX CQ: %d\n", vvrc);
1956                         goto failed;
1957                 }
1958
1959                 /* flag CQ initialised */
1960                 kibnal_data.kib_init = IBNAL_INIT_CQ;
1961
1962                 if (nentries < IBNAL_CQ_ENTRIES()) {
1963                         CERROR ("CQ only has %d entries, need %d\n",
1964                                 nentries, IBNAL_CQ_ENTRIES());
1965                         goto failed;
1966                 }
1967
1968                 vvrc = vv_request_completion_notification(kibnal_data.kib_hca,
1969                                                           kibnal_data.kib_cq,
1970                                                           vv_next_solicit_unsolicit_event);
1971                 if (vvrc != 0) {
1972                         CERROR ("Failed to re-arm completion queue: %d\n", rc);
1973                         goto failed;
1974                 }
1975         }
1976
1977         rc = kibnal_start_listener(ni);
1978         if (rc != 0) {
1979                 CERROR("Can't start listener: %d\n", rc);
1980                 goto failed;
1981         }
1982
1983         /* flag everything initialised */
1984         kibnal_data.kib_init = IBNAL_INIT_ALL;
1985         /*****************************************************/
1986
1987         return (0);
1988
1989  failed:
1990         CDEBUG(D_NET, "kibnal_startup failed\n");
1991         kibnal_shutdown (ni);
1992         return (-ENETDOWN);
1993 }
1994
1995 void __exit
1996 kibnal_module_fini (void)
1997 {
1998         lnet_unregister_lnd(&the_kiblnd);
1999         kibnal_tunables_fini();
2000 }
2001
2002 int __init
2003 kibnal_module_init (void)
2004 {
2005         int    rc;
2006
2007         vibnal_assert_wire_constants();
2008
2009         CLASSERT (offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t)
2010                   <= cm_REQ_priv_data_len);
2011         CLASSERT (offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t)
2012                   <= cm_REP_priv_data_len);
2013         CLASSERT (sizeof(kib_msg_t) <= IBNAL_MSG_SIZE);
2014 #if !IBNAL_USE_FMR
2015         CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBNAL_MAX_RDMA_FRAGS])
2016                   <= IBNAL_MSG_SIZE);
2017         CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBNAL_MAX_RDMA_FRAGS])
2018                   <= IBNAL_MSG_SIZE);
2019 #endif
2020         rc = kibnal_tunables_init();
2021         if (rc != 0)
2022                 return rc;
2023
2024         lnet_register_lnd(&the_kiblnd);
2025
2026         return 0;
2027 }
2028
2029 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2030 MODULE_DESCRIPTION("Kernel Voltaire IB LND v1.00");
2031 MODULE_LICENSE("GPL");
2032
2033 module_init(kibnal_module_init);
2034 module_exit(kibnal_module_fini);