Whamcloud - gitweb
b=16073
[fs/lustre-release.git] / lnet / klnds / iiblnd / iiblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "iiblnd.h"
25
26 lnd_t the_kiblnd = {
27         .lnd_type          = IIBLND,
28         .lnd_startup       = kibnal_startup,
29         .lnd_shutdown      = kibnal_shutdown,
30         .lnd_ctl           = kibnal_ctl,
31         .lnd_send          = kibnal_send,
32         .lnd_recv          = kibnal_recv,
33         .lnd_eager_recv    = kibnal_eager_recv,
34 };
35
36 kib_data_t              kibnal_data;
37
38 __u32 
39 kibnal_cksum (void *ptr, int nob)
40 {
41         char  *c  = ptr;
42         __u32  sum = 0;
43
44         while (nob-- > 0)
45                 sum = ((sum << 1) | (sum >> 31)) + *c++;
46         
47         /* ensure I don't return 0 (== no checksum) */
48         return (sum == 0) ? 1 : sum;
49 }
50
51 void
52 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
53 {
54         msg->ibm_type = type;
55         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
56 }
57
58 void
59 kibnal_pack_msg(kib_msg_t *msg, __u32 version, int credits, 
60                 lnet_nid_t dstnid, __u64 dststamp, __u64 seq)
61 {
62         /* CAVEAT EMPTOR! all message fields not set here should have been
63          * initialised previously. */
64         msg->ibm_magic    = IBNAL_MSG_MAGIC;
65         msg->ibm_version  = version;
66         /*   ibm_type */
67         msg->ibm_credits  = credits;
68         /*   ibm_nob */
69         msg->ibm_cksum    = 0;
70         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(kibnal_data.kib_ni->ni_nid,
71                                                   dstnid);
72         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
73         msg->ibm_dstnid   = dstnid;
74         msg->ibm_dststamp = dststamp;
75         msg->ibm_seq      = seq;
76
77         if (*kibnal_tunables.kib_cksum) {
78                 /* NB ibm_cksum zero while computing cksum */
79                 msg->ibm_cksum = kibnal_cksum(msg, msg->ibm_nob);
80         }
81 }
82
83 void
84 kibnal_pack_connmsg(kib_msg_t *msg, __u32 version, int nob, 
85                     int type, lnet_nid_t dstnid, __u64 dststamp)
86 {
87         LASSERT (nob >= offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t));
88
89         memset(msg, 0, nob);
90         kibnal_init_msg(msg, type, sizeof(kib_connparams_t));
91
92         msg->ibm_u.connparams.ibcp_queue_depth = IBNAL_MSG_QUEUE_SIZE;
93         msg->ibm_u.connparams.ibcp_max_msg_size = IBNAL_MSG_SIZE;
94         msg->ibm_u.connparams.ibcp_max_frags = IBNAL_MAX_RDMA_FRAGS;
95
96         kibnal_pack_msg(msg, version, 0, dstnid, dststamp, 0);
97 }
98
99 int
100 kibnal_unpack_msg(kib_msg_t *msg, __u32 expected_version, int nob)
101 {
102         const int hdr_size = offsetof(kib_msg_t, ibm_u);
103         __u32     msg_cksum;
104         __u32     msg_version;
105         int       flip;
106         int       msg_nob;
107 #if !IBNAL_USE_FMR
108         int       i;
109         int       n;
110 #endif
111         /* 6 bytes are enough to have received magic + version */
112         if (nob < 6) {
113                 CERROR("Short message: %d\n", nob);
114                 return -EPROTO;
115         }
116
117         /* Future protocol version compatibility support!
118          * If the iiblnd-specific protocol changes, or when LNET unifies
119          * protocols over all LNDs, the initial connection will negotiate a
120          * protocol version.  If I find this, I avoid any console errors.  If
121          * my is doing connection establishment, the reject will tell the peer
122          * which version I'm running. */
123
124         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
125                 flip = 0;
126         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
127                 flip = 1;
128         } else {
129                 if (msg->ibm_magic == LNET_PROTO_MAGIC ||
130                     msg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
131                         return -EPROTO;
132
133                 /* Completely out to lunch */
134                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
135                 return -EPROTO;
136         }
137
138         msg_version = flip ? __swab16(msg->ibm_version) : msg->ibm_version;
139         if (expected_version == 0) {
140                 if (msg_version != IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD &&
141                     msg_version != IBNAL_MSG_VERSION)
142                         return -EPROTO;
143         } else if (msg_version != expected_version) {
144                 CERROR("Bad version: %x(%x expected)\n", 
145                        msg_version, expected_version);
146                 return -EPROTO;
147         }
148
149         if (nob < hdr_size) {
150                 CERROR("Short message: %d\n", nob);
151                 return -EPROTO;
152         }
153
154         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
155         if (msg_nob > nob) {
156                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
157                 return -EPROTO;
158         }
159
160         /* checksum must be computed with ibm_cksum zero and BEFORE anything
161          * gets flipped */
162         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
163         msg->ibm_cksum = 0;
164         if (msg_cksum != 0 &&
165             msg_cksum != kibnal_cksum(msg, msg_nob)) {
166                 CERROR("Bad checksum\n");
167                 return -EPROTO;
168         }
169         msg->ibm_cksum = msg_cksum;
170         
171         if (flip) {
172                 /* leave magic unflipped as a clue to peer endianness */
173                 msg->ibm_version = msg_version;
174                 CLASSERT (sizeof(msg->ibm_type) == 1);
175                 CLASSERT (sizeof(msg->ibm_credits) == 1);
176                 msg->ibm_nob = msg_nob;
177                 __swab64s(&msg->ibm_srcnid);
178                 __swab64s(&msg->ibm_srcstamp);
179                 __swab64s(&msg->ibm_dstnid);
180                 __swab64s(&msg->ibm_dststamp);
181                 __swab64s(&msg->ibm_seq);
182         }
183         
184         if (msg->ibm_srcnid == LNET_NID_ANY) {
185                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
186                 return -EPROTO;
187         }
188
189         switch (msg->ibm_type) {
190         default:
191                 CERROR("Unknown message type %x\n", msg->ibm_type);
192                 return -EPROTO;
193                 
194         case IBNAL_MSG_NOOP:
195                 break;
196
197         case IBNAL_MSG_IMMEDIATE:
198                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
199                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
200                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
201                         return -EPROTO;
202                 }
203                 break;
204
205         case IBNAL_MSG_PUT_REQ:
206                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
207                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
208                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
209                         return -EPROTO;
210                 }
211                 break;
212
213         case IBNAL_MSG_PUT_ACK:
214                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
215                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
216                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
217                         return -EPROTO;
218                 }
219 #if IBNAL_USE_FMR
220                 if (flip) {
221                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
222                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
223                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
224                 }
225 #else
226                 if (flip) {
227                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
228                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrag);
229                 }
230                 
231                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrag;
232                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
233                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
234                                n, IBNAL_MAX_RDMA_FRAGS);
235                         return -EPROTO;
236                 }
237                 
238                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
239                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
240                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
241                         return -EPROTO;
242                 }
243
244                 if (flip) {
245                         for (i = 0; i < n; i++) {
246                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
247                                 __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr);
248                         }
249                 }
250 #endif
251                 break;
252
253         case IBNAL_MSG_GET_REQ:
254                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
255                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
256                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
257                         return -EPROTO;
258                 }
259 #if IBNAL_USE_FMR
260                 if (flip) {
261                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
262                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
263                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
264                 }
265 #else                
266                 if (flip) {
267                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
268                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrag);
269                 }
270
271                 n = msg->ibm_u.get.ibgm_rd.rd_nfrag;
272                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
273                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
274                                n, IBNAL_MAX_RDMA_FRAGS);
275                         return -EPROTO;
276                 }
277                 
278                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
279                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
280                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
281                         return -EPROTO;
282                 }
283                 
284                 if (flip)
285                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrag; i++) {
286                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
287                                 __swab64s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr);
288                         }
289 #endif
290                 break;
291
292         case IBNAL_MSG_PUT_NAK:
293         case IBNAL_MSG_PUT_DONE:
294         case IBNAL_MSG_GET_DONE:
295                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
296                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
297                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
298                         return -EPROTO;
299                 }
300                 if (flip)
301                         __swab32s(&msg->ibm_u.completion.ibcm_status);
302                 break;
303
304         case IBNAL_MSG_CONNREQ:
305         case IBNAL_MSG_CONNACK:
306                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
307                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
308                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
309                         return -EPROTO;
310                 }
311                 if (flip) {
312                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
313                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
314                         __swab32s(&msg->ibm_u.connparams.ibcp_max_frags);
315                 }
316                 break;
317         }
318         return 0;
319 }
320
321 IB_HANDLE
322 kibnal_create_cep(lnet_nid_t nid)
323 {
324         FSTATUS        frc;
325         __u32          u32val;
326         IB_HANDLE      cep;
327
328         cep = iba_cm_create_cep(CM_RC_TYPE);
329         if (cep == NULL) {
330                 CERROR ("Can't create CEP for %s\n",
331                         (nid == LNET_NID_ANY) ? "listener" :
332                         libcfs_nid2str(nid));
333                 return NULL;
334         }
335
336         if (nid == LNET_NID_ANY) {
337                 u32val = 1;
338                 frc = iba_cm_modify_cep(cep, CM_FLAG_ASYNC_ACCEPT,
339                                         (char *)&u32val, sizeof(u32val), 0);
340                 if (frc != FSUCCESS) {
341                         CERROR("Can't set async_accept: %d\n", frc);
342                         goto failed;
343                 }
344
345                 u32val = 0;                     /* sets system max */
346                 frc = iba_cm_modify_cep(cep, CM_FLAG_LISTEN_BACKLOG,
347                                         (char *)&u32val, sizeof(u32val), 0);
348                 if (frc != FSUCCESS) {
349                         CERROR("Can't set listen backlog: %d\n", frc);
350                         goto failed;
351                 }
352         }
353         
354         u32val = 1;
355         frc = iba_cm_modify_cep(cep, CM_FLAG_TIMEWAIT_CALLBACK,
356                                 (char *)&u32val, sizeof(u32val), 0);
357         if (frc != FSUCCESS) {
358                 CERROR("Can't set timewait_callback for %s: %d\n", 
359                         (nid == LNET_NID_ANY) ? "listener" :
360                         libcfs_nid2str(nid), frc);
361                 goto failed;
362         }
363
364         return cep;
365         
366  failed:
367         iba_cm_destroy_cep(cep);
368         return NULL;
369 }
370
371 #define IBNAL_CHECK_ADVERT 1
372 #if IBNAL_CHECK_ADVERT
373 void
374 kibnal_service_query_done (void *arg, QUERY *qry, 
375                            QUERY_RESULT_VALUES *qry_result)
376 {
377         int                    *rcp = arg;
378         FSTATUS                 frc = qry_result->Status;
379         SERVICE_RECORD_RESULTS *svc_rslt;
380         IB_SERVICE_RECORD      *svc;
381         lnet_nid_t              nid;
382
383         if (frc != FSUCCESS || qry_result->ResultDataSize == 0) {
384                 CERROR("Error checking advert: status %d data size %d\n",
385                        frc, qry_result->ResultDataSize);
386                 *rcp = -EIO;
387                 goto out;
388         }
389
390         svc_rslt = (SERVICE_RECORD_RESULTS *)qry_result->QueryResult;
391
392         if (svc_rslt->NumServiceRecords < 1) {
393                 CERROR("Check advert: %d records\n",
394                        svc_rslt->NumServiceRecords);
395                 *rcp = -ENOENT;
396                 goto out;
397         }
398
399         svc = &svc_rslt->ServiceRecords[0];
400         nid = le64_to_cpu(*kibnal_service_nid_field(svc));
401         
402         CDEBUG(D_NET, "Check advert: %s "LPX64" "LPX64":%04x\n",
403                libcfs_nid2str(nid), svc->RID.ServiceID, 
404                svc->RID.ServiceGID.Type.Global.InterfaceID, 
405                svc->RID.ServiceP_Key);
406
407         if (nid != kibnal_data.kib_ni->ni_nid) {
408                 CERROR("Check advert: Bad NID %s (%s expected)\n",
409                        libcfs_nid2str(nid),
410                        libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
411                 *rcp = -EINVAL;
412                 goto out;
413         }
414
415         if (svc->RID.ServiceID != *kibnal_tunables.kib_service_number) {
416                 CERROR("Check advert: Bad ServiceID "LPX64" (%x expected)\n",
417                        svc->RID.ServiceID,
418                        *kibnal_tunables.kib_service_number);
419                 *rcp = -EINVAL;
420                 goto out;
421         }
422
423         if (svc->RID.ServiceGID.Type.Global.InterfaceID != 
424             kibnal_data.kib_port_guid) {
425                 CERROR("Check advert: Bad GUID "LPX64" ("LPX64" expected)\n",
426                        svc->RID.ServiceGID.Type.Global.InterfaceID,
427                        kibnal_data.kib_port_guid);
428                 *rcp = -EINVAL;
429                 goto out;
430         }
431
432         if (svc->RID.ServiceP_Key != kibnal_data.kib_port_pkey) {
433                 CERROR("Check advert: Bad PKEY %04x (%04x expected)\n",
434                        svc->RID.ServiceP_Key, kibnal_data.kib_port_pkey);
435                 *rcp = -EINVAL;
436                 goto out;
437         }
438
439         CDEBUG(D_NET, "Check advert OK\n");
440         *rcp = 0;
441                 
442  out:
443         up (&kibnal_data.kib_listener_signal);                
444 }
445
446 int
447 kibnal_check_advert (void)
448 {
449         /* single-threaded */
450         static QUERY               qry;
451
452         FSTATUS                    frc;
453         int                        rc;
454
455         memset (&qry, 0, sizeof(qry));
456         qry.InputType = InputTypeServiceRecord;
457         qry.OutputType = OutputTypeServiceRecord;
458         kibnal_set_service_keys(&qry.InputValue.ServiceRecordValue.ServiceRecord,
459                                 kibnal_data.kib_ni->ni_nid);
460         qry.InputValue.ServiceRecordValue.ComponentMask = KIBNAL_SERVICE_KEY_MASK;
461
462         frc = iba_sd_query_port_fabric_info(kibnal_data.kib_sd, 
463                                             kibnal_data.kib_port_guid,
464                                             &qry, 
465                                             kibnal_service_query_done,
466                                             &kibnal_data.kib_sdretry, 
467                                             &rc);
468         if (frc != FPENDING) {
469                 CERROR ("Immediate error %d checking SM service\n", frc);
470                 return -EIO;
471         }
472         
473         down (&kibnal_data.kib_listener_signal);
474         
475         if (rc != 0)
476                 CERROR ("Error %d checking SM service\n", rc);
477         return rc;
478 }
479 #else
480 int
481 kibnal_check_advert(void)
482 {
483         return 0;
484 }
485 #endif
486
487 void 
488 kibnal_fill_fod(FABRIC_OPERATION_DATA *fod, FABRIC_OPERATION_TYPE type)
489 {
490         IB_SERVICE_RECORD     *svc;
491
492         memset (fod, 0, sizeof(*fod));
493         fod->Type = type;
494
495         svc = &fod->Value.ServiceRecordValue.ServiceRecord;
496         svc->RID.ServiceID = *kibnal_tunables.kib_service_number;
497         svc->RID.ServiceGID.Type.Global.InterfaceID = kibnal_data.kib_port_guid;
498         svc->RID.ServiceGID.Type.Global.SubnetPrefix = DEFAULT_SUBNET_PREFIX;
499         svc->RID.ServiceP_Key = kibnal_data.kib_port_pkey;
500         svc->ServiceLease = 0xffffffff;
501
502         kibnal_set_service_keys(svc, kibnal_data.kib_ni->ni_nid);
503 }
504
505 void
506 kibnal_service_setunset_done (void *arg, FABRIC_OPERATION_DATA *fod,
507                               FSTATUS frc, uint32 madrc)
508 {
509         *(FSTATUS *)arg = frc;
510         up (&kibnal_data.kib_listener_signal);
511 }
512
513 int
514 kibnal_advertise (void)
515 {
516         /* Single threaded here */
517         static FABRIC_OPERATION_DATA fod;
518
519         IB_SERVICE_RECORD *svc = &fod.Value.ServiceRecordValue.ServiceRecord;
520         FSTATUS            frc;
521         FSTATUS            frc2;
522
523         if (strlen(*kibnal_tunables.kib_service_name) >=
524             sizeof(svc->ServiceName)) {
525                 CERROR("Service name '%s' too long (%d chars max)\n",
526                        *kibnal_tunables.kib_service_name,
527                        (int)sizeof(svc->ServiceName) - 1);
528                 return -EINVAL;
529         }
530
531         kibnal_fill_fod(&fod, FabOpSetServiceRecord);
532
533         CDEBUG(D_NET, "Advertising service id "LPX64" %s:%s\n", 
534                svc->RID.ServiceID, svc->ServiceName, 
535                libcfs_nid2str(le64_to_cpu(*kibnal_service_nid_field(svc))));
536
537         frc = iba_sd_port_fabric_operation(kibnal_data.kib_sd,
538                                            kibnal_data.kib_port_guid,
539                                            &fod, 
540                                            kibnal_service_setunset_done, 
541                                            &kibnal_data.kib_sdretry,
542                                            &frc2);
543
544         if (frc != FSUCCESS && frc != FPENDING) {
545                 CERROR ("Immediate error %d advertising NID %s\n",
546                         frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
547                 return -EIO;
548         }
549
550         down (&kibnal_data.kib_listener_signal);
551
552         frc = frc2;
553         if (frc == FSUCCESS)
554                 return 0;
555         
556         CERROR ("Error %d advertising %s\n",
557                 frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
558         return -EIO;
559 }
560
561 void
562 kibnal_unadvertise (int expect_success)
563 {
564         /* single threaded */
565         static FABRIC_OPERATION_DATA fod;
566
567         IB_SERVICE_RECORD *svc = &fod.Value.ServiceRecordValue.ServiceRecord;
568         FSTATUS            frc;
569         FSTATUS            frc2;
570
571         LASSERT (kibnal_data.kib_ni->ni_nid != LNET_NID_ANY);
572
573         kibnal_fill_fod(&fod, FabOpDeleteServiceRecord);
574
575         CDEBUG(D_NET, "Unadvertising service %s:%s\n",
576                svc->ServiceName, 
577                libcfs_nid2str(le64_to_cpu(*kibnal_service_nid_field(svc))));
578         
579         frc = iba_sd_port_fabric_operation(kibnal_data.kib_sd,
580                                            kibnal_data.kib_port_guid,
581                                            &fod, 
582                                            kibnal_service_setunset_done, 
583                                            &kibnal_data.kib_sdretry, 
584                                            &frc2);
585         if (frc != FSUCCESS && frc != FPENDING) {
586                 CERROR ("Immediate error %d unadvertising NID %s\n",
587                         frc, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
588                 return;
589         }
590
591         down (&kibnal_data.kib_listener_signal);
592
593         CDEBUG(D_NET, "Unadvertise rc: %d\n", frc2);
594
595         if ((frc2 == FSUCCESS) == !!expect_success)
596                 return;
597
598         if (expect_success)
599                 CERROR("Error %d unadvertising NID %s\n",
600                        frc2, libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
601         else
602                 CWARN("Removed conflicting NID %s\n",
603                       libcfs_nid2str(kibnal_data.kib_ni->ni_nid));
604 }
605
606 void
607 kibnal_stop_listener(int normal_shutdown)
608 {
609         /* NB this also disables peer creation and destroys all existing
610          * peers */
611         IB_HANDLE      cep = kibnal_data.kib_listener_cep;
612         unsigned long  flags;
613         FSTATUS        frc;
614
615         LASSERT (cep != NULL);
616
617         kibnal_unadvertise(normal_shutdown);
618
619         frc = iba_cm_cancel(cep);
620         if (frc != FSUCCESS && frc != FPENDING)
621                 CERROR ("Error %d stopping listener\n", frc);
622
623         down(&kibnal_data.kib_listener_signal);
624
625         frc = iba_cm_destroy_cep(cep);
626         if (frc != FSUCCESS)
627                 CERROR ("Error %d destroying listener CEP\n", frc);
628
629         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
630         /* This assignment disables peer creation */
631         kibnal_data.kib_listener_cep = NULL;
632         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
633
634         /* Start to tear down any peers created while the listener was
635          * running */
636         kibnal_del_peer(LNET_NID_ANY);
637 }
638
639 int
640 kibnal_start_listener(void)
641 {
642         /* NB this also enables peer creation */
643
644         IB_HANDLE      cep;
645         CM_LISTEN_INFO info;
646         unsigned long  flags;
647         int            rc;
648         FSTATUS        frc;
649
650         LASSERT (kibnal_data.kib_listener_cep == NULL);
651         init_MUTEX_LOCKED (&kibnal_data.kib_listener_signal);
652
653         cep = kibnal_create_cep(LNET_NID_ANY);
654         if (cep == NULL)
655                 return -ENOMEM;
656
657         memset (&info, 0, sizeof(info));
658         info.ListenAddr.EndPt.SID = *kibnal_tunables.kib_service_number;
659
660         frc = iba_cm_listen(cep, &info, kibnal_listen_callback, NULL);
661         if (frc != FSUCCESS && frc != FPENDING) {
662                 CERROR ("iba_cm_listen error: %d\n", frc);
663
664                 iba_cm_destroy_cep(cep);
665                 return -EIO;
666         }
667
668         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
669         /* This assignment enables peer creation */
670         kibnal_data.kib_listener_cep = cep;
671         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
672
673         rc = kibnal_advertise();
674         if (rc == 0)
675                 rc = kibnal_check_advert();
676
677         if (rc == 0)
678                 return 0;
679
680         kibnal_stop_listener(0);
681         return rc;
682 }
683
684 int
685 kibnal_create_peer (kib_peer_t **peerp, lnet_nid_t nid)
686 {
687         kib_peer_t    *peer;
688         unsigned long  flags;
689         int            rc;
690
691         LASSERT (nid != LNET_NID_ANY);
692
693         LIBCFS_ALLOC (peer, sizeof (*peer));
694         if (peer == NULL) {
695                 CERROR("Cannot allocate peer\n");
696                 return -ENOMEM;
697         }
698
699         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
700
701         peer->ibp_nid = nid;
702         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
703
704         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
705         INIT_LIST_HEAD (&peer->ibp_conns);
706         INIT_LIST_HEAD (&peer->ibp_tx_queue);
707
708         peer->ibp_error = 0;
709         peer->ibp_last_alive = cfs_time_current();
710         peer->ibp_reconnect_interval = 0;       /* OK to connect at any time */
711
712         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
713         
714         if (atomic_read(&kibnal_data.kib_npeers) >=
715             *kibnal_tunables.kib_concurrent_peers) {
716                 rc = -EOVERFLOW;        /* !! but at least it distinguishes */
717         } else if (kibnal_data.kib_listener_cep == NULL) {
718                 rc = -ESHUTDOWN;        /* shutdown has started */
719         } else {
720                 rc = 0;
721                 /* npeers only grows with the global lock held */
722                 atomic_inc(&kibnal_data.kib_npeers);
723         }
724         
725         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
726
727         if (rc != 0) {
728                 CERROR("Can't create peer: %s\n", 
729                        (rc == -ESHUTDOWN) ? "shutting down" : 
730                        "too many peers");
731                 LIBCFS_FREE(peer, sizeof(*peer));
732         } else {
733                 *peerp = peer;
734         }
735         
736         return rc;
737 }
738
739 void
740 kibnal_destroy_peer (kib_peer_t *peer)
741 {
742
743         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
744         LASSERT (peer->ibp_persistence == 0);
745         LASSERT (!kibnal_peer_active(peer));
746         LASSERT (!kibnal_peer_connecting(peer));
747         LASSERT (list_empty (&peer->ibp_conns));
748         LASSERT (list_empty (&peer->ibp_tx_queue));
749
750         LIBCFS_FREE (peer, sizeof (*peer));
751
752         /* NB a peer's connections keep a reference on their peer until
753          * they are destroyed, so we can be assured that _all_ state to do
754          * with this peer has been cleaned up when its refcount drops to
755          * zero. */
756         atomic_dec (&kibnal_data.kib_npeers);
757 }
758
759 /* the caller is responsible for accounting for the additional reference
760  * that this creates */
761 kib_peer_t *
762 kibnal_find_peer_locked (lnet_nid_t nid)
763 {
764         struct list_head *peer_list = kibnal_nid2peerlist (nid);
765         struct list_head *tmp;
766         kib_peer_t       *peer;
767
768         list_for_each (tmp, peer_list) {
769
770                 peer = list_entry (tmp, kib_peer_t, ibp_list);
771
772                 LASSERT (peer->ibp_persistence != 0 ||
773                          kibnal_peer_connecting(peer) ||
774                          !list_empty (&peer->ibp_conns));
775
776                 if (peer->ibp_nid != nid)
777                         continue;
778
779                 CDEBUG(D_NET, "got peer %s (%d)\n",
780                        libcfs_nid2str(nid), atomic_read (&peer->ibp_refcount));
781                 return (peer);
782         }
783         return (NULL);
784 }
785
786 void
787 kibnal_unlink_peer_locked (kib_peer_t *peer)
788 {
789         LASSERT (peer->ibp_persistence == 0);
790         LASSERT (list_empty(&peer->ibp_conns));
791
792         LASSERT (kibnal_peer_active(peer));
793         list_del_init (&peer->ibp_list);
794         /* lose peerlist's ref */
795         kibnal_peer_decref(peer);
796 }
797
798 int
799 kibnal_get_peer_info (int index, lnet_nid_t *nidp, int *persistencep)
800 {
801         kib_peer_t        *peer;
802         struct list_head  *ptmp;
803         unsigned long      flags;
804         int                i;
805
806         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
807
808         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
809
810                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
811
812                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
813                         LASSERT (peer->ibp_persistence != 0 ||
814                                  kibnal_peer_connecting(peer) ||
815                                  !list_empty (&peer->ibp_conns));
816
817                         if (index-- > 0)
818                                 continue;
819
820                         *nidp = peer->ibp_nid;
821                         *persistencep = peer->ibp_persistence;
822
823                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
824                                                flags);
825                         return (0);
826                 }
827         }
828
829         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
830         return (-ENOENT);
831 }
832
833 int
834 kibnal_add_persistent_peer (lnet_nid_t nid)
835 {
836         unsigned long      flags;
837         kib_peer_t        *peer;
838         kib_peer_t        *peer2;
839         int                rc;
840         
841         if (nid == LNET_NID_ANY)
842                 return (-EINVAL);
843
844         rc = kibnal_create_peer(&peer, nid);
845         if (rc != 0)
846                 return rc;
847
848         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
849
850         /* I'm always called with a reference on kibnal_data.kib_ni
851          * so shutdown can't have started */
852         LASSERT (kibnal_data.kib_listener_cep != NULL);
853
854         peer2 = kibnal_find_peer_locked (nid);
855         if (peer2 != NULL) {
856                 kibnal_peer_decref (peer);
857                 peer = peer2;
858         } else {
859                 /* peer table takes existing ref on peer */
860                 list_add_tail (&peer->ibp_list,
861                                kibnal_nid2peerlist (nid));
862         }
863
864         peer->ibp_persistence++;
865         
866         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
867         return (0);
868 }
869
870 void
871 kibnal_del_peer_locked (kib_peer_t *peer)
872 {
873         struct list_head *ctmp;
874         struct list_head *cnxt;
875         kib_conn_t       *conn;
876
877         peer->ibp_persistence = 0;
878
879         if (list_empty(&peer->ibp_conns)) {
880                 kibnal_unlink_peer_locked(peer);
881         } else {
882                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
883                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
884
885                         kibnal_close_conn_locked (conn, 0);
886                 }
887                 /* NB peer is no longer persistent; closing its last conn
888                  * unlinked it. */
889         }
890         /* NB peer now unlinked; might even be freed if the peer table had the
891          * last ref on it. */
892 }
893
894 int
895 kibnal_del_peer (lnet_nid_t nid)
896 {
897         unsigned long      flags;
898         CFS_LIST_HEAD     (zombies);
899         struct list_head  *ptmp;
900         struct list_head  *pnxt;
901         kib_peer_t        *peer;
902         int                lo;
903         int                hi;
904         int                i;
905         int                rc = -ENOENT;
906
907         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
908
909         if (nid != LNET_NID_ANY)
910                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
911         else {
912                 lo = 0;
913                 hi = kibnal_data.kib_peer_hash_size - 1;
914         }
915
916         for (i = lo; i <= hi; i++) {
917                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
918                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
919                         LASSERT (peer->ibp_persistence != 0 ||
920                                  kibnal_peer_connecting(peer) ||
921                                  !list_empty (&peer->ibp_conns));
922
923                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
924                                 continue;
925
926                         if (!list_empty(&peer->ibp_tx_queue)) {
927                                 LASSERT (list_empty(&peer->ibp_conns));
928
929                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
930                         }
931
932                         kibnal_del_peer_locked (peer);
933                         rc = 0;         /* matched something */
934                 }
935         }
936
937         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
938
939         kibnal_txlist_done(&zombies, -EIO);
940
941         return (rc);
942 }
943
944 kib_conn_t *
945 kibnal_get_conn_by_idx (int index)
946 {
947         kib_peer_t        *peer;
948         struct list_head  *ptmp;
949         kib_conn_t        *conn;
950         struct list_head  *ctmp;
951         unsigned long      flags;
952         int                i;
953
954         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
955
956         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
957                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
958
959                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
960                         LASSERT (peer->ibp_persistence != 0 ||
961                                  kibnal_peer_connecting(peer) ||
962                                  !list_empty (&peer->ibp_conns));
963
964                         list_for_each (ctmp, &peer->ibp_conns) {
965                                 if (index-- > 0)
966                                         continue;
967
968                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
969                                 kibnal_conn_addref(conn);
970                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
971                                                        flags);
972                                 return (conn);
973                         }
974                 }
975         }
976
977         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
978         return (NULL);
979 }
980
981 int
982 kibnal_conn_rts(kib_conn_t *conn, 
983                 __u32 qpn, __u8 resp_res, __u8 init_depth, __u32 psn)
984 {
985         IB_PATH_RECORD         *path = &conn->ibc_cvars->cv_path;
986         IB_HANDLE               qp = conn->ibc_qp;
987         IB_QP_ATTRIBUTES_MODIFY modify_attr;
988         FSTATUS                 frc;
989         int                     rc;
990
991         if (resp_res > kibnal_data.kib_hca_attrs.MaxQPResponderResources)
992                 resp_res = kibnal_data.kib_hca_attrs.MaxQPResponderResources;
993
994         if (init_depth > kibnal_data.kib_hca_attrs.MaxQPInitiatorDepth)
995                 init_depth = kibnal_data.kib_hca_attrs.MaxQPInitiatorDepth;
996
997         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
998                 .RequestState       = QPStateReadyToRecv,
999                 .RecvPSN            = IBNAL_STARTING_PSN,
1000                 .DestQPNumber       = qpn,
1001                 .ResponderResources = resp_res,
1002                 .MinRnrTimer        = UsecToRnrNakTimer(2000), /* 20 ms */
1003                 .Attrs              = (IB_QP_ATTR_RECVPSN |
1004                                        IB_QP_ATTR_DESTQPNUMBER | 
1005                                        IB_QP_ATTR_RESPONDERRESOURCES | 
1006                                        IB_QP_ATTR_DESTAV | 
1007                                        IB_QP_ATTR_PATHMTU | 
1008                                        IB_QP_ATTR_MINRNRTIMER),
1009         };
1010         GetAVFromPath(0, path, &modify_attr.PathMTU, NULL, 
1011                       &modify_attr.DestAV);
1012
1013         frc = iba_modify_qp(qp, &modify_attr, NULL);
1014         if (frc != FSUCCESS) {
1015                 CERROR("Can't set QP %s ready to receive: %d\n",
1016                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1017                 return -EIO;
1018         }
1019
1020         rc = kibnal_post_receives(conn);
1021         if (rc != 0) {
1022                 CERROR("Can't post receives for %s: %d\n",
1023                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1024                 return rc;
1025         }
1026
1027         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
1028                 .RequestState           = QPStateReadyToSend,
1029                 .FlowControl            = TRUE,
1030                 .InitiatorDepth         = init_depth,
1031                 .SendPSN                = psn,
1032                 .LocalAckTimeout        = path->PktLifeTime + 2, /* 2 or 1? */
1033                 .RetryCount             = IBNAL_RETRY,
1034                 .RnrRetryCount          = IBNAL_RNR_RETRY,
1035                 .Attrs                  = (IB_QP_ATTR_FLOWCONTROL | 
1036                                            IB_QP_ATTR_INITIATORDEPTH | 
1037                                            IB_QP_ATTR_SENDPSN | 
1038                                            IB_QP_ATTR_LOCALACKTIMEOUT | 
1039                                            IB_QP_ATTR_RETRYCOUNT | 
1040                                            IB_QP_ATTR_RNRRETRYCOUNT),
1041         };
1042
1043         frc = iba_modify_qp(qp, &modify_attr, NULL);
1044         if (frc != FSUCCESS) {
1045                 CERROR("Can't set QP %s ready to send: %d\n",
1046                        libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1047                 return -EIO;
1048         }
1049
1050         frc = iba_query_qp(conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs, NULL);
1051         if (frc != FSUCCESS) {
1052                 CERROR ("Can't query QP %s attributes: %d\n",
1053                         libcfs_nid2str(conn->ibc_peer->ibp_nid), frc);
1054                 return -EIO;
1055         }
1056         
1057         return 0;
1058 }
1059
1060 kib_conn_t *
1061 kibnal_create_conn (lnet_nid_t nid, int proto_version)
1062 {
1063         kib_conn_t  *conn;
1064         int          i;
1065         int          page_offset;
1066         int          ipage;
1067         int          rc;
1068         FSTATUS      frc;
1069         union {
1070                 IB_QP_ATTRIBUTES_CREATE    qp_create;
1071                 IB_QP_ATTRIBUTES_MODIFY    qp_attr;
1072         } params;
1073         
1074         LIBCFS_ALLOC (conn, sizeof (*conn));
1075         if (conn == NULL) {
1076                 CERROR ("Can't allocate connection for %s\n",
1077                         libcfs_nid2str(nid));
1078                 return (NULL);
1079         }
1080
1081         /* zero flags, NULL pointers etc... */
1082         memset (conn, 0, sizeof (*conn));
1083         conn->ibc_state = IBNAL_CONN_INIT_NOTHING;
1084         conn->ibc_version = proto_version;
1085
1086         INIT_LIST_HEAD (&conn->ibc_early_rxs);
1087         INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);
1088         INIT_LIST_HEAD (&conn->ibc_tx_queue);
1089         INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);
1090         INIT_LIST_HEAD (&conn->ibc_active_txs);
1091         spin_lock_init (&conn->ibc_lock);
1092         
1093         atomic_inc (&kibnal_data.kib_nconns);
1094         /* well not really, but I call destroy() on failure, which decrements */
1095
1096         LIBCFS_ALLOC(conn->ibc_cvars, sizeof (*conn->ibc_cvars));
1097         if (conn->ibc_cvars == NULL) {
1098                 CERROR ("Can't allocate connvars for %s\n", 
1099                         libcfs_nid2str(nid));
1100                 goto failed;
1101         }
1102         memset(conn->ibc_cvars, 0, sizeof (*conn->ibc_cvars));
1103
1104         LIBCFS_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
1105         if (conn->ibc_rxs == NULL) {
1106                 CERROR("Cannot allocate RX descriptors for %s\n",
1107                        libcfs_nid2str(nid));
1108                 goto failed;
1109         }
1110         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
1111
1112         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES);
1113         if (rc != 0) {
1114                 CERROR("Can't allocate RX buffers for %s\n",
1115                        libcfs_nid2str(nid));
1116                 goto failed;
1117         }
1118         
1119         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
1120                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
1121                 kib_rx_t    *rx = &conn->ibc_rxs[i];
1122
1123                 rx->rx_conn = conn;
1124                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1125                              page_offset);
1126
1127                 rx->rx_hca_msg = kibnal_data.kib_whole_mem.md_addr +
1128                                  lnet_page2phys(page) + page_offset;
1129                 
1130                 page_offset += IBNAL_MSG_SIZE;
1131                 LASSERT (page_offset <= PAGE_SIZE);
1132
1133                 if (page_offset == PAGE_SIZE) {
1134                         page_offset = 0;
1135                         ipage++;
1136                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
1137                 }
1138         }
1139
1140         params.qp_create = (IB_QP_ATTRIBUTES_CREATE) {
1141                 .Type                    = QPTypeReliableConnected,
1142                 .SendQDepth              = (1 + IBNAL_MAX_RDMA_FRAGS) *
1143                                            (*kibnal_tunables.kib_concurrent_sends),
1144                 .RecvQDepth              = IBNAL_RX_MSGS,
1145                 .SendDSListDepth         = 1,
1146                 .RecvDSListDepth         = 1,
1147                 .SendCQHandle            = kibnal_data.kib_cq,
1148                 .RecvCQHandle            = kibnal_data.kib_cq,
1149                 .PDHandle                = kibnal_data.kib_pd,
1150                 .SendSignaledCompletions = TRUE,
1151         };
1152         frc = iba_create_qp(kibnal_data.kib_hca, &params.qp_create, NULL,
1153                             &conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs);
1154         if (frc != 0) {
1155                 CERROR ("Can't create QP %s: %d\n", libcfs_nid2str(nid), frc);
1156                 goto failed;
1157         }
1158
1159         /* Mark QP created */
1160         kibnal_set_conn_state(conn, IBNAL_CONN_INIT_QP);
1161
1162         params.qp_attr = (IB_QP_ATTRIBUTES_MODIFY) {
1163                 .RequestState             = QPStateInit,
1164                 .Attrs                    = (IB_QP_ATTR_PORTGUID |
1165                                              IB_QP_ATTR_PKEYINDEX |
1166                                              IB_QP_ATTR_ACCESSCONTROL),
1167                 .PortGUID                 = kibnal_data.kib_port_guid,
1168                 .PkeyIndex                = 0,
1169                 .AccessControl = { 
1170                         .s = {
1171                                 .RdmaWrite = 1,
1172                                 .RdmaRead  = 1,
1173                         },
1174                 },
1175         };
1176         frc = iba_modify_qp(conn->ibc_qp, &params.qp_attr, NULL);
1177         if (frc != 0) {
1178                 CERROR ("Can't set QP %s state to INIT: %d\n",
1179                         libcfs_nid2str(nid), frc);
1180                 goto failed;
1181         }
1182
1183         frc = iba_query_qp(conn->ibc_qp, &conn->ibc_cvars->cv_qpattrs, NULL);
1184         if (frc != FSUCCESS) {
1185                 CERROR ("Can't query QP %s attributes: %d\n",
1186                         libcfs_nid2str(nid), frc);
1187                 goto failed;
1188         }
1189
1190         /* 1 ref for caller */
1191         atomic_set (&conn->ibc_refcount, 1);
1192         CDEBUG(D_NET, "New conn %p\n", conn);
1193         return (conn);
1194         
1195  failed:
1196         kibnal_destroy_conn (conn);
1197         return (NULL);
1198 }
1199
1200 void
1201 kibnal_destroy_conn (kib_conn_t *conn)
1202 {
1203         FSTATUS frc;
1204
1205         LASSERT (!in_interrupt());
1206         
1207         CDEBUG (D_NET, "connection %s\n", 
1208                 (conn->ibc_peer) == NULL ? "<ANON>" :
1209                 libcfs_nid2str(conn->ibc_peer->ibp_nid));
1210
1211         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
1212         LASSERT (list_empty(&conn->ibc_early_rxs));
1213         LASSERT (list_empty(&conn->ibc_tx_queue));
1214         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
1215         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
1216         LASSERT (list_empty(&conn->ibc_active_txs));
1217         LASSERT (conn->ibc_nsends_posted == 0);
1218
1219         switch (conn->ibc_state) {
1220         case IBNAL_CONN_INIT_NOTHING:
1221         case IBNAL_CONN_INIT_QP:
1222         case IBNAL_CONN_DISCONNECTED:
1223                 break;
1224
1225         default:
1226                 /* conn must either have never engaged with the CM, or have
1227                  * completely disengaged from it */
1228                 CERROR("Bad conn %s state %d\n",
1229                        (conn->ibc_peer) == NULL ? "<anon>" :
1230                        libcfs_nid2str(conn->ibc_peer->ibp_nid), conn->ibc_state);
1231                 LBUG();
1232         }
1233
1234         if (conn->ibc_cep != NULL) {
1235                 frc = iba_cm_destroy_cep(conn->ibc_cep);
1236                 if (frc != FSUCCESS)
1237                         CERROR("Error destroying CEP %p: %d\n",
1238                                conn->ibc_cep, frc);
1239         }
1240
1241         if (conn->ibc_qp != NULL) {
1242                 frc = iba_destroy_qp(conn->ibc_qp);
1243                 if (frc != FSUCCESS)
1244                         CERROR("Error destroying QP %p: %d\n",
1245                                conn->ibc_qp, frc);
1246         }
1247
1248         if (conn->ibc_rx_pages != NULL) 
1249                 kibnal_free_pages(conn->ibc_rx_pages);
1250         
1251         if (conn->ibc_rxs != NULL)
1252                 LIBCFS_FREE(conn->ibc_rxs, 
1253                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
1254
1255         if (conn->ibc_cvars != NULL)
1256                 LIBCFS_FREE(conn->ibc_cvars, sizeof(*conn->ibc_cvars));
1257
1258         if (conn->ibc_peer != NULL)
1259                 kibnal_peer_decref(conn->ibc_peer);
1260
1261         LIBCFS_FREE(conn, sizeof (*conn));
1262
1263         atomic_dec(&kibnal_data.kib_nconns);
1264 }
1265
1266 int
1267 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
1268 {
1269         kib_conn_t         *conn;
1270         struct list_head   *ctmp;
1271         struct list_head   *cnxt;
1272         int                 count = 0;
1273
1274         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1275                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1276
1277                 count++;
1278                 kibnal_close_conn_locked (conn, why);
1279         }
1280
1281         return (count);
1282 }
1283
1284 int
1285 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
1286 {
1287         kib_conn_t         *conn;
1288         struct list_head   *ctmp;
1289         struct list_head   *cnxt;
1290         int                 count = 0;
1291
1292         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
1293                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
1294
1295                 if (conn->ibc_incarnation == incarnation)
1296                         continue;
1297
1298                 CDEBUG(D_NET, "Closing stale conn nid:%s incarnation:"LPX64"("LPX64")\n",
1299                        libcfs_nid2str(peer->ibp_nid), 
1300                        conn->ibc_incarnation, incarnation);
1301                 
1302                 count++;
1303                 kibnal_close_conn_locked (conn, -ESTALE);
1304         }
1305
1306         return (count);
1307 }
1308
1309 int
1310 kibnal_close_matching_conns (lnet_nid_t nid)
1311 {
1312         unsigned long       flags;
1313         kib_peer_t         *peer;
1314         struct list_head   *ptmp;
1315         struct list_head   *pnxt;
1316         int                 lo;
1317         int                 hi;
1318         int                 i;
1319         int                 count = 0;
1320
1321         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1322
1323         if (nid != LNET_NID_ANY)
1324                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1325         else {
1326                 lo = 0;
1327                 hi = kibnal_data.kib_peer_hash_size - 1;
1328         }
1329
1330         for (i = lo; i <= hi; i++) {
1331                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1332
1333                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1334                         LASSERT (peer->ibp_persistence != 0 ||
1335                                  kibnal_peer_connecting(peer) ||
1336                                  !list_empty (&peer->ibp_conns));
1337
1338                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1339                                 continue;
1340
1341                         count += kibnal_close_peer_conns_locked (peer, 0);
1342                 }
1343         }
1344
1345         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1346
1347         /* wildcards always succeed */
1348         if (nid == LNET_NID_ANY)
1349                 return (0);
1350         
1351         return (count == 0 ? -ENOENT : 0);
1352 }
1353
1354 int
1355 kibnal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1356 {
1357         struct libcfs_ioctl_data *data = arg;
1358         int                       rc = -EINVAL;
1359         ENTRY;
1360
1361         LASSERT (ni == kibnal_data.kib_ni);
1362
1363         switch(cmd) {
1364         case IOC_LIBCFS_GET_PEER: {
1365                 lnet_nid_t   nid = 0;
1366                 int          share_count = 0;
1367
1368                 rc = kibnal_get_peer_info(data->ioc_count,
1369                                           &nid, &share_count);
1370                 data->ioc_nid   = nid;
1371                 data->ioc_count = share_count;
1372                 break;
1373         }
1374         case IOC_LIBCFS_ADD_PEER: {
1375                 rc = kibnal_add_persistent_peer (data->ioc_nid);
1376                 break;
1377         }
1378         case IOC_LIBCFS_DEL_PEER: {
1379                 rc = kibnal_del_peer (data->ioc_nid);
1380                 break;
1381         }
1382         case IOC_LIBCFS_GET_CONN: {
1383                 kib_conn_t *conn = kibnal_get_conn_by_idx (data->ioc_count);
1384
1385                 if (conn == NULL)
1386                         rc = -ENOENT;
1387                 else {
1388                         rc = 0;
1389                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1390                         kibnal_conn_decref(conn);
1391                 }
1392                 break;
1393         }
1394         case IOC_LIBCFS_CLOSE_CONNECTION: {
1395                 rc = kibnal_close_matching_conns (data->ioc_nid);
1396                 break;
1397         }
1398         case IOC_LIBCFS_REGISTER_MYNID: {
1399                 if (ni->ni_nid == data->ioc_nid) {
1400                         rc = 0;
1401                 } else {
1402                         CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1403                                libcfs_nid2str(data->ioc_nid),
1404                                libcfs_nid2str(ni->ni_nid));
1405                         rc = -EINVAL;
1406                 }
1407                 break;
1408         }
1409         }
1410
1411         RETURN(rc);
1412 }
1413
1414 void
1415 kibnal_free_pages (kib_pages_t *p)
1416 {
1417         int     npages = p->ibp_npages;
1418         int     i;
1419         
1420         for (i = 0; i < npages; i++)
1421                 if (p->ibp_pages[i] != NULL)
1422                         __free_page(p->ibp_pages[i]);
1423         
1424         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1425 }
1426
1427 int
1428 kibnal_alloc_pages (kib_pages_t **pp, int npages)
1429 {
1430         kib_pages_t   *p;
1431         int            i;
1432
1433         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1434         if (p == NULL) {
1435                 CERROR ("Can't allocate buffer %d\n", npages);
1436                 return (-ENOMEM);
1437         }
1438
1439         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1440         p->ibp_npages = npages;
1441         
1442         for (i = 0; i < npages; i++) {
1443                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1444                 if (p->ibp_pages[i] == NULL) {
1445                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1446                         kibnal_free_pages(p);
1447                         return (-ENOMEM);
1448                 }
1449         }
1450
1451         *pp = p;
1452         return (0);
1453 }
1454
1455 int
1456 kibnal_alloc_tx_descs (void) 
1457 {
1458         int    i;
1459         
1460         LIBCFS_ALLOC (kibnal_data.kib_tx_descs,
1461                       IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1462         if (kibnal_data.kib_tx_descs == NULL)
1463                 return -ENOMEM;
1464         
1465         memset(kibnal_data.kib_tx_descs, 0,
1466                IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1467
1468         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1469                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1470
1471 #if IBNAL_USE_FMR
1472                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1473                              sizeof(*tx->tx_pages));
1474                 if (tx->tx_pages == NULL)
1475                         return -ENOMEM;
1476 #else
1477                 LIBCFS_ALLOC(tx->tx_wrq, 
1478                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1479                              sizeof(*tx->tx_wrq));
1480                 if (tx->tx_wrq == NULL)
1481                         return -ENOMEM;
1482                 
1483                 LIBCFS_ALLOC(tx->tx_gl, 
1484                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1485                              sizeof(*tx->tx_gl));
1486                 if (tx->tx_gl == NULL)
1487                         return -ENOMEM;
1488                 
1489                 LIBCFS_ALLOC(tx->tx_rd, 
1490                              offsetof(kib_rdma_desc_t, 
1491                                       rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1492                 if (tx->tx_rd == NULL)
1493                         return -ENOMEM;
1494 #endif
1495         }
1496
1497         return 0;
1498 }
1499
1500 void
1501 kibnal_free_tx_descs (void) 
1502 {
1503         int    i;
1504
1505         if (kibnal_data.kib_tx_descs == NULL)
1506                 return;
1507
1508         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1509                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1510
1511 #if IBNAL_USE_FMR
1512                 if (tx->tx_pages != NULL)
1513                         LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1514                                     sizeof(*tx->tx_pages));
1515 #else
1516                 if (tx->tx_wrq != NULL)
1517                         LIBCFS_FREE(tx->tx_wrq, 
1518                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1519                                     sizeof(*tx->tx_wrq));
1520
1521                 if (tx->tx_gl != NULL)
1522                         LIBCFS_FREE(tx->tx_gl, 
1523                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1524                                     sizeof(*tx->tx_gl));
1525
1526                 if (tx->tx_rd != NULL)
1527                         LIBCFS_FREE(tx->tx_rd, 
1528                                     offsetof(kib_rdma_desc_t, 
1529                                              rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1530 #endif
1531         }
1532
1533         LIBCFS_FREE(kibnal_data.kib_tx_descs,
1534                     IBNAL_TX_MSGS() * sizeof(kib_tx_t));
1535 }
1536
1537 int
1538 kibnal_setup_tx_descs (void)
1539 {
1540         int           ipage = 0;
1541         int           page_offset = 0;
1542         struct page  *page;
1543         kib_tx_t     *tx;
1544         int           i;
1545         int           rc;
1546
1547         /* pre-mapped messages are not bigger than 1 page */
1548         CLASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1549
1550         /* No fancy arithmetic when we do the buffer calculations */
1551         CLASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1552
1553         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages,
1554                                 IBNAL_TX_MSG_PAGES());
1555         if (rc != 0)
1556                 return (rc);
1557
1558         for (i = 0; i < IBNAL_TX_MSGS(); i++) {
1559                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1560                 tx = &kibnal_data.kib_tx_descs[i];
1561
1562 #if IBNAL_USE_FMR
1563                 /* Allocate an FMR for this TX so it can map src/sink buffers
1564                  * for large transfers */
1565 #endif
1566                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1567                                             page_offset);
1568
1569                 tx->tx_hca_msg = kibnal_data.kib_whole_mem.md_addr +
1570                                  lnet_page2phys(page) + page_offset;
1571
1572                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1573                        i, tx, tx->tx_msg, tx->tx_hca_msg);
1574
1575                 list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
1576
1577                 page_offset += IBNAL_MSG_SIZE;
1578                 LASSERT (page_offset <= PAGE_SIZE);
1579
1580                 if (page_offset == PAGE_SIZE) {
1581                         page_offset = 0;
1582                         ipage++;
1583                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES());
1584                 }
1585         }
1586         
1587         return (0);
1588 }
1589
1590 int
1591 kibnal_register_all_memory(void)
1592 {
1593         /* CAVEAT EMPTOR: this assumes all physical memory is in 1 contiguous
1594          * chunk starting at 0 */
1595         struct sysinfo     si;
1596         __u64              total;
1597         __u64              total2;
1598         __u64              roundup = (128<<20);     /* round up in big chunks */
1599         IB_MR_PHYS_BUFFER  phys;
1600         IB_ACCESS_CONTROL  access;
1601         FSTATUS            frc;
1602
1603         memset(&access, 0, sizeof(access));
1604         access.s.MWBindable = 1;
1605         access.s.LocalWrite = 1;
1606         access.s.RdmaRead = 1;
1607         access.s.RdmaWrite = 1;
1608
1609         /* XXX we don't bother with first-gen cards */
1610         if (kibnal_data.kib_hca_attrs.VendorId == 0xd0b7 && 
1611             kibnal_data.kib_hca_attrs.DeviceId == 0x3101) {
1612                 CERROR("Can't register all memory on first generation HCAs\n");
1613                 return -EINVAL;
1614         }
1615
1616         si_meminfo(&si);
1617
1618         CDEBUG(D_NET, "si_meminfo: %lu/%u, num_physpages %lu/%lu\n",
1619                si.totalram, si.mem_unit, num_physpages, PAGE_SIZE);
1620
1621         total = ((__u64)si.totalram) * si.mem_unit;
1622         total2 = num_physpages * PAGE_SIZE;
1623         if (total < total2)
1624                 total = total2;
1625
1626         if (total == 0) {
1627                 CERROR("Can't determine memory size\n");
1628                 return -ENOMEM;
1629         }
1630                  
1631         roundup = (128<<20);
1632         total = (total + (roundup - 1)) & ~(roundup - 1);
1633
1634         phys.PhysAddr = 0;
1635         phys.Length = total;
1636
1637         frc = iba_register_contig_pmr(kibnal_data.kib_hca, 0, &phys, 1, 0,
1638                                       kibnal_data.kib_pd, access,
1639                                       &kibnal_data.kib_whole_mem.md_handle,
1640                                       &kibnal_data.kib_whole_mem.md_addr,
1641                                       &kibnal_data.kib_whole_mem.md_lkey,
1642                                       &kibnal_data.kib_whole_mem.md_rkey);
1643
1644         if (frc != FSUCCESS) {
1645                 CERROR("registering physical memory failed: %d\n", frc);
1646                 return -EIO;
1647         }
1648
1649         CDEBUG(D_WARNING, "registered phys mem from 0("LPX64") for "LPU64"("LPU64") -> "LPX64"\n",
1650                phys.PhysAddr, total, phys.Length, kibnal_data.kib_whole_mem.md_addr);
1651
1652         return 0;
1653 }
1654
1655 void
1656 kibnal_shutdown (lnet_ni_t *ni)
1657 {
1658         int   i;
1659         int   rc;
1660
1661         LASSERT (ni == kibnal_data.kib_ni);
1662         LASSERT (ni->ni_data == &kibnal_data);
1663        
1664         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1665                atomic_read (&libcfs_kmemory));
1666
1667         switch (kibnal_data.kib_init) {
1668         default:
1669                 CERROR ("Unexpected state %d\n", kibnal_data.kib_init);
1670                 LBUG();
1671
1672         case IBNAL_INIT_ALL:
1673                 /* stop accepting connections, prevent new peers and start to
1674                  * tear down all existing ones... */
1675                 kibnal_stop_listener(1);
1676
1677                 /* Wait for all peer state to clean up */
1678                 i = 2;
1679                 while (atomic_read (&kibnal_data.kib_npeers) != 0) {
1680                         i++;
1681                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1682                                "waiting for %d peers to disconnect\n",
1683                                atomic_read (&kibnal_data.kib_npeers));
1684                         set_current_state (TASK_UNINTERRUPTIBLE);
1685                         schedule_timeout (HZ);
1686                 }
1687                 /* fall through */
1688
1689         case IBNAL_INIT_CQ:
1690                 rc = iba_destroy_cq(kibnal_data.kib_cq);
1691                 if (rc != 0)
1692                         CERROR ("Destroy CQ error: %d\n", rc);
1693                 /* fall through */
1694
1695         case IBNAL_INIT_TXD:
1696                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1697                 /* fall through */
1698
1699         case IBNAL_INIT_MD:
1700                 rc = iba_deregister_mr(kibnal_data.kib_whole_mem.md_handle);
1701                 if (rc != FSUCCESS)
1702                         CERROR ("Deregister memory: %d\n", rc);
1703                 /* fall through */
1704
1705         case IBNAL_INIT_PD:
1706                 rc = iba_free_pd(kibnal_data.kib_pd);
1707                 if (rc != 0)
1708                         CERROR ("Destroy PD error: %d\n", rc);
1709                 /* fall through */
1710
1711         case IBNAL_INIT_SD:
1712                 rc = iba_sd_deregister(kibnal_data.kib_sd);
1713                 if (rc != 0)
1714                         CERROR ("Deregister SD error: %d\n", rc);
1715                 /* fall through */
1716
1717         case IBNAL_INIT_PORTATTRS:
1718                 LIBCFS_FREE(kibnal_data.kib_hca_attrs.PortAttributesList,
1719                             kibnal_data.kib_hca_attrs.PortAttributesListSize);
1720                 /* fall through */
1721
1722         case IBNAL_INIT_HCA:
1723                 rc = iba_close_ca(kibnal_data.kib_hca);
1724                 if (rc != 0)
1725                         CERROR ("Close HCA  error: %d\n", rc);
1726                 /* fall through */
1727
1728         case IBNAL_INIT_DATA:
1729                 LASSERT (atomic_read (&kibnal_data.kib_npeers) == 0);
1730                 LASSERT (kibnal_data.kib_peers != NULL);
1731                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1732                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1733                 }
1734                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1735                 LASSERT (list_empty (&kibnal_data.kib_connd_zombies));
1736                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1737                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1738
1739                 /* flag threads to terminate; wake and wait for them to die */
1740                 kibnal_data.kib_shutdown = 1;
1741                 wake_up_all (&kibnal_data.kib_sched_waitq);
1742                 wake_up_all (&kibnal_data.kib_connd_waitq);
1743
1744                 i = 2;
1745                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1746                         i++;
1747                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1748                                "Waiting for %d threads to terminate\n",
1749                                atomic_read (&kibnal_data.kib_nthreads));
1750                         set_current_state (TASK_INTERRUPTIBLE);
1751                         schedule_timeout (HZ);
1752                 }
1753                 /* fall through */
1754                 
1755         case IBNAL_INIT_NOTHING:
1756                 break;
1757         }
1758
1759         kibnal_free_tx_descs();
1760
1761         if (kibnal_data.kib_peers != NULL)
1762                 LIBCFS_FREE (kibnal_data.kib_peers,
1763                              sizeof (struct list_head) * 
1764                              kibnal_data.kib_peer_hash_size);
1765
1766         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1767                atomic_read (&libcfs_kmemory));
1768
1769         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1770         PORTAL_MODULE_UNUSE;
1771 }
1772
1773 int 
1774 kibnal_get_ipif_name(char *ifname, int ifname_size, int idx)
1775 {
1776         char  *basename = *kibnal_tunables.kib_ipif_basename;
1777         int    n = strlen(basename);
1778         int    baseidx;
1779         int    m;
1780
1781         if (n == 0) {                           /* empty string */
1782                 CERROR("Empty IP interface basename specified\n");
1783                 return -EINVAL;
1784         }
1785
1786         for (m = n; m > 0; m--)                 /* find max numeric postfix */
1787                 if (sscanf(basename + m - 1, "%d", &baseidx) != 1)
1788                         break;
1789
1790         if (m == 0)                             /* just a number */
1791                 m = n;
1792
1793         if (m == n)                             /* no postfix */
1794                 baseidx = 1;                    /* default to 1 */
1795
1796         if (m >= ifname_size)
1797                 m = ifname_size - 1;
1798
1799         memcpy(ifname, basename, m);            /* copy prefix name */
1800         
1801         snprintf(ifname + m, ifname_size - m, "%d", baseidx + idx);
1802         
1803         if (strlen(ifname) == ifname_size - 1) {
1804                 CERROR("IP interface basename %s too long\n", basename);
1805                 return -EINVAL;
1806         }
1807         
1808         return 0;
1809 }
1810
1811 int
1812 kibnal_startup (lnet_ni_t *ni)
1813 {
1814         char                ipif_name[32];
1815         __u32               ip;
1816         __u32               netmask;
1817         int                 up;
1818         int                 nob;
1819         struct timeval      tv;
1820         IB_PORT_ATTRIBUTES *pattr;
1821         FSTATUS             frc;
1822         int                 rc;
1823         __u32               n;
1824         int                 i;
1825
1826         LASSERT (ni->ni_lnd == &the_kiblnd);
1827
1828         /* Only 1 instance supported */
1829         if (kibnal_data.kib_init != IBNAL_INIT_NOTHING) {
1830                 CERROR ("Only 1 instance supported\n");
1831                 return -EPERM;
1832         }
1833
1834         if (*kibnal_tunables.kib_credits > *kibnal_tunables.kib_ntx) {
1835                 CERROR ("Can't set credits(%d) > ntx(%d)\n",
1836                         *kibnal_tunables.kib_credits,
1837                         *kibnal_tunables.kib_ntx);
1838                 return -EINVAL;
1839         }
1840
1841         ni->ni_maxtxcredits = *kibnal_tunables.kib_credits;
1842         ni->ni_peertxcredits = *kibnal_tunables.kib_peercredits;
1843
1844         CLASSERT (LNET_MAX_INTERFACES > 1);
1845
1846         if (ni->ni_interfaces[0] == NULL) {
1847                 kibnal_data.kib_hca_idx = 0;
1848         } else {
1849                 /* Use the HCA specified in 'networks=' */
1850                 if (ni->ni_interfaces[1] != NULL) {
1851                         CERROR("Multiple interfaces not supported\n");
1852                         return -EPERM;
1853                 }
1854                 
1855                 /* Parse <number> into kib_hca_idx */
1856                 nob = strlen(ni->ni_interfaces[0]);
1857                 if (sscanf(ni->ni_interfaces[0], "%d%n", 
1858                            &kibnal_data.kib_hca_idx, &nob) < 1 ||
1859                     nob != strlen(ni->ni_interfaces[0])) {
1860                         CERROR("Can't parse interface '%s'\n",
1861                                ni->ni_interfaces[0]);
1862                         return -EINVAL;
1863                 }
1864         }
1865
1866         rc = kibnal_get_ipif_name(ipif_name, sizeof(ipif_name),
1867                                   kibnal_data.kib_hca_idx);
1868         if (rc != 0)
1869                 return rc;
1870         
1871         rc = libcfs_ipif_query(ipif_name, &up, &ip, &netmask);
1872         if (rc != 0) {
1873                 CERROR("Can't query IPoIB interface %s: %d\n", ipif_name, rc);
1874                 return -ENETDOWN;
1875         }
1876         
1877         if (!up) {
1878                 CERROR("Can't query IPoIB interface %s: it's down\n", ipif_name);
1879                 return -ENETDOWN;
1880         }
1881         
1882         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
1883
1884         ni->ni_data = &kibnal_data;
1885         kibnal_data.kib_ni = ni;
1886
1887         do_gettimeofday(&tv);
1888         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1889
1890         PORTAL_MODULE_USE;
1891
1892         rwlock_init(&kibnal_data.kib_global_lock);
1893
1894         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1895         LIBCFS_ALLOC (kibnal_data.kib_peers,
1896                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1897         if (kibnal_data.kib_peers == NULL) {
1898                 goto failed;
1899         }
1900         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1901                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1902
1903         spin_lock_init (&kibnal_data.kib_connd_lock);
1904         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1905         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1906         INIT_LIST_HEAD (&kibnal_data.kib_connd_zombies);
1907         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1908
1909         spin_lock_init (&kibnal_data.kib_sched_lock);
1910         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1911
1912         spin_lock_init (&kibnal_data.kib_tx_lock);
1913         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1914
1915         rc = kibnal_alloc_tx_descs();
1916         if (rc != 0) {
1917                 CERROR("Can't allocate tx descs\n");
1918                 goto failed;
1919         }
1920
1921         /* lists/ptrs/locks initialised */
1922         kibnal_data.kib_init = IBNAL_INIT_DATA;
1923         /*****************************************************/
1924
1925         kibnal_data.kib_sdretry.RetryCount = *kibnal_tunables.kib_sd_retries;
1926         kibnal_data.kib_sdretry.Timeout = (*kibnal_tunables.kib_timeout * 1000)/
1927                                           *kibnal_tunables.kib_sd_retries;
1928
1929         for (i = 0; i < IBNAL_N_SCHED; i++) {
1930                 rc = kibnal_thread_start (kibnal_scheduler,
1931                                           (void *)(unsigned long)i);
1932                 if (rc != 0) {
1933                         CERROR("Can't spawn iib scheduler[%d]: %d\n",
1934                                i, rc);
1935                         goto failed;
1936                 }
1937         }
1938
1939         rc = kibnal_thread_start (kibnal_connd, NULL);
1940         if (rc != 0) {
1941                 CERROR ("Can't spawn iib connd: %d\n", rc);
1942                 goto failed;
1943         }
1944
1945         n = sizeof(kibnal_data.kib_hca_guids) /
1946             sizeof(kibnal_data.kib_hca_guids[0]);
1947         frc = iba_get_caguids(&n, kibnal_data.kib_hca_guids);
1948         if (frc != FSUCCESS) {
1949                 CERROR ("Can't get HCA guids: %d\n", frc);
1950                 goto failed;
1951         }
1952
1953         if (n == 0) {
1954                 CERROR ("No HCAs found\n");
1955                 goto failed;
1956         }
1957
1958         if (n <= kibnal_data.kib_hca_idx) {
1959                 CERROR("Invalid HCA %d requested: (must be 0 - %d inclusive)\n",
1960                        kibnal_data.kib_hca_idx, n - 1);
1961                 goto failed;
1962         }
1963         
1964         /* Infinicon has per-HCA notification callbacks */
1965         frc = iba_open_ca(kibnal_data.kib_hca_guids[kibnal_data.kib_hca_idx],
1966                             kibnal_hca_callback,
1967                             kibnal_hca_async_callback,
1968                             NULL,
1969                             &kibnal_data.kib_hca);
1970         if (frc != FSUCCESS) {
1971                 CERROR ("Can't open HCA[%d]: %d\n", 
1972                         kibnal_data.kib_hca_idx, frc);
1973                 goto failed;
1974         }
1975         
1976         /* Channel Adapter opened */
1977         kibnal_data.kib_init = IBNAL_INIT_HCA;
1978         /*****************************************************/
1979
1980         kibnal_data.kib_hca_attrs.PortAttributesList = NULL;
1981         kibnal_data.kib_hca_attrs.PortAttributesListSize = 0;
1982         frc = iba_query_ca(kibnal_data.kib_hca,
1983                            &kibnal_data.kib_hca_attrs, NULL);
1984         if (frc != FSUCCESS) {
1985                 CERROR ("Can't size port attrs: %d\n", frc);
1986                 goto failed;
1987         }
1988         
1989         LIBCFS_ALLOC(kibnal_data.kib_hca_attrs.PortAttributesList,
1990                      kibnal_data.kib_hca_attrs.PortAttributesListSize);
1991         if (kibnal_data.kib_hca_attrs.PortAttributesList == NULL)
1992                 goto failed;
1993
1994         /* Port attrs allocated */
1995         kibnal_data.kib_init = IBNAL_INIT_PORTATTRS;
1996         /*****************************************************/
1997         
1998         frc = iba_query_ca(kibnal_data.kib_hca, &kibnal_data.kib_hca_attrs,
1999                            NULL);
2000         if (frc != FSUCCESS) {
2001                 CERROR ("Can't get port attrs for HCA %d: %d\n",
2002                         kibnal_data.kib_hca_idx, frc);
2003                 goto failed;
2004         }
2005
2006         for (i = 0, pattr = kibnal_data.kib_hca_attrs.PortAttributesList;
2007              pattr != NULL;
2008              i++, pattr = pattr->Next) {
2009                 switch (pattr->PortState) {
2010                 default:
2011                         CERROR("Unexpected port[%d] state %d\n",
2012                                i, pattr->PortState);
2013                         continue;
2014                 case PortStateDown:
2015                         CDEBUG(D_NET, "port[%d] Down\n", i);
2016                         continue;
2017                 case PortStateInit:
2018                         CDEBUG(D_NET, "port[%d] Init\n", i);
2019                         continue;
2020                 case PortStateArmed:
2021                         CDEBUG(D_NET, "port[%d] Armed\n", i);
2022                         continue;
2023                         
2024                 case PortStateActive:
2025                         CDEBUG(D_NET, "port[%d] Active\n", i);
2026                         kibnal_data.kib_port = i;
2027                         kibnal_data.kib_port_guid = pattr->GUID;
2028                         kibnal_data.kib_port_pkey = pattr->PkeyTable[0];
2029                         break;
2030                 }
2031                 break;
2032         }
2033
2034         if (pattr == NULL) {
2035                 CERROR ("Can't find an active port\n");
2036                 goto failed;
2037         }
2038
2039         CDEBUG(D_NET, "got guid "LPX64"\n", kibnal_data.kib_port_guid);
2040         
2041         frc = iba_sd_register(&kibnal_data.kib_sd, NULL);
2042         if (frc != FSUCCESS) {
2043                 CERROR ("Can't register with SD: %d\n", frc);
2044                 goto failed;
2045         }
2046         
2047         /* Registered with SD OK */
2048         kibnal_data.kib_init = IBNAL_INIT_SD;
2049         /*****************************************************/
2050
2051         frc = iba_alloc_pd(kibnal_data.kib_hca, 0, &kibnal_data.kib_pd);
2052         if (frc != FSUCCESS) {
2053                 CERROR ("Can't create PD: %d\n", rc);
2054                 goto failed;
2055         }
2056         
2057         /* flag PD initialised */
2058         kibnal_data.kib_init = IBNAL_INIT_PD;
2059         /*****************************************************/
2060
2061         rc = kibnal_register_all_memory();
2062         if (rc != 0) {
2063                 CERROR ("Can't register all memory\n");
2064                 goto failed;
2065         }
2066         
2067         /* flag whole memory MD initialised */
2068         kibnal_data.kib_init = IBNAL_INIT_MD;
2069         /*****************************************************/
2070
2071         rc = kibnal_setup_tx_descs();
2072         if (rc != 0) {
2073                 CERROR ("Can't register tx descs: %d\n", rc);
2074                 goto failed;
2075         }
2076         
2077         /* flag TX descs initialised */
2078         kibnal_data.kib_init = IBNAL_INIT_TXD;
2079         /*****************************************************/
2080         
2081         frc = iba_create_cq(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES(),
2082                             &kibnal_data.kib_cq, &kibnal_data.kib_cq,
2083                             &n);
2084         if (frc != FSUCCESS) {
2085                 CERROR ("Can't create RX CQ: %d\n", frc);
2086                 goto failed;
2087         }
2088
2089         /* flag CQ initialised */
2090         kibnal_data.kib_init = IBNAL_INIT_CQ;
2091         /*****************************************************/
2092         
2093         if (n < IBNAL_CQ_ENTRIES()) {
2094                 CERROR ("CQ only has %d entries: %d needed\n", 
2095                         n, IBNAL_CQ_ENTRIES());
2096                 goto failed;
2097         }
2098
2099         rc = iba_rearm_cq(kibnal_data.kib_cq, CQEventSelNextWC);
2100         if (rc != 0) {
2101                 CERROR ("Failed to re-arm completion queue: %d\n", rc);
2102                 goto failed;
2103         }
2104         
2105         rc = kibnal_start_listener();
2106         if (rc != 0) {
2107                 CERROR("Can't start listener: %d\n", rc);
2108                 goto failed;
2109         }
2110
2111         /* flag everything initialised */
2112         kibnal_data.kib_init = IBNAL_INIT_ALL;
2113         /*****************************************************/
2114
2115         return (0);
2116
2117  failed:
2118         kibnal_shutdown (ni);    
2119         return (-ENETDOWN);
2120 }
2121
2122 void __exit
2123 kibnal_module_fini (void)
2124 {
2125         lnet_unregister_lnd(&the_kiblnd);
2126         kibnal_tunables_fini();
2127 }
2128
2129 int __init
2130 kibnal_module_init (void)
2131 {
2132         int    rc;
2133
2134         if (the_lnet.ln_ptlcompat != 0) {
2135                 LCONSOLE_ERROR_MSG(0x12c, "IIB does not support portals "
2136                                    "compatibility mode\n");
2137                 return -ENODEV;
2138         }
2139         
2140         rc = kibnal_tunables_init();
2141         if (rc != 0)
2142                 return rc;
2143
2144         lnet_register_lnd(&the_kiblnd);
2145
2146         return 0;
2147 }
2148
2149 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2150 MODULE_DESCRIPTION("Kernel Infinicon IB LND v1.00");
2151 MODULE_LICENSE("GPL");
2152
2153 module_init(kibnal_module_init);
2154 module_exit(kibnal_module_fini);
2155