Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / klnds / iiblnd / iiblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "iiblnd.h"
25
26 lnd_t the_kiblnd = {
27         .lnd_type          = IIBLND,
28         .lnd_startup       = kibnal_startup,
29         .lnd_shutdown      = kibnal_shutdown,
30         .lnd_ctl           = kibnal_ctl,
31         .lnd_send          = kibnal_send,
32         .lnd_recv          = kibnal_recv,
33         .lnd_eager_recv    = kibnal_eager_recv,
34 };
35
36 kib_data_t              kibnal_data;
37
38 __u32 
39 kibnal_cksum (void *ptr, int nob)
40 {
41         char  *c  = ptr;
42         __u32  sum = 0;
43
44         while (nob-- > 0)
45                 sum = ((sum << 1) | (sum >> 31)) + *c++;
46         
47         /* ensure I don't return 0 (== no checksum) */
48         return (sum == 0) ? 1 : sum;
49 }
50
51 void
52 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
53 {
54         msg->ibm_type = type;
55         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
56 }
57
58 void
59 kibnal_pack_msg(kib_msg_t *msg, __u32 version, int credits, 
60                 lnet_nid_t dstnid, __u64 dststamp, __u64 seq)
61 {
62         /* CAVEAT EMPTOR! all message fields not set here should have been
63          * initialised previously. */
64         msg->ibm_magic    = IBNAL_MSG_MAGIC;
65         msg->ibm_version  = version;
66         /*   ibm_type */
67         msg->ibm_credits  = credits;
68         /*   ibm_nob */
69         msg->ibm_cksum    = 0;
70         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(kibnal_data.kib_ni->ni_nid,
71                                                   dstnid);
72         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
73         msg->ibm_dstnid   = dstnid;
74         msg->ibm_dststamp = dststamp;
75         msg->ibm_seq      = seq;
76
77         if (*kibnal_tunables.kib_cksum) {
78                 /* NB ibm_cksum zero while computing cksum */
79                 msg->ibm_cksum = kibnal_cksum(msg, msg->ibm_nob);
80         }
81 }
82
83 void
84 kibnal_pack_connmsg(kib_msg_t *msg, __u32 version, int nob, 
85                     int type, lnet_nid_t dstnid, __u64 dststamp)
86 {
87         LASSERT (nob >= offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
88
89         memset(msg, 0, nob);
90         kibnal_init_msg(msg, type, sizeof(kib_connparams_t));
91
92         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
93         msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
94         msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
95
96         kibnal_pack_msg(msg, version, 0, dstnid, dststamp, 0);
97 }
98
99 int
100 kibnal_unpack_msg(kib_msg_t *msg, __u32 expected_version, int nob)
101 {
102         const int hdr_size = offsetof(kib_msg_t, ibm_u);
103         __u32     msg_cksum;
104         __u32     msg_version;
105         int       flip;
106         int       msg_nob;
107 #if !IBNAL_USE_FMR
108         int       i;
109         int       n;
110 #endif
111         /* 6 bytes are enough to have received magic + version */
112         if (nob < 6) {
113                 CERROR("Short message: %d\n", nob);
114                 return -EPROTO;
115         }
116
117         /* Future protocol version compatibility support!
118          * If the iiblnd-specific protocol changes, or when LNET unifies
119          * protocols over all LNDs, the initial connection will negotiate a
120          * protocol version.  If I find this, I avoid any console errors.  If
121          * my is doing connection establishment, the reject will tell the peer
122          * which version I'm running. */
123
124         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
125                 flip = 0;
126         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
127                 flip = 1;
128         } else {
129                 if (msg->ibm_magic == LNET_PROTO_MAGIC ||
130                     msg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
131                         return -EPROTO;
132
133                 /* Completely out to lunch */
134                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
135                 return -EPROTO;
136         }
137
138         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
139         if (expected_version == 0) {
140                 if (msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
141                     msg_version != IBNAL_MSG_VERSION)
142                         return -EPROTO;
143         } else if (msg_version != expected_version) {
144                 CERROR("Bad version: %x(%x expected)\n", 
145                        msg_version, expected_version);
146                 return -EPROTO;
147         }
148
149         if (nob < hdr_size) {
150                 CERROR("Short message: %d\n", nob);
151                 return -EPROTO;
152         }
153
154         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
155         if (msg_nob > nob) {
156                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
157                 return -EPROTO;
158         }
159
160         /* checksum must be computed with ibm_cksum zero and BEFORE anything
161          * gets flipped */
162         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
163         msg->ibm_cksum = 0;
164         if (msg_cksum != 0 &&
165             msg_cksum != kibnal_cksum(msg, msg_nob)) {
166                 CERROR("Bad checksum\n");
167                 return -EPROTO;
168         }
169         msg->ibm_cksum = msg_cksum;
170         
171         if (flip) {
172                 /* leave magic unflipped as a clue to peer endianness */
173                 msg->ibm_version = msg_version;
174                 CLASSERT (sizeof(msg->ibm_type) == 1);
175                 CLASSERT (sizeof(msg->ibm_credits) == 1);
176                 msg->ibm_nob = msg_nob;
177                 __swab64s(&msg->ibm_srcnid);
178                 __swab64s(&msg->ibm_srcstamp);
179                 __swab64s(&msg->ibm_dstnid);
180                 __swab64s(&msg->ibm_dststamp);
181                 __swab64s(&msg->ibm_seq);
182         }
183         
184         if (msg->ibm_srcnid == LNET_NID_ANY) {
185                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
186                 return -EPROTO;
187         }
188
189         switch (msg->ibm_type) {
190         default:
191                 CERROR("Unknown message type %x\n", msg->ibm_type);
192                 return -EPROTO;
193                 
194         case IBNAL_MSG_NOOP:
195                 break;
196
197         case IBNAL_MSG_IMMEDIATE:
198                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
199                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
200                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
201                         return -EPROTO;
202                 }
203                 break;
204
205         case IBNAL_MSG_PUT_REQ:
206                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
207                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
208                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
209                         return -EPROTO;
210                 }
211                 break;
212
213         case IBNAL_MSG_PUT_ACK:
214                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
215                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
216                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
217                         return -EPROTO;
218                 }
219 #if IBNAL_USE_FMR
220                 if (flip) {
221                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
222                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
223                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
224                 }
225 #else
226                 if (flip) {
227                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
228                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrag);
229                 }
230                 
231                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrag;
232                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
233                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
234                                n, IBNAL_MAX_RDMA_FRAGS);
235                         return -EPROTO;
236                 }
237                 
238                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
239                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
240                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
241                         return -EPROTO;
242                 }
243
244                 if (flip) {
245                         for (i = 0; i < n; i++) {
246                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
247                                 __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr);
248                         }
249                 }
250 #endif
251                 break;
252
253         case IBNAL_MSG_GET_REQ:
254                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
255                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
256                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
257                         return -EPROTO;
258                 }
259 #if IBNAL_USE_FMR
260                 if (flip) {
261                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
262                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
263                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
264                 }
265 #else                
266                 if (flip) {
267                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
268                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrag);
269                 }
270
271                 n = msg->ibm_u.get.ibgm_rd.rd_nfrag;
272                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
273                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
274                                n, IBNAL_MAX_RDMA_FRAGS);
275                         return -EPROTO;
276                 }
277                 
278                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
279                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
280                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
281                         return -EPROTO;
282                 }
283                 
284                 if (flip)
285                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrag; i++) {
286                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
287                                 __swab64s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr);
288                         }
289 #endif
290                 break;
291
292         case IBNAL_MSG_PUT_NAK:
293         case IBNAL_MSG_PUT_DONE:
294         case IBNAL_MSG_GET_DONE:
295                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
296                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
297                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
298                         return -EPROTO;
299                 }
300                 if (flip)
301                         __swab32s(&msg->ibm_u.completion.ibcm_status);
302                 break;
303
304         case IBNAL_MSG_CONNREQ:
305         case IBNAL_MSG_CONNACK:
306                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
307                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
308                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
309                         return -EPROTO;
310                 }
311                 if (flip) {
312                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
313                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
314                         __swab32s(&msg->ibm_u.connparams.ibcp_max_frags);
315                 }
316                 break;
317         }
318         return 0;
319 }
320
321 IB_HANDLE
322 kibnal_create_cep(lnet_nid_t nid)
323 {
324         FSTATUS        frc;
325         __u32          u32val;
326         IB_HANDLE      cep;
327
328         cep = iba_cm_create_cep(CM_RC_TYPE);
329         if (cep == NULL) {
330                 CERROR ("Can't create CEP for %s\n",
331                         (nid == LNET_NID_ANY) ? "listener" :
332                         libcfs_nid2str(nid));
333                 return NULL;
334         }
335
336         if (nid == LNET_NID_ANY) {
337                 u32val = 1;
338                 frc = iba_cm_modify_cep(cep, CM_FLAG_ASYNC_ACCEPT,
339                                         (char *)&u32val, sizeof(u32val), 0);
340                 if (frc != FSUCCESS) {
341                         CERROR("Can't set async_accept: %d\n", frc);
342                         goto failed;
343                 }
344
345                 u32val = 0;                     /* sets system max */
346                 frc = iba_cm_modify_cep(cep, CM_FLAG_LISTEN_BACKLOG,
347                                         (char *)&u32val, sizeof(u32val), 0);
348                 if (frc != FSUCCESS) {
349                         CERROR("Can't set listen backlog: %d\n", frc);
350                         goto failed;
351                 }
352         }
353         
354         u32val = 1;
355         frc = iba_cm_modify_cep(cep, CM_FLAG_TIMEWAIT_CALLBACK,
356                                 (char *)&u32val, sizeof(u32val), 0);
357         if (frc != FSUCCESS) {
358                 CERROR("Can't set timewait_callback for %s: %d\n", 
359                         (nid == LNET_NID_ANY) ? "listener" :
360                         libcfs_nid2str(nid), frc);
361                 goto failed;
362         }
363
364         return cep;
365         
366  failed:
367         iba_cm_destroy_cep(cep);
368         return NULL;
369 }
370
371 #define IBNAL_CHECK_ADVERT 1
372 #if IBNAL_CHECK_ADVERT
373 void
374 kibnal_service_query_done (void *arg, QUERY *qry, 
375                            QUERY_RESULT_VALUES *qry_result)
376 {
377         int                    *rcp = arg;
378         FSTATUS                 frc = qry_result->Status;
379         SERVICE_RECORD_RESULTS *svc_rslt;
380         IB_SERVICE_RECORD      *svc;
381         lnet_nid_t              nid;
382
383         if (frc != FSUCCESS || qry_result->ResultDataSize == 0) {
384                 CERROR("Error checking advert: status %d data size %d\n",
385                        frc, qry_result->ResultDataSize);
386                 *rcp = -EIO;
387                 goto out;
388         }
389
390         svc_rslt = (SERVICE_RECORD_RESULTS *)qry_result->QueryResult;
391
392         if (svc_rslt->NumServiceRecords < 1) {
393                 CERROR("Check advert: %d records\n",
394                        svc_rslt->NumServiceRecords);
395                 *rcp = -ENOENT;
396                 goto out;
397         }
398
399         svc = &svc_rslt->ServiceRecords[0];
400         nid = le64_to_cpu(*kibnal_service_nid_field(svc));
401         
402         CDEBUG(D_NET, "Check advert: %s "LPX64" "LPX64":%04x\n",
403                libcfs_nid2str(nid), svc->RID.ServiceID, 
404                svc->RID.ServiceGID.Type.Global.InterfaceID, 
405                svc->RID.ServiceP_Key);
406
407         if (nid != kibnal_data.kib_ni->ni_nid) {
408                 CERROR("Check advert: Bad NID %s (%s expected)\n",
409                        libcfs_nid2str(nid),
410                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
411                 *rcp = -EINVAL;
412                 goto out;
413         }
414
415         if (svc->RID.ServiceID != *kibnal_tunables.kib_service_number) {
416                 CERROR("Check advert: Bad ServiceID "LPX64" (%x expected)\n",
417                        svc->RID.ServiceID,
418                        *kibnal_tunables.kib_service_number);
419                 *rcp = -EINVAL;
420                 goto out;
421         }
422
423         if (svc->RID.ServiceGID.Type.Global.InterfaceID != 
424             kibnal_data.kib_port_guid) {
425                 CERROR("Check advert: Bad GUID "LPX64" ("LPX64" expected)\n",
426                        svc->RID.ServiceGID.Type.Global.InterfaceID,
427                        kibnal_data.kib_port_guid);
428                 *rcp = -EINVAL;
429                 goto out;
430         }
431
432         if (svc->RID.ServiceP_Key != kibnal_data.kib_port_pkey) {
433                 CERROR("Check advert: Bad PKEY %04x (%04x expected)\n",
434                        svc->RID.ServiceP_Key, kibnal_data.kib_port_pkey);
435                 *rcp = -EINVAL;
436                 goto out;
437         }
438
439         CDEBUG(D_NET, "Check advert OK\n");
440         *rcp = 0;
441                 
442  out:
443         up (&kibnal_data.kib_listener_signal);                
444 }
445
446 int
447 kibnal_check_advert (void)
448 {
449         /* single-threaded */
450         static QUERY               qry;
451
452         FSTATUS                    frc;
453         int                        rc;
454
455         memset (&qry, 0, sizeof(qry));
456         qry.InputType = InputTypeServiceRecord;
457         qry.OutputType = OutputTypeServiceRecord;
458         kibnal_set_service_keys(&qry.InputValue.ServiceRecordValue.ServiceRecord,
459                                 kibnal_data.kib_ni->ni_nid);
460         qry.InputValue.ServiceRecordValue.ComponentMask = KIBNAL_SERVICE_KEY_MASK;
461
462         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd, 
463                                             kibnal_data.kib_port_guid,
464                                             &qry, 
465                                             kibnal_service_query_done,
466                                             &kibnal_data.kib_sdretry, 
467                                             &rc);
468         if (frc != FPENDING) {
469                 CERROR ("Immediate error %d checking SM service\n", frc);
470                 return -EIO;
471         }
472         
473         down (&kibnal_data.kib_listener_signal);
474         
475         if (rc != 0)
476                 CERROR ("Error %d checking SM service\n", rc);
477         return rc;
478 }
479 #else
480 int
481 kibnal_check_advert(void)
482 {
483         return 0;
484 }
485 #endif
486
487 void 
488 kibnal_fill_fod(FABRIC_OPERATION_DATA *fod, FABRIC_OPERATION_TYPE type)
489 {
490         IB_SERVICE_RECORD     *svc;
491
492         memset (fod, 0, sizeof(*fod));
493         fod->Type = type;
494
495         svc = &fod->Value.ServiceRecordValue.ServiceRecord;
496         svc->RID.ServiceID = *kibnal_tunables.kib_service_number;
497         svc->RID.ServiceGID.Type.Global.InterfaceID = kibnal_data.kib_port_guid;
498         svc->RID.ServiceGID.Type.Global.SubnetPrefix = DEFAULT_SUBNET_PREFIX;
499         svc->RID.ServiceP_Key = kibnal_data.kib_port_pkey;
500         svc->ServiceLease = 0xffffffff;
501
502         kibnal_set_service_keys(svc, kibnal_data.kib_ni->ni_nid);
503 }
504
505 void
506 kibnal_service_setunset_done (void *arg, FABRIC_OPERATION_DATA *fod,
507                               FSTATUS frc, uint32 madrc)
508 {
509         *(FSTATUS *)arg = frc;
510         up (&kibnal_data.kib_listener_signal);
511 }
512
513 int
514 kibnal_advertise (void)
515 {
516         /* Single threaded here */
517         static FABRIC_OPERATION_DATA fod;
518
519         IB_SERVICE_RECORD *svc = &fod.Value.ServiceRecordValue.ServiceRecord;
520         FSTATUS            frc;
521         FSTATUS            frc2;
522
523         if (strlen(*kibnal_tunables.kib_service_name) >=
524             sizeof(svc->ServiceName)) {
525                 CERROR("Service name '%s' too long (%d chars max)\n",
526                        *kibnal_tunables.kib_service_name,
527                        (int)sizeof(svc->ServiceName) - 1);
528                 return -EINVAL;
529         }
530
531         kibnal_fill_fod(&fod, FabOpSetServiceRecord);
532
533         CDEBUG(D_NET, "Advertising service id "LPX64" %s:%s\n", 
534                svc->RID.ServiceID, svc->ServiceName, 
535                libcfs_nid2str(le64_to_cpu(*kibnal_service_nid_field(svc))));
536
537         frc = iba_sd_port_fabric_operation(kibnal_data.kib_sd,
538                                            kibnal_data.kib_port_guid,
539                                            &fod, 
540                                            kibnal_service_setunset_done, 
541                                            &kibnal_data.kib_sdretry,
542                                            &frc2);
543
544         if (frc != FSUCCESS && frc != FPENDING) {
545                 CERROR ("Immediate error %d advertising NID %s\n",
546                         frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
547                 return -EIO;
548         }
549
550         down (&kibnal_data.kib_listener_signal);
551
552         frc = frc2;
553         if (frc == FSUCCESS)
554                 return 0;
555         
556         CERROR ("Error %d advertising %s\n",
557                 frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
558         return -EIO;
559 }
560
561 void
562 kibnal_unadvertise (int expect_success)
563 {
564         /* single threaded */
565         static FABRIC_OPERATION_DATA fod;
566
567         IB_SERVICE_RECORD *svc = &fod.Value.ServiceRecordValue.ServiceRecord;
568         FSTATUS            frc;
569         FSTATUS            frc2;
570
571         LASSERT (kibnal_data.kib_ni->ni_nid != LNET_NID_ANY);
572
573         kibnal_fill_fod(&fod, FabOpDeleteServiceRecord);
574
575         CDEBUG(D_NET, "Unadvertising service %s:%s\n",
576                svc->ServiceName, 
577                libcfs_nid2str(le64_to_cpu(*kibnal_service_nid_field(svc))));
578         
579         frc = iba_sd_port_fabric_operation(kibnal_data.kib_sd,
580                                            kibnal_data.kib_port_guid,
581                                            &fod, 
582                                            kibnal_service_setunset_done, 
583                                            &kibnal_data.kib_sdretry, 
584                                            &frc2);
585         if (frc != FSUCCESS && frc != FPENDING) {
586                 CERROR ("Immediate error %d unadvertising NID %s\n",
587                         frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
588                 return;
589         }
590
591         down (&kibnal_data.kib_listener_signal);
592
593         CDEBUG(D_NET, "Unadvertise rc: %d\n", frc2);
594
595         if ((frc2 == FSUCCESS) == !!expect_success)
596                 return;
597
598         if (expect_success)
599                 CERROR("Error %d unadvertising NID %s\n",
600                        frc2, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
601         else
602                 CWARN("Removed conflicting NID %s\n",
603                       libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
604 }
605
606 void
607 kibnal_stop_listener(int normal_shutdown)
608 {
609         /* NB this also disables peer creation and destroys all existing
610          * peers */
611         IB_HANDLE      cep = kibnal_data.kib_listener_cep;
612         unsigned long  flags;
613         FSTATUS        frc;
614
615         LASSERT (cep != NULL);
616
617         kibnal_unadvertise(normal_shutdown);
618
619         frc = iba_cm_cancel(cep);
620         if (frc != FSUCCESS && frc != FPENDING)
621                 CERROR ("Error %d stopping listener\n", frc);
622
623         down(&kibnal_data.kib_listener_signal);
624
625         frc = iba_cm_destroy_cep(cep);
626         if (frc != FSUCCESS)
627                 CERROR ("Error %d destroying listener CEP\n", frc);
628
629         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
630         /* This assignment disables peer creation */
631         kibnal_data.kib_listener_cep = NULL;
632         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
633
634         /* Start to tear down any peers created while the listener was
635          * running */
636         kibnal_del_peer(LNET_NID_ANY);
637 }
638
639 int
640 kibnal_start_listener(void)
641 {
642         /* NB this also enables peer creation */
643
644         IB_HANDLE      cep;
645         CM_LISTEN_INFO info;
646         unsigned long  flags;
647         int            rc;
648         FSTATUS        frc;
649
650         LASSERT (kibnal_data.kib_listener_cep == NULL);
651         init_MUTEX_LOCKED (&kibnal_data.kib_listener_signal);
652
653         cep = kibnal_create_cep(LNET_NID_ANY);
654         if (cep == NULL)
655                 return -ENOMEM;
656
657         memset (&info, 0, sizeof(info));
658         info.ListenAddr.EndPt.SID = *kibnal_tunables.kib_service_number;
659
660         frc = iba_cm_listen(cep, &info, kibnal_listen_callback, NULL);
661         if (frc != FSUCCESS && frc != FPENDING) {
662                 CERROR ("iba_cm_listen error: %d\n", frc);
663
664                 iba_cm_destroy_cep(cep);
665                 return -EIO;
666         }
667
668         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
669         /* This assignment enables peer creation */
670         kibnal_data.kib_listener_cep = cep;
671         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
672
673         rc = kibnal_advertise();
674         if (rc == 0)
675                 rc = kibnal_check_advert();
676
677         if (rc == 0)
678                 return 0;
679
680         kibnal_stop_listener(0);
681         return rc;
682 }
683
684 int
685 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
686 {
687         kib_peer_t    *peer;
688         unsigned long  flags;
689         int            rc;
690
691         LASSERT (nid != LNET_NID_ANY);
692
693         LIBCFS_ALLOC (peer, sizeof (*peer));
694         if (peer == NULL) {
695                 CERROR("Cannot allocate peer\n");
696                 return -ENOMEM;
697         }
698
699         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
700
701         peer->ibp_nid = nid;
702         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
703
704         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
705         INIT_LIST_HEAD (&peer->ibp_conns);
706         INIT_LIST_HEAD (&peer->ibp_tx_queue);
707
708         peer->ibp_error = 0;
709         peer->ibp_last_alive = cfs_time_current();
710         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
711
712         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
713         
714         if (atomic_read(&kibnal_data.kib_npeers) >=
715             *kibnal_tunables.kib_concurrent_peers) {
716                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
717         } else if (kibnal_data.kib_listener_cep == NULL) {
718                 rc = -ESHUTDOWN;        /* shutdown has started */
719         } else {
720                 rc = 0;
721                 /* npeers only grows with the global lock held */
722                 atomic_inc(&kibnal_data.kib_npeers);
723         }
724         
725         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
726
727         if (rc != 0) {
728                 CERROR("Can't create peer: %s\n", 
729                        (rc == -ESHUTDOWN) ? "shutting down" : 
730                        "too many peers");
731                 LIBCFS_FREE(peer, sizeof(*peer));
732         } else {
733                 *peerp = peer;
734         }
735         
736         return rc;
737 }
738
739 void
740 kibnal_destroy_peer (kib_peer_t *peer)
741 {
742
743         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
744         LASSERT (peer->ibp_persistence == 0);
745         LASSERT (!kibnal_peer_active(peer));
746         LASSERT (!kibnal_peer_connecting(peer));
747         LASSERT (list_empty (&peer->ibp_conns));
748         LASSERT (list_empty (&peer->ibp_tx_queue));
749
750         LIBCFS_FREE (peer, sizeof (*peer));
751
752         /* NB a peer's connections keep a reference on their peer until
753          * they are destroyed, so we can be assured that _all_ state to do
754          * with this peer has been cleaned up when its refcount drops to
755          * zero. */
756         atomic_dec (&kibnal_data.kib_npeers);
757 }
758
759 /* the caller is responsible for accounting for the additional reference
760  * that this creates */
761 kib_peer_t *
762 kibnal_find_peer_locked (lnet_nid_t nid)
763 {
764         struct list_head *peer_list = kibnal_nid2peerlist (nid);
765         struct list_head *tmp;
766         kib_peer_t       *peer;
767
768         list_for_each (tmp, peer_list) {
769
770                 peer = list_entry (tmp, kib_peer_t, ibp_list);
771
772                 LASSERT (peer->ibp_persistence != 0 ||
773                          kibnal_peer_connecting(peer) ||
774                          !list_empty (&peer->ibp_conns));
775
776                 if (peer->ibp_nid != nid)
777                         continue;
778
779                 CDEBUG(D_NET, "got peer %s (%d)\n",
780                        libcfs_nid2str(nid), atomic_read (&peer->ibp_refcount));
781                 return (peer);
782         }
783         return (NULL);
784 }
785
786 void
787 kibnal_unlink_peer_locked (kib_peer_t *peer)
788 {
789         LASSERT (peer->ibp_persistence == 0);
790         LASSERT (list_empty(&peer->ibp_conns));
791
792         LASSERT (kibnal_peer_active(peer));
793         list_del_init (&peer->ibp_list);
794         /* lose peerlist's ref */
795         kibnal_peer_decref(peer);
796 }
797
798 int
799 kibnal_get_peer_info (int index, lnet_nid_t *nidp, int *persistencep)
800 {
801         kib_peer_t        *peer;
802         struct list_head  *ptmp;
803         unsigned long      flags;
804         int                i;
805
806         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
807
808         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
809
810                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
811
812                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
813                         LASSERT (peer->ibp_persistence != 0 ||
814                                  kibnal_peer_connecting(peer) ||
815                                  !list_empty (&peer->ibp_conns));
816
817                         if (index-- > 0)
818                                 continue;
819
820                         *nidp = peer->ibp_nid;
821                         *persistencep = peer->ibp_persistence;
822
823                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
824                                                flags);
825                         return (0);
826                 }
827         }
828
829         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
830         return (-ENOENT);
831 }
832
833 int
834 kibnal_add_persistent_peer (lnet_nid_t nid)
835 {
836         unsigned long      flags;
837         kib_peer_t        *peer;
838         kib_peer_t        *peer2;
839         int                rc;
840         
841         if (nid == LNET_NID_ANY)
842                 return (-EINVAL);
843
844         rc = kibnal_create_peer(&peer, nid);
845         if (rc != 0)
846                 return rc;
847
848         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
849
850         peer2 = kibnal_find_peer_locked (nid);
851         if (peer2 != NULL) {
852                 kibnal_peer_decref (peer);
853                 peer = peer2;
854         } else {
855                 /* peer table takes existing ref on peer */
856                 list_add_tail (&peer->ibp_list,
857                                kibnal_nid2peerlist (nid));
858         }
859
860         peer->ibp_persistence++;
861         
862         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
863         return (0);
864 }
865
866 void
867 kibnal_del_peer_locked (kib_peer_t *peer)
868 {
869         struct list_head *ctmp;
870         struct list_head *cnxt;
871         kib_conn_t       *conn;
872
873         peer->ibp_persistence = 0;
874
875         if (list_empty(&peer->ibp_conns)) {
876                 kibnal_unlink_peer_locked(peer);
877         } else {
878                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
879                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
880
881                         kibnal_close_conn_locked (conn, 0);
882                 }
883                 /* NB peer is no longer persistent; closing its last conn
884                  * unlinked it. */
885         }
886         /* NB peer now unlinked; might even be freed if the peer table had the
887          * last ref on it. */
888 }
889
890 int
891 kibnal_del_peer (lnet_nid_t nid)
892 {
893         unsigned long      flags;
894         CFS_LIST_HEAD     (zombies);
895         struct list_head  *ptmp;
896         struct list_head  *pnxt;
897         kib_peer_t        *peer;
898         int                lo;
899         int                hi;
900         int                i;
901         int                rc = -ENOENT;
902
903         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
904
905         if (nid != LNET_NID_ANY)
906                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
907         else {
908                 lo = 0;
909                 hi = kibnal_data.kib_peer_hash_size - 1;
910         }
911
912         for (i = lo; i <= hi; i++) {
913                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
914                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
915                         LASSERT (peer->ibp_persistence != 0 ||
916                                  kibnal_peer_connecting(peer) ||
917                                  !list_empty (&peer->ibp_conns));
918
919                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
920                                 continue;
921
922                         if (!list_empty(&peer->ibp_tx_queue)) {
923                                 LASSERT (list_empty(&peer->ibp_conns));
924
925                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
926                         }
927
928                         kibnal_del_peer_locked (peer);
929                         rc = 0;         /* matched something */
930                 }
931         }
932
933         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
934
935         kibnal_txlist_done(&zombies, -EIO);
936
937         return (rc);
938 }
939
940 kib_conn_t *
941 kibnal_get_conn_by_idx (int index)
942 {
943         kib_peer_t        *peer;
944         struct list_head  *ptmp;
945         kib_conn_t        *conn;
946         struct list_head  *ctmp;
947         unsigned long      flags;
948         int                i;
949
950         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
951
952         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
953                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
954
955                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
956                         LASSERT (peer->ibp_persistence != 0 ||
957                                  kibnal_peer_connecting(peer) ||
958                                  !list_empty (&peer->ibp_conns));
959
960                         list_for_each (ctmp, &peer->ibp_conns) {
961                                 if (index-- > 0)
962                                         continue;
963
964                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
965                                 kibnal_conn_addref(conn);
966                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
967                                                        flags);
968                                 return (conn);
969                         }
970                 }
971         }
972
973         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
974         return (NULL);
975 }
976
977 int
978 kibnal_conn_rts(kib_conn_t *conn, 
979                 __u32 qpn, __u8 resp_res, __u8 init_depth, __u32 psn)
980 {
981         IB_PATH_RECORD         *path = &conn->ibc_cvars->cv_path;
982         IB_HANDLE               qp = conn->ibc_qp;
983         IB_QP_ATTRIBUTES_MODIFY modify_attr;
984         FSTATUS                 frc;
985         int                     rc;
986
987         if (resp_res > kibnal_data.kib_hca_attrs.MaxQPResponderResources)
988                 resp_res = kibnal_data.kib_hca_attrs.MaxQPResponderResources;
989
990         if (init_depth > kibnal_data.kib_hca_attrs.MaxQPInitiatorDepth)
991                 init_depth = kibnal_data.kib_hca_attrs.MaxQPInitiatorDepth;
992
993         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
994                 .RequestState       = QPStateReadyToRecv,
995                 .RecvPSN            = IBNAL_STARTING_PSN,
996                 .DestQPNumber       = qpn,
997                 .ResponderResources = resp_res,
998                 .MinRnrTimer        = UsecToRnrNakTimer(2000), /* 20 ms */
999                 .Attrs              = (IB_QP_ATTR_RECVPSN |
1000                                        IB_QP_ATTR_DESTQPNUMBER | 
1001                                        IB_QP_ATTR_RESPONDERRESOURCES | 
1002                                        IB_QP_ATTR_DESTAV | 
1003                                        IB_QP_ATTR_PATHMTU | 
1004                                        IB_QP_ATTR_MINRNRTIMER),
1005         };
1006         GetAVFromPath(0, path, &modify_attr.PathMTU, NULL, 
1007                       &modify_attr.DestAV);
1008
1009         frc = iba_modify_qp(qp, &modify_attr, NULL);
1010         if (frc != FSUCCESS) {
1011                 CERROR("Can't set QP %s ready to receive: %d\n",
1012                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1013                 return -EIO;
1014         }
1015
1016         rc = kibnal_post_receives(conn);
1017         if (rc != 0) {
1018                 CERROR("Can't post receives for %s: %d\n",
1019                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1020                 return rc;
1021         }
1022
1023         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
1024                 .RequestState           = QPStateReadyToSend,
1025                 .FlowControl            = TRUE,
1026                 .InitiatorDepth         = init_depth,
1027                 .SendPSN                = psn,
1028                 .LocalAckTimeout        = path->PktLifeTime + 2, /* 2 or 1? */
1029                 .RetryCount             = IBNAL_RETRY,
1030                 .RnrRetryCount          = IBNAL_RNR_RETRY,
1031                 .Attrs                  = (IB_QP_ATTR_FLOWCONTROL | 
1032                                            IB_QP_ATTR_INITIATORDEPTH | 
1033                                            IB_QP_ATTR_SENDPSN | 
1034                                            IB_QP_ATTR_LOCALACKTIMEOUT | 
1035                                            IB_QP_ATTR_RETRYCOUNT | 
1036                                            IB_QP_ATTR_RNRRETRYCOUNT),
1037         };
1038
1039         frc = iba_modify_qp(qp, &modify_attr, NULL);
1040         if (frc != FSUCCESS) {
1041                 CERROR("Can't set QP %s ready to send: %d\n",
1042                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1043                 return -EIO;
1044         }
1045
1046         frc = iba_query_qp(conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs, NULL);
1047         if (frc != FSUCCESS) {
1048                 CERROR ("Can't query QP %s attributes: %d\n",
1049                         libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1050                 return -EIO;
1051         }
1052         
1053         return 0;
1054 }
1055
1056 kib_conn_t *
1057 kibnal_create_conn (lnet_nid_t nid, int proto_version)
1058 {
1059         kib_conn_t  *conn;
1060         int          i;
1061         int          page_offset;
1062         int          ipage;
1063         int          rc;
1064         FSTATUS      frc;
1065         union {
1066                 IB_QP_ATTRIBUTES_CREATE    qp_create;
1067                 IB_QP_ATTRIBUTES_MODIFY    qp_attr;
1068         } params;
1069         
1070         LIBCFS_ALLOC (conn, sizeof (*conn));
1071         if (conn == NULL) {
1072                 CERROR ("Can't allocate connection for %s\n",
1073                         libcfs_nid2str(nid));
1074                 return (NULL);
1075         }
1076
1077         /* zero flags, NULL pointers etc... */
1078         memset (conn, 0, sizeof (*conn));
1079         conn->ibc_state = IBNAL_CONN_INIT_NOTHING;
1080         conn->ibc_version = proto_version;
1081
1082         INIT_LIST_HEAD (&conn->ibc_early_rxs);
1083         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
1084         INIT_LIST_HEAD (&conn->ibc_tx_queue);
1085         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
1086         INIT_LIST_HEAD (&conn->ibc_active_txs);
1087         spin_lock_init (&conn->ibc_lock);
1088         
1089         atomic_inc (&kibnal_data.kib_nconns);
1090         /* well not really, but I call destroy() on failure, which decrements */
1091
1092         LIBCFS_ALLOC(conn->ibc_cvars, sizeof (*conn->ibc_cvars));
1093         if (conn->ibc_cvars == NULL) {
1094                 CERROR ("Can't allocate connvars for %s\n", 
1095                         libcfs_nid2str(nid));
1096                 goto failed;
1097         }
1098         memset(conn->ibc_cvars, 0, sizeof (*conn->ibc_cvars));
1099
1100         LIBCFS_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
1101         if (conn->ibc_rxs == NULL) {
1102                 CERROR("Cannot allocate RX descriptors for %s\n",
1103                        libcfs_nid2str(nid));
1104                 goto failed;
1105         }
1106         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
1107
1108         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES);
1109         if (rc != 0) {
1110                 CERROR("Can't allocate RX buffers for %s\n",
1111                        libcfs_nid2str(nid));
1112                 goto failed;
1113         }
1114         
1115         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
1116                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
1117                 kib_rx_t    *rx = &conn->ibc_rxs[i];
1118
1119                 rx->rx_conn = conn;
1120                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1121                              page_offset);
1122
1123                 rx->rx_hca_msg = kibnal_data.kib_whole_mem.md_addr +
1124                                  lnet_page2phys(page) + page_offset;
1125                 
1126                 page_offset += IBNAL_MSG_SIZE;
1127                 LASSERT (page_offset <= PAGE_SIZE);
1128
1129                 if (page_offset == PAGE_SIZE) {
1130                         page_offset = 0;
1131                         ipage++;
1132                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
1133                 }
1134         }
1135
1136         params.qp_create = (IB_QP_ATTRIBUTES_CREATE) {
1137                 .Type                    = QPTypeReliableConnected,
1138                 .SendQDepth              = (1 + IBNAL_MAX_RDMA_FRAGS) *
1139                                            (*kibnal_tunables.kib_concurrent_sends),
1140                 .RecvQDepth              = IBNAL_RX_MSGS,
1141                 .SendDSListDepth         = 1,
1142                 .RecvDSListDepth         = 1,
1143                 .SendCQHandle            = kibnal_data.kib_cq,
1144                 .RecvCQHandle            = kibnal_data.kib_cq,
1145                 .PDHandle                = kibnal_data.kib_pd,
1146                 .SendSignaledCompletions = TRUE,
1147         };
1148         frc = iba_create_qp(kibnal_data.kib_hca, &params.qp_create, NULL,
1149                             &conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs);
1150         if (frc != 0) {
1151                 CERROR ("Can't create QP %s: %d\n", libcfs_nid2str(nid), frc);
1152                 goto failed;
1153         }
1154
1155         /* Mark QP created */
1156         kibnal_set_conn_state(conn, IBNAL_CONN_INIT_QP);
1157
1158         params.qp_attr = (IB_QP_ATTRIBUTES_MODIFY) {
1159                 .RequestState             = QPStateInit,
1160                 .Attrs                    = (IB_QP_ATTR_PORTGUID |
1161                                              IB_QP_ATTR_PKEYINDEX |
1162                                              IB_QP_ATTR_ACCESSCONTROL),
1163                 .PortGUID                 = kibnal_data.kib_port_guid,
1164                 .PkeyIndex                = 0,
1165                 .AccessControl = { 
1166                         .s = {
1167                                 .RdmaWrite = 1,
1168                                 .RdmaRead  = 1,
1169                         },
1170                 },
1171         };
1172         frc = iba_modify_qp(conn->ibc_qp, &params.qp_attr, NULL);
1173         if (frc != 0) {
1174                 CERROR ("Can't set QP %s state to INIT: %d\n",
1175                         libcfs_nid2str(nid), frc);
1176                 goto failed;
1177         }
1178
1179         frc = iba_query_qp(conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs, NULL);
1180         if (frc != FSUCCESS) {
1181                 CERROR ("Can't query QP %s attributes: %d\n",
1182                         libcfs_nid2str(nid), frc);
1183                 goto failed;
1184         }
1185
1186         /* 1 ref for caller */
1187         atomic_set (&conn->ibc_refcount, 1);
1188         CDEBUG(D_NET, "New conn %p\n", conn);
1189         return (conn);
1190         
1191  failed:
1192         kibnal_destroy_conn (conn);
1193         return (NULL);
1194 }
1195
1196 void
1197 kibnal_destroy_conn (kib_conn_t *conn)
1198 {
1199         FSTATUS frc;
1200
1201         LASSERT (!in_interrupt());
1202         
1203         CDEBUG (D_NET, "connection %s\n", 
1204                 (conn->ibc_peer) == NULL ? "<ANON>" :
1205                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
1206
1207         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1208         LASSERT (list_empty(&conn->ibc_early_rxs));
1209         LASSERT (list_empty(&conn->ibc_tx_queue));
1210         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1211         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1212         LASSERT (list_empty(&conn->ibc_active_txs));
1213         LASSERT (conn->ibc_nsends_posted == 0);
1214
1215         switch (conn->ibc_state) {
1216         case IBNAL_CONN_INIT_NOTHING:
1217         case IBNAL_CONN_INIT_QP:
1218         case IBNAL_CONN_DISCONNECTED:
1219                 break;
1220
1221         default:
1222                 /* conn must either have never engaged with the CM, or have
1223                  * completely disengaged from it */
1224                 CERROR("Bad conn %s state %d\n",
1225                        (conn->ibc_peer) == NULL ? "<anon>" :
1226                        libcfs_nid2str(conn->ibc_peer->ibp_nid), conn->ibc_state);
1227                 LBUG();
1228         }
1229
1230         if (conn->ibc_cep != NULL) {
1231                 frc = iba_cm_destroy_cep(conn->ibc_cep);
1232                 if (frc != FSUCCESS)
1233                         CERROR("Error destroying CEP %p: %d\n",
1234                                conn->ibc_cep, frc);
1235         }
1236
1237         if (conn->ibc_qp != NULL) {
1238                 frc = iba_destroy_qp(conn->ibc_qp);
1239                 if (frc != FSUCCESS)
1240                         CERROR("Error destroying QP %p: %d\n",
1241                                conn->ibc_qp, frc);
1242         }
1243
1244         if (conn->ibc_rx_pages != NULL) 
1245                 kibnal_free_pages(conn->ibc_rx_pages);
1246         
1247         if (conn->ibc_rxs != NULL)
1248                 LIBCFS_FREE(conn->ibc_rxs, 
1249                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1250
1251         if (conn->ibc_cvars != NULL)
1252                 LIBCFS_FREE(conn->ibc_cvars, sizeof(*conn->ibc_cvars));
1253
1254         if (conn->ibc_peer != NULL)
1255                 kibnal_peer_decref(conn->ibc_peer);
1256
1257         LIBCFS_FREE(conn, sizeof (*conn));
1258
1259         atomic_dec(&kibnal_data.kib_nconns);
1260 }
1261
1262 int
1263 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1264 {
1265         kib_conn_t         *conn;
1266         struct list_head   *ctmp;
1267         struct list_head   *cnxt;
1268         int                 count = 0;
1269
1270         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1271                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1272
1273                 count++;
1274                 kibnal_close_conn_locked (conn, why);
1275         }
1276
1277         return (count);
1278 }
1279
1280 int
1281 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1282 {
1283         kib_conn_t         *conn;
1284         struct list_head   *ctmp;
1285         struct list_head   *cnxt;
1286         int                 count = 0;
1287
1288         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1289                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1290
1291                 if (conn->ibc_incarnation == incarnation)
1292                         continue;
1293
1294                 CDEBUG(D_NET, "Closing stale conn nid:%s incarnation:"LPX64"("LPX64")\n",
1295                        libcfs_nid2str(peer->ibp_nid), 
1296                        conn->ibc_incarnation, incarnation);
1297                 
1298                 count++;
1299                 kibnal_close_conn_locked (conn, -ESTALE);
1300         }
1301
1302         return (count);
1303 }
1304
1305 int
1306 kibnal_close_matching_conns (lnet_nid_t nid)
1307 {
1308         unsigned long       flags;
1309         kib_peer_t         *peer;
1310         struct list_head   *ptmp;
1311         struct list_head   *pnxt;
1312         int                 lo;
1313         int                 hi;
1314         int                 i;
1315         int                 count = 0;
1316
1317         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1318
1319         if (nid != LNET_NID_ANY)
1320                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1321         else {
1322                 lo = 0;
1323                 hi = kibnal_data.kib_peer_hash_size - 1;
1324         }
1325
1326         for (i = lo; i <= hi; i++) {
1327                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1328
1329                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1330                         LASSERT (peer->ibp_persistence != 0 ||
1331                                  kibnal_peer_connecting(peer) ||
1332                                  !list_empty (&peer->ibp_conns));
1333
1334                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1335                                 continue;
1336
1337                         count += kibnal_close_peer_conns_locked (peer, 0);
1338                 }
1339         }
1340
1341         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1342
1343         /* wildcards always succeed */
1344         if (nid == LNET_NID_ANY)
1345                 return (0);
1346         
1347         return (count == 0 ? -ENOENT : 0);
1348 }
1349
1350 int
1351 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1352 {
1353         struct libcfs_ioctl_data *data = arg;
1354         int                       rc = -EINVAL;
1355         ENTRY;
1356
1357         LASSERT (ni == kibnal_data.kib_ni);
1358
1359         switch(cmd) {
1360         case IOC_LIBCFS_GET_PEER: {
1361                 lnet_nid_t   nid = 0;
1362                 int          share_count = 0;
1363
1364                 rc = kibnal_get_peer_info(data->ioc_count,
1365                                           &nid, &share_count);
1366                 data->ioc_nid   = nid;
1367                 data->ioc_count = share_count;
1368                 break;
1369         }
1370         case IOC_LIBCFS_ADD_PEER: {
1371                 rc = kibnal_add_persistent_peer (data->ioc_nid);
1372                 break;
1373         }
1374         case IOC_LIBCFS_DEL_PEER: {
1375                 rc = kibnal_del_peer (data->ioc_nid);
1376                 break;
1377         }
1378         case IOC_LIBCFS_GET_CONN: {
1379                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1380
1381                 if (conn == NULL)
1382                         rc = -ENOENT;
1383                 else {
1384                         rc = 0;
1385                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1386                         kibnal_conn_decref(conn);
1387                 }
1388                 break;
1389         }
1390         case IOC_LIBCFS_CLOSE_CONNECTION: {
1391                 rc = kibnal_close_matching_conns (data->ioc_nid);
1392                 break;
1393         }
1394         case IOC_LIBCFS_REGISTER_MYNID: {
1395                 if (ni->ni_nid == data->ioc_nid) {
1396                         rc = 0;
1397                 } else {
1398                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1399                                libcfs_nid2str(data->ioc_nid),
1400                                libcfs_nid2str(ni->ni_nid));
1401                         rc = -EINVAL;
1402                 }
1403                 break;
1404         }
1405         }
1406
1407         RETURN(rc);
1408 }
1409
1410 void
1411 kibnal_free_pages (kib_pages_t *p)
1412 {
1413         int     npages = p->ibp_npages;
1414         int     i;
1415         
1416         for (i = 0; i < npages; i++)
1417                 if (p->ibp_pages[i] != NULL)
1418                         __free_page(p->ibp_pages[i]);
1419         
1420         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1421 }
1422
1423 int
1424 kibnal_alloc_pages (kib_pages_t **pp, int npages)
1425 {
1426         kib_pages_t   *p;
1427         int            i;
1428
1429         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1430         if (p == NULL) {
1431                 CERROR ("Can't allocate buffer %d\n", npages);
1432                 return (-ENOMEM);
1433         }
1434
1435         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1436         p->ibp_npages = npages;
1437         
1438         for (i = 0; i < npages; i++) {
1439                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1440                 if (p->ibp_pages[i] == NULL) {
1441                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1442                         kibnal_free_pages(p);
1443                         return (-ENOMEM);
1444                 }
1445         }
1446
1447         *pp = p;
1448         return (0);
1449 }
1450
1451 int
1452 kibnal_alloc_tx_descs (void) 
1453 {
1454         int    i;
1455         
1456         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1457                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1458         if (kibnal_data.kib_tx_descs == NULL)
1459                 return -ENOMEM;
1460         
1461         memset(kibnal_data.kib_tx_descs, 0,
1462                IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1463
1464         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1465                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1466
1467 #if IBNAL_USE_FMR
1468                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1469                              sizeof(*tx->tx_pages));
1470                 if (tx->tx_pages == NULL)
1471                         return -ENOMEM;
1472 #else
1473                 LIBCFS_ALLOC(tx->tx_wrq, 
1474                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1475                              sizeof(*tx->tx_wrq));
1476                 if (tx->tx_wrq == NULL)
1477                         return -ENOMEM;
1478                 
1479                 LIBCFS_ALLOC(tx->tx_gl, 
1480                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1481                              sizeof(*tx->tx_gl));
1482                 if (tx->tx_gl == NULL)
1483                         return -ENOMEM;
1484                 
1485                 LIBCFS_ALLOC(tx->tx_rd, 
1486                              offsetof(kib_rdma_desc_t, 
1487                                       rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1488                 if (tx->tx_rd == NULL)
1489                         return -ENOMEM;
1490 #endif
1491         }
1492
1493         return 0;
1494 }
1495
1496 void
1497 kibnal_free_tx_descs (void) 
1498 {
1499         int    i;
1500
1501         if (kibnal_data.kib_tx_descs == NULL)
1502                 return;
1503
1504         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1505                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1506
1507 #if IBNAL_USE_FMR
1508                 if (tx->tx_pages != NULL)
1509                         LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1510                                     sizeof(*tx->tx_pages));
1511 #else
1512                 if (tx->tx_wrq != NULL)
1513                         LIBCFS_FREE(tx->tx_wrq, 
1514                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1515                                     sizeof(*tx->tx_wrq));
1516
1517                 if (tx->tx_gl != NULL)
1518                         LIBCFS_FREE(tx->tx_gl, 
1519                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1520                                     sizeof(*tx->tx_gl));
1521
1522                 if (tx->tx_rd != NULL)
1523                         LIBCFS_FREE(tx->tx_rd, 
1524                                     offsetof(kib_rdma_desc_t, 
1525                                              rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1526 #endif
1527         }
1528
1529         LIBCFS_FREE(kibnal_data.kib_tx_descs,
1530                     IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1531 }
1532
1533 int
1534 kibnal_setup_tx_descs (void)
1535 {
1536         int           ipage = 0;
1537         int           page_offset = 0;
1538         struct page  *page;
1539         kib_tx_t     *tx;
1540         int           i;
1541         int           rc;
1542
1543         /* pre-mapped messages are not bigger than 1 page */
1544         CLASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1545
1546         /* No fancy arithmetic when we do the buffer calculations */
1547         CLASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1548
1549         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1550                                 IBNAL_TX_MSG_PAGES());
1551         if (rc != 0)
1552                 return (rc);
1553
1554         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1555                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1556                 tx = &kibnal_data.kib_tx_descs[i];
1557
1558 #if IBNAL_USE_FMR
1559                 /* Allocate an FMR for this TX so it can map src/sink buffers
1560                  * for large transfers */
1561 #endif
1562                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1563                                             page_offset);
1564
1565                 tx->tx_hca_msg = kibnal_data.kib_whole_mem.md_addr +
1566                                  lnet_page2phys(page) + page_offset;
1567
1568                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1569                        i, tx, tx->tx_msg, tx->tx_hca_msg);
1570
1571                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1572
1573                 page_offset += IBNAL_MSG_SIZE;
1574                 LASSERT (page_offset <= PAGE_SIZE);
1575
1576                 if (page_offset == PAGE_SIZE) {
1577                         page_offset = 0;
1578                         ipage++;
1579                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1580                 }
1581         }
1582         
1583         return (0);
1584 }
1585
1586 int
1587 kibnal_register_all_memory(void)
1588 {
1589         /* CAVEAT EMPTOR: this assumes all physical memory is in 1 contiguous
1590          * chunk starting at 0 */
1591         struct sysinfo     si;
1592         __u64              total;
1593         __u64              total2;
1594         __u64              roundup = (128<<20);     /* round up in big chunks */
1595         IB_MR_PHYS_BUFFER  phys;
1596         IB_ACCESS_CONTROL  access;
1597         FSTATUS            frc;
1598
1599         memset(&access, 0, sizeof(access));
1600         access.s.MWBindable = 1;
1601         access.s.LocalWrite = 1;
1602         access.s.RdmaRead = 1;
1603         access.s.RdmaWrite = 1;
1604
1605         /* XXX we don't bother with first-gen cards */
1606         if (kibnal_data.kib_hca_attrs.VendorId == 0xd0b7 && 
1607             kibnal_data.kib_hca_attrs.DeviceId == 0x3101) {
1608                 CERROR("Can't register all memory on first generation HCAs\n");
1609                 return -EINVAL;
1610         }
1611
1612         si_meminfo(&si);
1613
1614         CDEBUG(D_NET, "si_meminfo: %lu/%u, num_physpages %lu/%lu\n",
1615                si.totalram, si.mem_unit, num_physpages, PAGE_SIZE);
1616
1617         total = ((__u64)si.totalram) * si.mem_unit;
1618         total2 = num_physpages * PAGE_SIZE;
1619         if (total < total2)
1620                 total = total2;
1621
1622         if (total == 0) {
1623                 CERROR("Can't determine memory size\n");
1624                 return -ENOMEM;
1625         }
1626                  
1627         roundup = (128<<20);
1628         total = (total + (roundup - 1)) & ~(roundup - 1);
1629
1630         phys.PhysAddr = 0;
1631         phys.Length = total;
1632
1633         frc = iba_register_contig_pmr(kibnal_data.kib_hca, 0, &phys, 1, 0,
1634                                       kibnal_data.kib_pd, access,
1635                                       &kibnal_data.kib_whole_mem.md_handle,
1636                                       &kibnal_data.kib_whole_mem.md_addr,
1637                                       &kibnal_data.kib_whole_mem.md_lkey,
1638                                       &kibnal_data.kib_whole_mem.md_rkey);
1639
1640         if (frc != FSUCCESS) {
1641                 CERROR("registering physical memory failed: %d\n", frc);
1642                 return -EIO;
1643         }
1644
1645         CDEBUG(D_WARNING, "registered phys mem from 0("LPX64") for "LPU64"("LPU64") -> "LPX64"\n",
1646                phys.PhysAddr, total, phys.Length, kibnal_data.kib_whole_mem.md_addr);
1647
1648         return 0;
1649 }
1650
1651 void
1652 kibnal_shutdown (lnet_ni_t *ni)
1653 {
1654         int   i;
1655         int   rc;
1656
1657         LASSERT (ni == kibnal_data.kib_ni);
1658         LASSERT (ni->ni_data == &kibnal_data);
1659        
1660         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1661                atomic_read (&libcfs_kmemory));
1662
1663         switch (kibnal_data.kib_init) {
1664         default:
1665                 CERROR ("Unexpected state %d\n", kibnal_data.kib_init);
1666                 LBUG();
1667
1668         case IBNAL_INIT_ALL:
1669                 /* stop accepting connections, prevent new peers and start to
1670                  * tear down all existing ones... */
1671                 kibnal_stop_listener(1);
1672
1673                 /* Wait for all peer state to clean up */
1674                 i = 2;
1675                 while (atomic_read (&kibnal_data.kib_npeers) != 0) {
1676                         i++;
1677                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1678                                "waiting for %d peers to disconnect\n",
1679                                atomic_read (&kibnal_data.kib_npeers));
1680                         set_current_state (TASK_UNINTERRUPTIBLE);
1681                         schedule_timeout (HZ);
1682                 }
1683                 /* fall through */
1684
1685         case IBNAL_INIT_CQ:
1686                 rc = iba_destroy_cq(kibnal_data.kib_cq);
1687                 if (rc != 0)
1688                         CERROR ("Destroy CQ error: %d\n", rc);
1689                 /* fall through */
1690
1691         case IBNAL_INIT_TXD:
1692                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1693                 /* fall through */
1694
1695         case IBNAL_INIT_MD:
1696                 rc = iba_deregister_mr(kibnal_data.kib_whole_mem.md_handle);
1697                 if (rc != FSUCCESS)
1698                         CERROR ("Deregister memory: %d\n", rc);
1699                 /* fall through */
1700
1701         case IBNAL_INIT_PD:
1702                 rc = iba_free_pd(kibnal_data.kib_pd);
1703                 if (rc != 0)
1704                         CERROR ("Destroy PD error: %d\n", rc);
1705                 /* fall through */
1706
1707         case IBNAL_INIT_SD:
1708                 rc = iba_sd_deregister(kibnal_data.kib_sd);
1709                 if (rc != 0)
1710                         CERROR ("Deregister SD error: %d\n", rc);
1711                 /* fall through */
1712
1713         case IBNAL_INIT_PORTATTRS:
1714                 LIBCFS_FREE(kibnal_data.kib_hca_attrs.PortAttributesList,
1715                             kibnal_data.kib_hca_attrs.PortAttributesListSize);
1716                 /* fall through */
1717
1718         case IBNAL_INIT_HCA:
1719                 rc = iba_close_ca(kibnal_data.kib_hca);
1720                 if (rc != 0)
1721                         CERROR ("Close HCA  error: %d\n", rc);
1722                 /* fall through */
1723
1724         case IBNAL_INIT_DATA:
1725                 LASSERT (atomic_read (&kibnal_data.kib_npeers) == 0);
1726                 LASSERT (kibnal_data.kib_peers != NULL);
1727                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1728                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1729                 }
1730                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1731                 LASSERT (list_empty (&kibnal_data.kib_connd_zombies));
1732                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1733                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1734
1735                 /* flag threads to terminate; wake and wait for them to die */
1736                 kibnal_data.kib_shutdown = 1;
1737                 wake_up_all (&kibnal_data.kib_sched_waitq);
1738                 wake_up_all (&kibnal_data.kib_connd_waitq);
1739
1740                 i = 2;
1741                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1742                         i++;
1743                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1744                                "Waiting for %d threads to terminate\n",
1745                                atomic_read (&kibnal_data.kib_nthreads));
1746                         set_current_state (TASK_INTERRUPTIBLE);
1747                         schedule_timeout (HZ);
1748                 }
1749                 /* fall through */
1750                 
1751         case IBNAL_INIT_NOTHING:
1752                 break;
1753         }
1754
1755         kibnal_free_tx_descs();
1756
1757         if (kibnal_data.kib_peers != NULL)
1758                 LIBCFS_FREE (kibnal_data.kib_peers,
1759                              sizeof (struct list_head) * 
1760                              kibnal_data.kib_peer_hash_size);
1761
1762         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1763                atomic_read (&libcfs_kmemory));
1764
1765         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1766         PORTAL_MODULE_UNUSE;
1767 }
1768
1769 int 
1770 kibnal_get_ipif_name(char *ifname, int ifname_size, int idx)
1771 {
1772         char  *basename = *kibnal_tunables.kib_ipif_basename;
1773         int    n = strlen(basename);
1774         int    baseidx;
1775         int    m;
1776
1777         if (n == 0) {                           /* empty string */
1778                 CERROR("Empty IP interface basename specified\n");
1779                 return -EINVAL;
1780         }
1781
1782         for (m = n; m > 0; m--)                 /* find max numeric postfix */
1783                 if (sscanf(basename + m - 1, "%d", &baseidx) != 1)
1784                         break;
1785
1786         if (m == 0)                             /* just a number */
1787                 m = n;
1788
1789         if (m == n)                             /* no postfix */
1790                 baseidx = 1;                    /* default to 1 */
1791
1792         if (m >= ifname_size)
1793                 m = ifname_size - 1;
1794
1795         memcpy(ifname, basename, m);            /* copy prefix name */
1796         
1797         snprintf(ifname + m, ifname_size - m, "%d", baseidx + idx);
1798         
1799         if (strlen(ifname) == ifname_size - 1) {
1800                 CERROR("IP interface basename %s too long\n", basename);
1801                 return -EINVAL;
1802         }
1803         
1804         return 0;
1805 }
1806
1807 int
1808 kibnal_startup (lnet_ni_t *ni)
1809 {
1810         char                ipif_name[32];
1811         __u32               ip;
1812         __u32               netmask;
1813         int                 up;
1814         int                 nob;
1815         struct timeval      tv;
1816         IB_PORT_ATTRIBUTES *pattr;
1817         FSTATUS             frc;
1818         int                 rc;
1819         __u32               n;
1820         int                 i;
1821
1822         LASSERT (ni->ni_lnd == &the_kiblnd);
1823
1824         /* Only 1 instance supported */
1825         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1826                 CERROR ("Only 1 instance supported\n");
1827                 return -EPERM;
1828         }
1829
1830         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1831                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1832                         *kibnal_tunables.kib_credits,
1833                         *kibnal_tunables.kib_ntx);
1834                 return -EINVAL;
1835         }
1836
1837         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1838         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1839
1840         CLASSERT (LNET_MAX_INTERFACES > 1);
1841
1842         if (ni->ni_interfaces[0] == NULL) {
1843                 kibnal_data.kib_hca_idx = 0;
1844         } else {
1845                 /* Use the HCA specified in 'networks=' */
1846                 if (ni->ni_interfaces[1] != NULL) {
1847                         CERROR("Multiple interfaces not supported\n");
1848                         return -EPERM;
1849                 }
1850                 
1851                 /* Parse <number> into kib_hca_idx */
1852                 nob = strlen(ni->ni_interfaces[0]);
1853                 if (sscanf(ni->ni_interfaces[0], "%d%n", 
1854                            &kibnal_data.kib_hca_idx, &nob) < 1 ||
1855                     nob != strlen(ni->ni_interfaces[0])) {
1856                         CERROR("Can't parse interface '%s'\n",
1857                                ni->ni_interfaces[0]);
1858                         return -EINVAL;
1859                 }
1860         }
1861
1862         rc = kibnal_get_ipif_name(ipif_name, sizeof(ipif_name),
1863                                   kibnal_data.kib_hca_idx);
1864         if (rc != 0)
1865                 return rc;
1866         
1867         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1868         if (rc != 0) {
1869                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1870                 return -ENETDOWN;
1871         }
1872         
1873         if (!up) {
1874                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1875                 return -ENETDOWN;
1876         }
1877         
1878         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1879
1880         ni->ni_data = &kibnal_data;
1881         kibnal_data.kib_ni = ni;
1882
1883         do_gettimeofday(&tv);
1884         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1885
1886         PORTAL_MODULE_USE;
1887
1888         rwlock_init(&kibnal_data.kib_global_lock);
1889
1890         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1891         LIBCFS_ALLOC (kibnal_data.kib_peers,
1892                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1893         if (kibnal_data.kib_peers == NULL) {
1894                 goto failed;
1895         }
1896         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1897                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1898
1899         spin_lock_init (&kibnal_data.kib_connd_lock);
1900         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1901         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1902         INIT_LIST_HEAD (&kibnal_data.kib_connd_zombies);
1903         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1904
1905         spin_lock_init (&kibnal_data.kib_sched_lock);
1906         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1907
1908         spin_lock_init (&kibnal_data.kib_tx_lock);
1909         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1910
1911         rc = kibnal_alloc_tx_descs();
1912         if (rc != 0) {
1913                 CERROR("Can't allocate tx descs\n");
1914                 goto failed;
1915         }
1916
1917         /* lists/ptrs/locks initialised */
1918         kibnal_data.kib_init = IBNAL_INIT_DATA;
1919         /*****************************************************/
1920
1921         kibnal_data.kib_sdretry.RetryCount = *kibnal_tunables.kib_sd_retries;
1922         kibnal_data.kib_sdretry.Timeout = (*kibnal_tunables.kib_timeout * 1000)/
1923                                           *kibnal_tunables.kib_sd_retries;
1924
1925         for (i = 0; i < IBNAL_N_SCHED; i++) {
1926                 rc = kibnal_thread_start (kibnal_scheduler,
1927                                           (void *)(unsigned long)i);
1928                 if (rc != 0) {
1929                         CERROR("Can't spawn iib scheduler[%d]: %d\n",
1930                                i, rc);
1931                         goto failed;
1932                 }
1933         }
1934
1935         rc = kibnal_thread_start (kibnal_connd, NULL);
1936         if (rc != 0) {
1937                 CERROR ("Can't spawn iib connd: %d\n", rc);
1938                 goto failed;
1939         }
1940
1941         n = sizeof(kibnal_data.kib_hca_guids) /
1942             sizeof(kibnal_data.kib_hca_guids[0]);
1943         frc = iba_get_caguids(&n, kibnal_data.kib_hca_guids);
1944         if (frc != FSUCCESS) {
1945                 CERROR ("Can't get HCA guids: %d\n", frc);
1946                 goto failed;
1947         }
1948
1949         if (n == 0) {
1950                 CERROR ("No HCAs found\n");
1951                 goto failed;
1952         }
1953
1954         if (n <= kibnal_data.kib_hca_idx) {
1955                 CERROR("Invalid HCA %d requested: (must be 0 - %d inclusive)\n",
1956                        kibnal_data.kib_hca_idx, n - 1);
1957                 goto failed;
1958         }
1959         
1960         /* Infinicon has per-HCA notification callbacks */
1961         frc = iba_open_ca(kibnal_data.kib_hca_guids[kibnal_data.kib_hca_idx],
1962                             kibnal_hca_callback,
1963                             kibnal_hca_async_callback,
1964                             NULL,
1965                             &kibnal_data.kib_hca);
1966         if (frc != FSUCCESS) {
1967                 CERROR ("Can't open HCA[%d]: %d\n", 
1968                         kibnal_data.kib_hca_idx, frc);
1969                 goto failed;
1970         }
1971         
1972         /* Channel Adapter opened */
1973         kibnal_data.kib_init = IBNAL_INIT_HCA;
1974         /*****************************************************/
1975
1976         kibnal_data.kib_hca_attrs.PortAttributesList = NULL;
1977         kibnal_data.kib_hca_attrs.PortAttributesListSize = 0;
1978         frc = iba_query_ca(kibnal_data.kib_hca,
1979                            &kibnal_data.kib_hca_attrs, NULL);
1980         if (frc != FSUCCESS) {
1981                 CERROR ("Can't size port attrs: %d\n", frc);
1982                 goto failed;
1983         }
1984         
1985         LIBCFS_ALLOC(kibnal_data.kib_hca_attrs.PortAttributesList,
1986                      kibnal_data.kib_hca_attrs.PortAttributesListSize);
1987         if (kibnal_data.kib_hca_attrs.PortAttributesList == NULL)
1988                 goto failed;
1989
1990         /* Port attrs allocated */
1991         kibnal_data.kib_init = IBNAL_INIT_PORTATTRS;
1992         /*****************************************************/
1993         
1994         frc = iba_query_ca(kibnal_data.kib_hca, &kibnal_data.kib_hca_attrs,
1995                            NULL);
1996         if (frc != FSUCCESS) {
1997                 CERROR ("Can't get port attrs for HCA %d: %d\n",
1998                         kibnal_data.kib_hca_idx, frc);
1999                 goto failed;
2000         }
2001
2002         for (i = 0, pattr = kibnal_data.kib_hca_attrs.PortAttributesList;
2003              pattr != NULL;
2004              i++, pattr = pattr->Next) {
2005                 switch (pattr->PortState) {
2006                 default:
2007                         CERROR("Unexpected port[%d] state %d\n",
2008                                i, pattr->PortState);
2009                         continue;
2010                 case PortStateDown:
2011                         CDEBUG(D_NET, "port[%d] Down\n", i);
2012                         continue;
2013                 case PortStateInit:
2014                         CDEBUG(D_NET, "port[%d] Init\n", i);
2015                         continue;
2016                 case PortStateArmed:
2017                         CDEBUG(D_NET, "port[%d] Armed\n", i);
2018                         continue;
2019                         
2020                 case PortStateActive:
2021                         CDEBUG(D_NET, "port[%d] Active\n", i);
2022                         kibnal_data.kib_port = i;
2023                         kibnal_data.kib_port_guid = pattr->GUID;
2024                         kibnal_data.kib_port_pkey = pattr->PkeyTable[0];
2025                         break;
2026                 }
2027                 break;
2028         }
2029
2030         if (pattr == NULL) {
2031                 CERROR ("Can't find an active port\n");
2032                 goto failed;
2033         }
2034
2035         CDEBUG(D_NET, "got guid "LPX64"\n", kibnal_data.kib_port_guid);
2036         
2037         frc = iba_sd_register(&kibnal_data.kib_sd, NULL);
2038         if (frc != FSUCCESS) {
2039                 CERROR ("Can't register with SD: %d\n", frc);
2040                 goto failed;
2041         }
2042         
2043         /* Registered with SD OK */
2044         kibnal_data.kib_init = IBNAL_INIT_SD;
2045         /*****************************************************/
2046
2047         frc = iba_alloc_pd(kibnal_data.kib_hca, 0, &kibnal_data.kib_pd);
2048         if (frc != FSUCCESS) {
2049                 CERROR ("Can't create PD: %d\n", rc);
2050                 goto failed;
2051         }
2052         
2053         /* flag PD initialised */
2054         kibnal_data.kib_init = IBNAL_INIT_PD;
2055         /*****************************************************/
2056
2057         rc = kibnal_register_all_memory();
2058         if (rc != 0) {
2059                 CERROR ("Can't register all memory\n");
2060                 goto failed;
2061         }
2062         
2063         /* flag whole memory MD initialised */
2064         kibnal_data.kib_init = IBNAL_INIT_MD;
2065         /*****************************************************/
2066
2067         rc = kibnal_setup_tx_descs();
2068         if (rc != 0) {
2069                 CERROR ("Can't register tx descs: %d\n", rc);
2070                 goto failed;
2071         }
2072         
2073         /* flag TX descs initialised */
2074         kibnal_data.kib_init = IBNAL_INIT_TXD;
2075         /*****************************************************/
2076         
2077         frc = iba_create_cq(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES(),
2078                             &kibnal_data.kib_cq, &kibnal_data.kib_cq,
2079                             &n);
2080         if (frc != FSUCCESS) {
2081                 CERROR ("Can't create RX CQ: %d\n", frc);
2082                 goto failed;
2083         }
2084
2085         /* flag CQ initialised */
2086         kibnal_data.kib_init = IBNAL_INIT_CQ;
2087         /*****************************************************/
2088         
2089         if (n < IBNAL_CQ_ENTRIES()) {
2090                 CERROR ("CQ only has %d entries: %d needed\n", 
2091                         n, IBNAL_CQ_ENTRIES());
2092                 goto failed;
2093         }
2094
2095         rc = iba_rearm_cq(kibnal_data.kib_cq, CQEventSelNextWC);
2096         if (rc != 0) {
2097                 CERROR ("Failed to re-arm completion queue: %d\n", rc);
2098                 goto failed;
2099         }
2100         
2101         rc = kibnal_start_listener();
2102         if (rc != 0) {
2103                 CERROR("Can't start listener: %d\n", rc);
2104                 goto failed;
2105         }
2106
2107         /* flag everything initialised */
2108         kibnal_data.kib_init = IBNAL_INIT_ALL;
2109         /*****************************************************/
2110
2111         return (0);
2112
2113  failed:
2114         kibnal_shutdown (ni);    
2115         return (-ENETDOWN);
2116 }
2117
2118 void __exit
2119 kibnal_module_fini (void)
2120 {
2121         lnet_unregister_lnd(&the_kiblnd);
2122         kibnal_tunables_fini();
2123 }
2124
2125 int __init
2126 kibnal_module_init (void)
2127 {
2128         int    rc;
2129
2130         if (the_lnet.ln_ptlcompat != 0) {
2131                 LCONSOLE_ERROR("IIB does not support portals compatibility mode\n");
2132                 return -ENODEV;
2133         }
2134         
2135         rc = kibnal_tunables_init();
2136         if (rc != 0)
2137                 return rc;
2138
2139         lnet_register_lnd(&the_kiblnd);
2140
2141         return 0;
2142 }
2143
2144 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2145 MODULE_DESCRIPTION("Kernel Infinicon IB LND v1.00");
2146 MODULE_LICENSE("GPL");
2147
2148 module_init(kibnal_module_init);
2149 module_exit(kibnal_module_fini);
2150