Whamcloud - gitweb
b=10778,i=eeb:
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "viblnd.h"
26
27 lnd_t the_kiblnd = {
28         .lnd_type       = VIBLND,
29         .lnd_startup    = kibnal_startup,
30         .lnd_shutdown   = kibnal_shutdown,
31         .lnd_ctl        = kibnal_ctl,
32         .lnd_send       = kibnal_send,
33         .lnd_recv       = kibnal_recv,
34         .lnd_eager_recv = kibnal_eager_recv,
35 };
36
37 kib_data_t              kibnal_data;
38
39 void vibnal_assert_wire_constants (void)
40 {
41         /* Wire protocol assertions generated by 'wirecheck'
42          * running on Linux robert 2.6.11-1.27_FC3 #1 Tue May 17 20:27:37 EDT 2005 i686 athlon i386 G
43          * with gcc version 3.4.3 20050227 (Red Hat 3.4.3-22.fc3) */
44
45
46         /* Constants... */
47         CLASSERT (IBNAL_MSG_MAGIC == 0x0be91b91);
48         CLASSERT (IBNAL_MSG_VERSION == 0x11);
49         CLASSERT (IBNAL_MSG_CONNREQ == 0xc0);
50         CLASSERT (IBNAL_MSG_CONNACK == 0xc1);
51         CLASSERT (IBNAL_MSG_NOOP == 0xd0);
52         CLASSERT (IBNAL_MSG_IMMEDIATE == 0xd1);
53         CLASSERT (IBNAL_MSG_PUT_REQ == 0xd2);
54         CLASSERT (IBNAL_MSG_PUT_NAK == 0xd3);
55         CLASSERT (IBNAL_MSG_PUT_ACK == 0xd4);
56         CLASSERT (IBNAL_MSG_PUT_DONE == 0xd5);
57         CLASSERT (IBNAL_MSG_GET_REQ == 0xd6);
58         CLASSERT (IBNAL_MSG_GET_DONE == 0xd7);
59
60         /* Checks for struct kib_connparams_t */
61         CLASSERT ((int)sizeof(kib_connparams_t) == 12);
62         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_queue_depth) == 0);
63         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_queue_depth) == 4);
64         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_max_msg_size) == 4);
65         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_max_msg_size) == 4);
66         CLASSERT ((int)offsetof(kib_connparams_t, ibcp_max_frags) == 8);
67         CLASSERT ((int)sizeof(((kib_connparams_t *)0)->ibcp_max_frags) == 4);
68
69         /* Checks for struct kib_immediate_msg_t */
70         CLASSERT ((int)sizeof(kib_immediate_msg_t) == 72);
71         CLASSERT ((int)offsetof(kib_immediate_msg_t, ibim_hdr) == 0);
72         CLASSERT ((int)sizeof(((kib_immediate_msg_t *)0)->ibim_hdr) == 72);
73         CLASSERT ((int)offsetof(kib_immediate_msg_t, ibim_payload[13]) == 85);
74         CLASSERT ((int)sizeof(((kib_immediate_msg_t *)0)->ibim_payload[13]) == 1);
75         CLASSERT (IBNAL_USE_FMR == 1);
76
77         /* Checks for struct kib_rdma_desc_t */
78         CLASSERT ((int)sizeof(kib_rdma_desc_t) == 16);
79         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_addr) == 0);
80         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_addr) == 8);
81         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_nob) == 8);
82         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_nob) == 4);
83         CLASSERT ((int)offsetof(kib_rdma_desc_t, rd_key) == 12);
84         CLASSERT ((int)sizeof(((kib_rdma_desc_t *)0)->rd_key) == 4);
85
86         /* Checks for struct kib_putreq_msg_t */
87         CLASSERT ((int)sizeof(kib_putreq_msg_t) == 80);
88         CLASSERT ((int)offsetof(kib_putreq_msg_t, ibprm_hdr) == 0);
89         CLASSERT ((int)sizeof(((kib_putreq_msg_t *)0)->ibprm_hdr) == 72);
90         CLASSERT ((int)offsetof(kib_putreq_msg_t, ibprm_cookie) == 72);
91         CLASSERT ((int)sizeof(((kib_putreq_msg_t *)0)->ibprm_cookie) == 8);
92
93         /* Checks for struct kib_putack_msg_t */
94         CLASSERT ((int)sizeof(kib_putack_msg_t) == 32);
95         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_src_cookie) == 0);
96         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_src_cookie) == 8);
97         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_dst_cookie) == 8);
98         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_dst_cookie) == 8);
99         CLASSERT ((int)offsetof(kib_putack_msg_t, ibpam_rd) == 16);
100         CLASSERT ((int)sizeof(((kib_putack_msg_t *)0)->ibpam_rd) == 16);
101
102         /* Checks for struct kib_get_msg_t */
103         CLASSERT ((int)sizeof(kib_get_msg_t) == 96);
104         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_hdr) == 0);
105         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_hdr) == 72);
106         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_cookie) == 72);
107         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_cookie) == 8);
108         CLASSERT ((int)offsetof(kib_get_msg_t, ibgm_rd) == 80);
109         CLASSERT ((int)sizeof(((kib_get_msg_t *)0)->ibgm_rd) == 16);
110
111         /* Checks for struct kib_completion_msg_t */
112         CLASSERT ((int)sizeof(kib_completion_msg_t) == 12);
113         CLASSERT ((int)offsetof(kib_completion_msg_t, ibcm_cookie) == 0);
114         CLASSERT ((int)sizeof(((kib_completion_msg_t *)0)->ibcm_cookie) == 8);
115         CLASSERT ((int)offsetof(kib_completion_msg_t, ibcm_status) == 8);
116         CLASSERT ((int)sizeof(((kib_completion_msg_t *)0)->ibcm_status) == 4);
117
118         /* Checks for struct kib_msg_t */
119         CLASSERT ((int)sizeof(kib_msg_t) == 152);
120         CLASSERT ((int)offsetof(kib_msg_t, ibm_magic) == 0);
121         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_magic) == 4);
122         CLASSERT ((int)offsetof(kib_msg_t, ibm_version) == 4);
123         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_version) == 2);
124         CLASSERT ((int)offsetof(kib_msg_t, ibm_type) == 6);
125         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_type) == 1);
126         CLASSERT ((int)offsetof(kib_msg_t, ibm_credits) == 7);
127         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_credits) == 1);
128         CLASSERT ((int)offsetof(kib_msg_t, ibm_nob) == 8);
129         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_nob) == 4);
130         CLASSERT ((int)offsetof(kib_msg_t, ibm_cksum) == 12);
131         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_cksum) == 4);
132         CLASSERT ((int)offsetof(kib_msg_t, ibm_srcnid) == 16);
133         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_srcnid) == 8);
134         CLASSERT ((int)offsetof(kib_msg_t, ibm_srcstamp) == 24);
135         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_srcstamp) == 8);
136         CLASSERT ((int)offsetof(kib_msg_t, ibm_dstnid) == 32);
137         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_dstnid) == 8);
138         CLASSERT ((int)offsetof(kib_msg_t, ibm_dststamp) == 40);
139         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_dststamp) == 8);
140         CLASSERT ((int)offsetof(kib_msg_t, ibm_seq) == 48);
141         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_seq) == 8);
142         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.connparams) == 56);
143         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.connparams) == 12);
144         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.immediate) == 56);
145         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.immediate) == 72);
146         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.putreq) == 56);
147         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.putreq) == 80);
148         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.putack) == 56);
149         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.putack) == 32);
150         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.get) == 56);
151         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.get) == 96);
152         CLASSERT ((int)offsetof(kib_msg_t, ibm_u.completion) == 56);
153         CLASSERT ((int)sizeof(((kib_msg_t *)0)->ibm_u.completion) == 12);
154 }
155
156 __u32 
157 kibnal_cksum (void *ptr, int nob)
158 {
159         char  *c  = ptr;
160         __u32  sum = 0;
161
162         while (nob-- > 0)
163                 sum = ((sum << 1) | (sum >> 31)) + *c++;
164
165         /* ensure I don't return 0 (== no checksum) */
166         return (sum == 0) ? 1 : sum;
167 }
168
169 void
170 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
171 {
172         msg->ibm_type = type;
173         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
174 }
175
176 void
177 kibnal_pack_msg(kib_msg_t *msg, __u32 version, int credits, 
178                 lnet_nid_t dstnid, __u64 dststamp, __u64 seq)
179 {
180         /* CAVEAT EMPTOR! all message fields not set here should have been
181          * initialised previously. */
182         msg->ibm_magic    = IBNAL_MSG_MAGIC;
183         msg->ibm_version  = version;
184         /*   ibm_type */
185         msg->ibm_credits  = credits;
186         /*   ibm_nob */
187         msg->ibm_cksum    = 0;
188         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(kibnal_data.kib_ni->ni_nid,
189                                                   dstnid);
190         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
191         msg->ibm_dstnid   = dstnid;
192         msg->ibm_dststamp = dststamp;
193         msg->ibm_seq      = seq;
194
195         if (*kibnal_tunables.kib_cksum) {
196                 /* NB ibm_cksum zero while computing cksum */
197                 msg->ibm_cksum = kibnal_cksum(msg, msg->ibm_nob);
198         }
199 }
200
201 int
202 kibnal_unpack_msg(kib_msg_t *msg, __u32 expected_version, int nob)
203 {
204         const int hdr_size = offsetof(kib_msg_t, ibm_u);
205         __u32     msg_cksum;
206         __u32     msg_version;
207         int       flip;
208         int       msg_nob;
209 #if !IBNAL_USE_FMR
210         int       i;
211         int       n;
212 #endif
213         /* 6 bytes are enough to have received magic + version */
214         if (nob < 6) {
215                 CERROR("Short message: %d\n", nob);
216                 return -EPROTO;
217         }
218
219         /* Future protocol version compatibility support!
220          * If the viblnd-specific protocol changes, or when LNET unifies
221          * protocols over all LNDs, the initial connection will negotiate a
222          * protocol version.  If I find this, I avoid any console errors.  If
223          * my is doing connection establishment, the reject will tell the peer
224          * which version I'm running. */
225
226         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
227                 flip = 0;
228         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
229                 flip = 1;
230         } else {
231                 if (msg->ibm_magic == LNET_PROTO_MAGIC ||
232                     msg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
233                         return -EPROTO;
234
235                 /* Completely out to lunch */
236                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
237                 return -EPROTO;
238         }
239
240         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
241         if (expected_version == 0) {
242                 if (msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
243                     msg_version != IBNAL_MSG_VERSION)
244                         return -EPROTO;
245         } else if (msg_version != expected_version) {
246                 CERROR("Bad version: %x(%x expected)\n", 
247                        msg_version, expected_version);
248                 return -EPROTO;
249         }
250
251         if (nob < hdr_size) {
252                 CERROR("Short message: %d\n", nob);
253                 return -EPROTO;
254         }
255
256         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
257         if (msg_nob > nob) {
258                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
259                 return -EPROTO;
260         }
261
262         /* checksum must be computed with ibm_cksum zero and BEFORE anything
263          * gets flipped */
264         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
265         msg->ibm_cksum = 0;
266         if (msg_cksum != 0 &&
267             msg_cksum != kibnal_cksum(msg, msg_nob)) {
268                 CERROR("Bad checksum\n");
269                 return -EPROTO;
270         }
271         msg->ibm_cksum = msg_cksum;
272         
273         if (flip) {
274                 /* leave magic unflipped as a clue to peer endianness */
275                 msg->ibm_version = msg_version;
276                 CLASSERT (sizeof(msg->ibm_type) == 1);
277                 CLASSERT (sizeof(msg->ibm_credits) == 1);
278                 msg->ibm_nob = msg_nob;
279                 __swab64s(&msg->ibm_srcnid);
280                 __swab64s(&msg->ibm_srcstamp);
281                 __swab64s(&msg->ibm_dstnid);
282                 __swab64s(&msg->ibm_dststamp);
283                 __swab64s(&msg->ibm_seq);
284         }
285         
286         if (msg->ibm_srcnid == LNET_NID_ANY) {
287                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
288                 return -EPROTO;
289         }
290
291         switch (msg->ibm_type) {
292         default:
293                 CERROR("Unknown message type %x\n", msg->ibm_type);
294                 return -EPROTO;
295                 
296         case IBNAL_MSG_NOOP:
297                 break;
298
299         case IBNAL_MSG_IMMEDIATE:
300                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
301                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
302                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
303                         return -EPROTO;
304                 }
305                 break;
306
307         case IBNAL_MSG_PUT_REQ:
308                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
309                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
310                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
311                         return -EPROTO;
312                 }
313                 break;
314
315         case IBNAL_MSG_PUT_ACK:
316                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
317                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
318                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
319                         return -EPROTO;
320                 }
321 #if IBNAL_USE_FMR
322                 if (flip) {
323                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
324                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
325                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
326                 }
327 #else
328                 if (flip) {
329                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
330                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrag);
331                 }
332                 
333                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrag;
334                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
335                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
336                                n, IBNAL_MAX_RDMA_FRAGS);
337                         return -EPROTO;
338                 }
339                 
340                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
341                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
342                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
343                         return -EPROTO;
344                 }
345
346                 if (flip) {
347                         for (i = 0; i < n; i++) {
348                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
349                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr_lo);
350                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr_hi);
351                         }
352                 }
353 #endif
354                 break;
355
356         case IBNAL_MSG_GET_REQ:
357                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
358                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
359                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
360                         return -EPROTO;
361                 }
362 #if IBNAL_USE_FMR
363                 if (flip) {
364                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
365                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
366                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
367                 }
368 #else                
369                 if (flip) {
370                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
371                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrag);
372                 }
373
374                 n = msg->ibm_u.get.ibgm_rd.rd_nfrag;
375                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
376                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
377                                n, IBNAL_MAX_RDMA_FRAGS);
378                         return -EPROTO;
379                 }
380                 
381                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
382                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
383                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
384                         return -EPROTO;
385                 }
386                 
387                 if (flip)
388                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrag; i++) {
389                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
390                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr_lo);
391                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr_hi);
392                         }
393 #endif
394                 break;
395
396         case IBNAL_MSG_PUT_NAK:
397         case IBNAL_MSG_PUT_DONE:
398         case IBNAL_MSG_GET_DONE:
399                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
400                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
401                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
402                         return -EPROTO;
403                 }
404                 if (flip)
405                         __swab32s(&msg->ibm_u.completion.ibcm_status);
406                 break;
407
408         case IBNAL_MSG_CONNREQ:
409         case IBNAL_MSG_CONNACK:
410                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
411                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
412                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
413                         return -EPROTO;
414                 }
415                 if (flip) {
416                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
417                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
418                         __swab32s(&msg->ibm_u.connparams.ibcp_max_frags);
419                 }
420                 break;
421         }
422         return 0;
423 }
424
425 int
426 kibnal_start_listener (lnet_ni_t *ni)
427 {
428         static cm_listen_data_t info;
429
430         cm_return_t      cmrc;
431
432         LASSERT (kibnal_data.kib_listen_handle == NULL);
433
434         kibnal_data.kib_listen_handle = 
435                 cm_create_cep(cm_cep_transp_rc);
436         if (kibnal_data.kib_listen_handle == NULL) {
437                 CERROR ("Can't create listen CEP\n");
438                 return -ENOMEM;
439         }
440
441         CDEBUG(D_NET, "Created CEP %p for listening\n", 
442                kibnal_data.kib_listen_handle);
443
444         memset(&info, 0, sizeof(info));
445         info.listen_addr.end_pt.sid = 
446                 (__u64)(*kibnal_tunables.kib_service_number);
447
448         cmrc = cm_listen(kibnal_data.kib_listen_handle, &info,
449                          kibnal_listen_callback, NULL);
450         if (cmrc == cm_stat_success)
451                 return 0;
452         
453         CERROR ("cm_listen error: %d\n", cmrc);
454
455         cmrc = cm_destroy_cep(kibnal_data.kib_listen_handle);
456         LASSERT (cmrc == cm_stat_success);
457
458         kibnal_data.kib_listen_handle = NULL;
459         return -EINVAL;
460 }
461
462 void
463 kibnal_stop_listener(lnet_ni_t *ni)
464 {
465         cm_return_t      cmrc;
466
467         LASSERT (kibnal_data.kib_listen_handle != NULL);
468         
469         cmrc = cm_cancel(kibnal_data.kib_listen_handle);
470         if (cmrc != cm_stat_success)
471                 CERROR ("Error %d stopping listener\n", cmrc);
472
473         cfs_pause(cfs_time_seconds(1)/10);   /* ensure no more callbacks */
474         
475         cmrc = cm_destroy_cep(kibnal_data.kib_listen_handle);
476         if (cmrc != vv_return_ok)
477                 CERROR ("Error %d destroying CEP\n", cmrc);
478
479         kibnal_data.kib_listen_handle = NULL;
480 }
481
482 int
483 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
484 {
485         kib_peer_t     *peer;
486         unsigned long   flags;
487         int             rc;
488
489         LASSERT (nid != LNET_NID_ANY);
490
491         LIBCFS_ALLOC(peer, sizeof (*peer));
492         if (peer == NULL) {
493                 CERROR("Cannot allocate peer\n");
494                 return -ENOMEM;
495         }
496
497         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
498
499         peer->ibp_nid = nid;
500         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
501
502         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
503         INIT_LIST_HEAD (&peer->ibp_conns);
504         INIT_LIST_HEAD (&peer->ibp_tx_queue);
505
506         peer->ibp_error = 0;
507         peer->ibp_last_alive = cfs_time_current();
508         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
509
510         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
511
512         if (atomic_read(&kibnal_data.kib_npeers) >=
513             *kibnal_tunables.kib_concurrent_peers) {
514                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
515         } else if (kibnal_data.kib_listen_handle == NULL) {
516                 rc = -ESHUTDOWN;        /* shutdown has started */
517         } else {
518                 rc = 0;
519                 /* npeers only grows with the global lock held */
520                 atomic_inc(&kibnal_data.kib_npeers);
521         }
522         
523         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
524
525         if (rc != 0) {
526                 CERROR("Can't create peer: %s\n", 
527                        (rc == -ESHUTDOWN) ? "shutting down" : 
528                        "too many peers");
529                 LIBCFS_FREE(peer, sizeof(*peer));
530         } else {
531                 *peerp = peer;
532         }
533         
534         return rc;
535 }
536
537 void
538 kibnal_destroy_peer (kib_peer_t *peer)
539 {
540         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
541         LASSERT (peer->ibp_persistence == 0);
542         LASSERT (!kibnal_peer_active(peer));
543         LASSERT (peer->ibp_connecting == 0);
544         LASSERT (peer->ibp_accepting == 0);
545         LASSERT (list_empty (&peer->ibp_conns));
546         LASSERT (list_empty (&peer->ibp_tx_queue));
547         
548         LIBCFS_FREE (peer, sizeof (*peer));
549
550         /* NB a peer's connections keep a reference on their peer until
551          * they are destroyed, so we can be assured that _all_ state to do
552          * with this peer has been cleaned up when its refcount drops to
553          * zero. */
554         atomic_dec(&kibnal_data.kib_npeers);
555 }
556
557 kib_peer_t *
558 kibnal_find_peer_locked (lnet_nid_t nid)
559 {
560         /* the caller is responsible for accounting the additional reference
561          * that this creates */
562         struct list_head *peer_list = kibnal_nid2peerlist (nid);
563         struct list_head *tmp;
564         kib_peer_t       *peer;
565
566         list_for_each (tmp, peer_list) {
567
568                 peer = list_entry (tmp, kib_peer_t, ibp_list);
569
570                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
571                          peer->ibp_connecting != 0 || /* creating conns */
572                          peer->ibp_accepting != 0 ||
573                          !list_empty (&peer->ibp_conns));  /* active conn */
574
575                 if (peer->ibp_nid != nid)
576                         continue;
577
578                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
579                        peer, libcfs_nid2str(nid),
580                        atomic_read (&peer->ibp_refcount));
581                 return (peer);
582         }
583         return (NULL);
584 }
585
586 void
587 kibnal_unlink_peer_locked (kib_peer_t *peer)
588 {
589         LASSERT (peer->ibp_persistence == 0);
590         LASSERT (list_empty(&peer->ibp_conns));
591
592         LASSERT (kibnal_peer_active(peer));
593         list_del_init (&peer->ibp_list);
594         /* lose peerlist's ref */
595         kibnal_peer_decref(peer);
596 }
597
598 int
599 kibnal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp,
600                       int *persistencep)
601 {
602         kib_peer_t        *peer;
603         struct list_head  *ptmp;
604         int                i;
605         unsigned long      flags;
606
607         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
608
609         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
610
611                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
612
613                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
614                         LASSERT (peer->ibp_persistence != 0 ||
615                                  peer->ibp_connecting != 0 ||
616                                  peer->ibp_accepting != 0 ||
617                                  !list_empty (&peer->ibp_conns));
618
619                         if (index-- > 0)
620                                 continue;
621
622                         *nidp = peer->ibp_nid;
623                         *ipp = peer->ibp_ip;
624                         *persistencep = peer->ibp_persistence;
625
626                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
627                                                flags);
628                         return (0);
629                 }
630         }
631
632         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
633         return (-ENOENT);
634 }
635
636 int
637 kibnal_add_persistent_peer (lnet_nid_t nid, __u32 ip)
638 {
639         kib_peer_t        *peer;
640         kib_peer_t        *peer2;
641         unsigned long      flags;
642         int                rc;
643
644         CDEBUG(D_NET, "%s at %u.%u.%u.%u\n",
645                libcfs_nid2str(nid), HIPQUAD(ip));
646         
647         if (nid == LNET_NID_ANY)
648                 return (-EINVAL);
649
650         rc = kibnal_create_peer(&peer, nid);
651         if (rc != 0)
652                 return rc;
653
654         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
655
656         /* I'm always called with a reference on kibnal_data.kib_ni
657          * so shutdown can't have started */
658         LASSERT (kibnal_data.kib_listen_handle != NULL);
659
660         peer2 = kibnal_find_peer_locked (nid);
661         if (peer2 != NULL) {
662                 kibnal_peer_decref (peer);
663                 peer = peer2;
664         } else {
665                 /* peer table takes existing ref on peer */
666                 list_add_tail (&peer->ibp_list,
667                                kibnal_nid2peerlist (nid));
668         }
669
670         peer->ibp_ip = ip;
671         peer->ibp_persistence++;
672         
673         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
674         return (0);
675 }
676
677 void
678 kibnal_del_peer_locked (kib_peer_t *peer)
679 {
680         struct list_head *ctmp;
681         struct list_head *cnxt;
682         kib_conn_t       *conn;
683
684         peer->ibp_persistence = 0;
685
686         if (list_empty(&peer->ibp_conns)) {
687                 kibnal_unlink_peer_locked(peer);
688         } else {
689                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
690                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
691
692                         kibnal_close_conn_locked (conn, 0);
693                 }
694                 /* NB peer is no longer persistent; closing its last conn
695                  * unlinked it. */
696         }
697         /* NB peer now unlinked; might even be freed if the peer table had the
698          * last ref on it. */
699 }
700
701 int
702 kibnal_del_peer (lnet_nid_t nid)
703 {
704         CFS_LIST_HEAD     (zombies);
705         struct list_head  *ptmp;
706         struct list_head  *pnxt;
707         kib_peer_t        *peer;
708         int                lo;
709         int                hi;
710         int                i;
711         unsigned long      flags;
712         int                rc = -ENOENT;
713
714         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
715
716         if (nid != LNET_NID_ANY)
717                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
718         else {
719                 lo = 0;
720                 hi = kibnal_data.kib_peer_hash_size - 1;
721         }
722
723         for (i = lo; i <= hi; i++) {
724                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
725                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
726                         LASSERT (peer->ibp_persistence != 0 ||
727                                  peer->ibp_connecting != 0 ||
728                                  peer->ibp_accepting != 0 ||
729                                  !list_empty (&peer->ibp_conns));
730
731                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
732                                 continue;
733
734                         if (!list_empty(&peer->ibp_tx_queue)) {
735                                 LASSERT (list_empty(&peer->ibp_conns));
736
737                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
738                         }
739
740                         kibnal_del_peer_locked (peer);
741                         rc = 0;         /* matched something */
742                 }
743         }
744
745         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
746
747         kibnal_txlist_done(&zombies, -EIO);
748
749         return (rc);
750 }
751
752 kib_conn_t *
753 kibnal_get_conn_by_idx (int index)
754 {
755         kib_peer_t        *peer;
756         struct list_head  *ptmp;
757         kib_conn_t        *conn;
758         struct list_head  *ctmp;
759         int                i;
760         unsigned long      flags;
761
762         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
763
764         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
765                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
766
767                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
768                         LASSERT (peer->ibp_persistence > 0 ||
769                                  peer->ibp_connecting != 0 ||
770                                  peer->ibp_accepting != 0 ||
771                                  !list_empty (&peer->ibp_conns));
772
773                         list_for_each (ctmp, &peer->ibp_conns) {
774                                 if (index-- > 0)
775                                         continue;
776
777                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
778                                 kibnal_conn_addref(conn);
779                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
780                                                        flags);
781                                 return (conn);
782                         }
783                 }
784         }
785
786         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
787         return (NULL);
788 }
789
790 void
791 kibnal_debug_rx (kib_rx_t *rx)
792 {
793         CDEBUG(D_CONSOLE, "      %p nob %d msg_type %x "
794                "cred %d seq "LPD64"\n",
795                rx, rx->rx_nob, rx->rx_msg->ibm_type,
796                rx->rx_msg->ibm_credits, rx->rx_msg->ibm_seq);
797 }
798
799 void
800 kibnal_debug_tx (kib_tx_t *tx)
801 {
802         CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "
803                "cookie "LPX64" msg %s%s type %x cred %d seq "LPD64"\n",
804                tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,
805                tx->tx_status, tx->tx_deadline, tx->tx_cookie,
806                tx->tx_lntmsg[0] == NULL ? "-" : "!",
807                tx->tx_lntmsg[1] == NULL ? "-" : "!",
808                tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits,
809                tx->tx_msg->ibm_seq);
810 }
811
812 void
813 kibnal_debug_conn (kib_conn_t *conn)
814 {
815         struct list_head *tmp;
816         int               i;
817         
818         spin_lock(&conn->ibc_lock);
819         
820         CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n", 
821                atomic_read(&conn->ibc_refcount), conn, 
822                libcfs_nid2str(conn->ibc_peer->ibp_nid));
823         CDEBUG(D_CONSOLE, "   txseq "LPD64" rxseq "LPD64" state %d \n",
824                conn->ibc_txseq, conn->ibc_rxseq, conn->ibc_state);
825         CDEBUG(D_CONSOLE, "   nposted %d cred %d o_cred %d r_cred %d\n",
826                conn->ibc_nsends_posted, conn->ibc_credits, 
827                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);
828         CDEBUG(D_CONSOLE, "   disc %d comms_err %d\n",
829                conn->ibc_disconnect, conn->ibc_comms_error);
830
831         CDEBUG(D_CONSOLE, "   early_rxs:\n");
832         list_for_each(tmp, &conn->ibc_early_rxs)
833                 kibnal_debug_rx(list_entry(tmp, kib_rx_t, rx_list));
834         
835         CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");
836         list_for_each(tmp, &conn->ibc_tx_queue_nocred)
837                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
838
839         CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");
840         list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)
841                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
842
843         CDEBUG(D_CONSOLE, "   tx_queue:\n");
844         list_for_each(tmp, &conn->ibc_tx_queue)
845                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
846         
847         CDEBUG(D_CONSOLE, "   active_txs:\n");
848         list_for_each(tmp, &conn->ibc_active_txs)
849                 kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
850         
851         CDEBUG(D_CONSOLE, "   rxs:\n");
852         for (i = 0; i < IBNAL_RX_MSGS; i++)
853                 kibnal_debug_rx(&conn->ibc_rxs[i]);
854
855         spin_unlock(&conn->ibc_lock);
856 }
857
858 int
859 kibnal_set_qp_state (kib_conn_t *conn, vv_qp_state_t new_state)
860 {
861         static vv_qp_attr_t attr;
862         
863         kib_connvars_t   *cv = conn->ibc_connvars;
864         vv_return_t       vvrc;
865         
866         /* Only called by connd => static OK */
867         LASSERT (!in_interrupt());
868         LASSERT (current == kibnal_data.kib_connd);
869
870         memset(&attr, 0, sizeof(attr));
871         
872         switch (new_state) {
873         default:
874                 LBUG();
875                 
876         case vv_qp_state_init: {
877                 struct vv_qp_modify_init_st *init = &attr.modify.params.init;
878
879                 init->p_key_indx     = cv->cv_pkey_index;
880                 init->phy_port_num   = cv->cv_port;
881                 init->q_key          = IBNAL_QKEY; /* XXX but VV_QP_AT_Q_KEY not set! */
882                 init->access_control = vv_acc_r_mem_read |
883                                        vv_acc_r_mem_write; /* XXX vv_acc_l_mem_write ? */
884
885                 attr.modify.vv_qp_attr_mask = VV_QP_AT_P_KEY_IX | 
886                                               VV_QP_AT_PHY_PORT_NUM |
887                                               VV_QP_AT_ACCESS_CON_F;
888                 break;
889         }
890         case vv_qp_state_rtr: {
891                 struct vv_qp_modify_rtr_st *rtr = &attr.modify.params.rtr;
892                 vv_add_vec_t               *av  = &rtr->remote_add_vec;
893
894                 av->dlid                      = cv->cv_path.dlid;
895                 av->grh_flag                  = (!IBNAL_LOCAL_SUB);
896                 av->max_static_rate           = IBNAL_R_2_STATIC_RATE(cv->cv_path.rate);
897                 av->service_level             = cv->cv_path.sl;
898                 av->source_path_bit           = IBNAL_SOURCE_PATH_BIT;
899                 av->pmtu                      = cv->cv_path.mtu;
900                 av->rnr_retry_count           = cv->cv_rnr_count;
901                 av->global_dest.traffic_class = cv->cv_path.traffic_class;
902                 av->global_dest.hope_limit    = cv->cv_path.hop_limut;
903                 av->global_dest.flow_lable    = cv->cv_path.flow_label;
904                 av->global_dest.s_gid_index   = cv->cv_sgid_index;
905                 // XXX other av fields zero?
906
907                 rtr->destanation_qp            = cv->cv_remote_qpn;
908                 rtr->receive_psn               = cv->cv_rxpsn;
909                 rtr->responder_rdma_r_atom_num = IBNAL_OUS_DST_RD;
910                 rtr->opt_min_rnr_nak_timer     = *kibnal_tunables.kib_rnr_nak_timer;
911
912
913                 // XXX sdp sets VV_QP_AT_OP_F but no actual optional options
914                 attr.modify.vv_qp_attr_mask = VV_QP_AT_ADD_VEC | 
915                                               VV_QP_AT_DEST_QP |
916                                               VV_QP_AT_R_PSN | 
917                                               VV_QP_AT_MIN_RNR_NAK_T |
918                                               VV_QP_AT_RESP_RDMA_ATOM_OUT_NUM |
919                                               VV_QP_AT_OP_F;
920                 break;
921         }
922         case vv_qp_state_rts: {
923                 struct vv_qp_modify_rts_st *rts = &attr.modify.params.rts;
924
925                 rts->send_psn                 = cv->cv_txpsn;
926                 rts->local_ack_timeout        = *kibnal_tunables.kib_local_ack_timeout;
927                 rts->retry_num                = *kibnal_tunables.kib_retry_cnt;
928                 rts->rnr_num                  = *kibnal_tunables.kib_rnr_cnt;
929                 rts->dest_out_rdma_r_atom_num = IBNAL_OUS_DST_RD;
930                 
931                 attr.modify.vv_qp_attr_mask = VV_QP_AT_S_PSN |
932                                               VV_QP_AT_L_ACK_T |
933                                               VV_QP_AT_RETRY_NUM |
934                                               VV_QP_AT_RNR_NUM |
935                                               VV_QP_AT_DEST_RDMA_ATOM_OUT_NUM;
936                 break;
937         }
938         case vv_qp_state_error:
939         case vv_qp_state_reset:
940                 attr.modify.vv_qp_attr_mask = 0;
941                 break;
942         }
943                 
944         attr.modify.qp_modify_into_state = new_state;
945         attr.modify.vv_qp_attr_mask |= VV_QP_AT_STATE;
946         
947         vvrc = vv_qp_modify(kibnal_data.kib_hca, conn->ibc_qp, &attr, NULL);
948         if (vvrc != vv_return_ok) {
949                 CERROR("Can't modify qp -> %s state to %d: %d\n", 
950                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
951                        new_state, vvrc);
952                 return -EIO;
953         }
954         
955         return 0;
956 }
957
958 kib_conn_t *
959 kibnal_create_conn (cm_cep_handle_t cep)
960 {
961         kib_conn_t   *conn;
962         int           i;
963         int           page_offset;
964         int           ipage;
965         vv_return_t   vvrc;
966         int           rc;
967
968         static vv_qp_attr_t  reqattr;
969         static vv_qp_attr_t  rspattr;
970
971         /* Only the connd creates conns => single threaded */
972         LASSERT(!in_interrupt());
973         LASSERT(current == kibnal_data.kib_connd);
974         
975         LIBCFS_ALLOC(conn, sizeof (*conn));
976         if (conn == NULL) {
977                 CERROR ("Can't allocate connection\n");
978                 return (NULL);
979         }
980
981         /* zero flags, NULL pointers etc... */
982         memset (conn, 0, sizeof (*conn));
983
984         conn->ibc_version = IBNAL_MSG_VERSION;  /* Use latest version at first */
985
986         INIT_LIST_HEAD (&conn->ibc_early_rxs);
987         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
988         INIT_LIST_HEAD (&conn->ibc_tx_queue);
989         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
990         INIT_LIST_HEAD (&conn->ibc_active_txs);
991         spin_lock_init (&conn->ibc_lock);
992         
993         atomic_inc (&kibnal_data.kib_nconns);
994         /* well not really, but I call destroy() on failure, which decrements */
995
996         conn->ibc_cep = cep;
997
998         LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
999         if (conn->ibc_connvars == NULL) {
1000                 CERROR("Can't allocate in-progress connection state\n");
1001                 goto failed;
1002         }
1003         memset (conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));
1004         /* Random seed for QP sequence number */
1005         get_random_bytes(&conn->ibc_connvars->cv_rxpsn,
1006                          sizeof(conn->ibc_connvars->cv_rxpsn));
1007
1008         LIBCFS_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
1009         if (conn->ibc_rxs == NULL) {
1010                 CERROR("Cannot allocate RX buffers\n");
1011                 goto failed;
1012         }
1013         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
1014
1015         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES, 1);
1016         if (rc != 0)
1017                 goto failed;
1018
1019         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
1020                 struct page    *page = conn->ibc_rx_pages->ibp_pages[ipage];
1021                 kib_rx_t       *rx = &conn->ibc_rxs[i];
1022                 vv_mem_reg_h_t  mem_h;
1023                 vv_r_key_t      r_key;
1024
1025                 rx->rx_conn = conn;
1026                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1027                              page_offset);
1028
1029                 vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1030                                             rx->rx_msg,
1031                                             IBNAL_MSG_SIZE,
1032                                             &mem_h,
1033                                             &rx->rx_lkey,
1034                                             &r_key);
1035                 LASSERT (vvrc == vv_return_ok);
1036
1037                 CDEBUG(D_NET, "Rx[%d] %p->%p[%x]\n", i, rx, 
1038                        rx->rx_msg, rx->rx_lkey);
1039
1040                 page_offset += IBNAL_MSG_SIZE;
1041                 LASSERT (page_offset <= PAGE_SIZE);
1042
1043                 if (page_offset == PAGE_SIZE) {
1044                         page_offset = 0;
1045                         ipage++;
1046                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
1047                 }
1048         }
1049
1050         memset(&reqattr, 0, sizeof(reqattr));
1051
1052         reqattr.create.qp_type                    = vv_qp_type_r_conn;
1053         reqattr.create.cq_send_h                  = kibnal_data.kib_cq;
1054         reqattr.create.cq_receive_h               = kibnal_data.kib_cq;
1055         reqattr.create.send_max_outstand_wr       = (1 + IBNAL_MAX_RDMA_FRAGS) * 
1056                                                     (*kibnal_tunables.kib_concurrent_sends);
1057         reqattr.create.receive_max_outstand_wr    = IBNAL_RX_MSGS;
1058         reqattr.create.max_scatgat_per_send_wr    = 1;
1059         reqattr.create.max_scatgat_per_receive_wr = 1;
1060         reqattr.create.signaling_type             = vv_selectable_signaling;
1061         reqattr.create.pd_h                       = kibnal_data.kib_pd;
1062         reqattr.create.recv_solicited_events      = vv_selectable_signaling; // vv_signal_all;
1063
1064         vvrc = vv_qp_create(kibnal_data.kib_hca, &reqattr, NULL,
1065                             &conn->ibc_qp, &rspattr);
1066         if (vvrc != vv_return_ok) {
1067                 CERROR ("Failed to create queue pair: %d\n", vvrc);
1068                 goto failed;
1069         }
1070
1071         /* Mark QP created */
1072         conn->ibc_state = IBNAL_CONN_INIT_QP;
1073         conn->ibc_connvars->cv_local_qpn = rspattr.create_return.qp_num;
1074
1075         if (rspattr.create_return.receive_max_outstand_wr < 
1076             IBNAL_RX_MSGS ||
1077             rspattr.create_return.send_max_outstand_wr < 
1078             (1 + IBNAL_MAX_RDMA_FRAGS) * (*kibnal_tunables.kib_concurrent_sends)) {
1079                 CERROR("Insufficient rx/tx work items: wanted %d/%d got %d/%d\n",
1080                        IBNAL_RX_MSGS, 
1081                        (1 + IBNAL_MAX_RDMA_FRAGS) * 
1082                        (*kibnal_tunables.kib_concurrent_sends),
1083                        rspattr.create_return.receive_max_outstand_wr,
1084                        rspattr.create_return.send_max_outstand_wr);
1085                 goto failed;
1086         }
1087
1088         /* Mark init complete */
1089         conn->ibc_state = IBNAL_CONN_INIT;
1090
1091         /* 1 ref for caller */
1092         atomic_set (&conn->ibc_refcount, 1);
1093         return (conn);
1094         
1095  failed:
1096         kibnal_destroy_conn (conn);
1097         return (NULL);
1098 }
1099
1100 void
1101 kibnal_destroy_conn (kib_conn_t *conn)
1102 {
1103         vv_return_t vvrc;
1104
1105         /* Only the connd does this (i.e. single threaded) */
1106         LASSERT (!in_interrupt());
1107         LASSERT (current == kibnal_data.kib_connd);
1108         
1109         CDEBUG (D_NET, "connection %p\n", conn);
1110
1111         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1112         LASSERT (list_empty(&conn->ibc_early_rxs));
1113         LASSERT (list_empty(&conn->ibc_tx_queue));
1114         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1115         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1116         LASSERT (list_empty(&conn->ibc_active_txs));
1117         LASSERT (conn->ibc_nsends_posted == 0);
1118
1119         switch (conn->ibc_state) {
1120         default:
1121                 /* conn must be completely disengaged from the network */
1122                 LBUG();
1123
1124         case IBNAL_CONN_DISCONNECTED:
1125                 /* connvars should have been freed already */
1126                 LASSERT (conn->ibc_connvars == NULL);
1127                 /* fall through */
1128
1129         case IBNAL_CONN_INIT:
1130                 vvrc = cm_destroy_cep(conn->ibc_cep);
1131                 LASSERT (vvrc == vv_return_ok);
1132                 /* fall through */
1133
1134         case IBNAL_CONN_INIT_QP:
1135                 kibnal_set_qp_state(conn, vv_qp_state_reset);
1136                 vvrc = vv_qp_destroy(kibnal_data.kib_hca, conn->ibc_qp);
1137                 if (vvrc != vv_return_ok)
1138                         CERROR("Can't destroy QP: %d\n", vvrc);
1139                 /* fall through */
1140                 
1141         case IBNAL_CONN_INIT_NOTHING:
1142                 break;
1143         }
1144
1145         if (conn->ibc_rx_pages != NULL) 
1146                 kibnal_free_pages(conn->ibc_rx_pages);
1147
1148         if (conn->ibc_rxs != NULL)
1149                 LIBCFS_FREE(conn->ibc_rxs, 
1150                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1151
1152         if (conn->ibc_connvars != NULL)
1153                 LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
1154
1155         if (conn->ibc_peer != NULL)
1156                 kibnal_peer_decref(conn->ibc_peer);
1157
1158         LIBCFS_FREE(conn, sizeof (*conn));
1159
1160         atomic_dec(&kibnal_data.kib_nconns);
1161 }
1162
1163 int
1164 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1165 {
1166         kib_conn_t         *conn;
1167         struct list_head   *ctmp;
1168         struct list_head   *cnxt;
1169         int                 count = 0;
1170
1171         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1172                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1173
1174                 count++;
1175                 kibnal_close_conn_locked (conn, why);
1176         }
1177
1178         return (count);
1179 }
1180
1181 int
1182 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1183 {
1184         kib_conn_t         *conn;
1185         struct list_head   *ctmp;
1186         struct list_head   *cnxt;
1187         int                 count = 0;
1188
1189         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1190                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1191
1192                 if (conn->ibc_incarnation == incarnation)
1193                         continue;
1194
1195                 CDEBUG(D_NET, "Closing stale conn -> %s incarnation:"LPX64"("LPX64")\n",
1196                        libcfs_nid2str(peer->ibp_nid),
1197                        conn->ibc_incarnation, incarnation);
1198                 
1199                 count++;
1200                 kibnal_close_conn_locked (conn, -ESTALE);
1201         }
1202
1203         return (count);
1204 }
1205
1206 int
1207 kibnal_close_matching_conns (lnet_nid_t nid)
1208 {
1209         kib_peer_t         *peer;
1210         struct list_head   *ptmp;
1211         struct list_head   *pnxt;
1212         int                 lo;
1213         int                 hi;
1214         int                 i;
1215         unsigned long       flags;
1216         int                 count = 0;
1217
1218         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1219
1220         if (nid != LNET_NID_ANY)
1221                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1222         else {
1223                 lo = 0;
1224                 hi = kibnal_data.kib_peer_hash_size - 1;
1225         }
1226
1227         for (i = lo; i <= hi; i++) {
1228                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1229
1230                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1231                         LASSERT (peer->ibp_persistence != 0 ||
1232                                  peer->ibp_connecting != 0 ||
1233                                  peer->ibp_accepting != 0 ||
1234                                  !list_empty (&peer->ibp_conns));
1235
1236                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1237                                 continue;
1238
1239                         count += kibnal_close_peer_conns_locked (peer, 0);
1240                 }
1241         }
1242
1243         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1244
1245         /* wildcards always succeed */
1246         if (nid == LNET_NID_ANY)
1247                 return (0);
1248         
1249         return (count == 0 ? -ENOENT : 0);
1250 }
1251
1252 int
1253 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1254 {
1255         struct libcfs_ioctl_data *data = arg;
1256         int                       rc = -EINVAL;
1257
1258         LASSERT (ni == kibnal_data.kib_ni);
1259
1260         switch(cmd) {
1261         case IOC_LIBCFS_GET_PEER: {
1262                 lnet_nid_t   nid = 0;
1263                 __u32        ip = 0;
1264                 int          share_count = 0;
1265
1266                 rc = kibnal_get_peer_info(data->ioc_count,
1267                                           &nid, &ip, &share_count);
1268                 data->ioc_nid    = nid;
1269                 data->ioc_count  = share_count;
1270                 data->ioc_u32[0] = ip;
1271                 data->ioc_u32[1] = *kibnal_tunables.kib_service_number; /* port */
1272                 break;
1273         }
1274         case IOC_LIBCFS_ADD_PEER: {
1275                 rc = kibnal_add_persistent_peer (data->ioc_nid,
1276                                                  data->ioc_u32[0]); /* IP */
1277                 break;
1278         }
1279         case IOC_LIBCFS_DEL_PEER: {
1280                 rc = kibnal_del_peer (data->ioc_nid);
1281                 break;
1282         }
1283         case IOC_LIBCFS_GET_CONN: {
1284                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1285
1286                 if (conn == NULL)
1287                         rc = -ENOENT;
1288                 else {
1289                         // kibnal_debug_conn(conn);
1290                         rc = 0;
1291                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1292                         kibnal_conn_decref(conn);
1293                 }
1294                 break;
1295         }
1296         case IOC_LIBCFS_CLOSE_CONNECTION: {
1297                 rc = kibnal_close_matching_conns (data->ioc_nid);
1298                 break;
1299         }
1300         case IOC_LIBCFS_REGISTER_MYNID: {
1301                 if (ni->ni_nid == data->ioc_nid) {
1302                         rc = 0;
1303                 } else {
1304                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1305                                libcfs_nid2str(data->ioc_nid),
1306                                libcfs_nid2str(ni->ni_nid));
1307                         rc = -EINVAL;
1308                 }
1309                 break;
1310         }
1311         }
1312
1313         return rc;
1314 }
1315
1316 void
1317 kibnal_free_pages (kib_pages_t *p)
1318 {
1319         int         npages = p->ibp_npages;
1320         int         i;
1321         
1322         for (i = 0; i < npages; i++)
1323                 if (p->ibp_pages[i] != NULL)
1324                         __free_page(p->ibp_pages[i]);
1325         
1326         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1327 }
1328
1329 int
1330 kibnal_alloc_pages (kib_pages_t **pp, int npages, int allow_write)
1331 {
1332         kib_pages_t   *p;
1333         int            i;
1334
1335         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1336         if (p == NULL) {
1337                 CERROR ("Can't allocate buffer %d\n", npages);
1338                 return (-ENOMEM);
1339         }
1340
1341         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1342         p->ibp_npages = npages;
1343         
1344         for (i = 0; i < npages; i++) {
1345                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1346                 if (p->ibp_pages[i] == NULL) {
1347                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1348                         kibnal_free_pages(p);
1349                         return (-ENOMEM);
1350                 }
1351         }
1352
1353         *pp = p;
1354         return (0);
1355 }
1356
1357 int
1358 kibnal_alloc_tx_descs (void) 
1359 {
1360         int    i;
1361         
1362         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1363                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1364         if (kibnal_data.kib_tx_descs == NULL)
1365                 return -ENOMEM;
1366         
1367         memset(kibnal_data.kib_tx_descs, 0,
1368                IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1369
1370         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1371                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1372
1373 #if IBNAL_USE_FMR
1374                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1375                              sizeof(*tx->tx_pages));
1376                 if (tx->tx_pages == NULL)
1377                         return -ENOMEM;
1378 #else
1379                 LIBCFS_ALLOC(tx->tx_wrq, 
1380                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1381                              sizeof(*tx->tx_wrq));
1382                 if (tx->tx_wrq == NULL)
1383                         return -ENOMEM;
1384                 
1385                 LIBCFS_ALLOC(tx->tx_gl, 
1386                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1387                              sizeof(*tx->tx_gl));
1388                 if (tx->tx_gl == NULL)
1389                         return -ENOMEM;
1390                 
1391                 LIBCFS_ALLOC(tx->tx_rd, 
1392                              offsetof(kib_rdma_desc_t, 
1393                                       rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1394                 if (tx->tx_rd == NULL)
1395                         return -ENOMEM;
1396 #endif
1397         }
1398
1399         return 0;
1400 }
1401
1402 void
1403 kibnal_free_tx_descs (void) 
1404 {
1405         int    i;
1406
1407         if (kibnal_data.kib_tx_descs == NULL)
1408                 return;
1409
1410         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1411                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1412
1413 #if IBNAL_USE_FMR
1414                 if (tx->tx_pages != NULL)
1415                         LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1416                                     sizeof(*tx->tx_pages));
1417 #else
1418                 if (tx->tx_wrq != NULL)
1419                         LIBCFS_FREE(tx->tx_wrq, 
1420                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1421                                     sizeof(*tx->tx_wrq));
1422
1423                 if (tx->tx_gl != NULL)
1424                         LIBCFS_FREE(tx->tx_gl, 
1425                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1426                                     sizeof(*tx->tx_gl));
1427
1428                 if (tx->tx_rd != NULL)
1429                         LIBCFS_FREE(tx->tx_rd, 
1430                                     offsetof(kib_rdma_desc_t, 
1431                                              rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1432 #endif
1433         }
1434
1435         LIBCFS_FREE(kibnal_data.kib_tx_descs,
1436                     IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1437 }
1438
1439 #if IBNAL_USE_FMR
1440 void
1441 kibnal_free_fmrs (int n) 
1442 {
1443         int             i;
1444         vv_return_t     vvrc;
1445         kib_tx_t       *tx;
1446
1447         for (i = 0; i < n; i++) {
1448                 tx = &kibnal_data.kib_tx_descs[i];
1449
1450                 vvrc = vv_free_fmr(kibnal_data.kib_hca,
1451                                    tx->tx_md.md_fmrhandle);
1452                 if (vvrc != vv_return_ok)
1453                         CWARN("vv_free_fmr[%d]: %d\n", i, vvrc);
1454         }
1455 }
1456 #endif
1457
1458 int
1459 kibnal_setup_tx_descs (void)
1460 {
1461         int             ipage = 0;
1462         int             page_offset = 0;
1463         struct page    *page;
1464         kib_tx_t       *tx;
1465         vv_mem_reg_h_t  mem_h;
1466         vv_r_key_t      rkey;
1467         vv_return_t     vvrc;
1468         int             i;
1469         int             rc;
1470 #if IBNAL_USE_FMR
1471         vv_fmr_t        fmr_props;
1472 #endif
1473
1474         /* pre-mapped messages are not bigger than 1 page */
1475         CLASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1476
1477         /* No fancy arithmetic when we do the buffer calculations */
1478         CLASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1479
1480         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages, 
1481                                 IBNAL_TX_MSG_PAGES(), 0);
1482         if (rc != 0)
1483                 return (rc);
1484
1485         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1486                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1487                 tx = &kibnal_data.kib_tx_descs[i];
1488
1489 #if IBNAL_USE_FMR
1490                 memset(&fmr_props, 0, sizeof(fmr_props));
1491                 fmr_props.pd_hndl              = kibnal_data.kib_pd;
1492                 fmr_props.acl                  = (vv_acc_r_mem_write |
1493                                                   vv_acc_l_mem_write);
1494                 fmr_props.max_pages            = LNET_MAX_IOV;
1495                 fmr_props.log2_page_sz         = PAGE_SHIFT;
1496                 fmr_props.max_outstanding_maps = *kibnal_tunables.kib_fmr_remaps;
1497                 
1498                 vvrc = vv_alloc_fmr(kibnal_data.kib_hca,
1499                                     &fmr_props,
1500                                     &tx->tx_md.md_fmrhandle);
1501                 if (vvrc != vv_return_ok) {
1502                         CERROR("Can't allocate fmr %d: %d\n", i, vvrc);
1503                         
1504                         kibnal_free_fmrs(i);
1505                         kibnal_free_pages (kibnal_data.kib_tx_pages);
1506                         return -ENOMEM;
1507                 }
1508
1509                 tx->tx_md.md_fmrcount = *kibnal_tunables.kib_fmr_remaps;
1510                 tx->tx_md.md_active   = 0;
1511 #endif
1512                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1513                                            page_offset);
1514
1515                 vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1516                                             tx->tx_msg,
1517                                             IBNAL_MSG_SIZE,
1518                                             &mem_h,
1519                                             &tx->tx_lkey,
1520                                             &rkey);
1521                 LASSERT (vvrc == vv_return_ok);
1522
1523                 CDEBUG(D_NET, "Tx[%d] %p->%p[%x]\n", i, tx, 
1524                        tx->tx_msg, tx->tx_lkey);
1525
1526                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1527
1528                 page_offset += IBNAL_MSG_SIZE;
1529                 LASSERT (page_offset <= PAGE_SIZE);
1530
1531                 if (page_offset == PAGE_SIZE) {
1532                         page_offset = 0;
1533                         ipage++;
1534                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1535                 }
1536         }
1537         
1538         return (0);
1539 }
1540
1541 void
1542 kibnal_shutdown (lnet_ni_t *ni)
1543 {
1544         int           i;
1545         vv_return_t   vvrc;
1546
1547         LASSERT (ni == kibnal_data.kib_ni);
1548         LASSERT (ni->ni_data == &kibnal_data);
1549         
1550         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1551                atomic_read (&libcfs_kmemory));
1552
1553         switch (kibnal_data.kib_init) {
1554
1555         case IBNAL_INIT_ALL:
1556                 /* stop accepting connections and prevent new peers */
1557                 kibnal_stop_listener(ni);
1558
1559                 /* nuke all existing peers */
1560                 kibnal_del_peer(LNET_NID_ANY);
1561
1562                 /* Wait for all peer state to clean up */
1563                 i = 2;
1564                 while (atomic_read(&kibnal_data.kib_npeers) != 0) {
1565                         i++;
1566                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
1567                                "waiting for %d peers to disconnect\n",
1568                                atomic_read(&kibnal_data.kib_npeers));
1569                         cfs_pause(cfs_time_seconds(1));
1570                 }
1571                 /* fall through */
1572
1573         case IBNAL_INIT_CQ:
1574                 vvrc = vv_cq_destroy(kibnal_data.kib_hca, kibnal_data.kib_cq);
1575                 if (vvrc != vv_return_ok)
1576                         CERROR ("Destroy CQ error: %d\n", vvrc);
1577                 /* fall through */
1578
1579         case IBNAL_INIT_TXD:
1580                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1581 #if IBNAL_USE_FMR
1582                 kibnal_free_fmrs(IBNAL_TX_MSGS());
1583 #endif
1584                 /* fall through */
1585
1586         case IBNAL_INIT_PD:
1587 #if 0
1588                 /* Only deallocate a PD if we actually allocated one */
1589                 vvrc = vv_pd_deallocate(kibnal_data.kib_hca,
1590                                         kibnal_data.kib_pd);
1591                 if (vvrc != vv_return_ok)
1592                         CERROR ("Destroy PD error: %d\n", vvrc);
1593 #endif
1594                 /* fall through */
1595
1596         case IBNAL_INIT_ASYNC:
1597                 vvrc = vv_dell_async_event_cb (kibnal_data.kib_hca,
1598                                               kibnal_async_callback);
1599                 if (vvrc != vv_return_ok)
1600                         CERROR("vv_dell_async_event_cb error: %d\n", vvrc);
1601                         
1602                 /* fall through */
1603
1604         case IBNAL_INIT_HCA:
1605                 vvrc = vv_hca_close(kibnal_data.kib_hca);
1606                 if (vvrc != vv_return_ok)
1607                         CERROR ("Close HCA  error: %d\n", vvrc);
1608                 /* fall through */
1609
1610         case IBNAL_INIT_DATA:
1611                 LASSERT (atomic_read(&kibnal_data.kib_npeers) == 0);
1612                 LASSERT (kibnal_data.kib_peers != NULL);
1613                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1614                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1615                 }
1616                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1617                 LASSERT (list_empty (&kibnal_data.kib_connd_zombies));
1618                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1619                 LASSERT (list_empty (&kibnal_data.kib_connd_pcreqs));
1620                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1621
1622                 /* flag threads to terminate; wake and wait for them to die */
1623                 kibnal_data.kib_shutdown = 1;
1624                 wake_up_all (&kibnal_data.kib_sched_waitq);
1625                 wake_up_all (&kibnal_data.kib_connd_waitq);
1626
1627                 i = 2;
1628                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1629                         i++;
1630                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1631                                "Waiting for %d threads to terminate\n",
1632                                atomic_read (&kibnal_data.kib_nthreads));
1633                         cfs_pause(cfs_time_seconds(1));
1634                 }
1635                 /* fall through */
1636                 
1637         case IBNAL_INIT_NOTHING:
1638                 break;
1639         }
1640
1641         kibnal_free_tx_descs();
1642
1643         if (kibnal_data.kib_peers != NULL)
1644                 LIBCFS_FREE (kibnal_data.kib_peers,
1645                              sizeof (struct list_head) * 
1646                              kibnal_data.kib_peer_hash_size);
1647
1648         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1649                atomic_read (&libcfs_kmemory));
1650
1651         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1652         PORTAL_MODULE_UNUSE;
1653 }
1654
1655 int
1656 kibnal_startup (lnet_ni_t *ni)
1657 {
1658         char                      scratch[32];
1659         char                      ipif_name[32];
1660         char                     *hca_name;
1661         __u32                     ip;
1662         __u32                     netmask;
1663         int                       up;
1664         int                       nob;
1665         int                       devno;
1666         struct timeval            tv;
1667         int                       rc;
1668         int                       i;
1669         vv_request_event_record_t req_er;
1670         vv_return_t               vvrc;
1671
1672         LASSERT (ni->ni_lnd == &the_kiblnd);
1673
1674         /* Only 1 instance supported */
1675         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1676                 CERROR ("Only 1 instance supported\n");
1677                 return -EPERM;
1678         }
1679
1680         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1681                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1682                         *kibnal_tunables.kib_credits,
1683                         *kibnal_tunables.kib_ntx);
1684                 return -EINVAL;
1685         }
1686
1687         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1688         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1689
1690         CLASSERT (LNET_MAX_INTERFACES > 1);
1691         
1692         if (ni->ni_interfaces[0] != NULL) {
1693                 /* Use the HCA specified in 'networks=' */
1694
1695                 if (ni->ni_interfaces[1] != NULL) {
1696                         CERROR("Multiple interfaces not supported\n");
1697                         return -EPERM;
1698                 }
1699
1700                 /* Parse <hca base name><number> */
1701                 hca_name = ni->ni_interfaces[0];
1702                 nob = strlen(*kibnal_tunables.kib_hca_basename);
1703                 
1704                 if (strncmp(hca_name, *kibnal_tunables.kib_hca_basename, nob) ||
1705                     sscanf(hca_name + nob, "%d%n", &devno, &nob) < 1) {
1706                         CERROR("Unrecognised HCA %s\n", hca_name);
1707                         return -EINVAL;
1708                 }
1709
1710         } else {
1711                 /* Use <hca base name>0 */
1712                 devno = 0;
1713
1714                 hca_name = scratch;
1715                 snprintf(hca_name, sizeof(scratch), "%s%d",
1716                          *kibnal_tunables.kib_hca_basename, devno);
1717                 if (strlen(hca_name) == sizeof(scratch) - 1) {
1718                         CERROR("HCA name %s truncated\n", hca_name);
1719                         return -EINVAL;
1720                 }
1721         }
1722
1723         /* Find IP address from <ipif base name><hca number> */
1724         snprintf(ipif_name, sizeof(ipif_name), "%s%d",
1725                  *kibnal_tunables.kib_ipif_basename, devno);
1726         if (strlen(ipif_name) == sizeof(ipif_name) - 1) {
1727                 CERROR("IPoIB interface name %s truncated\n", ipif_name);
1728                 return -EINVAL;
1729         }
1730         
1731         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1732         if (rc != 0) {
1733                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1734                 return -ENETDOWN;
1735         }
1736         
1737         if (!up) {
1738                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1739                 return -ENETDOWN;
1740         }
1741         
1742         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1743         
1744         PORTAL_MODULE_USE;
1745         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1746
1747         kibnal_data.kib_ni = ni;
1748         ni->ni_data = &kibnal_data;
1749         
1750         do_gettimeofday(&tv);
1751         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1752
1753         rwlock_init(&kibnal_data.kib_global_lock);
1754
1755         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1756         LIBCFS_ALLOC (kibnal_data.kib_peers,
1757                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1758         if (kibnal_data.kib_peers == NULL) {
1759                 goto failed;
1760         }
1761         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1762                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1763
1764         spin_lock_init (&kibnal_data.kib_connd_lock);
1765         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1766         INIT_LIST_HEAD (&kibnal_data.kib_connd_pcreqs);
1767         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1768         INIT_LIST_HEAD (&kibnal_data.kib_connd_zombies);
1769         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1770
1771         spin_lock_init (&kibnal_data.kib_sched_lock);
1772         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1773
1774         spin_lock_init (&kibnal_data.kib_tx_lock);
1775         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1776
1777         rc = kibnal_alloc_tx_descs();
1778         if (rc != 0) {
1779                 CERROR("Can't allocate tx descs\n");
1780                 goto failed;
1781         }
1782         
1783         /* lists/ptrs/locks initialised */
1784         kibnal_data.kib_init = IBNAL_INIT_DATA;
1785         /*****************************************************/
1786
1787         for (i = 0; i < IBNAL_N_SCHED; i++) {
1788                 rc = kibnal_thread_start (kibnal_scheduler, (void *)((long)i));
1789                 if (rc != 0) {
1790                         CERROR("Can't spawn vibnal scheduler[%d]: %d\n",
1791                                i, rc);
1792                         goto failed;
1793                 }
1794         }
1795
1796         rc = kibnal_thread_start (kibnal_connd, NULL);
1797         if (rc != 0) {
1798                 CERROR ("Can't spawn vibnal connd: %d\n", rc);
1799                 goto failed;
1800         }
1801
1802         vvrc = vv_hca_open(hca_name, NULL, &kibnal_data.kib_hca);
1803         if (vvrc != vv_return_ok) {
1804                 CERROR ("Can't open HCA %s: %d\n", hca_name, vvrc);
1805                 goto failed;
1806         }
1807
1808         /* Channel Adapter opened */
1809         kibnal_data.kib_init = IBNAL_INIT_HCA;
1810
1811         /* register to get HCA's asynchronous events. */
1812         req_er.req_event_type = VV_ASYNC_EVENT_ALL_MASK;
1813         vvrc = vv_set_async_event_cb (kibnal_data.kib_hca, req_er,
1814                                      kibnal_async_callback);
1815         if (vvrc != vv_return_ok) {
1816                 CERROR ("Can't set HCA %s callback: %d\n", hca_name, vvrc);
1817                 goto failed; 
1818         }
1819
1820         kibnal_data.kib_init = IBNAL_INIT_ASYNC;
1821
1822         /*****************************************************/
1823
1824         vvrc = vv_hca_query(kibnal_data.kib_hca, &kibnal_data.kib_hca_attrs);
1825         if (vvrc != vv_return_ok) {
1826                 CERROR ("Can't size port attrs for %s: %d\n", hca_name, vvrc);
1827                 goto failed;
1828         }
1829
1830         kibnal_data.kib_port = -1;
1831
1832         for (i = 0; i<kibnal_data.kib_hca_attrs.port_num; i++) {
1833
1834                 int port_num = i+1;
1835                 u_int32_t tbl_count;
1836                 vv_port_attrib_t *pattr = &kibnal_data.kib_port_attr;
1837
1838                 vvrc = vv_port_query(kibnal_data.kib_hca, port_num, pattr);
1839                 if (vvrc != vv_return_ok) {
1840                         CERROR("vv_port_query failed for %s port %d: %d\n",
1841                                hca_name, port_num, vvrc);
1842                         continue;
1843                 }
1844
1845                 switch (pattr->port_state) {
1846                 case vv_state_linkDoun:
1847                         CDEBUG(D_NET, "port[%d] Down\n", port_num);
1848                         continue;
1849                 case vv_state_linkInit:
1850                         CDEBUG(D_NET, "port[%d] Init\n", port_num);
1851                         continue;
1852                 case vv_state_linkArm:
1853                         CDEBUG(D_NET, "port[%d] Armed\n", port_num);
1854                         continue;
1855                 case vv_state_linkActive:
1856                         CDEBUG(D_NET, "port[%d] Active\n", port_num);
1857
1858                         /* Found a suitable port. Get its GUID and PKEY. */
1859                         tbl_count = 1;
1860                         vvrc = vv_get_port_gid_tbl(kibnal_data.kib_hca, 
1861                                                    port_num, &tbl_count,
1862                                                    &kibnal_data.kib_port_gid);
1863                         if (vvrc != vv_return_ok) {
1864                                 CERROR("vv_get_port_gid_tbl failed "
1865                                        "for %s port %d: %d\n", 
1866                                        hca_name, port_num, vvrc);
1867                                 continue;
1868                         }
1869
1870                         tbl_count = 1;
1871                         vvrc = vv_get_port_partition_tbl(kibnal_data.kib_hca, 
1872                                                          port_num, &tbl_count,
1873                                                          &kibnal_data.kib_port_pkey);
1874                         if (vvrc != vv_return_ok) {
1875                                 CERROR("vv_get_port_partition_tbl failed "
1876                                        "for %s port %d: %d\n",
1877                                        hca_name, port_num, vvrc);
1878                                 continue;
1879                         }
1880
1881                         kibnal_data.kib_port = port_num;
1882
1883                         break;
1884                 case vv_state_linkActDefer: /* TODO: correct? */
1885                 case vv_state_linkNoChange:
1886                         CERROR("Unexpected %s port[%d] state %d\n",
1887                                hca_name, i, pattr->port_state);
1888                         continue;
1889                 }
1890                 break;
1891         }
1892
1893         if (kibnal_data.kib_port == -1) {
1894                 CERROR ("Can't find an active port on %s\n", hca_name);
1895                 goto failed;
1896         }
1897
1898         CDEBUG(D_NET, "Using %s port %d - GID="LPX64":"LPX64"\n",
1899                hca_name, kibnal_data.kib_port, 
1900                kibnal_data.kib_port_gid.scope.g.subnet, 
1901                kibnal_data.kib_port_gid.scope.g.eui64);
1902         
1903         /*****************************************************/
1904
1905 #if 1
1906         /* We use a pre-allocated PD */
1907         vvrc = vv_get_gen_pd_h(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1908 #else
1909         vvrc = vv_pd_allocate(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1910 #endif
1911         if (vvrc != vv_return_ok) {
1912                 CERROR ("Can't init PD: %d\n", vvrc);
1913                 goto failed;
1914         }
1915         
1916         /* flag PD initialised */
1917         kibnal_data.kib_init = IBNAL_INIT_PD;
1918         /*****************************************************/
1919
1920         rc = kibnal_setup_tx_descs();
1921         if (rc != 0) {
1922                 CERROR ("Can't register tx descs: %d\n", rc);
1923                 goto failed;
1924         }
1925         
1926         /* flag TX descs initialised */
1927         kibnal_data.kib_init = IBNAL_INIT_TXD;
1928         /*****************************************************/
1929
1930         {
1931                 uint32_t nentries;
1932
1933                 vvrc = vv_cq_create(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES(),
1934                                     kibnal_cq_callback, 
1935                                     NULL, /* context */
1936                                     &kibnal_data.kib_cq, &nentries);
1937                 if (vvrc != 0) {
1938                         CERROR ("Can't create RX CQ: %d\n", vvrc);
1939                         goto failed;
1940                 }
1941
1942                 /* flag CQ initialised */
1943                 kibnal_data.kib_init = IBNAL_INIT_CQ;
1944
1945                 if (nentries < IBNAL_CQ_ENTRIES()) {
1946                         CERROR ("CQ only has %d entries, need %d\n", 
1947                                 nentries, IBNAL_CQ_ENTRIES());
1948                         goto failed;
1949                 }
1950
1951                 vvrc = vv_request_completion_notification(kibnal_data.kib_hca, 
1952                                                           kibnal_data.kib_cq, 
1953                                                           vv_next_solicit_unsolicit_event);
1954                 if (vvrc != 0) {
1955                         CERROR ("Failed to re-arm completion queue: %d\n", rc);
1956                         goto failed;
1957                 }
1958         }
1959
1960         rc = kibnal_start_listener(ni);
1961         if (rc != 0) {
1962                 CERROR("Can't start listener: %d\n", rc);
1963                 goto failed;
1964         }
1965         
1966         /* flag everything initialised */
1967         kibnal_data.kib_init = IBNAL_INIT_ALL;
1968         /*****************************************************/
1969
1970         return (0);
1971
1972  failed:
1973         CDEBUG(D_NET, "kibnal_startup failed\n");
1974         kibnal_shutdown (ni);    
1975         return (-ENETDOWN);
1976 }
1977
1978 void __exit
1979 kibnal_module_fini (void)
1980 {
1981         lnet_unregister_lnd(&the_kiblnd);
1982         kibnal_tunables_fini();
1983 }
1984
1985 int __init
1986 kibnal_module_init (void)
1987 {
1988         int    rc;
1989
1990         vibnal_assert_wire_constants();
1991
1992         CLASSERT (offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t) 
1993                   <= cm_REQ_priv_data_len);
1994         CLASSERT (offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t) 
1995                   <= cm_REP_priv_data_len);
1996         CLASSERT (sizeof(kib_msg_t) <= IBNAL_MSG_SIZE);
1997 #if !IBNAL_USE_FMR
1998         CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBNAL_MAX_RDMA_FRAGS])
1999                   <= IBNAL_MSG_SIZE);
2000         CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBNAL_MAX_RDMA_FRAGS])
2001                   <= IBNAL_MSG_SIZE);
2002 #endif
2003         rc = kibnal_tunables_init();
2004         if (rc != 0)
2005                 return rc;
2006
2007         lnet_register_lnd(&the_kiblnd);
2008
2009         return 0;
2010 }
2011
2012 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2013 MODULE_DESCRIPTION("Kernel Voltaire IB LND v1.00");
2014 MODULE_LICENSE("GPL");
2015
2016 module_init(kibnal_module_init);
2017 module_exit(kibnal_module_fini);
2018