Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / iiblnd / iiblnd.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * lnet/klnds/iiblnd/iiblnd.c
38  *
39  * Author: Eric Barton <eric@bartonsoftware.com>
40  */
41
42 #include "iiblnd.h"
43
44 lnd_t the_kiblnd = {
45         .lnd_type          = IIBLND,
46         .lnd_startup       = kibnal_startup,
47         .lnd_shutdown      = kibnal_shutdown,
48         .lnd_ctl           = kibnal_ctl,
49         .lnd_send          = kibnal_send,
50         .lnd_recv          = kibnal_recv,
51         .lnd_eager_recv    = kibnal_eager_recv,
52 };
53
54 kib_data_t              kibnal_data;
55
56 __u32 
57 kibnal_cksum (void *ptr, int nob)
58 {
59         char  *c  = ptr;
60         __u32  sum = 0;
61
62         while (nob-- > 0)
63                 sum = ((sum << 1) | (sum >> 31)) + *c++;
64         
65         /* ensure I don't return 0 (== no checksum) */
66         return (sum == 0) ? 1 : sum;
67 }
68
69 void
70 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
71 {
72         msg->ibm_type = type;
73         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
74 }
75
76 void
77 kibnal_pack_msg(kib_msg_t *msg, __u32 version, int credits, 
78                 lnet_nid_t dstnid, __u64 dststamp, __u64 seq)
79 {
80         /* CAVEAT EMPTOR! all message fields not set here should have been
81          * initialised previously. */
82         msg->ibm_magic    = IBNAL_MSG_MAGIC;
83         msg->ibm_version  = version;
84         /*   ibm_type */
85         msg->ibm_credits  = credits;
86         /*   ibm_nob */
87         msg->ibm_cksum    = 0;
88         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(kibnal_data.kib_ni->ni_nid,
89                                                   dstnid);
90         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
91         msg->ibm_dstnid   = dstnid;
92         msg->ibm_dststamp = dststamp;
93         msg->ibm_seq      = seq;
94
95         if (*kibnal_tunables.kib_cksum) {
96                 /* NB ibm_cksum zero while computing cksum */
97                 msg->ibm_cksum = kibnal_cksum(msg, msg->ibm_nob);
98         }
99 }
100
101 void
102 kibnal_pack_connmsg(kib_msg_t *msg, __u32 version, int nob, 
103                     int type, lnet_nid_t dstnid, __u64 dststamp)
104 {
105         LASSERT (nob >= offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
106
107         memset(msg, 0, nob);
108         kibnal_init_msg(msg, type, sizeof(kib_connparams_t));
109
110         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
111         msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
112         msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
113
114         kibnal_pack_msg(msg, version, 0, dstnid, dststamp, 0);
115 }
116
117 int
118 kibnal_unpack_msg(kib_msg_t *msg, __u32 expected_version, int nob)
119 {
120         const int hdr_size = offsetof(kib_msg_t, ibm_u);
121         __u32     msg_cksum;
122         __u32     msg_version;
123         int       flip;
124         int       msg_nob;
125 #if !IBNAL_USE_FMR
126         int       i;
127         int       n;
128 #endif
129         /* 6 bytes are enough to have received magic + version */
130         if (nob < 6) {
131                 CERROR("Short message: %d\n", nob);
132                 return -EPROTO;
133         }
134
135         /* Future protocol version compatibility support!
136          * If the iiblnd-specific protocol changes, or when LNET unifies
137          * protocols over all LNDs, the initial connection will negotiate a
138          * protocol version.  If I find this, I avoid any console errors.  If
139          * my is doing connection establishment, the reject will tell the peer
140          * which version I'm running. */
141
142         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
143                 flip = 0;
144         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
145                 flip = 1;
146         } else {
147                 if (msg->ibm_magic == LNET_PROTO_MAGIC ||
148                     msg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
149                         return -EPROTO;
150
151                 /* Completely out to lunch */
152                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
153                 return -EPROTO;
154         }
155
156         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
157         if (expected_version == 0) {
158                 if (msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
159                     msg_version != IBNAL_MSG_VERSION)
160                         return -EPROTO;
161         } else if (msg_version != expected_version) {
162                 CERROR("Bad version: %x(%x expected)\n", 
163                        msg_version, expected_version);
164                 return -EPROTO;
165         }
166
167         if (nob < hdr_size) {
168                 CERROR("Short message: %d\n", nob);
169                 return -EPROTO;
170         }
171
172         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
173         if (msg_nob > nob) {
174                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
175                 return -EPROTO;
176         }
177
178         /* checksum must be computed with ibm_cksum zero and BEFORE anything
179          * gets flipped */
180         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
181         msg->ibm_cksum = 0;
182         if (msg_cksum != 0 &&
183             msg_cksum != kibnal_cksum(msg, msg_nob)) {
184                 CERROR("Bad checksum\n");
185                 return -EPROTO;
186         }
187         msg->ibm_cksum = msg_cksum;
188         
189         if (flip) {
190                 /* leave magic unflipped as a clue to peer endianness */
191                 msg->ibm_version = msg_version;
192                 CLASSERT (sizeof(msg->ibm_type) == 1);
193                 CLASSERT (sizeof(msg->ibm_credits) == 1);
194                 msg->ibm_nob = msg_nob;
195                 __swab64s(&msg->ibm_srcnid);
196                 __swab64s(&msg->ibm_srcstamp);
197                 __swab64s(&msg->ibm_dstnid);
198                 __swab64s(&msg->ibm_dststamp);
199                 __swab64s(&msg->ibm_seq);
200         }
201         
202         if (msg->ibm_srcnid == LNET_NID_ANY) {
203                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
204                 return -EPROTO;
205         }
206
207         switch (msg->ibm_type) {
208         default:
209                 CERROR("Unknown message type %x\n", msg->ibm_type);
210                 return -EPROTO;
211                 
212         case IBNAL_MSG_NOOP:
213                 break;
214
215         case IBNAL_MSG_IMMEDIATE:
216                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
217                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
218                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
219                         return -EPROTO;
220                 }
221                 break;
222
223         case IBNAL_MSG_PUT_REQ:
224                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
225                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
226                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
227                         return -EPROTO;
228                 }
229                 break;
230
231         case IBNAL_MSG_PUT_ACK:
232                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
233                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
234                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
235                         return -EPROTO;
236                 }
237 #if IBNAL_USE_FMR
238                 if (flip) {
239                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
240                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
241                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
242                 }
243 #else
244                 if (flip) {
245                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
246                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrag);
247                 }
248                 
249                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrag;
250                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
251                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
252                                n, IBNAL_MAX_RDMA_FRAGS);
253                         return -EPROTO;
254                 }
255                 
256                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
257                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
258                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
259                         return -EPROTO;
260                 }
261
262                 if (flip) {
263                         for (i = 0; i < n; i++) {
264                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
265                                 __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr);
266                         }
267                 }
268 #endif
269                 break;
270
271         case IBNAL_MSG_GET_REQ:
272                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
273                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
274                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
275                         return -EPROTO;
276                 }
277 #if IBNAL_USE_FMR
278                 if (flip) {
279                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
280                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
281                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
282                 }
283 #else                
284                 if (flip) {
285                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
286                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrag);
287                 }
288
289                 n = msg->ibm_u.get.ibgm_rd.rd_nfrag;
290                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
291                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
292                                n, IBNAL_MAX_RDMA_FRAGS);
293                         return -EPROTO;
294                 }
295                 
296                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
297                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
298                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
299                         return -EPROTO;
300                 }
301                 
302                 if (flip)
303                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrag; i++) {
304                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
305                                 __swab64s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr);
306                         }
307 #endif
308                 break;
309
310         case IBNAL_MSG_PUT_NAK:
311         case IBNAL_MSG_PUT_DONE:
312         case IBNAL_MSG_GET_DONE:
313                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
314                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
315                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
316                         return -EPROTO;
317                 }
318                 if (flip)
319                         __swab32s(&msg->ibm_u.completion.ibcm_status);
320                 break;
321
322         case IBNAL_MSG_CONNREQ:
323         case IBNAL_MSG_CONNACK:
324                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
325                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
326                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
327                         return -EPROTO;
328                 }
329                 if (flip) {
330                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
331                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
332                         __swab32s(&msg->ibm_u.connparams.ibcp_max_frags);
333                 }
334                 break;
335         }
336         return 0;
337 }
338
339 IB_HANDLE
340 kibnal_create_cep(lnet_nid_t nid)
341 {
342         FSTATUS        frc;
343         __u32          u32val;
344         IB_HANDLE      cep;
345
346         cep = iba_cm_create_cep(CM_RC_TYPE);
347         if (cep == NULL) {
348                 CERROR ("Can't create CEP for %s\n",
349                         (nid == LNET_NID_ANY) ? "listener" :
350                         libcfs_nid2str(nid));
351                 return NULL;
352         }
353
354         if (nid == LNET_NID_ANY) {
355                 u32val = 1;
356                 frc = iba_cm_modify_cep(cep, CM_FLAG_ASYNC_ACCEPT,
357                                         (char *)&u32val, sizeof(u32val), 0);
358                 if (frc != FSUCCESS) {
359                         CERROR("Can't set async_accept: %d\n", frc);
360                         goto failed;
361                 }
362
363                 u32val = 0;                     /* sets system max */
364                 frc = iba_cm_modify_cep(cep, CM_FLAG_LISTEN_BACKLOG,
365                                         (char *)&u32val, sizeof(u32val), 0);
366                 if (frc != FSUCCESS) {
367                         CERROR("Can't set listen backlog: %d\n", frc);
368                         goto failed;
369                 }
370         }
371         
372         u32val = 1;
373         frc = iba_cm_modify_cep(cep, CM_FLAG_TIMEWAIT_CALLBACK,
374                                 (char *)&u32val, sizeof(u32val), 0);
375         if (frc != FSUCCESS) {
376                 CERROR("Can't set timewait_callback for %s: %d\n", 
377                         (nid == LNET_NID_ANY) ? "listener" :
378                         libcfs_nid2str(nid), frc);
379                 goto failed;
380         }
381
382         return cep;
383         
384  failed:
385         iba_cm_destroy_cep(cep);
386         return NULL;
387 }
388
389 #define IBNAL_CHECK_ADVERT 1
390 #if IBNAL_CHECK_ADVERT
391 void
392 kibnal_service_query_done (void *arg, QUERY *qry, 
393                            QUERY_RESULT_VALUES *qry_result)
394 {
395         int                    *rcp = arg;
396         FSTATUS                 frc = qry_result->Status;
397         SERVICE_RECORD_RESULTS *svc_rslt;
398         IB_SERVICE_RECORD      *svc;
399         lnet_nid_t              nid;
400
401         if (frc != FSUCCESS || qry_result->ResultDataSize == 0) {
402                 CERROR("Error checking advert: status %d data size %d\n",
403                        frc, qry_result->ResultDataSize);
404                 *rcp = -EIO;
405                 goto out;
406         }
407
408         svc_rslt = (SERVICE_RECORD_RESULTS *)qry_result->QueryResult;
409
410         if (svc_rslt->NumServiceRecords < 1) {
411                 CERROR("Check advert: %d records\n",
412                        svc_rslt->NumServiceRecords);
413                 *rcp = -ENOENT;
414                 goto out;
415         }
416
417         svc = &svc_rslt->ServiceRecords[0];
418         nid = le64_to_cpu(*kibnal_service_nid_field(svc));
419         
420         CDEBUG(D_NET, "Check advert: %s "LPX64" "LPX64":%04x\n",
421                libcfs_nid2str(nid), svc->RID.ServiceID, 
422                svc->RID.ServiceGID.Type.Global.InterfaceID, 
423                svc->RID.ServiceP_Key);
424
425         if (nid != kibnal_data.kib_ni->ni_nid) {
426                 CERROR("Check advert: Bad NID %s (%s expected)\n",
427                        libcfs_nid2str(nid),
428                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
429                 *rcp = -EINVAL;
430                 goto out;
431         }
432
433         if (svc->RID.ServiceID != *kibnal_tunables.kib_service_number) {
434                 CERROR("Check advert: Bad ServiceID "LPX64" (%x expected)\n",
435                        svc->RID.ServiceID,
436                        *kibnal_tunables.kib_service_number);
437                 *rcp = -EINVAL;
438                 goto out;
439         }
440
441         if (svc->RID.ServiceGID.Type.Global.InterfaceID != 
442             kibnal_data.kib_port_guid) {
443                 CERROR("Check advert: Bad GUID "LPX64" ("LPX64" expected)\n",
444                        svc->RID.ServiceGID.Type.Global.InterfaceID,
445                        kibnal_data.kib_port_guid);
446                 *rcp = -EINVAL;
447                 goto out;
448         }
449
450         if (svc->RID.ServiceP_Key != kibnal_data.kib_port_pkey) {
451                 CERROR("Check advert: Bad PKEY %04x (%04x expected)\n",
452                        svc->RID.ServiceP_Key, kibnal_data.kib_port_pkey);
453                 *rcp = -EINVAL;
454                 goto out;
455         }
456
457         CDEBUG(D_NET, "Check advert OK\n");
458         *rcp = 0;
459                 
460  out:
461         up (&kibnal_data.kib_listener_signal);                
462 }
463
464 int
465 kibnal_check_advert (void)
466 {
467         /* single-threaded */
468         static QUERY               qry;
469
470         FSTATUS                    frc;
471         int                        rc;
472
473         memset (&qry, 0, sizeof(qry));
474         qry.InputType = InputTypeServiceRecord;
475         qry.OutputType = OutputTypeServiceRecord;
476         kibnal_set_service_keys(&qry.InputValue.ServiceRecordValue.ServiceRecord,
477                                 kibnal_data.kib_ni->ni_nid);
478         qry.InputValue.ServiceRecordValue.ComponentMask = KIBNAL_SERVICE_KEY_MASK;
479
480         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd, 
481                                             kibnal_data.kib_port_guid,
482                                             &qry, 
483                                             kibnal_service_query_done,
484                                             &kibnal_data.kib_sdretry, 
485                                             &rc);
486         if (frc != FPENDING) {
487                 CERROR ("Immediate error %d checking SM service\n", frc);
488                 return -EIO;
489         }
490         
491         down (&kibnal_data.kib_listener_signal);
492         
493         if (rc != 0)
494                 CERROR ("Error %d checking SM service\n", rc);
495         return rc;
496 }
497 #else
498 int
499 kibnal_check_advert(void)
500 {
501         return 0;
502 }
503 #endif
504
505 void 
506 kibnal_fill_fod(FABRIC_OPERATION_DATA *fod, FABRIC_OPERATION_TYPE type)
507 {
508         IB_SERVICE_RECORD     *svc;
509
510         memset (fod, 0, sizeof(*fod));
511         fod->Type = type;
512
513         svc = &fod->Value.ServiceRecordValue.ServiceRecord;
514         svc->RID.ServiceID = *kibnal_tunables.kib_service_number;
515         svc->RID.ServiceGID.Type.Global.InterfaceID = kibnal_data.kib_port_guid;
516         svc->RID.ServiceGID.Type.Global.SubnetPrefix = DEFAULT_SUBNET_PREFIX;
517         svc->RID.ServiceP_Key = kibnal_data.kib_port_pkey;
518         svc->ServiceLease = 0xffffffff;
519
520         kibnal_set_service_keys(svc, kibnal_data.kib_ni->ni_nid);
521 }
522
523 void
524 kibnal_service_setunset_done (void *arg, FABRIC_OPERATION_DATA *fod,
525                               FSTATUS frc, uint32 madrc)
526 {
527         *(FSTATUS *)arg = frc;
528         up (&kibnal_data.kib_listener_signal);
529 }
530
531 int
532 kibnal_advertise (void)
533 {
534         /* Single threaded here */
535         static FABRIC_OPERATION_DATA fod;
536
537         IB_SERVICE_RECORD *svc = &fod.Value.ServiceRecordValue.ServiceRecord;
538         FSTATUS            frc;
539         FSTATUS            frc2;
540
541         if (strlen(*kibnal_tunables.kib_service_name) >=
542             sizeof(svc->ServiceName)) {
543                 CERROR("Service name '%s' too long (%d chars max)\n",
544                        *kibnal_tunables.kib_service_name,
545                        (int)sizeof(svc->ServiceName) - 1);
546                 return -EINVAL;
547         }
548
549         kibnal_fill_fod(&fod, FabOpSetServiceRecord);
550
551         CDEBUG(D_NET, "Advertising service id "LPX64" %s:%s\n", 
552                svc->RID.ServiceID, svc->ServiceName, 
553                libcfs_nid2str(le64_to_cpu(*kibnal_service_nid_field(svc))));
554
555         frc = iba_sd_port_fabric_operation(kibnal_data.kib_sd,
556                                            kibnal_data.kib_port_guid,
557                                            &fod, 
558                                            kibnal_service_setunset_done, 
559                                            &kibnal_data.kib_sdretry,
560                                            &frc2);
561
562         if (frc != FSUCCESS && frc != FPENDING) {
563                 CERROR ("Immediate error %d advertising NID %s\n",
564                         frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
565                 return -EIO;
566         }
567
568         down (&kibnal_data.kib_listener_signal);
569
570         frc = frc2;
571         if (frc == FSUCCESS)
572                 return 0;
573         
574         CERROR ("Error %d advertising %s\n",
575                 frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
576         return -EIO;
577 }
578
579 void
580 kibnal_unadvertise (int expect_success)
581 {
582         /* single threaded */
583         static FABRIC_OPERATION_DATA fod;
584
585         IB_SERVICE_RECORD *svc = &fod.Value.ServiceRecordValue.ServiceRecord;
586         FSTATUS            frc;
587         FSTATUS            frc2;
588
589         LASSERT (kibnal_data.kib_ni->ni_nid != LNET_NID_ANY);
590
591         kibnal_fill_fod(&fod, FabOpDeleteServiceRecord);
592
593         CDEBUG(D_NET, "Unadvertising service %s:%s\n",
594                svc->ServiceName, 
595                libcfs_nid2str(le64_to_cpu(*kibnal_service_nid_field(svc))));
596         
597         frc = iba_sd_port_fabric_operation(kibnal_data.kib_sd,
598                                            kibnal_data.kib_port_guid,
599                                            &fod, 
600                                            kibnal_service_setunset_done, 
601                                            &kibnal_data.kib_sdretry, 
602                                            &frc2);
603         if (frc != FSUCCESS && frc != FPENDING) {
604                 CERROR ("Immediate error %d unadvertising NID %s\n",
605                         frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
606                 return;
607         }
608
609         down (&kibnal_data.kib_listener_signal);
610
611         CDEBUG(D_NET, "Unadvertise rc: %d\n", frc2);
612
613         if ((frc2 == FSUCCESS) == !!expect_success)
614                 return;
615
616         if (expect_success)
617                 CERROR("Error %d unadvertising NID %s\n",
618                        frc2, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
619         else
620                 CWARN("Removed conflicting NID %s\n",
621                       libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
622 }
623
624 void
625 kibnal_stop_listener(int normal_shutdown)
626 {
627         /* NB this also disables peer creation and destroys all existing
628          * peers */
629         IB_HANDLE      cep = kibnal_data.kib_listener_cep;
630         unsigned long  flags;
631         FSTATUS        frc;
632
633         LASSERT (cep != NULL);
634
635         kibnal_unadvertise(normal_shutdown);
636
637         frc = iba_cm_cancel(cep);
638         if (frc != FSUCCESS && frc != FPENDING)
639                 CERROR ("Error %d stopping listener\n", frc);
640
641         down(&kibnal_data.kib_listener_signal);
642
643         frc = iba_cm_destroy_cep(cep);
644         if (frc != FSUCCESS)
645                 CERROR ("Error %d destroying listener CEP\n", frc);
646
647         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
648         /* This assignment disables peer creation */
649         kibnal_data.kib_listener_cep = NULL;
650         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
651
652         /* Start to tear down any peers created while the listener was
653          * running */
654         kibnal_del_peer(LNET_NID_ANY);
655 }
656
657 int
658 kibnal_start_listener(void)
659 {
660         /* NB this also enables peer creation */
661
662         IB_HANDLE      cep;
663         CM_LISTEN_INFO info;
664         unsigned long  flags;
665         int            rc;
666         FSTATUS        frc;
667
668         LASSERT (kibnal_data.kib_listener_cep == NULL);
669         init_MUTEX_LOCKED (&kibnal_data.kib_listener_signal);
670
671         cep = kibnal_create_cep(LNET_NID_ANY);
672         if (cep == NULL)
673                 return -ENOMEM;
674
675         memset (&info, 0, sizeof(info));
676         info.ListenAddr.EndPt.SID = *kibnal_tunables.kib_service_number;
677
678         frc = iba_cm_listen(cep, &info, kibnal_listen_callback, NULL);
679         if (frc != FSUCCESS && frc != FPENDING) {
680                 CERROR ("iba_cm_listen error: %d\n", frc);
681
682                 iba_cm_destroy_cep(cep);
683                 return -EIO;
684         }
685
686         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
687         /* This assignment enables peer creation */
688         kibnal_data.kib_listener_cep = cep;
689         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
690
691         rc = kibnal_advertise();
692         if (rc == 0)
693                 rc = kibnal_check_advert();
694
695         if (rc == 0)
696                 return 0;
697
698         kibnal_stop_listener(0);
699         return rc;
700 }
701
702 int
703 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
704 {
705         kib_peer_t    *peer;
706         unsigned long  flags;
707         int            rc;
708
709         LASSERT (nid != LNET_NID_ANY);
710
711         LIBCFS_ALLOC (peer, sizeof (*peer));
712         if (peer == NULL) {
713                 CERROR("Cannot allocate peer\n");
714                 return -ENOMEM;
715         }
716
717         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
718
719         peer->ibp_nid = nid;
720         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
721
722         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
723         INIT_LIST_HEAD (&peer->ibp_conns);
724         INIT_LIST_HEAD (&peer->ibp_tx_queue);
725
726         peer->ibp_error = 0;
727         peer->ibp_last_alive = cfs_time_current();
728         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
729
730         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
731         
732         if (atomic_read(&kibnal_data.kib_npeers) >=
733             *kibnal_tunables.kib_concurrent_peers) {
734                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
735         } else if (kibnal_data.kib_listener_cep == NULL) {
736                 rc = -ESHUTDOWN;        /* shutdown has started */
737         } else {
738                 rc = 0;
739                 /* npeers only grows with the global lock held */
740                 atomic_inc(&kibnal_data.kib_npeers);
741         }
742         
743         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
744
745         if (rc != 0) {
746                 CERROR("Can't create peer: %s\n", 
747                        (rc == -ESHUTDOWN) ? "shutting down" : 
748                        "too many peers");
749                 LIBCFS_FREE(peer, sizeof(*peer));
750         } else {
751                 *peerp = peer;
752         }
753         
754         return rc;
755 }
756
757 void
758 kibnal_destroy_peer (kib_peer_t *peer)
759 {
760
761         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
762         LASSERT (peer->ibp_persistence == 0);
763         LASSERT (!kibnal_peer_active(peer));
764         LASSERT (!kibnal_peer_connecting(peer));
765         LASSERT (list_empty (&peer->ibp_conns));
766         LASSERT (list_empty (&peer->ibp_tx_queue));
767
768         LIBCFS_FREE (peer, sizeof (*peer));
769
770         /* NB a peer's connections keep a reference on their peer until
771          * they are destroyed, so we can be assured that _all_ state to do
772          * with this peer has been cleaned up when its refcount drops to
773          * zero. */
774         atomic_dec (&kibnal_data.kib_npeers);
775 }
776
777 /* the caller is responsible for accounting for the additional reference
778  * that this creates */
779 kib_peer_t *
780 kibnal_find_peer_locked (lnet_nid_t nid)
781 {
782         struct list_head *peer_list = kibnal_nid2peerlist (nid);
783         struct list_head *tmp;
784         kib_peer_t       *peer;
785
786         list_for_each (tmp, peer_list) {
787
788                 peer = list_entry (tmp, kib_peer_t, ibp_list);
789
790                 LASSERT (peer->ibp_persistence != 0 ||
791                          kibnal_peer_connecting(peer) ||
792                          !list_empty (&peer->ibp_conns));
793
794                 if (peer->ibp_nid != nid)
795                         continue;
796
797                 CDEBUG(D_NET, "got peer %s (%d)\n",
798                        libcfs_nid2str(nid), atomic_read (&peer->ibp_refcount));
799                 return (peer);
800         }
801         return (NULL);
802 }
803
804 void
805 kibnal_unlink_peer_locked (kib_peer_t *peer)
806 {
807         LASSERT (peer->ibp_persistence == 0);
808         LASSERT (list_empty(&peer->ibp_conns));
809
810         LASSERT (kibnal_peer_active(peer));
811         list_del_init (&peer->ibp_list);
812         /* lose peerlist's ref */
813         kibnal_peer_decref(peer);
814 }
815
816 int
817 kibnal_get_peer_info (int index, lnet_nid_t *nidp, int *persistencep)
818 {
819         kib_peer_t        *peer;
820         struct list_head  *ptmp;
821         unsigned long      flags;
822         int                i;
823
824         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
825
826         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
827
828                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
829
830                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
831                         LASSERT (peer->ibp_persistence != 0 ||
832                                  kibnal_peer_connecting(peer) ||
833                                  !list_empty (&peer->ibp_conns));
834
835                         if (index-- > 0)
836                                 continue;
837
838                         *nidp = peer->ibp_nid;
839                         *persistencep = peer->ibp_persistence;
840
841                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
842                                                flags);
843                         return (0);
844                 }
845         }
846
847         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
848         return (-ENOENT);
849 }
850
851 int
852 kibnal_add_persistent_peer (lnet_nid_t nid)
853 {
854         unsigned long      flags;
855         kib_peer_t        *peer;
856         kib_peer_t        *peer2;
857         int                rc;
858         
859         if (nid == LNET_NID_ANY)
860                 return (-EINVAL);
861
862         rc = kibnal_create_peer(&peer, nid);
863         if (rc != 0)
864                 return rc;
865
866         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
867
868         /* I'm always called with a reference on kibnal_data.kib_ni
869          * so shutdown can't have started */
870         LASSERT (kibnal_data.kib_listener_cep != NULL);
871
872         peer2 = kibnal_find_peer_locked (nid);
873         if (peer2 != NULL) {
874                 kibnal_peer_decref (peer);
875                 peer = peer2;
876         } else {
877                 /* peer table takes existing ref on peer */
878                 list_add_tail (&peer->ibp_list,
879                                kibnal_nid2peerlist (nid));
880         }
881
882         peer->ibp_persistence++;
883         
884         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
885         return (0);
886 }
887
888 void
889 kibnal_del_peer_locked (kib_peer_t *peer)
890 {
891         struct list_head *ctmp;
892         struct list_head *cnxt;
893         kib_conn_t       *conn;
894
895         peer->ibp_persistence = 0;
896
897         if (list_empty(&peer->ibp_conns)) {
898                 kibnal_unlink_peer_locked(peer);
899         } else {
900                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
901                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
902
903                         kibnal_close_conn_locked (conn, 0);
904                 }
905                 /* NB peer is no longer persistent; closing its last conn
906                  * unlinked it. */
907         }
908         /* NB peer now unlinked; might even be freed if the peer table had the
909          * last ref on it. */
910 }
911
912 int
913 kibnal_del_peer (lnet_nid_t nid)
914 {
915         unsigned long      flags;
916         CFS_LIST_HEAD     (zombies);
917         struct list_head  *ptmp;
918         struct list_head  *pnxt;
919         kib_peer_t        *peer;
920         int                lo;
921         int                hi;
922         int                i;
923         int                rc = -ENOENT;
924
925         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
926
927         if (nid != LNET_NID_ANY)
928                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
929         else {
930                 lo = 0;
931                 hi = kibnal_data.kib_peer_hash_size - 1;
932         }
933
934         for (i = lo; i <= hi; i++) {
935                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
936                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
937                         LASSERT (peer->ibp_persistence != 0 ||
938                                  kibnal_peer_connecting(peer) ||
939                                  !list_empty (&peer->ibp_conns));
940
941                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
942                                 continue;
943
944                         if (!list_empty(&peer->ibp_tx_queue)) {
945                                 LASSERT (list_empty(&peer->ibp_conns));
946
947                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
948                         }
949
950                         kibnal_del_peer_locked (peer);
951                         rc = 0;         /* matched something */
952                 }
953         }
954
955         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
956
957         kibnal_txlist_done(&zombies, -EIO);
958
959         return (rc);
960 }
961
962 kib_conn_t *
963 kibnal_get_conn_by_idx (int index)
964 {
965         kib_peer_t        *peer;
966         struct list_head  *ptmp;
967         kib_conn_t        *conn;
968         struct list_head  *ctmp;
969         unsigned long      flags;
970         int                i;
971
972         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
973
974         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
975                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
976
977                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
978                         LASSERT (peer->ibp_persistence != 0 ||
979                                  kibnal_peer_connecting(peer) ||
980                                  !list_empty (&peer->ibp_conns));
981
982                         list_for_each (ctmp, &peer->ibp_conns) {
983                                 if (index-- > 0)
984                                         continue;
985
986                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
987                                 kibnal_conn_addref(conn);
988                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
989                                                        flags);
990                                 return (conn);
991                         }
992                 }
993         }
994
995         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
996         return (NULL);
997 }
998
999 int
1000 kibnal_conn_rts(kib_conn_t *conn, 
1001                 __u32 qpn, __u8 resp_res, __u8 init_depth, __u32 psn)
1002 {
1003         IB_PATH_RECORD         *path = &conn->ibc_cvars->cv_path;
1004         IB_HANDLE               qp = conn->ibc_qp;
1005         IB_QP_ATTRIBUTES_MODIFY modify_attr;
1006         FSTATUS                 frc;
1007         int                     rc;
1008
1009         if (resp_res > kibnal_data.kib_hca_attrs.MaxQPResponderResources)
1010                 resp_res = kibnal_data.kib_hca_attrs.MaxQPResponderResources;
1011
1012         if (init_depth > kibnal_data.kib_hca_attrs.MaxQPInitiatorDepth)
1013                 init_depth = kibnal_data.kib_hca_attrs.MaxQPInitiatorDepth;
1014
1015         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
1016                 .RequestState       = QPStateReadyToRecv,
1017                 .RecvPSN            = IBNAL_STARTING_PSN,
1018                 .DestQPNumber       = qpn,
1019                 .ResponderResources = resp_res,
1020                 .MinRnrTimer        = UsecToRnrNakTimer(2000), /* 20 ms */
1021                 .Attrs              = (IB_QP_ATTR_RECVPSN |
1022                                        IB_QP_ATTR_DESTQPNUMBER | 
1023                                        IB_QP_ATTR_RESPONDERRESOURCES | 
1024                                        IB_QP_ATTR_DESTAV | 
1025                                        IB_QP_ATTR_PATHMTU | 
1026                                        IB_QP_ATTR_MINRNRTIMER),
1027         };
1028         GetAVFromPath(0, path, &modify_attr.PathMTU, NULL, 
1029                       &modify_attr.DestAV);
1030
1031         frc = iba_modify_qp(qp, &modify_attr, NULL);
1032         if (frc != FSUCCESS) {
1033                 CERROR("Can't set QP %s ready to receive: %d\n",
1034                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1035                 return -EIO;
1036         }
1037
1038         rc = kibnal_post_receives(conn);
1039         if (rc != 0) {
1040                 CERROR("Can't post receives for %s: %d\n",
1041                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1042                 return rc;
1043         }
1044
1045         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
1046                 .RequestState           = QPStateReadyToSend,
1047                 .FlowControl            = TRUE,
1048                 .InitiatorDepth         = init_depth,
1049                 .SendPSN                = psn,
1050                 .LocalAckTimeout        = path->PktLifeTime + 2, /* 2 or 1? */
1051                 .RetryCount             = IBNAL_RETRY,
1052                 .RnrRetryCount          = IBNAL_RNR_RETRY,
1053                 .Attrs                  = (IB_QP_ATTR_FLOWCONTROL | 
1054                                            IB_QP_ATTR_INITIATORDEPTH | 
1055                                            IB_QP_ATTR_SENDPSN | 
1056                                            IB_QP_ATTR_LOCALACKTIMEOUT | 
1057                                            IB_QP_ATTR_RETRYCOUNT | 
1058                                            IB_QP_ATTR_RNRRETRYCOUNT),
1059         };
1060
1061         frc = iba_modify_qp(qp, &modify_attr, NULL);
1062         if (frc != FSUCCESS) {
1063                 CERROR("Can't set QP %s ready to send: %d\n",
1064                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1065                 return -EIO;
1066         }
1067
1068         frc = iba_query_qp(conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs, NULL);
1069         if (frc != FSUCCESS) {
1070                 CERROR ("Can't query QP %s attributes: %d\n",
1071                         libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1072                 return -EIO;
1073         }
1074         
1075         return 0;
1076 }
1077
1078 kib_conn_t *
1079 kibnal_create_conn (lnet_nid_t nid, int proto_version)
1080 {
1081         kib_conn_t  *conn;
1082         int          i;
1083         int          page_offset;
1084         int          ipage;
1085         int          rc;
1086         FSTATUS      frc;
1087         union {
1088                 IB_QP_ATTRIBUTES_CREATE    qp_create;
1089                 IB_QP_ATTRIBUTES_MODIFY    qp_attr;
1090         } params;
1091         
1092         LIBCFS_ALLOC (conn, sizeof (*conn));
1093         if (conn == NULL) {
1094                 CERROR ("Can't allocate connection for %s\n",
1095                         libcfs_nid2str(nid));
1096                 return (NULL);
1097         }
1098
1099         /* zero flags, NULL pointers etc... */
1100         memset (conn, 0, sizeof (*conn));
1101         conn->ibc_state = IBNAL_CONN_INIT_NOTHING;
1102         conn->ibc_version = proto_version;
1103
1104         INIT_LIST_HEAD (&conn->ibc_early_rxs);
1105         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
1106         INIT_LIST_HEAD (&conn->ibc_tx_queue);
1107         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
1108         INIT_LIST_HEAD (&conn->ibc_active_txs);
1109         spin_lock_init (&conn->ibc_lock);
1110         
1111         atomic_inc (&kibnal_data.kib_nconns);
1112         /* well not really, but I call destroy() on failure, which decrements */
1113
1114         LIBCFS_ALLOC(conn->ibc_cvars, sizeof (*conn->ibc_cvars));
1115         if (conn->ibc_cvars == NULL) {
1116                 CERROR ("Can't allocate connvars for %s\n", 
1117                         libcfs_nid2str(nid));
1118                 goto failed;
1119         }
1120         memset(conn->ibc_cvars, 0, sizeof (*conn->ibc_cvars));
1121
1122         LIBCFS_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
1123         if (conn->ibc_rxs == NULL) {
1124                 CERROR("Cannot allocate RX descriptors for %s\n",
1125                        libcfs_nid2str(nid));
1126                 goto failed;
1127         }
1128         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
1129
1130         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES);
1131         if (rc != 0) {
1132                 CERROR("Can't allocate RX buffers for %s\n",
1133                        libcfs_nid2str(nid));
1134                 goto failed;
1135         }
1136         
1137         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
1138                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
1139                 kib_rx_t    *rx = &conn->ibc_rxs[i];
1140
1141                 rx->rx_conn = conn;
1142                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1143                              page_offset);
1144
1145                 rx->rx_hca_msg = kibnal_data.kib_whole_mem.md_addr +
1146                                  lnet_page2phys(page) + page_offset;
1147                 
1148                 page_offset += IBNAL_MSG_SIZE;
1149                 LASSERT (page_offset <= PAGE_SIZE);
1150
1151                 if (page_offset == PAGE_SIZE) {
1152                         page_offset = 0;
1153                         ipage++;
1154                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
1155                 }
1156         }
1157
1158         params.qp_create = (IB_QP_ATTRIBUTES_CREATE) {
1159                 .Type                    = QPTypeReliableConnected,
1160                 .SendQDepth              = (1 + IBNAL_MAX_RDMA_FRAGS) *
1161                                            (*kibnal_tunables.kib_concurrent_sends),
1162                 .RecvQDepth              = IBNAL_RX_MSGS,
1163                 .SendDSListDepth         = 1,
1164                 .RecvDSListDepth         = 1,
1165                 .SendCQHandle            = kibnal_data.kib_cq,
1166                 .RecvCQHandle            = kibnal_data.kib_cq,
1167                 .PDHandle                = kibnal_data.kib_pd,
1168                 .SendSignaledCompletions = TRUE,
1169         };
1170         frc = iba_create_qp(kibnal_data.kib_hca, &params.qp_create, NULL,
1171                             &conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs);
1172         if (frc != 0) {
1173                 CERROR ("Can't create QP %s: %d\n", libcfs_nid2str(nid), frc);
1174                 goto failed;
1175         }
1176
1177         /* Mark QP created */
1178         kibnal_set_conn_state(conn, IBNAL_CONN_INIT_QP);
1179
1180         params.qp_attr = (IB_QP_ATTRIBUTES_MODIFY) {
1181                 .RequestState             = QPStateInit,
1182                 .Attrs                    = (IB_QP_ATTR_PORTGUID |
1183                                              IB_QP_ATTR_PKEYINDEX |
1184                                              IB_QP_ATTR_ACCESSCONTROL),
1185                 .PortGUID                 = kibnal_data.kib_port_guid,
1186                 .PkeyIndex                = 0,
1187                 .AccessControl = { 
1188                         .s = {
1189                                 .RdmaWrite = 1,
1190                                 .RdmaRead  = 1,
1191                         },
1192                 },
1193         };
1194         frc = iba_modify_qp(conn->ibc_qp, &params.qp_attr, NULL);
1195         if (frc != 0) {
1196                 CERROR ("Can't set QP %s state to INIT: %d\n",
1197                         libcfs_nid2str(nid), frc);
1198                 goto failed;
1199         }
1200
1201         frc = iba_query_qp(conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs, NULL);
1202         if (frc != FSUCCESS) {
1203                 CERROR ("Can't query QP %s attributes: %d\n",
1204                         libcfs_nid2str(nid), frc);
1205                 goto failed;
1206         }
1207
1208         /* 1 ref for caller */
1209         atomic_set (&conn->ibc_refcount, 1);
1210         CDEBUG(D_NET, "New conn %p\n", conn);
1211         return (conn);
1212         
1213  failed:
1214         kibnal_destroy_conn (conn);
1215         return (NULL);
1216 }
1217
1218 void
1219 kibnal_destroy_conn (kib_conn_t *conn)
1220 {
1221         FSTATUS frc;
1222
1223         LASSERT (!in_interrupt());
1224         
1225         CDEBUG (D_NET, "connection %s\n", 
1226                 (conn->ibc_peer) == NULL ? "<ANON>" :
1227                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
1228
1229         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1230         LASSERT (list_empty(&conn->ibc_early_rxs));
1231         LASSERT (list_empty(&conn->ibc_tx_queue));
1232         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1233         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1234         LASSERT (list_empty(&conn->ibc_active_txs));
1235         LASSERT (conn->ibc_nsends_posted == 0);
1236
1237         switch (conn->ibc_state) {
1238         case IBNAL_CONN_INIT_NOTHING:
1239         case IBNAL_CONN_INIT_QP:
1240         case IBNAL_CONN_DISCONNECTED:
1241                 break;
1242
1243         default:
1244                 /* conn must either have never engaged with the CM, or have
1245                  * completely disengaged from it */
1246                 CERROR("Bad conn %s state %d\n",
1247                        (conn->ibc_peer) == NULL ? "<anon>" :
1248                        libcfs_nid2str(conn->ibc_peer->ibp_nid), conn->ibc_state);
1249                 LBUG();
1250         }
1251
1252         if (conn->ibc_cep != NULL) {
1253                 frc = iba_cm_destroy_cep(conn->ibc_cep);
1254                 if (frc != FSUCCESS)
1255                         CERROR("Error destroying CEP %p: %d\n",
1256                                conn->ibc_cep, frc);
1257         }
1258
1259         if (conn->ibc_qp != NULL) {
1260                 frc = iba_destroy_qp(conn->ibc_qp);
1261                 if (frc != FSUCCESS)
1262                         CERROR("Error destroying QP %p: %d\n",
1263                                conn->ibc_qp, frc);
1264         }
1265
1266         if (conn->ibc_rx_pages != NULL) 
1267                 kibnal_free_pages(conn->ibc_rx_pages);
1268         
1269         if (conn->ibc_rxs != NULL)
1270                 LIBCFS_FREE(conn->ibc_rxs, 
1271                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1272
1273         if (conn->ibc_cvars != NULL)
1274                 LIBCFS_FREE(conn->ibc_cvars, sizeof(*conn->ibc_cvars));
1275
1276         if (conn->ibc_peer != NULL)
1277                 kibnal_peer_decref(conn->ibc_peer);
1278
1279         LIBCFS_FREE(conn, sizeof (*conn));
1280
1281         atomic_dec(&kibnal_data.kib_nconns);
1282 }
1283
1284 int
1285 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1286 {
1287         kib_conn_t         *conn;
1288         struct list_head   *ctmp;
1289         struct list_head   *cnxt;
1290         int                 count = 0;
1291
1292         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1293                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1294
1295                 count++;
1296                 kibnal_close_conn_locked (conn, why);
1297         }
1298
1299         return (count);
1300 }
1301
1302 int
1303 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1304 {
1305         kib_conn_t         *conn;
1306         struct list_head   *ctmp;
1307         struct list_head   *cnxt;
1308         int                 count = 0;
1309
1310         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1311                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1312
1313                 if (conn->ibc_incarnation == incarnation)
1314                         continue;
1315
1316                 CDEBUG(D_NET, "Closing stale conn nid:%s incarnation:"LPX64"("LPX64")\n",
1317                        libcfs_nid2str(peer->ibp_nid), 
1318                        conn->ibc_incarnation, incarnation);
1319                 
1320                 count++;
1321                 kibnal_close_conn_locked (conn, -ESTALE);
1322         }
1323
1324         return (count);
1325 }
1326
1327 int
1328 kibnal_close_matching_conns (lnet_nid_t nid)
1329 {
1330         unsigned long       flags;
1331         kib_peer_t         *peer;
1332         struct list_head   *ptmp;
1333         struct list_head   *pnxt;
1334         int                 lo;
1335         int                 hi;
1336         int                 i;
1337         int                 count = 0;
1338
1339         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1340
1341         if (nid != LNET_NID_ANY)
1342                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1343         else {
1344                 lo = 0;
1345                 hi = kibnal_data.kib_peer_hash_size - 1;
1346         }
1347
1348         for (i = lo; i <= hi; i++) {
1349                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1350
1351                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1352                         LASSERT (peer->ibp_persistence != 0 ||
1353                                  kibnal_peer_connecting(peer) ||
1354                                  !list_empty (&peer->ibp_conns));
1355
1356                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1357                                 continue;
1358
1359                         count += kibnal_close_peer_conns_locked (peer, 0);
1360                 }
1361         }
1362
1363         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1364
1365         /* wildcards always succeed */
1366         if (nid == LNET_NID_ANY)
1367                 return (0);
1368         
1369         return (count == 0 ? -ENOENT : 0);
1370 }
1371
1372 int
1373 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1374 {
1375         struct libcfs_ioctl_data *data = arg;
1376         int                       rc = -EINVAL;
1377         ENTRY;
1378
1379         LASSERT (ni == kibnal_data.kib_ni);
1380
1381         switch(cmd) {
1382         case IOC_LIBCFS_GET_PEER: {
1383                 lnet_nid_t   nid = 0;
1384                 int          share_count = 0;
1385
1386                 rc = kibnal_get_peer_info(data->ioc_count,
1387                                           &nid, &share_count);
1388                 data->ioc_nid   = nid;
1389                 data->ioc_count = share_count;
1390                 break;
1391         }
1392         case IOC_LIBCFS_ADD_PEER: {
1393                 rc = kibnal_add_persistent_peer (data->ioc_nid);
1394                 break;
1395         }
1396         case IOC_LIBCFS_DEL_PEER: {
1397                 rc = kibnal_del_peer (data->ioc_nid);
1398                 break;
1399         }
1400         case IOC_LIBCFS_GET_CONN: {
1401                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1402
1403                 if (conn == NULL)
1404                         rc = -ENOENT;
1405                 else {
1406                         rc = 0;
1407                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1408                         kibnal_conn_decref(conn);
1409                 }
1410                 break;
1411         }
1412         case IOC_LIBCFS_CLOSE_CONNECTION: {
1413                 rc = kibnal_close_matching_conns (data->ioc_nid);
1414                 break;
1415         }
1416         case IOC_LIBCFS_REGISTER_MYNID: {
1417                 if (ni->ni_nid == data->ioc_nid) {
1418                         rc = 0;
1419                 } else {
1420                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1421                                libcfs_nid2str(data->ioc_nid),
1422                                libcfs_nid2str(ni->ni_nid));
1423                         rc = -EINVAL;
1424                 }
1425                 break;
1426         }
1427         }
1428
1429         RETURN(rc);
1430 }
1431
1432 void
1433 kibnal_free_pages (kib_pages_t *p)
1434 {
1435         int     npages = p->ibp_npages;
1436         int     i;
1437         
1438         for (i = 0; i < npages; i++)
1439                 if (p->ibp_pages[i] != NULL)
1440                         __free_page(p->ibp_pages[i]);
1441         
1442         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1443 }
1444
1445 int
1446 kibnal_alloc_pages (kib_pages_t **pp, int npages)
1447 {
1448         kib_pages_t   *p;
1449         int            i;
1450
1451         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1452         if (p == NULL) {
1453                 CERROR ("Can't allocate buffer %d\n", npages);
1454                 return (-ENOMEM);
1455         }
1456
1457         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1458         p->ibp_npages = npages;
1459         
1460         for (i = 0; i < npages; i++) {
1461                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1462                 if (p->ibp_pages[i] == NULL) {
1463                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1464                         kibnal_free_pages(p);
1465                         return (-ENOMEM);
1466                 }
1467         }
1468
1469         *pp = p;
1470         return (0);
1471 }
1472
1473 int
1474 kibnal_alloc_tx_descs (void) 
1475 {
1476         int    i;
1477         
1478         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1479                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1480         if (kibnal_data.kib_tx_descs == NULL)
1481                 return -ENOMEM;
1482         
1483         memset(kibnal_data.kib_tx_descs, 0,
1484                IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1485
1486         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1487                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1488
1489 #if IBNAL_USE_FMR
1490                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1491                              sizeof(*tx->tx_pages));
1492                 if (tx->tx_pages == NULL)
1493                         return -ENOMEM;
1494 #else
1495                 LIBCFS_ALLOC(tx->tx_wrq, 
1496                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1497                              sizeof(*tx->tx_wrq));
1498                 if (tx->tx_wrq == NULL)
1499                         return -ENOMEM;
1500                 
1501                 LIBCFS_ALLOC(tx->tx_gl, 
1502                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1503                              sizeof(*tx->tx_gl));
1504                 if (tx->tx_gl == NULL)
1505                         return -ENOMEM;
1506                 
1507                 LIBCFS_ALLOC(tx->tx_rd, 
1508                              offsetof(kib_rdma_desc_t, 
1509                                       rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1510                 if (tx->tx_rd == NULL)
1511                         return -ENOMEM;
1512 #endif
1513         }
1514
1515         return 0;
1516 }
1517
1518 void
1519 kibnal_free_tx_descs (void) 
1520 {
1521         int    i;
1522
1523         if (kibnal_data.kib_tx_descs == NULL)
1524                 return;
1525
1526         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1527                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1528
1529 #if IBNAL_USE_FMR
1530                 if (tx->tx_pages != NULL)
1531                         LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1532                                     sizeof(*tx->tx_pages));
1533 #else
1534                 if (tx->tx_wrq != NULL)
1535                         LIBCFS_FREE(tx->tx_wrq, 
1536                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1537                                     sizeof(*tx->tx_wrq));
1538
1539                 if (tx->tx_gl != NULL)
1540                         LIBCFS_FREE(tx->tx_gl, 
1541                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1542                                     sizeof(*tx->tx_gl));
1543
1544                 if (tx->tx_rd != NULL)
1545                         LIBCFS_FREE(tx->tx_rd, 
1546                                     offsetof(kib_rdma_desc_t, 
1547                                              rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1548 #endif
1549         }
1550
1551         LIBCFS_FREE(kibnal_data.kib_tx_descs,
1552                     IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1553 }
1554
1555 int
1556 kibnal_setup_tx_descs (void)
1557 {
1558         int           ipage = 0;
1559         int           page_offset = 0;
1560         struct page  *page;
1561         kib_tx_t     *tx;
1562         int           i;
1563         int           rc;
1564
1565         /* pre-mapped messages are not bigger than 1 page */
1566         CLASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1567
1568         /* No fancy arithmetic when we do the buffer calculations */
1569         CLASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1570
1571         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1572                                 IBNAL_TX_MSG_PAGES());
1573         if (rc != 0)
1574                 return (rc);
1575
1576         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1577                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1578                 tx = &kibnal_data.kib_tx_descs[i];
1579
1580 #if IBNAL_USE_FMR
1581                 /* Allocate an FMR for this TX so it can map src/sink buffers
1582                  * for large transfers */
1583 #endif
1584                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1585                                             page_offset);
1586
1587                 tx->tx_hca_msg = kibnal_data.kib_whole_mem.md_addr +
1588                                  lnet_page2phys(page) + page_offset;
1589
1590                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1591                        i, tx, tx->tx_msg, tx->tx_hca_msg);
1592
1593                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1594
1595                 page_offset += IBNAL_MSG_SIZE;
1596                 LASSERT (page_offset <= PAGE_SIZE);
1597
1598                 if (page_offset == PAGE_SIZE) {
1599                         page_offset = 0;
1600                         ipage++;
1601                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1602                 }
1603         }
1604         
1605         return (0);
1606 }
1607
1608 int
1609 kibnal_register_all_memory(void)
1610 {
1611         /* CAVEAT EMPTOR: this assumes all physical memory is in 1 contiguous
1612          * chunk starting at 0 */
1613         struct sysinfo     si;
1614         __u64              total;
1615         __u64              total2;
1616         __u64              roundup = (128<<20);     /* round up in big chunks */
1617         IB_MR_PHYS_BUFFER  phys;
1618         IB_ACCESS_CONTROL  access;
1619         FSTATUS            frc;
1620
1621         memset(&access, 0, sizeof(access));
1622         access.s.MWBindable = 1;
1623         access.s.LocalWrite = 1;
1624         access.s.RdmaRead = 1;
1625         access.s.RdmaWrite = 1;
1626
1627         /* XXX we don't bother with first-gen cards */
1628         if (kibnal_data.kib_hca_attrs.VendorId == 0xd0b7 && 
1629             kibnal_data.kib_hca_attrs.DeviceId == 0x3101) {
1630                 CERROR("Can't register all memory on first generation HCAs\n");
1631                 return -EINVAL;
1632         }
1633
1634         si_meminfo(&si);
1635
1636         CDEBUG(D_NET, "si_meminfo: %lu/%u, num_physpages %lu/%lu\n",
1637                si.totalram, si.mem_unit, num_physpages, PAGE_SIZE);
1638
1639         total = ((__u64)si.totalram) * si.mem_unit;
1640         total2 = num_physpages * PAGE_SIZE;
1641         if (total < total2)
1642                 total = total2;
1643
1644         if (total == 0) {
1645                 CERROR("Can't determine memory size\n");
1646                 return -ENOMEM;
1647         }
1648                  
1649         roundup = (128<<20);
1650         total = (total + (roundup - 1)) & ~(roundup - 1);
1651
1652         phys.PhysAddr = 0;
1653         phys.Length = total;
1654
1655         frc = iba_register_contig_pmr(kibnal_data.kib_hca, 0, &phys, 1, 0,
1656                                       kibnal_data.kib_pd, access,
1657                                       &kibnal_data.kib_whole_mem.md_handle,
1658                                       &kibnal_data.kib_whole_mem.md_addr,
1659                                       &kibnal_data.kib_whole_mem.md_lkey,
1660                                       &kibnal_data.kib_whole_mem.md_rkey);
1661
1662         if (frc != FSUCCESS) {
1663                 CERROR("registering physical memory failed: %d\n", frc);
1664                 return -EIO;
1665         }
1666
1667         CDEBUG(D_WARNING, "registered phys mem from 0("LPX64") for "LPU64"("LPU64") -> "LPX64"\n",
1668                phys.PhysAddr, total, phys.Length, kibnal_data.kib_whole_mem.md_addr);
1669
1670         return 0;
1671 }
1672
1673 void
1674 kibnal_shutdown (lnet_ni_t *ni)
1675 {
1676         int   i;
1677         int   rc;
1678
1679         LASSERT (ni == kibnal_data.kib_ni);
1680         LASSERT (ni->ni_data == &kibnal_data);
1681        
1682         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1683                atomic_read (&libcfs_kmemory));
1684
1685         switch (kibnal_data.kib_init) {
1686         default:
1687                 CERROR ("Unexpected state %d\n", kibnal_data.kib_init);
1688                 LBUG();
1689
1690         case IBNAL_INIT_ALL:
1691                 /* stop accepting connections, prevent new peers and start to
1692                  * tear down all existing ones... */
1693                 kibnal_stop_listener(1);
1694
1695                 /* Wait for all peer state to clean up */
1696                 i = 2;
1697                 while (atomic_read (&kibnal_data.kib_npeers) != 0) {
1698                         i++;
1699                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1700                                "waiting for %d peers to disconnect\n",
1701                                atomic_read (&kibnal_data.kib_npeers));
1702                         set_current_state (TASK_UNINTERRUPTIBLE);
1703                         schedule_timeout (HZ);
1704                 }
1705                 /* fall through */
1706
1707         case IBNAL_INIT_CQ:
1708                 rc = iba_destroy_cq(kibnal_data.kib_cq);
1709                 if (rc != 0)
1710                         CERROR ("Destroy CQ error: %d\n", rc);
1711                 /* fall through */
1712
1713         case IBNAL_INIT_TXD:
1714                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1715                 /* fall through */
1716
1717         case IBNAL_INIT_MD:
1718                 rc = iba_deregister_mr(kibnal_data.kib_whole_mem.md_handle);
1719                 if (rc != FSUCCESS)
1720                         CERROR ("Deregister memory: %d\n", rc);
1721                 /* fall through */
1722
1723         case IBNAL_INIT_PD:
1724                 rc = iba_free_pd(kibnal_data.kib_pd);
1725                 if (rc != 0)
1726                         CERROR ("Destroy PD error: %d\n", rc);
1727                 /* fall through */
1728
1729         case IBNAL_INIT_SD:
1730                 rc = iba_sd_deregister(kibnal_data.kib_sd);
1731                 if (rc != 0)
1732                         CERROR ("Deregister SD error: %d\n", rc);
1733                 /* fall through */
1734
1735         case IBNAL_INIT_PORTATTRS:
1736                 LIBCFS_FREE(kibnal_data.kib_hca_attrs.PortAttributesList,
1737                             kibnal_data.kib_hca_attrs.PortAttributesListSize);
1738                 /* fall through */
1739
1740         case IBNAL_INIT_HCA:
1741                 rc = iba_close_ca(kibnal_data.kib_hca);
1742                 if (rc != 0)
1743                         CERROR ("Close HCA  error: %d\n", rc);
1744                 /* fall through */
1745
1746         case IBNAL_INIT_DATA:
1747                 LASSERT (atomic_read (&kibnal_data.kib_npeers) == 0);
1748                 LASSERT (kibnal_data.kib_peers != NULL);
1749                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1750                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1751                 }
1752                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1753                 LASSERT (list_empty (&kibnal_data.kib_connd_zombies));
1754                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1755                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1756
1757                 /* flag threads to terminate; wake and wait for them to die */
1758                 kibnal_data.kib_shutdown = 1;
1759                 wake_up_all (&kibnal_data.kib_sched_waitq);
1760                 wake_up_all (&kibnal_data.kib_connd_waitq);
1761
1762                 i = 2;
1763                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1764                         i++;
1765                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1766                                "Waiting for %d threads to terminate\n",
1767                                atomic_read (&kibnal_data.kib_nthreads));
1768                         set_current_state (TASK_INTERRUPTIBLE);
1769                         schedule_timeout (HZ);
1770                 }
1771                 /* fall through */
1772                 
1773         case IBNAL_INIT_NOTHING:
1774                 break;
1775         }
1776
1777         kibnal_free_tx_descs();
1778
1779         if (kibnal_data.kib_peers != NULL)
1780                 LIBCFS_FREE (kibnal_data.kib_peers,
1781                              sizeof (struct list_head) * 
1782                              kibnal_data.kib_peer_hash_size);
1783
1784         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1785                atomic_read (&libcfs_kmemory));
1786
1787         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1788         PORTAL_MODULE_UNUSE;
1789 }
1790
1791 int 
1792 kibnal_get_ipif_name(char *ifname, int ifname_size, int idx)
1793 {
1794         char  *basename = *kibnal_tunables.kib_ipif_basename;
1795         int    n = strlen(basename);
1796         int    baseidx;
1797         int    m;
1798
1799         if (n == 0) {                           /* empty string */
1800                 CERROR("Empty IP interface basename specified\n");
1801                 return -EINVAL;
1802         }
1803
1804         for (m = n; m > 0; m--)                 /* find max numeric postfix */
1805                 if (sscanf(basename + m - 1, "%d", &baseidx) != 1)
1806                         break;
1807
1808         if (m == 0)                             /* just a number */
1809                 m = n;
1810
1811         if (m == n)                             /* no postfix */
1812                 baseidx = 1;                    /* default to 1 */
1813
1814         if (m >= ifname_size)
1815                 m = ifname_size - 1;
1816
1817         memcpy(ifname, basename, m);            /* copy prefix name */
1818         
1819         snprintf(ifname + m, ifname_size - m, "%d", baseidx + idx);
1820         
1821         if (strlen(ifname) == ifname_size - 1) {
1822                 CERROR("IP interface basename %s too long\n", basename);
1823                 return -EINVAL;
1824         }
1825         
1826         return 0;
1827 }
1828
1829 int
1830 kibnal_startup (lnet_ni_t *ni)
1831 {
1832         char                ipif_name[32];
1833         __u32               ip;
1834         __u32               netmask;
1835         int                 up;
1836         int                 nob;
1837         struct timeval      tv;
1838         IB_PORT_ATTRIBUTES *pattr;
1839         FSTATUS             frc;
1840         int                 rc;
1841         __u32               n;
1842         int                 i;
1843
1844         LASSERT (ni->ni_lnd == &the_kiblnd);
1845
1846         /* Only 1 instance supported */
1847         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1848                 CERROR ("Only 1 instance supported\n");
1849                 return -EPERM;
1850         }
1851
1852         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1853                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1854                         *kibnal_tunables.kib_credits,
1855                         *kibnal_tunables.kib_ntx);
1856                 return -EINVAL;
1857         }
1858
1859         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1860         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1861
1862         CLASSERT (LNET_MAX_INTERFACES > 1);
1863
1864         if (ni->ni_interfaces[0] == NULL) {
1865                 kibnal_data.kib_hca_idx = 0;
1866         } else {
1867                 /* Use the HCA specified in 'networks=' */
1868                 if (ni->ni_interfaces[1] != NULL) {
1869                         CERROR("Multiple interfaces not supported\n");
1870                         return -EPERM;
1871                 }
1872                 
1873                 /* Parse <number> into kib_hca_idx */
1874                 nob = strlen(ni->ni_interfaces[0]);
1875                 if (sscanf(ni->ni_interfaces[0], "%d%n", 
1876                            &kibnal_data.kib_hca_idx, &nob) < 1 ||
1877                     nob != strlen(ni->ni_interfaces[0])) {
1878                         CERROR("Can't parse interface '%s'\n",
1879                                ni->ni_interfaces[0]);
1880                         return -EINVAL;
1881                 }
1882         }
1883
1884         rc = kibnal_get_ipif_name(ipif_name, sizeof(ipif_name),
1885                                   kibnal_data.kib_hca_idx);
1886         if (rc != 0)
1887                 return rc;
1888         
1889         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1890         if (rc != 0) {
1891                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1892                 return -ENETDOWN;
1893         }
1894         
1895         if (!up) {
1896                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1897                 return -ENETDOWN;
1898         }
1899         
1900         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1901
1902         ni->ni_data = &kibnal_data;
1903         kibnal_data.kib_ni = ni;
1904
1905         do_gettimeofday(&tv);
1906         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1907
1908         PORTAL_MODULE_USE;
1909
1910         rwlock_init(&kibnal_data.kib_global_lock);
1911
1912         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1913         LIBCFS_ALLOC (kibnal_data.kib_peers,
1914                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1915         if (kibnal_data.kib_peers == NULL) {
1916                 goto failed;
1917         }
1918         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1919                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1920
1921         spin_lock_init (&kibnal_data.kib_connd_lock);
1922         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1923         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1924         INIT_LIST_HEAD (&kibnal_data.kib_connd_zombies);
1925         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1926
1927         spin_lock_init (&kibnal_data.kib_sched_lock);
1928         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1929
1930         spin_lock_init (&kibnal_data.kib_tx_lock);
1931         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1932
1933         rc = kibnal_alloc_tx_descs();
1934         if (rc != 0) {
1935                 CERROR("Can't allocate tx descs\n");
1936                 goto failed;
1937         }
1938
1939         /* lists/ptrs/locks initialised */
1940         kibnal_data.kib_init = IBNAL_INIT_DATA;
1941         /*****************************************************/
1942
1943         kibnal_data.kib_sdretry.RetryCount = *kibnal_tunables.kib_sd_retries;
1944         kibnal_data.kib_sdretry.Timeout = (*kibnal_tunables.kib_timeout * 1000)/
1945                                           *kibnal_tunables.kib_sd_retries;
1946
1947         for (i = 0; i < IBNAL_N_SCHED; i++) {
1948                 rc = kibnal_thread_start (kibnal_scheduler,
1949                                           (void *)(unsigned long)i);
1950                 if (rc != 0) {
1951                         CERROR("Can't spawn iib scheduler[%d]: %d\n",
1952                                i, rc);
1953                         goto failed;
1954                 }
1955         }
1956
1957         rc = kibnal_thread_start (kibnal_connd, NULL);
1958         if (rc != 0) {
1959                 CERROR ("Can't spawn iib connd: %d\n", rc);
1960                 goto failed;
1961         }
1962
1963         n = sizeof(kibnal_data.kib_hca_guids) /
1964             sizeof(kibnal_data.kib_hca_guids[0]);
1965         frc = iba_get_caguids(&n, kibnal_data.kib_hca_guids);
1966         if (frc != FSUCCESS) {
1967                 CERROR ("Can't get HCA guids: %d\n", frc);
1968                 goto failed;
1969         }
1970
1971         if (n == 0) {
1972                 CERROR ("No HCAs found\n");
1973                 goto failed;
1974         }
1975
1976         if (n <= kibnal_data.kib_hca_idx) {
1977                 CERROR("Invalid HCA %d requested: (must be 0 - %d inclusive)\n",
1978                        kibnal_data.kib_hca_idx, n - 1);
1979                 goto failed;
1980         }
1981         
1982         /* Infinicon has per-HCA notification callbacks */
1983         frc = iba_open_ca(kibnal_data.kib_hca_guids[kibnal_data.kib_hca_idx],
1984                             kibnal_hca_callback,
1985                             kibnal_hca_async_callback,
1986                             NULL,
1987                             &kibnal_data.kib_hca);
1988         if (frc != FSUCCESS) {
1989                 CERROR ("Can't open HCA[%d]: %d\n", 
1990                         kibnal_data.kib_hca_idx, frc);
1991                 goto failed;
1992         }
1993         
1994         /* Channel Adapter opened */
1995         kibnal_data.kib_init = IBNAL_INIT_HCA;
1996         /*****************************************************/
1997
1998         kibnal_data.kib_hca_attrs.PortAttributesList = NULL;
1999         kibnal_data.kib_hca_attrs.PortAttributesListSize = 0;
2000         frc = iba_query_ca(kibnal_data.kib_hca,
2001                            &kibnal_data.kib_hca_attrs, NULL);
2002         if (frc != FSUCCESS) {
2003                 CERROR ("Can't size port attrs: %d\n", frc);
2004                 goto failed;
2005         }
2006         
2007         LIBCFS_ALLOC(kibnal_data.kib_hca_attrs.PortAttributesList,
2008                      kibnal_data.kib_hca_attrs.PortAttributesListSize);
2009         if (kibnal_data.kib_hca_attrs.PortAttributesList == NULL)
2010                 goto failed;
2011
2012         /* Port attrs allocated */
2013         kibnal_data.kib_init = IBNAL_INIT_PORTATTRS;
2014         /*****************************************************/
2015         
2016         frc = iba_query_ca(kibnal_data.kib_hca, &kibnal_data.kib_hca_attrs,
2017                            NULL);
2018         if (frc != FSUCCESS) {
2019                 CERROR ("Can't get port attrs for HCA %d: %d\n",
2020                         kibnal_data.kib_hca_idx, frc);
2021                 goto failed;
2022         }
2023
2024         for (i = 0, pattr = kibnal_data.kib_hca_attrs.PortAttributesList;
2025              pattr != NULL;
2026              i++, pattr = pattr->Next) {
2027                 switch (pattr->PortState) {
2028                 default:
2029                         CERROR("Unexpected port[%d] state %d\n",
2030                                i, pattr->PortState);
2031                         continue;
2032                 case PortStateDown:
2033                         CDEBUG(D_NET, "port[%d] Down\n", i);
2034                         continue;
2035                 case PortStateInit:
2036                         CDEBUG(D_NET, "port[%d] Init\n", i);
2037                         continue;
2038                 case PortStateArmed:
2039                         CDEBUG(D_NET, "port[%d] Armed\n", i);
2040                         continue;
2041                         
2042                 case PortStateActive:
2043                         CDEBUG(D_NET, "port[%d] Active\n", i);
2044                         kibnal_data.kib_port = i;
2045                         kibnal_data.kib_port_guid = pattr->GUID;
2046                         kibnal_data.kib_port_pkey = pattr->PkeyTable[0];
2047                         break;
2048                 }
2049                 break;
2050         }
2051
2052         if (pattr == NULL) {
2053                 CERROR ("Can't find an active port\n");
2054                 goto failed;
2055         }
2056
2057         CDEBUG(D_NET, "got guid "LPX64"\n", kibnal_data.kib_port_guid);
2058         
2059         frc = iba_sd_register(&kibnal_data.kib_sd, NULL);
2060         if (frc != FSUCCESS) {
2061                 CERROR ("Can't register with SD: %d\n", frc);
2062                 goto failed;
2063         }
2064         
2065         /* Registered with SD OK */
2066         kibnal_data.kib_init = IBNAL_INIT_SD;
2067         /*****************************************************/
2068
2069         frc = iba_alloc_pd(kibnal_data.kib_hca, 0, &kibnal_data.kib_pd);
2070         if (frc != FSUCCESS) {
2071                 CERROR ("Can't create PD: %d\n", rc);
2072                 goto failed;
2073         }
2074         
2075         /* flag PD initialised */
2076         kibnal_data.kib_init = IBNAL_INIT_PD;
2077         /*****************************************************/
2078
2079         rc = kibnal_register_all_memory();
2080         if (rc != 0) {
2081                 CERROR ("Can't register all memory\n");
2082                 goto failed;
2083         }
2084         
2085         /* flag whole memory MD initialised */
2086         kibnal_data.kib_init = IBNAL_INIT_MD;
2087         /*****************************************************/
2088
2089         rc = kibnal_setup_tx_descs();
2090         if (rc != 0) {
2091                 CERROR ("Can't register tx descs: %d\n", rc);
2092                 goto failed;
2093         }
2094         
2095         /* flag TX descs initialised */
2096         kibnal_data.kib_init = IBNAL_INIT_TXD;
2097         /*****************************************************/
2098         
2099         frc = iba_create_cq(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES(),
2100                             &kibnal_data.kib_cq, &kibnal_data.kib_cq,
2101                             &n);
2102         if (frc != FSUCCESS) {
2103                 CERROR ("Can't create RX CQ: %d\n", frc);
2104                 goto failed;
2105         }
2106
2107         /* flag CQ initialised */
2108         kibnal_data.kib_init = IBNAL_INIT_CQ;
2109         /*****************************************************/
2110         
2111         if (n < IBNAL_CQ_ENTRIES()) {
2112                 CERROR ("CQ only has %d entries: %d needed\n", 
2113                         n, IBNAL_CQ_ENTRIES());
2114                 goto failed;
2115         }
2116
2117         rc = iba_rearm_cq(kibnal_data.kib_cq, CQEventSelNextWC);
2118         if (rc != 0) {
2119                 CERROR ("Failed to re-arm completion queue: %d\n", rc);
2120                 goto failed;
2121         }
2122         
2123         rc = kibnal_start_listener();
2124         if (rc != 0) {
2125                 CERROR("Can't start listener: %d\n", rc);
2126                 goto failed;
2127         }
2128
2129         /* flag everything initialised */
2130         kibnal_data.kib_init = IBNAL_INIT_ALL;
2131         /*****************************************************/
2132
2133         return (0);
2134
2135  failed:
2136         kibnal_shutdown (ni);    
2137         return (-ENETDOWN);
2138 }
2139
2140 void __exit
2141 kibnal_module_fini (void)
2142 {
2143         lnet_unregister_lnd(&the_kiblnd);
2144         kibnal_tunables_fini();
2145 }
2146
2147 int __init
2148 kibnal_module_init (void)
2149 {
2150         int    rc;
2151
2152         if (the_lnet.ln_ptlcompat != 0) {
2153                 LCONSOLE_ERROR_MSG(0x12c, "IIB does not support portals "
2154                                    "compatibility mode\n");
2155                 return -ENODEV;
2156         }
2157         
2158         rc = kibnal_tunables_init();
2159         if (rc != 0)
2160                 return rc;
2161
2162         lnet_register_lnd(&the_kiblnd);
2163
2164         return 0;
2165 }
2166
2167 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2168 MODULE_DESCRIPTION("Kernel Infinicon IB LND v1.00");
2169 MODULE_LICENSE("GPL");
2170
2171 module_init(kibnal_module_init);
2172 module_exit(kibnal_module_fini);