Whamcloud - gitweb
* landing portals b1_4_sfw on HEAD
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 nal_t                   kibnal_api;
28 ptl_handle_ni_t         kibnal_ni;
29 kib_tunables_t          kibnal_tunables;
30
31 kib_data_t              kibnal_data = {
32         .kib_service_id = IBNAL_SERVICE_NUMBER,
33 };
34
35 #ifdef CONFIG_SYSCTL
36 #define IBNAL_SYSCTL             202
37
38 #define IBNAL_SYSCTL_TIMEOUT     1
39
40 static ctl_table kibnal_ctl_table[] = {
41         {IBNAL_SYSCTL_TIMEOUT, "timeout", 
42          &kibnal_tunables.kib_io_timeout, sizeof (int),
43          0644, NULL, &proc_dointvec},
44         { 0 }
45 };
46
47 static ctl_table kibnal_top_ctl_table[] = {
48         {IBNAL_SYSCTL, "vibnal", NULL, 0, 0555, kibnal_ctl_table},
49         { 0 }
50 };
51 #endif
52
53 #ifdef unused
54 void
55 print_service(IB_SERVICE_RECORD *service, char *tag, int rc)
56 {
57         char name[32];
58
59         if (service == NULL) 
60         {
61                 CWARN("tag       : %s\n"
62                       "status    : %d (NULL)\n", tag, rc);
63                 return;
64         }
65         strncpy (name, service->ServiceName, sizeof(name)-1);
66         name[sizeof(name)-1] = 0;
67         
68         CWARN("tag       : %s\n"
69               "status    : %d\n"
70               "service id: "LPX64"\n"
71               "name      : %s\n"
72               "NID       : "LPX64"\n", tag, rc,
73               service->RID.ServiceID, name,
74               *kibnal_service_nid_field(service));
75 }
76 #endif
77
78 /* 
79  * method is SUBN_ADM_SET, SUBN_ADM_GET, SUBN_ADM_DELETE. Tables not supported.
80  * nid is the nid to advertize/query/unadvertize
81  */
82 static void fill_sa_request(struct sa_request *request, int method, ptl_nid_t nid)
83 {
84         gsi_dtgrm_t *dtgrm = request->dtgrm_req;
85         sa_mad_v2_t *mad = (sa_mad_v2_t *) dtgrm->mad;
86         ib_service_record_v2_t *sr = (ib_service_record_v2_t *) mad->payload;
87         
88         memset(mad, 0, MAD_BLOCK_SIZE);
89
90         request->mad = mad;
91
92         dtgrm->rlid = kibnal_data.kib_port_attr.port_sma_address_info.sm_lid;
93         dtgrm->sl = kibnal_data.kib_port_attr.port_sma_address_info.service_level;
94
95         mad->hdr.base_ver = MAD_IB_BASE_VERSION;
96         mad->hdr.class = MAD_CLASS_SUBN_ADM;
97         mad->hdr.class_ver = 2;
98         mad->hdr.m.ms.method = method;
99         mad->hdr.attrib_id = SA_SERVICE_RECORD; /* something(?) will swap that field */
100
101                 /* Note: the transaction ID is set by the Voltaire stack if it is 0. */
102
103         /* TODO: change the 40 to sizeof(something) */
104         mad->payload_len = cpu_to_be32(0x40 /*header size */  +
105                                        sizeof (ib_service_record_v2_t));
106
107
108         mad->component_mask = cpu_to_be64(
109                                           (1ull << 0)  |        /* service_id       */
110                                           (1ull << 2)  |        /* service_pkey     */
111                                           (1ull << 6)  |        /* service_name     */
112                                           (1ull << 7)  |        /* service_data8[0] */
113                                           (1ull << 8)  |        /* service_data8[1] */
114                                           (1ull << 9)  |        /* service_data8[2] */
115                                           (1ull << 10) |        /* service_data8[3] */
116                                           (1ull << 11) |        /* service_data8[4] */
117                                           (1ull << 12) |        /* service_data8[5] */
118                                           (1ull << 13) |        /* service_data8[6] */
119                                           (1ull << 14)      /* service_data8[7] */
120                                           );
121
122         sr->service_id = cpu_to_be64(kibnal_data.kib_service_id);
123         sr->service_pkey = cpu_to_be16(kibnal_data.kib_port_pkey);
124
125         /* Set the service name and the data (bytes 0 to 7) in data8 */
126         kibnal_set_service_keys(sr, nid);
127
128         if (method == SUBN_ADM_SET) {
129                 mad->component_mask |= cpu_to_be64(
130                                                    (1ull << 1) |        /* service_gid       */
131                                                    (1ull << 4)          /* service_lease     */
132                                                    );
133
134                 sr->service_gid = kibnal_data.kib_port_gid;
135                 gid_swap(&sr->service_gid);
136                 sr->service_lease = cpu_to_be32(0xffffffff);
137         }
138
139         CDEBUG(D_NET, "SA request %02x for service id "LPX64" %s:"LPX64"\n",
140                mad->hdr.m.ms.method,
141                sr->service_id, 
142                sr->service_name,
143                *kibnal_service_nid_field(sr));
144 }
145
146 /* Do an advertizement operation: 
147  *   SUBN_ADM_GET = 0x01 (i.e. query),
148  *   SUBN_ADM_SET = 0x02 (i.e. advertize),
149  *   SUBN_ADM_DELETE = 0x15 (i.e. un-advertize).
150  * If callback is NULL, the function is synchronous (and context is ignored).
151  */
152 int kibnal_advertize_op(ptl_nid_t nid, int op, sa_request_cb_t callback, void *context)
153 {
154         struct sa_request *request;
155         int ret;
156
157         LASSERT (kibnal_data.kib_nid != PTL_NID_ANY);
158
159         CDEBUG(D_NET, "kibnal_advertize_op: nid="LPX64", op=%d\n", nid, op);
160
161         request = alloc_sa_request();
162         if (request == NULL) {
163                 CERROR("Cannot allocate a SA request");
164                 return -ENOMEM;
165         }
166                 
167         fill_sa_request(request, op, nid);
168
169         if (callback) {
170                 request->callback = callback;
171                 request->context = context;
172         } else {
173                 init_completion(&request->signal);
174         }
175
176         ret = vibnal_start_sa_request(request);
177         if (ret) {
178                 CERROR("vibnal_send_sa failed: %d\n", ret);
179                 free_sa_request(request);
180         } else {
181                 if (callback) {
182                         /* Return. The callback will have to free the SA request. */
183                         ret = 0;
184                 } else {
185                         wait_for_completion(&request->signal);
186
187                         ret = request->status;
188
189                         if (ret != 0) {
190                                 CERROR ("Error %d in advertising operation %d for NID "LPX64"\n",
191                                         ret, op, kibnal_data.kib_nid);
192                         }
193                         
194                         free_sa_request(request);
195                 }
196         }
197
198         return ret;
199 }
200
201 static int
202 kibnal_set_mynid(ptl_nid_t nid)
203 {
204         struct timeval tv;
205         lib_ni_t      *ni = &kibnal_lib.libnal_ni;
206         int            rc;
207         vv_return_t    retval;
208
209         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
210                nid, ni->ni_pid.nid);
211
212         do_gettimeofday(&tv);
213
214         down (&kibnal_data.kib_nid_mutex);
215
216         if (nid == kibnal_data.kib_nid) {
217                 /* no change of NID */
218                 up (&kibnal_data.kib_nid_mutex);
219                 return (0);
220         }
221
222         CDEBUG(D_NET, "NID "LPX64"("LPX64")\n",
223                kibnal_data.kib_nid, nid);
224
225         /* Unsubscribes the current NID */
226         if (kibnal_data.kib_nid != PTL_NID_ANY) {
227
228                 rc = kibnal_advertize_op(kibnal_data.kib_nid, SUBN_ADM_DELETE, NULL, NULL);
229
230                 if (rc) {
231                         CERROR("Error %d unadvertising NID "LPX64"\n",
232                                rc, kibnal_data.kib_nid);
233                 }
234         }
235         
236         kibnal_data.kib_nid = ni->ni_pid.nid = nid;
237         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
238
239         /* Destroys the current endpoint, if any. */
240         if (kibnal_data.kib_cep) {
241                 retval = cm_cancel(kibnal_data.kib_cep);
242                 if (retval)
243                         CERROR ("Error %d stopping listener\n", retval);
244         
245                 retval = cm_destroy_cep(kibnal_data.kib_cep);
246                 if (retval)
247                         CERROR ("Error %d destroying CEP\n", retval);
248         
249                 kibnal_data.kib_cep = NULL;
250         }
251         
252         /* Delete all existing peers and their connections after new
253          * NID/incarnation set to ensure no old connections in our brave
254          * new world. */
255         kibnal_del_peer (PTL_NID_ANY, 0);
256
257         if (kibnal_data.kib_nid == PTL_NID_ANY) {
258                 /* No new NID to install. The driver is shuting down. */
259                 up (&kibnal_data.kib_nid_mutex);
260                 return (0);
261         }
262
263         /* remove any previous advert (crashed node etc) */
264         kibnal_advertize_op(kibnal_data.kib_nid, SUBN_ADM_DELETE, NULL, NULL);
265
266         kibnal_data.kib_cep = cm_create_cep(cm_cep_transp_rc);
267         if (kibnal_data.kib_cep == NULL) {
268                 CERROR ("Can't create CEP\n");
269                 rc = -ENOMEM;
270         } else {
271                 cm_return_t cmret;
272                 cm_listen_data_t info;
273
274                 CDEBUG(D_NET, "Created CEP %p for listening\n", kibnal_data.kib_cep);
275
276                 memset(&info, 0, sizeof(info));
277                 info.listen_addr.end_pt.sid = kibnal_data.kib_service_id;
278
279                 cmret = cm_listen(kibnal_data.kib_cep, &info,
280                                   kibnal_listen_callback, NULL);
281                 if (cmret) {
282                         CERROR ("cm_listen error: %d\n", cmret);
283                         rc = -EINVAL;
284                 } else {
285                         rc = 0;
286                 }
287         }
288         
289         if (rc == 0) {
290                 rc = kibnal_advertize_op(kibnal_data.kib_nid, SUBN_ADM_SET, NULL, NULL);
291                 if (rc == 0) {
292 #ifdef IBNAL_CHECK_ADVERT
293                         kibnal_advertize_op(kibnal_data.kib_nid, SUBN_ADM_GET, NULL, NULL);
294 #endif
295                         up (&kibnal_data.kib_nid_mutex);
296                         return (0);
297                 }
298                 
299                 retval = cm_cancel (kibnal_data.kib_cep);
300                 if (retval)
301                         CERROR("cm_cancel failed: %d\n", retval);
302
303                 retval = cm_destroy_cep (kibnal_data.kib_cep);
304                 if (retval)
305                         CERROR("cm_destroy_cep failed: %d\n", retval);
306
307                 /* remove any peers that sprung up while I failed to
308                  * advertise myself */
309                 kibnal_del_peer (PTL_NID_ANY, 0);
310         }
311
312         kibnal_data.kib_nid = PTL_NID_ANY;
313         up (&kibnal_data.kib_nid_mutex);
314         return (rc);
315 }
316
317 kib_peer_t *
318 kibnal_create_peer (ptl_nid_t nid)
319 {
320         kib_peer_t *peer;
321
322         LASSERT (nid != PTL_NID_ANY);
323
324         PORTAL_ALLOC(peer, sizeof (*peer));
325         if (peer == NULL) {
326                 CERROR("Canot allocate perr\n");
327                 return (NULL);
328         }
329
330         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
331
332         peer->ibp_nid = nid;
333         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
334
335         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
336         INIT_LIST_HEAD (&peer->ibp_conns);
337         INIT_LIST_HEAD (&peer->ibp_tx_queue);
338
339         peer->ibp_reconnect_time = jiffies;
340         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
341
342         atomic_inc (&kibnal_data.kib_npeers);
343         return (peer);
344 }
345
346 void
347 kibnal_destroy_peer (kib_peer_t *peer)
348 {
349
350         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
351         LASSERT (peer->ibp_persistence == 0);
352         LASSERT (!kibnal_peer_active(peer));
353         LASSERT (peer->ibp_connecting == 0);
354         LASSERT (list_empty (&peer->ibp_conns));
355         LASSERT (list_empty (&peer->ibp_tx_queue));
356         
357         PORTAL_FREE (peer, sizeof (*peer));
358
359         /* NB a peer's connections keep a reference on their peer until
360          * they are destroyed, so we can be assured that _all_ state to do
361          * with this peer has been cleaned up when its refcount drops to
362          * zero. */
363         atomic_dec (&kibnal_data.kib_npeers);
364 }
365
366 /* the caller is responsible for accounting for the additional reference
367  * that this creates */
368 kib_peer_t *
369 kibnal_find_peer_locked (ptl_nid_t nid)
370 {
371         struct list_head *peer_list = kibnal_nid2peerlist (nid);
372         struct list_head *tmp;
373         kib_peer_t       *peer;
374
375         list_for_each (tmp, peer_list) {
376
377                 peer = list_entry (tmp, kib_peer_t, ibp_list);
378
379                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
380                          peer->ibp_connecting != 0 || /* creating conns */
381                          !list_empty (&peer->ibp_conns));  /* active conn */
382
383                 if (peer->ibp_nid != nid)
384                         continue;
385
386                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
387                        peer, nid, atomic_read (&peer->ibp_refcount));
388                 return (peer);
389         }
390         return (NULL);
391 }
392
393 kib_peer_t *
394 kibnal_get_peer (ptl_nid_t nid)
395 {
396         kib_peer_t     *peer;
397
398         read_lock (&kibnal_data.kib_global_lock);
399         peer = kibnal_find_peer_locked (nid);
400         if (peer != NULL)                       /* +1 ref for caller? */
401                 kib_peer_addref(peer);
402         read_unlock (&kibnal_data.kib_global_lock);
403
404         return (peer);
405 }
406
407 void
408 kibnal_unlink_peer_locked (kib_peer_t *peer)
409 {
410         LASSERT (peer->ibp_persistence == 0);
411         LASSERT (list_empty(&peer->ibp_conns));
412
413         LASSERT (kibnal_peer_active(peer));
414         list_del_init (&peer->ibp_list);
415         /* lose peerlist's ref */
416         kib_peer_decref(peer);
417 }
418
419 static int
420 kibnal_get_peer_info (int index, ptl_nid_t *nidp, int *persistencep)
421 {
422         kib_peer_t        *peer;
423         struct list_head  *ptmp;
424         int                i;
425
426         read_lock (&kibnal_data.kib_global_lock);
427
428         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
429
430                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
431
432                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
433                         LASSERT (peer->ibp_persistence != 0 ||
434                                  peer->ibp_connecting != 0 ||
435                                  !list_empty (&peer->ibp_conns));
436
437                         if (index-- > 0)
438                                 continue;
439
440                         *nidp = peer->ibp_nid;
441                         *persistencep = peer->ibp_persistence;
442
443                         read_unlock (&kibnal_data.kib_global_lock);
444                         return (0);
445                 }
446         }
447
448         read_unlock (&kibnal_data.kib_global_lock);
449         return (-ENOENT);
450 }
451
452 static int
453 kibnal_add_persistent_peer (ptl_nid_t nid)
454 {
455         unsigned long      flags;
456         kib_peer_t        *peer;
457         kib_peer_t        *peer2;
458         
459         if (nid == PTL_NID_ANY)
460                 return (-EINVAL);
461
462         peer = kibnal_create_peer (nid);
463         if (peer == NULL)
464                 return (-ENOMEM);
465
466         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
467
468         peer2 = kibnal_find_peer_locked (nid);
469         if (peer2 != NULL) {
470                 kib_peer_decref (peer);
471                 peer = peer2;
472         } else {
473                 /* peer table takes existing ref on peer */
474                 list_add_tail (&peer->ibp_list,
475                                kibnal_nid2peerlist (nid));
476         }
477
478         peer->ibp_persistence++;
479         
480         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
481         return (0);
482 }
483
484 static void
485 kibnal_del_peer_locked (kib_peer_t *peer, int single_share)
486 {
487         struct list_head *ctmp;
488         struct list_head *cnxt;
489         kib_conn_t       *conn;
490
491         if (!single_share)
492                 peer->ibp_persistence = 0;
493         else if (peer->ibp_persistence > 0)
494                 peer->ibp_persistence--;
495
496         if (peer->ibp_persistence != 0)
497                 return;
498
499         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
500                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
501
502                 kibnal_close_conn_locked (conn, 0);
503         }
504
505         /* NB peer unlinks itself when last conn is closed */
506 }
507
508 int
509 kibnal_del_peer (ptl_nid_t nid, int single_share)
510 {
511         unsigned long      flags;
512         struct list_head  *ptmp;
513         struct list_head  *pnxt;
514         kib_peer_t        *peer;
515         int                lo;
516         int                hi;
517         int                i;
518         int                rc = -ENOENT;
519
520         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
521
522         if (nid != PTL_NID_ANY)
523                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
524         else {
525                 lo = 0;
526                 hi = kibnal_data.kib_peer_hash_size - 1;
527         }
528
529         for (i = lo; i <= hi; i++) {
530                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
531                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
532                         LASSERT (peer->ibp_persistence != 0 ||
533                                  peer->ibp_connecting != 0 ||
534                                  !list_empty (&peer->ibp_conns));
535
536                         if (!(nid == PTL_NID_ANY || peer->ibp_nid == nid))
537                                 continue;
538
539                         kibnal_del_peer_locked (peer, single_share);
540                         rc = 0;         /* matched something */
541
542                         if (single_share)
543                                 goto out;
544                 }
545         }
546  out:
547         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
548
549         return (rc);
550 }
551
552 static kib_conn_t *
553 kibnal_get_conn_by_idx (int index)
554 {
555         kib_peer_t        *peer;
556         struct list_head  *ptmp;
557         kib_conn_t        *conn;
558         struct list_head  *ctmp;
559         int                i;
560
561         read_lock (&kibnal_data.kib_global_lock);
562
563         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
564                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
565
566                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
567                         LASSERT (peer->ibp_persistence > 0 ||
568                                  peer->ibp_connecting != 0 ||
569                                  !list_empty (&peer->ibp_conns));
570
571                         list_for_each (ctmp, &peer->ibp_conns) {
572                                 if (index-- > 0)
573                                         continue;
574
575                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
576                                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
577                                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
578                                        atomic_read (&conn->ibc_refcount));
579                                 atomic_inc (&conn->ibc_refcount);
580                                 read_unlock (&kibnal_data.kib_global_lock);
581                                 return (conn);
582                         }
583                 }
584         }
585
586         read_unlock (&kibnal_data.kib_global_lock);
587         return (NULL);
588 }
589
590 kib_conn_t *
591 kibnal_create_conn (void)
592 {
593         kib_conn_t  *conn;
594         int          i;
595         __u64        vaddr = 0;
596         __u64        vaddr_base;
597         int          page_offset;
598         int          ipage;
599         vv_qp_attr_t qp_attr;
600         vv_return_t  retval;
601         int          rc;
602         void        *qp_context;
603         
604         PORTAL_ALLOC(conn, sizeof (*conn));
605         if (conn == NULL) {
606                 CERROR ("Can't allocate connection\n");
607                 return (NULL);
608         }
609
610         /* zero flags, NULL pointers etc... */
611         memset (conn, 0, sizeof (*conn));
612
613         INIT_LIST_HEAD (&conn->ibc_tx_queue);
614         INIT_LIST_HEAD (&conn->ibc_active_txs);
615         spin_lock_init (&conn->ibc_lock);
616         
617         atomic_inc (&kibnal_data.kib_nconns);
618         /* well not really, but I call destroy() on failure, which decrements */
619
620         PORTAL_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
621         if (conn->ibc_rxs == NULL) {
622                 CERROR("Cannot allocate RX buffers\n");
623                 goto failed;
624         }
625         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
626
627         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES, 1);
628         if (rc != 0)
629                 goto failed;
630
631         vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;
632
633         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
634                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
635                 kib_rx_t   *rx = &conn->ibc_rxs[i];
636
637                 rx->rx_conn = conn;
638                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
639                              page_offset);
640
641                 if (kibnal_whole_mem()) {
642                         void *newaddr;
643                         vv_mem_reg_h_t mem_h;
644                         vv_r_key_t r_key;
645
646                         /* Voltaire stack already registers the whole
647                          * memory, so use that API. */
648                         retval = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
649                                                       rx->rx_msg,
650                                                       IBNAL_MSG_SIZE,
651                                                       &mem_h,
652                                                       &rx->l_key,
653                                                       &r_key);
654                         if (retval) {
655                                 CERROR("vv_get_gen_mr_attrib failed: %d", retval);
656                                 /* TODO: free pages? */
657                                 goto failed;
658                         }
659                 }
660                 
661                 vaddr += IBNAL_MSG_SIZE;
662                 LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);
663                 
664                 page_offset += IBNAL_MSG_SIZE;
665                 LASSERT (page_offset <= PAGE_SIZE);
666
667                 if (page_offset == PAGE_SIZE) {
668                         page_offset = 0;
669                         ipage++;
670                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
671                 }
672         }
673
674         qp_attr = (vv_qp_attr_t) {
675                 .create.qp_type          = vv_qp_type_r_conn,
676                 .create.cq_send_h        = kibnal_data.kib_cq,
677                 .create.cq_receive_h     = kibnal_data.kib_cq,
678                 .create.send_max_outstand_wr = IBNAL_TX_MAX_SG * 
679                                            IBNAL_MSG_QUEUE_SIZE,
680                 .create.receive_max_outstand_wr = IBNAL_MSG_QUEUE_SIZE,
681                 .create.max_scatgat_per_send_wr = 1,
682                 .create.max_scatgat_per_receive_wr = 1,
683                 .create.signaling_type   = vv_selectable_signaling, /* TODO: correct? */
684                 .create.pd_h             = kibnal_data.kib_pd,
685                 .create.recv_solicited_events = vv_signal_all,
686         };
687         retval = vv_qp_create(kibnal_data.kib_hca, &qp_attr, NULL,
688                               &conn->ibc_qp, &conn->ibc_qp_attrs);
689         if (retval != 0) {
690                 CERROR ("Failed to create queue pair: %d\n", retval);
691                 goto failed;
692         }
693
694         /* Mark QP created */
695         conn->ibc_state = IBNAL_CONN_INIT_QP;
696
697         qp_attr = (vv_qp_attr_t) {
698                 .modify.qp_modify_into_state = vv_qp_state_init,
699                 .modify.vv_qp_attr_mask      = VV_QP_AT_STATE | VV_QP_AT_PHY_PORT_NUM | VV_QP_AT_P_KEY_IX | VV_QP_AT_ACCESS_CON_F,
700                 .modify.qp_type              = vv_qp_type_r_conn,
701
702                 .modify.params.init.p_key_indx      = 0,
703                 .modify.params.init.phy_port_num    = kibnal_data.kib_port,
704                 .modify.params.init.access_control  = vv_acc_r_mem_write | vv_acc_r_mem_read,
705         };
706         retval = vv_qp_modify(kibnal_data.kib_hca, conn->ibc_qp, &qp_attr, &conn->ibc_qp_attrs);
707         if (retval != 0) {
708                 CERROR ("Failed to modify queue pair: %d\n", retval);
709                 goto failed;
710         }
711
712         retval = vv_qp_query(kibnal_data.kib_hca, conn->ibc_qp, &qp_context, &conn->ibc_qp_attrs);
713         if (retval) {
714                 CERROR ("Failed to query queue pair: %d\n", retval);
715                 goto failed;
716         }
717
718         /* 1 ref for caller */
719         atomic_set (&conn->ibc_refcount, 1);
720         return (conn);
721         
722  failed:
723         kibnal_destroy_conn (conn);
724         return (NULL);
725 }
726
727 void
728 kibnal_destroy_conn (kib_conn_t *conn)
729 {
730         vv_return_t retval;
731         
732         CDEBUG (D_NET, "connection %p\n", conn);
733
734         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
735         LASSERT (list_empty(&conn->ibc_tx_queue));
736         LASSERT (list_empty(&conn->ibc_active_txs));
737         LASSERT (conn->ibc_nsends_posted == 0);
738         LASSERT (conn->ibc_connreq == NULL);
739
740         switch (conn->ibc_state) {
741         case IBNAL_CONN_DISCONNECTED:
742                 /* called after connection sequence initiated */
743                 /* fall through */
744
745         case IBNAL_CONN_INIT_QP:
746                 /* _destroy includes an implicit Reset of the QP which 
747                  * discards posted work */
748                 retval = vv_qp_destroy(kibnal_data.kib_hca, conn->ibc_qp);
749                 if (retval)
750                         CERROR("Can't destroy QP: %d\n", retval);
751                 /* fall through */
752                 
753         case IBNAL_CONN_INIT_NOTHING:
754                 break;
755
756         default:
757                 LASSERT (0);
758         }
759
760         if (conn->ibc_cep != NULL) {
761                 retval = cm_destroy_cep(conn->ibc_cep);
762                 if (retval)
763                         CERROR("Can't destroy CEP %p: %d\n", conn->ibc_cep, 
764                                retval);
765         }
766
767         if (conn->ibc_rx_pages != NULL) 
768                 kibnal_free_pages(conn->ibc_rx_pages);
769         
770         if (conn->ibc_rxs != NULL)
771                 PORTAL_FREE(conn->ibc_rxs, 
772                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
773
774         if (conn->ibc_peer != NULL)
775                 kib_peer_decref(conn->ibc_peer);
776
777         PORTAL_FREE(conn, sizeof (*conn));
778
779         atomic_dec(&kibnal_data.kib_nconns);
780         
781         if (atomic_read (&kibnal_data.kib_nconns) == 0 &&
782             kibnal_data.kib_shutdown) {
783                 /* I just nuked the last connection on shutdown; wake up
784                  * everyone so they can exit. */
785                 wake_up_all(&kibnal_data.kib_sched_waitq);
786                 wake_up_all(&kibnal_data.kib_connd_waitq);
787         }
788 }
789
790 void
791 kibnal_put_conn (kib_conn_t *conn)
792 {
793         unsigned long flags;
794
795         CDEBUG (D_NET, "putting conn[%p] state %d -> "LPX64" (%d)\n",
796                 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
797                 atomic_read (&conn->ibc_refcount));
798
799         LASSERT (atomic_read (&conn->ibc_refcount) > 0);
800         if (!atomic_dec_and_test (&conn->ibc_refcount))
801                 return;
802
803         /* must disconnect before dropping the final ref */
804         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECTED);
805
806         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
807
808         list_add (&conn->ibc_list, &kibnal_data.kib_connd_conns);
809         wake_up (&kibnal_data.kib_connd_waitq);
810
811         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
812 }
813
814 static int
815 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
816 {
817         kib_conn_t         *conn;
818         struct list_head   *ctmp;
819         struct list_head   *cnxt;
820         int                 count = 0;
821
822         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
823                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
824
825                 count++;
826                 kibnal_close_conn_locked (conn, why);
827         }
828
829         return (count);
830 }
831
832 int
833 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
834 {
835         kib_conn_t         *conn;
836         struct list_head   *ctmp;
837         struct list_head   *cnxt;
838         int                 count = 0;
839
840         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
841                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
842
843                 if (conn->ibc_incarnation == incarnation)
844                         continue;
845
846                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
847                        peer->ibp_nid, conn->ibc_incarnation, incarnation);
848                 
849                 count++;
850                 kibnal_close_conn_locked (conn, -ESTALE);
851         }
852
853         return (count);
854 }
855
856 static int
857 kibnal_close_matching_conns (ptl_nid_t nid)
858 {
859         unsigned long       flags;
860         kib_peer_t         *peer;
861         struct list_head   *ptmp;
862         struct list_head   *pnxt;
863         int                 lo;
864         int                 hi;
865         int                 i;
866         int                 count = 0;
867
868         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
869
870         if (nid != PTL_NID_ANY)
871                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
872         else {
873                 lo = 0;
874                 hi = kibnal_data.kib_peer_hash_size - 1;
875         }
876
877         for (i = lo; i <= hi; i++) {
878                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
879
880                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
881                         LASSERT (peer->ibp_persistence != 0 ||
882                                  peer->ibp_connecting != 0 ||
883                                  !list_empty (&peer->ibp_conns));
884
885                         if (!(nid == PTL_NID_ANY || nid == peer->ibp_nid))
886                                 continue;
887
888                         count += kibnal_close_peer_conns_locked (peer, 0);
889                 }
890         }
891
892         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
893
894         /* wildcards always succeed */
895         if (nid == PTL_NID_ANY)
896                 return (0);
897         
898         return (count == 0 ? -ENOENT : 0);
899 }
900
901 static int
902 kibnal_cmd(struct portals_cfg *pcfg, void * private)
903 {
904         int rc = -EINVAL;
905         ENTRY;
906
907         LASSERT (pcfg != NULL);
908
909         switch(pcfg->pcfg_command) {
910         case NAL_CMD_GET_PEER: {
911                 ptl_nid_t   nid = 0;
912                 int         share_count = 0;
913
914                 rc = kibnal_get_peer_info(pcfg->pcfg_count,
915                                           &nid, &share_count);
916                 pcfg->pcfg_nid   = nid;
917                 pcfg->pcfg_size  = 0;
918                 pcfg->pcfg_id    = 0;
919                 pcfg->pcfg_misc  = 0;
920                 pcfg->pcfg_count = 0;
921                 pcfg->pcfg_wait  = share_count;
922                 break;
923         }
924         case NAL_CMD_ADD_PEER: {
925                 rc = kibnal_add_persistent_peer (pcfg->pcfg_nid);
926                 break;
927         }
928         case NAL_CMD_DEL_PEER: {
929                 rc = kibnal_del_peer (pcfg->pcfg_nid, 
930                                        /* flags == single_share */
931                                        pcfg->pcfg_flags != 0);
932                 break;
933         }
934         case NAL_CMD_GET_CONN: {
935                 kib_conn_t *conn = kibnal_get_conn_by_idx (pcfg->pcfg_count);
936
937                 if (conn == NULL)
938                         rc = -ENOENT;
939                 else {
940                         rc = 0;
941                         pcfg->pcfg_nid   = conn->ibc_peer->ibp_nid;
942                         pcfg->pcfg_id    = 0;
943                         pcfg->pcfg_misc  = 0;
944                         pcfg->pcfg_flags = 0;
945                         kibnal_put_conn (conn);
946                 }
947                 break;
948         }
949         case NAL_CMD_CLOSE_CONNECTION: {
950                 rc = kibnal_close_matching_conns (pcfg->pcfg_nid);
951                 break;
952         }
953         case NAL_CMD_REGISTER_MYNID: {
954                 if (pcfg->pcfg_nid == PTL_NID_ANY)
955                         rc = -EINVAL;
956                 else
957                         rc = kibnal_set_mynid (pcfg->pcfg_nid);
958                 break;
959         }
960         }
961
962         RETURN(rc);
963 }
964
965 void
966 kibnal_free_pages (kib_pages_t *p)
967 {
968         int     npages = p->ibp_npages;
969         vv_return_t retval;
970         int     i;
971         
972         if (p->ibp_mapped) {
973                 retval = vv_mem_region_destroy(kibnal_data.kib_hca, p->ibp_handle);
974                 if (retval != 0)
975                         CERROR ("Deregister error: %d\n", retval);
976         }
977         
978         for (i = 0; i < npages; i++)
979                 if (p->ibp_pages[i] != NULL)
980                         __free_page(p->ibp_pages[i]);
981         
982         PORTAL_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
983 }
984
985 int
986 kibnal_alloc_pages (kib_pages_t **pp, int npages, int allow_write)
987 {
988         kib_pages_t   *p;
989         vv_phy_list_t  phys_pages;
990         vv_phy_buf_t  *phys_buf;
991         int            i;
992         vv_return_t    retval;
993
994         PORTAL_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
995         if (p == NULL) {
996                 CERROR ("Can't allocate buffer %d\n", npages);
997                 return (-ENOMEM);
998         }
999
1000         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1001         p->ibp_npages = npages;
1002         
1003         for (i = 0; i < npages; i++) {
1004                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1005                 if (p->ibp_pages[i] == NULL) {
1006                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1007                         kibnal_free_pages(p);
1008                         return (-ENOMEM);
1009                 }
1010         }
1011
1012         if (kibnal_whole_mem())
1013                 goto out;
1014
1015         PORTAL_ALLOC(phys_buf, npages * sizeof(vv_phy_buf_t));
1016         if (phys_buf == NULL) {
1017                 CERROR ("Can't allocate phys_buf for %d pages\n", npages);
1018                 /* XXX free ibp_pages? */
1019                 kibnal_free_pages(p);
1020                 return (-ENOMEM);
1021         }
1022
1023         phys_pages.number_of_buff = npages;
1024         phys_pages.phy_list = phys_buf;
1025
1026         /* if we were using the _contig_ registration variant we would have
1027          * an array of PhysAddr/Length pairs, but the discontiguous variant
1028          * just takes the PhysAddr */
1029         for (i = 0; i < npages; i++) {
1030                 phys_buf[i].start = kibnal_page2phys(p->ibp_pages[i]);
1031                 phys_buf[i].size = PAGE_SIZE;
1032         }
1033
1034         retval = vv_phy_mem_region_register(kibnal_data.kib_hca,
1035                                             &phys_pages,
1036                                             0, /* requested vaddr */
1037                                             npages * PAGE_SIZE,
1038                                             0, /* offset */
1039                                             kibnal_data.kib_pd,
1040                                             vv_acc_l_mem_write | vv_acc_r_mem_write | vv_acc_r_mem_read | vv_acc_mem_bind, /* TODO: translated as-is, but seems incorrect or too much */
1041                                             &p->ibp_handle, &p->ibp_vaddr,                                           
1042                                             &p->ibp_lkey, &p->ibp_rkey);
1043         
1044         PORTAL_FREE(phys_buf, npages * sizeof(vv_phy_buf_t));
1045         
1046         if (retval) {
1047                 CERROR ("Error %d mapping %d pages\n", retval, npages);
1048                 kibnal_free_pages(p);
1049                 return (-ENOMEM);
1050         }
1051
1052         CDEBUG(D_NET, "registered %d pages; handle: %x vaddr "LPX64" "
1053                       "lkey %x rkey %x\n", npages, p->ibp_handle,
1054                       p->ibp_vaddr, p->ibp_lkey, p->ibp_rkey);
1055         
1056         p->ibp_mapped = 1;
1057 out:
1058         *pp = p;
1059         return (0);
1060 }
1061
1062 static int
1063 kibnal_setup_tx_descs (void)
1064 {
1065         int           ipage = 0;
1066         int           page_offset = 0;
1067         __u64         vaddr;
1068         __u64         vaddr_base;
1069         struct page  *page;
1070         kib_tx_t     *tx;
1071         int           i;
1072         int           rc;
1073
1074         /* pre-mapped messages are not bigger than 1 page */
1075         LASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1076
1077         /* No fancy arithmetic when we do the buffer calculations */
1078         LASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1079
1080         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages, IBNAL_TX_MSG_PAGES, 
1081                                 0);
1082         if (rc != 0)
1083                 return (rc);
1084
1085         /* ignored for the whole_mem case */
1086         vaddr = vaddr_base = kibnal_data.kib_tx_pages->ibp_vaddr;
1087
1088         for (i = 0; i < IBNAL_TX_MSGS; i++) {
1089                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1090                 tx = &kibnal_data.kib_tx_descs[i];
1091
1092                 memset (tx, 0, sizeof(*tx));    /* zero flags etc */
1093                 
1094                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1095                                            page_offset);
1096
1097                 if (kibnal_whole_mem()) {
1098                         void *newaddr;
1099                         vv_mem_reg_h_t mem_h;
1100                         vv_return_t  retval;
1101
1102                         /* Voltaire stack already registers the whole
1103                          * memory, so use that API. */
1104                         retval = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1105                                                       tx->tx_msg,
1106                                                       IBNAL_MSG_SIZE,
1107                                                       &mem_h,
1108                                                       &tx->l_key,
1109                                                       &tx->r_key);
1110                         if (retval) {
1111                                 CERROR("vv_get_gen_mr_attrib failed: %d", retval);
1112                                 /* TODO: free pages? */
1113                                 /* TODO: return. */
1114                         }
1115                 }
1116
1117                 tx->tx_isnblk = (i >= IBNAL_NTX);
1118                 tx->tx_mapped = KIB_TX_UNMAPPED;
1119
1120                 CDEBUG(D_NET, "Tx[%d] %p->%p\n", i, tx, tx->tx_msg);
1121
1122                 if (tx->tx_isnblk)
1123                         list_add (&tx->tx_list, 
1124                                   &kibnal_data.kib_idle_nblk_txs);
1125                 else
1126                         list_add (&tx->tx_list, 
1127                                   &kibnal_data.kib_idle_txs);
1128
1129                 vaddr += IBNAL_MSG_SIZE;
1130                 LASSERT (vaddr <= vaddr_base + IBNAL_TX_MSG_BYTES);
1131
1132                 page_offset += IBNAL_MSG_SIZE;
1133                 LASSERT (page_offset <= PAGE_SIZE);
1134
1135                 if (page_offset == PAGE_SIZE) {
1136                         page_offset = 0;
1137                         ipage++;
1138                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES);
1139                 }
1140         }
1141         
1142         return (0);
1143 }
1144
1145 static void
1146 kibnal_api_shutdown (nal_t *nal)
1147 {
1148         int   i;
1149         int   rc;
1150         vv_return_t retval;
1151
1152         if (nal->nal_refct != 0) {
1153                 /* This module got the first ref */
1154                 PORTAL_MODULE_UNUSE;
1155                 return;
1156         }
1157
1158         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1159                atomic_read (&portal_kmemory));
1160
1161         LASSERT(nal == &kibnal_api);
1162
1163         switch (kibnal_data.kib_init) {
1164
1165         case IBNAL_INIT_ALL:
1166                 /* stop calls to nal_cmd */
1167                 libcfs_nal_cmd_unregister(VIBNAL);
1168                 /* No new peers */
1169
1170                 /* resetting my NID to unadvertises me, removes my
1171                  * listener and nukes all current peers */
1172                 kibnal_set_mynid (PTL_NID_ANY);
1173
1174                 /* Wait for all peer state to clean up (crazy) */
1175                 i = 2;
1176                 while (atomic_read (&kibnal_data.kib_npeers) != 0) {
1177                         i++;
1178                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1179                                "waiting for %d peers to disconnect (can take a few seconds)\n",
1180                                atomic_read (&kibnal_data.kib_npeers));
1181                         set_current_state (TASK_UNINTERRUPTIBLE);
1182                         schedule_timeout (HZ);
1183                 }
1184                 /* fall through */
1185
1186         case IBNAL_INIT_CQ:
1187                 retval = vv_cq_destroy(kibnal_data.kib_hca, kibnal_data.kib_cq);
1188                 if (retval)
1189                         CERROR ("Destroy CQ error: %d\n", retval);
1190                 /* fall through */
1191
1192         case IBNAL_INIT_TXD:
1193                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1194                 /* fall through */
1195
1196 #if IBNAL_FMR
1197         case IBNAL_INIT_FMR:
1198                 rc = ib_fmr_pool_destroy (kibnal_data.kib_fmr_pool);
1199                 if (rc != 0)
1200                         CERROR ("Destroy FMR pool error: %d\n", rc);
1201                 /* fall through */
1202 #endif
1203         case IBNAL_INIT_PD:
1204 #if IBNAL_WHOLE_MEM==0
1205                 retval = vv_pd_deallocate(kibnal_data.kib_hca, kibnal_data.kib_pd);
1206                 if (retval != 0)
1207                         CERROR ("Destroy PD error: %d\n", retval);
1208 #endif
1209                 /* fall through */
1210
1211         case IBNAL_INIT_GSI:
1212                 retval = gsi_deregister_class(kibnal_data.gsi_handle);
1213                 if (retval != 0)
1214                         CERROR ("GSI deregister failed: %d\n", retval);
1215                 /* fall through */
1216
1217         case IBNAL_INIT_GSI_POOL:
1218                 gsi_dtgrm_pool_destroy(kibnal_data.gsi_pool_handle);
1219                 /* fall through */
1220
1221         case IBNAL_INIT_PORT:
1222                 /* XXX ??? */
1223                 /* fall through */
1224
1225         case IBNAL_INIT_ASYNC:
1226                 retval = vv_dell_async_event_cb (kibnal_data.kib_hca,
1227                                                  kibnal_ca_async_callback);
1228                 if (retval)
1229                         CERROR("deregister asynchronous call back error: %d\n", retval);
1230                         
1231                 /* fall through */
1232
1233         case IBNAL_INIT_HCA:
1234                 retval = vv_hca_close(kibnal_data.kib_hca);
1235                 if (retval != 0)
1236                         CERROR ("Close HCA  error: %d\n", retval);
1237                 /* fall through */
1238
1239         case IBNAL_INIT_LIB:
1240                 lib_fini(&kibnal_lib);
1241                 /* fall through */
1242
1243         case IBNAL_INIT_DATA:
1244                 /* Module refcount only gets to zero when all peers
1245                  * have been closed so all lists must be empty */
1246                 LASSERT (atomic_read (&kibnal_data.kib_npeers) == 0);
1247                 LASSERT (kibnal_data.kib_peers != NULL);
1248                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1249                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1250                 }
1251                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1252                 LASSERT (list_empty (&kibnal_data.kib_sched_rxq));
1253                 LASSERT (list_empty (&kibnal_data.kib_sched_txq));
1254                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1255                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1256
1257                 /* flag threads to terminate; wake and wait for them to die */
1258                 kibnal_data.kib_shutdown = 1;
1259                 wake_up_all (&kibnal_data.kib_sched_waitq);
1260                 wake_up_all (&kibnal_data.kib_connd_waitq);
1261
1262                 i = 2;
1263                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1264                         i++;
1265                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1266                                "Waiting for %d threads to terminate\n",
1267                                atomic_read (&kibnal_data.kib_nthreads));
1268                         set_current_state (TASK_INTERRUPTIBLE);
1269                         schedule_timeout (HZ);
1270                 }
1271                 /* fall through */
1272                 
1273         case IBNAL_INIT_NOTHING:
1274                 break;
1275         }
1276
1277         if (kibnal_data.kib_tx_descs != NULL)
1278                 PORTAL_FREE (kibnal_data.kib_tx_descs,
1279                              IBNAL_TX_MSGS * sizeof(kib_tx_t));
1280
1281         if (kibnal_data.kib_peers != NULL)
1282                 PORTAL_FREE (kibnal_data.kib_peers,
1283                              sizeof (struct list_head) * 
1284                              kibnal_data.kib_peer_hash_size);
1285
1286         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1287                atomic_read (&portal_kmemory));
1288         printk(KERN_INFO "Lustre: Voltaire IB NAL unloaded (final mem %d)\n",
1289                atomic_read(&portal_kmemory));
1290
1291         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1292 }
1293
1294 #define roundup_power(val, power) \
1295         ( (val + (__u64)(power - 1)) & ~((__u64)(power - 1)) )
1296
1297 /* this isn't very portable or sturdy in the face of funny mem/bus configs */
1298 static __u64 max_phys_mem(void)
1299 {
1300         struct sysinfo si;
1301         __u64 ret;
1302
1303         si_meminfo(&si);
1304         ret = (__u64)max(si.totalram, max_mapnr) * si.mem_unit;
1305         return roundup_power(ret, 128 * 1024 * 1024);
1306
1307 #undef roundup_power
1308
1309 static int
1310 kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1311                      ptl_ni_limits_t *requested_limits,
1312                      ptl_ni_limits_t *actual_limits)
1313 {
1314         ptl_process_id_t    process_id;
1315         int                 pkmem = atomic_read(&portal_kmemory);
1316         int                 rc;
1317         int                 i;
1318         vv_request_event_record_t req_er;
1319         vv_return_t         retval;
1320
1321         LASSERT (nal == &kibnal_api);
1322
1323         if (nal->nal_refct != 0) {
1324                 if (actual_limits != NULL)
1325                         *actual_limits = kibnal_lib.libnal_ni.ni_actual_limits;
1326                 /* This module got the first ref */
1327                 PORTAL_MODULE_USE;
1328                 return (PTL_OK);
1329         }
1330
1331         LASSERT (kibnal_data.kib_init == IBNAL_INIT_NOTHING);
1332
1333         init_MUTEX (&kibnal_data.kib_nid_mutex);
1334         kibnal_data.kib_nid = PTL_NID_ANY;
1335
1336         rwlock_init(&kibnal_data.kib_global_lock);
1337
1338         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1339         PORTAL_ALLOC (kibnal_data.kib_peers,
1340                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1341         if (kibnal_data.kib_peers == NULL) {
1342                 goto failed;
1343         }
1344         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1345                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1346
1347         spin_lock_init (&kibnal_data.kib_connd_lock);
1348         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1349         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1350         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1351
1352         spin_lock_init (&kibnal_data.kib_sched_lock);
1353         INIT_LIST_HEAD (&kibnal_data.kib_sched_txq);
1354         INIT_LIST_HEAD (&kibnal_data.kib_sched_rxq);
1355         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1356
1357         spin_lock_init (&kibnal_data.kib_tx_lock);
1358         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1359         INIT_LIST_HEAD (&kibnal_data.kib_idle_nblk_txs);
1360         init_waitqueue_head(&kibnal_data.kib_idle_tx_waitq);
1361
1362         INIT_LIST_HEAD (&kibnal_data.gsi_pending);
1363         init_MUTEX (&kibnal_data.gsi_mutex);
1364
1365         PORTAL_ALLOC (kibnal_data.kib_tx_descs,
1366                       IBNAL_TX_MSGS * sizeof(kib_tx_t));
1367         if (kibnal_data.kib_tx_descs == NULL) {
1368                 CERROR ("Can't allocate tx descs\n");
1369                 goto failed;
1370         }
1371
1372         /* lists/ptrs/locks initialised */
1373         kibnal_data.kib_init = IBNAL_INIT_DATA;
1374         /*****************************************************/
1375
1376         process_id.pid = requested_pid;
1377         process_id.nid = kibnal_data.kib_nid;
1378         
1379         rc = lib_init(&kibnal_lib, nal, process_id,
1380                       requested_limits, actual_limits);
1381         if (rc != PTL_OK) {
1382                 CERROR("lib_init failed: error %d\n", rc);
1383                 goto failed;
1384         }
1385
1386         /* lib interface initialised */
1387         kibnal_data.kib_init = IBNAL_INIT_LIB;
1388         /*****************************************************/
1389
1390         for (i = 0; i < IBNAL_N_SCHED; i++) {
1391                 rc = kibnal_thread_start (kibnal_scheduler, (void *)i);
1392                 if (rc != 0) {
1393                         CERROR("Can't spawn vibnal scheduler[%d]: %d\n",
1394                                i, rc);
1395                         goto failed;
1396                 }
1397         }
1398
1399         rc = kibnal_thread_start (kibnal_connd, NULL);
1400         if (rc != 0) {
1401                 CERROR ("Can't spawn vibnal connd: %d\n", rc);
1402                 goto failed;
1403         }
1404
1405         /* TODO: apparently only one adapter is supported */
1406         retval = vv_hca_open("ANY_HCA", NULL, &kibnal_data.kib_hca);
1407         if (retval) {
1408                 CERROR ("Can't open CA: %d\n", retval);
1409                 goto failed;
1410         }
1411
1412         /* Channel Adapter opened */
1413         kibnal_data.kib_init = IBNAL_INIT_HCA;
1414
1415         /* register to get HCA's asynchronous events. */
1416         req_er.req_event_type = VV_ASYNC_EVENT_ALL_MASK;
1417         retval = vv_set_async_event_cb (kibnal_data.kib_hca,
1418                                         req_er,
1419                                         kibnal_ca_async_callback);
1420
1421         if (retval) {
1422                 CERROR ("Can't open CA: %d\n", retval);
1423                 goto failed; 
1424         }
1425
1426         kibnal_data.kib_init = IBNAL_INIT_ASYNC;
1427
1428         /*****************************************************/
1429
1430         retval = vv_hca_query(kibnal_data.kib_hca,
1431                              &kibnal_data.kib_hca_attrs);
1432         if (retval) {
1433                 CERROR ("Can't size port attrs: %d\n", retval);
1434                 goto failed;
1435         }
1436
1437         kibnal_data.kib_port = -1;
1438
1439         for (i = 0; i<kibnal_data.kib_hca_attrs.port_num; i++) {
1440
1441                 int port_num = i+1;
1442                 u_int32_t tbl_count;
1443                 vv_port_attrib_t *pattr = &kibnal_data.kib_port_attr;
1444
1445                 retval = vv_port_query(kibnal_data.kib_hca, port_num, pattr);
1446                 if (retval) {
1447                         CERROR("vv_port_query failed for port %d: %d\n", port_num, retval);
1448                         continue;
1449                 }
1450
1451                 switch (pattr->port_state) {
1452                 case vv_state_linkDoun:
1453                         CDEBUG(D_NET, "port[%d] Down\n", port_num);
1454                         continue;
1455                 case vv_state_linkInit:
1456                         CDEBUG(D_NET, "port[%d] Init\n", port_num);
1457                         continue;
1458                 case vv_state_linkArm:
1459                         CDEBUG(D_NET, "port[%d] Armed\n", port_num);
1460                         continue;
1461                 case vv_state_linkActive:
1462                         CDEBUG(D_NET, "port[%d] Active\n", port_num);
1463
1464                         /* Found a suitable port. Get its GUID and PKEY. */
1465                         kibnal_data.kib_port = port_num;
1466                         
1467                         tbl_count = 1;
1468                         retval = vv_get_port_gid_tbl(kibnal_data.kib_hca, port_num, &tbl_count, &kibnal_data.kib_port_gid);
1469                         if (retval) {
1470                                 CERROR("vv_get_port_gid_tbl failed for port %d: %d\n", port_num, retval);
1471                                 continue;
1472                         }
1473
1474                         tbl_count = 1;
1475                         retval = vv_get_port_partition_tbl (kibnal_data.kib_hca, port_num, &tbl_count, &kibnal_data.kib_port_pkey);
1476                         if (retval) {
1477                                 CERROR("vv_get_port_partition_tbl failed for port %d: %d\n", port_num, retval);
1478                                 continue;
1479                         }
1480
1481                         break;
1482                 case vv_state_linkActDefer: /* TODO: correct? */
1483                 case vv_state_linkNoChange:
1484                         CERROR("Unexpected port[%d] state %d\n",
1485                                i, pattr->port_state);
1486                         continue;
1487                 }
1488                 break;
1489         }
1490
1491         if (kibnal_data.kib_port == -1) {
1492                 CERROR ("Can't find an active port\n");
1493                 goto failed;
1494         }
1495
1496         CDEBUG(D_NET, "Using port %d - GID="LPX64":"LPX64"\n",
1497                kibnal_data.kib_port, kibnal_data.kib_port_gid.scope.g.subnet, kibnal_data.kib_port_gid.scope.g.eui64);
1498         CDEBUG(D_NET, "got guid "LPX64"\n", cpu_to_le64(kibnal_data.kib_port_gid.scope.g.eui64));
1499         
1500         /* Active port found */
1501         kibnal_data.kib_init = IBNAL_INIT_PORT;
1502         /*****************************************************/
1503
1504         /* Prepare things to be able to send/receive MADS */
1505         retval = gsi_dtgrm_pool_create(IBNAL_CONCURRENT_PEERS, &kibnal_data.gsi_pool_handle);
1506         if (retval) {
1507                 CERROR("Could not create GSI pool: %d\n", retval);
1508                 goto failed;
1509         }
1510         kibnal_data.kib_init = IBNAL_INIT_GSI_POOL;
1511
1512         retval = gsi_register_class(MAD_CLASS_SUBN_ADM, /* TODO: correct? */
1513                                 2,      /* version */
1514                                 "ANY_HCA",
1515 #ifdef GSI_PASS_PORT_NUM
1516                                 kibnal_data.kib_port,
1517 #endif                   
1518                                 0, 0,
1519                                 vibnal_mad_sent_cb,     vibnal_mad_received_cb,
1520                                 NULL, &kibnal_data.gsi_handle);
1521         if (retval) {
1522                 CERROR("Cannot register GSI class: %d\n", retval);
1523                 goto failed;
1524         }
1525
1526         kibnal_data.kib_init = IBNAL_INIT_GSI;
1527         /*****************************************************/
1528
1529 #if IBNAL_WHOLE_MEM==0
1530         retval = vv_pd_allocate(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1531 #else
1532         retval = vv_get_gen_pd_h(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1533 #endif
1534         if (retval) {
1535                 CERROR ("Can't create PD: %d\n", retval);
1536                 goto failed;
1537         }
1538         
1539         /* flag PD initialised */
1540         kibnal_data.kib_init = IBNAL_INIT_PD;
1541         /*****************************************************/
1542
1543 #if IBNAL_FMR
1544         {
1545                 const int pool_size = IBNAL_NTX + IBNAL_NTX_NBLK;
1546                 struct ib_fmr_pool_param params = {
1547                         .max_pages_per_fmr = PTL_MTU/PAGE_SIZE,
1548                         .access            = (IB_ACCESS_LOCAL_WRITE |
1549                                               IB_ACCESS_REMOTE_WRITE |
1550                                               IB_ACCESS_REMOTE_READ),
1551                         .pool_size         = pool_size,
1552                         .dirty_watermark   = (pool_size * 3)/4,
1553                         .flush_function    = NULL,
1554                         .flush_arg         = NULL,
1555                         .cache             = 1,
1556                 };
1557                 rc = ib_fmr_pool_create(kibnal_data.kib_pd, &params,
1558                                         &kibnal_data.kib_fmr_pool);
1559                 if (rc != 0) {
1560                         CERROR ("Can't create FMR pool size %d: %d\n", 
1561                                 pool_size, rc);
1562                         goto failed;
1563                 }
1564         }
1565
1566         /* flag FMR pool initialised */
1567         kibnal_data.kib_init = IBNAL_INIT_FMR;
1568 #endif
1569
1570         /*****************************************************/
1571
1572         rc = kibnal_setup_tx_descs();
1573         if (rc != 0) {
1574                 CERROR ("Can't register tx descs: %d\n", rc);
1575                 goto failed;
1576         }
1577         
1578         /* flag TX descs initialised */
1579         kibnal_data.kib_init = IBNAL_INIT_TXD;
1580         /*****************************************************/
1581         {
1582                 uint32_t nentries;
1583
1584                 retval = vv_cq_create(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES,
1585                                       kibnal_ca_callback, 
1586                                       NULL, /* context */
1587                                       &kibnal_data.kib_cq, &nentries);
1588                 if (retval) {
1589                         CERROR ("Can't create RX CQ: %d\n", retval);
1590                         goto failed;
1591                 }
1592
1593                 /* flag CQ initialised */
1594                 kibnal_data.kib_init = IBNAL_INIT_CQ;
1595
1596                 if (nentries < IBNAL_CQ_ENTRIES) {
1597                         CERROR ("CQ only has %d entries, need %d\n", 
1598                                 nentries, IBNAL_CQ_ENTRIES);
1599                         goto failed;
1600                 }
1601
1602                 retval = vv_request_completion_notification(kibnal_data.kib_hca, kibnal_data.kib_cq, vv_next_solicit_unsolicit_event);
1603                 if (retval != 0) {
1604                         CERROR ("Failed to re-arm completion queue: %d\n", rc);
1605                         goto failed;
1606                 }
1607         }
1608         
1609         /*****************************************************/
1610
1611         rc = libcfs_nal_cmd_register(VIBNAL, &kibnal_cmd, NULL);
1612         if (rc != 0) {
1613                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1614                 goto failed;
1615         }
1616
1617         /* flag everything initialised */
1618         kibnal_data.kib_init = IBNAL_INIT_ALL;
1619         /*****************************************************/
1620
1621         printk(KERN_INFO "Lustre: Voltaire IB NAL loaded "
1622                "(initial mem %d)\n", pkmem);
1623
1624         return (PTL_OK);
1625
1626  failed:
1627         CDEBUG(D_NET, "kibnal_api_startup failed\n");
1628         kibnal_api_shutdown (&kibnal_api);    
1629         return (PTL_FAIL);
1630 }
1631
1632 void __exit
1633 kibnal_module_fini (void)
1634 {
1635 #ifdef CONFIG_SYSCTL
1636         if (kibnal_tunables.kib_sysctl != NULL)
1637                 unregister_sysctl_table (kibnal_tunables.kib_sysctl);
1638 #endif
1639         PtlNIFini(kibnal_ni);
1640
1641         ptl_unregister_nal(VIBNAL);
1642 }
1643
1644 int __init
1645 kibnal_module_init (void)
1646 {
1647         int    rc;
1648
1649         if (sizeof(kib_wire_connreq_t) > cm_REQ_priv_data_len) {
1650                 CERROR("sizeof(kib_wire_connreq_t) > cm_REQ_priv_data_len\n");
1651                 return -EINVAL;
1652         }
1653
1654         /* the following must be sizeof(int) for proc_dointvec() */
1655         if (sizeof (kibnal_tunables.kib_io_timeout) != sizeof (int)) {
1656                 CERROR("sizeof (kibnal_tunables.kib_io_timeout) != sizeof (int)\n");
1657                 return -EINVAL;
1658         }
1659
1660         kibnal_api.nal_ni_init = kibnal_api_startup;
1661         kibnal_api.nal_ni_fini = kibnal_api_shutdown;
1662
1663         /* Initialise dynamic tunables to defaults once only */
1664         kibnal_tunables.kib_io_timeout = IBNAL_IO_TIMEOUT;
1665
1666         rc = ptl_register_nal(VIBNAL, &kibnal_api);
1667         if (rc != PTL_OK) {
1668                 CERROR("Can't register IBNAL: %d\n", rc);
1669                 return (-ENOMEM);               /* or something... */
1670         }
1671
1672         /* Pure gateways want the NAL started up at module load time... */
1673         rc = PtlNIInit(VIBNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kibnal_ni);
1674         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
1675                 ptl_unregister_nal(VIBNAL);
1676                 return (-ENODEV);
1677         }
1678         
1679 #ifdef CONFIG_SYSCTL
1680         /* Press on regardless even if registering sysctl doesn't work */
1681         kibnal_tunables.kib_sysctl = 
1682                 register_sysctl_table (kibnal_top_ctl_table, 0);
1683 #endif
1684         return (0);
1685 }
1686
1687 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1688 MODULE_DESCRIPTION("Kernel Voltaire IB NAL v0.01");
1689 MODULE_LICENSE("GPL");
1690
1691 module_init(kibnal_module_init);
1692 module_exit(kibnal_module_fini);
1693