Whamcloud - gitweb
* 5630 fix takes ibnal global lock at raised IRQ priority
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 nal_t                   kibnal_api;
28 ptl_handle_ni_t         kibnal_ni;
29 kib_tunables_t          kibnal_tunables;
30
31 kib_data_t              kibnal_data = {
32         .kib_service_id = IBNAL_SERVICE_NUMBER,
33 };
34
35 #ifdef CONFIG_SYSCTL
36 #define IBNAL_SYSCTL             202
37
38 #define IBNAL_SYSCTL_TIMEOUT     1
39
40 static ctl_table kibnal_ctl_table[] = {
41         {IBNAL_SYSCTL_TIMEOUT, "timeout", 
42          &kibnal_tunables.kib_io_timeout, sizeof (int),
43          0644, NULL, &proc_dointvec},
44         { 0 }
45 };
46
47 static ctl_table kibnal_top_ctl_table[] = {
48         {IBNAL_SYSCTL, "vibnal", NULL, 0, 0555, kibnal_ctl_table},
49         { 0 }
50 };
51 #endif
52
53 #ifdef unused
54 void
55 print_service(IB_SERVICE_RECORD *service, char *tag, int rc)
56 {
57         char name[32];
58
59         if (service == NULL) 
60         {
61                 CWARN("tag       : %s\n"
62                       "status    : %d (NULL)\n", tag, rc);
63                 return;
64         }
65         strncpy (name, service->ServiceName, sizeof(name)-1);
66         name[sizeof(name)-1] = 0;
67         
68         CWARN("tag       : %s\n"
69               "status    : %d\n"
70               "service id: "LPX64"\n"
71               "name      : %s\n"
72               "NID       : "LPX64"\n", tag, rc,
73               service->RID.ServiceID, name,
74               *kibnal_service_nid_field(service));
75 }
76 #endif
77
78 /* 
79  * method is SUBN_ADM_SET, SUBN_ADM_GET, SUBN_ADM_DELETE. Tables not supported.
80  * nid is the nid to advertize/query/unadvertize
81  */
82 static void fill_sa_request(struct sa_request *request, int method, ptl_nid_t nid)
83 {
84         gsi_dtgrm_t *dtgrm = request->dtgrm_req;
85         sa_mad_v2_t *mad = (sa_mad_v2_t *) dtgrm->mad;
86         ib_service_record_v2_t *sr = (ib_service_record_v2_t *) mad->payload;
87         
88         memset(mad, 0, MAD_BLOCK_SIZE);
89
90         request->mad = mad;
91
92         dtgrm->rlid = kibnal_data.kib_port_attr.port_sma_address_info.sm_lid;
93         dtgrm->sl = kibnal_data.kib_port_attr.port_sma_address_info.service_level;
94
95         mad->hdr.base_ver = MAD_IB_BASE_VERSION;
96         mad->hdr.class = MAD_CLASS_SUBN_ADM;
97         mad->hdr.class_ver = 2;
98         mad->hdr.m.ms.method = method;
99         mad->hdr.attrib_id = SA_SERVICE_RECORD; /* something(?) will swap that field */
100
101                 /* Note: the transaction ID is set by the Voltaire stack if it is 0. */
102
103         /* TODO: change the 40 to sizeof(something) */
104         mad->payload_len = cpu_to_be32(0x40 /*header size */  +
105                                        sizeof (ib_service_record_v2_t));
106
107
108         mad->component_mask = cpu_to_be64(
109                                           (1ull << 0)  |        /* service_id       */
110                                           (1ull << 2)  |        /* service_pkey     */
111                                           (1ull << 6)  |        /* service_name     */
112                                           (1ull << 7)  |        /* service_data8[0] */
113                                           (1ull << 8)  |        /* service_data8[1] */
114                                           (1ull << 9)  |        /* service_data8[2] */
115                                           (1ull << 10) |        /* service_data8[3] */
116                                           (1ull << 11) |        /* service_data8[4] */
117                                           (1ull << 12) |        /* service_data8[5] */
118                                           (1ull << 13) |        /* service_data8[6] */
119                                           (1ull << 14)      /* service_data8[7] */
120                                           );
121
122         sr->service_id = cpu_to_be64(kibnal_data.kib_service_id);
123         sr->service_pkey = cpu_to_be16(kibnal_data.kib_port_pkey);
124
125         /* Set the service name and the data (bytes 0 to 7) in data8 */
126         kibnal_set_service_keys(sr, nid);
127
128         if (method == SUBN_ADM_SET) {
129                 mad->component_mask |= cpu_to_be64(
130                                                    (1ull << 1) |        /* service_gid       */
131                                                    (1ull << 4)          /* service_lease     */
132                                                    );
133
134                 sr->service_gid = kibnal_data.kib_port_gid;
135                 gid_swap(&sr->service_gid);
136                 sr->service_lease = cpu_to_be32(0xffffffff);
137         }
138
139         CDEBUG(D_NET, "SA request %02x for service id "LPX64" %s:"LPX64"\n",
140                mad->hdr.m.ms.method,
141                sr->service_id, 
142                sr->service_name,
143                *kibnal_service_nid_field(sr));
144 }
145
146 /* Do an advertizement operation: 
147  *   SUBN_ADM_GET = 0x01 (i.e. query),
148  *   SUBN_ADM_SET = 0x02 (i.e. advertize),
149  *   SUBN_ADM_DELETE = 0x15 (i.e. un-advertize).
150  * If callback is NULL, the function is synchronous (and context is ignored).
151  */
152 int kibnal_advertize_op(ptl_nid_t nid, int op, sa_request_cb_t callback, void *context)
153 {
154         struct sa_request *request;
155         int ret;
156
157         LASSERT (kibnal_data.kib_nid != PTL_NID_ANY);
158
159         CDEBUG(D_NET, "kibnal_advertize_op: nid="LPX64", op=%d\n", nid, op);
160
161         request = alloc_sa_request();
162         if (request == NULL) {
163                 CERROR("Cannot allocate a SA request");
164                 return -ENOMEM;
165         }
166                 
167         fill_sa_request(request, op, nid);
168
169         if (callback) {
170                 request->callback = callback;
171                 request->context = context;
172         } else {
173                 init_completion(&request->signal);
174         }
175
176         ret = vibnal_start_sa_request(request);
177         if (ret) {
178                 CERROR("vibnal_send_sa failed: %d\n", ret);
179                 free_sa_request(request);
180         } else {
181                 if (callback) {
182                         /* Return. The callback will have to free the SA request. */
183                         ret = 0;
184                 } else {
185                         wait_for_completion(&request->signal);
186
187                         ret = request->status;
188
189                         if (ret != 0) {
190                                 CERROR ("Error %d in advertising operation %d for NID "LPX64"\n",
191                                         ret, op, kibnal_data.kib_nid);
192                         }
193                         
194                         free_sa_request(request);
195                 }
196         }
197
198         return ret;
199 }
200
201 static int
202 kibnal_set_mynid(ptl_nid_t nid)
203 {
204         struct timeval tv;
205         lib_ni_t      *ni = &kibnal_lib.libnal_ni;
206         int            rc;
207         vv_return_t    retval;
208
209         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
210                nid, ni->ni_pid.nid);
211
212         do_gettimeofday(&tv);
213
214         down (&kibnal_data.kib_nid_mutex);
215
216         if (nid == kibnal_data.kib_nid) {
217                 /* no change of NID */
218                 up (&kibnal_data.kib_nid_mutex);
219                 return (0);
220         }
221
222         CDEBUG(D_NET, "NID "LPX64"("LPX64")\n",
223                kibnal_data.kib_nid, nid);
224
225         /* Unsubscribes the current NID */
226         if (kibnal_data.kib_nid != PTL_NID_ANY) {
227
228                 rc = kibnal_advertize_op(kibnal_data.kib_nid, SUBN_ADM_DELETE, NULL, NULL);
229
230                 if (rc) {
231                         CERROR("Error %d unadvertising NID "LPX64"\n",
232                                rc, kibnal_data.kib_nid);
233                 }
234         }
235         
236         kibnal_data.kib_nid = ni->ni_pid.nid = nid;
237         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
238
239         /* Destroys the current endpoint, if any. */
240         if (kibnal_data.kib_cep) {
241                 retval = cm_cancel(kibnal_data.kib_cep);
242                 if (retval)
243                         CERROR ("Error %d stopping listener\n", retval);
244         
245                 retval = cm_destroy_cep(kibnal_data.kib_cep);
246                 if (retval)
247                         CERROR ("Error %d destroying CEP\n", retval);
248         
249                 kibnal_data.kib_cep = NULL;
250         }
251         
252         /* Delete all existing peers and their connections after new
253          * NID/incarnation set to ensure no old connections in our brave
254          * new world. */
255         kibnal_del_peer (PTL_NID_ANY, 0);
256
257         if (kibnal_data.kib_nid == PTL_NID_ANY) {
258                 /* No new NID to install. The driver is shuting down. */
259                 up (&kibnal_data.kib_nid_mutex);
260                 return (0);
261         }
262
263         /* remove any previous advert (crashed node etc) */
264         kibnal_advertize_op(kibnal_data.kib_nid, SUBN_ADM_DELETE, NULL, NULL);
265
266         kibnal_data.kib_cep = cm_create_cep(cm_cep_transp_rc);
267         if (kibnal_data.kib_cep == NULL) {
268                 CERROR ("Can't create CEP\n");
269                 rc = -ENOMEM;
270         } else {
271                 cm_return_t cmret;
272                 cm_listen_data_t info;
273
274                 CDEBUG(D_NET, "Created CEP %p for listening\n", kibnal_data.kib_cep);
275
276                 memset(&info, 0, sizeof(info));
277                 info.listen_addr.end_pt.sid = kibnal_data.kib_service_id;
278
279                 cmret = cm_listen(kibnal_data.kib_cep, &info,
280                                   kibnal_listen_callback, NULL);
281                 if (cmret) {
282                         CERROR ("cm_listen error: %d\n", cmret);
283                         rc = -EINVAL;
284                 } else {
285                         rc = 0;
286                 }
287         }
288         
289         if (rc == 0) {
290                 rc = kibnal_advertize_op(kibnal_data.kib_nid, SUBN_ADM_SET, NULL, NULL);
291                 if (rc == 0) {
292 #ifdef IBNAL_CHECK_ADVERT
293                         kibnal_advertize_op(kibnal_data.kib_nid, SUBN_ADM_GET, NULL, NULL);
294 #endif
295                         up (&kibnal_data.kib_nid_mutex);
296                         return (0);
297                 }
298                 
299                 retval = cm_cancel (kibnal_data.kib_cep);
300                 if (retval)
301                         CERROR("cm_cancel failed: %d\n", retval);
302
303                 retval = cm_destroy_cep (kibnal_data.kib_cep);
304                 if (retval)
305                         CERROR("cm_destroy_cep failed: %d\n", retval);
306
307                 /* remove any peers that sprung up while I failed to
308                  * advertise myself */
309                 kibnal_del_peer (PTL_NID_ANY, 0);
310         }
311
312         kibnal_data.kib_nid = PTL_NID_ANY;
313         up (&kibnal_data.kib_nid_mutex);
314         return (rc);
315 }
316
317 kib_peer_t *
318 kibnal_create_peer (ptl_nid_t nid)
319 {
320         kib_peer_t *peer;
321
322         LASSERT (nid != PTL_NID_ANY);
323
324         PORTAL_ALLOC(peer, sizeof (*peer));
325         if (peer == NULL) {
326                 CERROR("Canot allocate perr\n");
327                 return (NULL);
328         }
329
330         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
331
332         peer->ibp_nid = nid;
333         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
334
335         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
336         INIT_LIST_HEAD (&peer->ibp_conns);
337         INIT_LIST_HEAD (&peer->ibp_tx_queue);
338
339         peer->ibp_reconnect_time = jiffies;
340         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
341
342         atomic_inc (&kibnal_data.kib_npeers);
343         return (peer);
344 }
345
346 void
347 kibnal_destroy_peer (kib_peer_t *peer)
348 {
349
350         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
351         LASSERT (peer->ibp_persistence == 0);
352         LASSERT (!kibnal_peer_active(peer));
353         LASSERT (peer->ibp_connecting == 0);
354         LASSERT (list_empty (&peer->ibp_conns));
355         LASSERT (list_empty (&peer->ibp_tx_queue));
356         
357         PORTAL_FREE (peer, sizeof (*peer));
358
359         /* NB a peer's connections keep a reference on their peer until
360          * they are destroyed, so we can be assured that _all_ state to do
361          * with this peer has been cleaned up when its refcount drops to
362          * zero. */
363         atomic_dec (&kibnal_data.kib_npeers);
364 }
365
366 /* the caller is responsible for accounting for the additional reference
367  * that this creates */
368 kib_peer_t *
369 kibnal_find_peer_locked (ptl_nid_t nid)
370 {
371         struct list_head *peer_list = kibnal_nid2peerlist (nid);
372         struct list_head *tmp;
373         kib_peer_t       *peer;
374
375         list_for_each (tmp, peer_list) {
376
377                 peer = list_entry (tmp, kib_peer_t, ibp_list);
378
379                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
380                          peer->ibp_connecting != 0 || /* creating conns */
381                          !list_empty (&peer->ibp_conns));  /* active conn */
382
383                 if (peer->ibp_nid != nid)
384                         continue;
385
386                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
387                        peer, nid, atomic_read (&peer->ibp_refcount));
388                 return (peer);
389         }
390         return (NULL);
391 }
392
393 kib_peer_t *
394 kibnal_get_peer (ptl_nid_t nid)
395 {
396         kib_peer_t     *peer;
397         unsigned long   flags;
398
399         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
400         peer = kibnal_find_peer_locked (nid);
401         if (peer != NULL)                       /* +1 ref for caller? */
402                 kib_peer_addref(peer);
403         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
404
405         return (peer);
406 }
407
408 void
409 kibnal_unlink_peer_locked (kib_peer_t *peer)
410 {
411         LASSERT (peer->ibp_persistence == 0);
412         LASSERT (list_empty(&peer->ibp_conns));
413
414         LASSERT (kibnal_peer_active(peer));
415         list_del_init (&peer->ibp_list);
416         /* lose peerlist's ref */
417         kib_peer_decref(peer);
418 }
419
420 static int
421 kibnal_get_peer_info (int index, ptl_nid_t *nidp, int *persistencep)
422 {
423         kib_peer_t        *peer;
424         struct list_head  *ptmp;
425         unsigned long      flags;
426         int                i;
427
428         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
429
430         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
431
432                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
433
434                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
435                         LASSERT (peer->ibp_persistence != 0 ||
436                                  peer->ibp_connecting != 0 ||
437                                  !list_empty (&peer->ibp_conns));
438
439                         if (index-- > 0)
440                                 continue;
441
442                         *nidp = peer->ibp_nid;
443                         *persistencep = peer->ibp_persistence;
444
445                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
446                                                flags);
447                         return (0);
448                 }
449         }
450
451         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
452         return (-ENOENT);
453 }
454
455 static int
456 kibnal_add_persistent_peer (ptl_nid_t nid)
457 {
458         unsigned long      flags;
459         kib_peer_t        *peer;
460         kib_peer_t        *peer2;
461         
462         if (nid == PTL_NID_ANY)
463                 return (-EINVAL);
464
465         peer = kibnal_create_peer (nid);
466         if (peer == NULL)
467                 return (-ENOMEM);
468
469         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
470
471         peer2 = kibnal_find_peer_locked (nid);
472         if (peer2 != NULL) {
473                 kib_peer_decref (peer);
474                 peer = peer2;
475         } else {
476                 /* peer table takes existing ref on peer */
477                 list_add_tail (&peer->ibp_list,
478                                kibnal_nid2peerlist (nid));
479         }
480
481         peer->ibp_persistence++;
482         
483         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
484         return (0);
485 }
486
487 static void
488 kibnal_del_peer_locked (kib_peer_t *peer, int single_share)
489 {
490         struct list_head *ctmp;
491         struct list_head *cnxt;
492         kib_conn_t       *conn;
493
494         if (!single_share)
495                 peer->ibp_persistence = 0;
496         else if (peer->ibp_persistence > 0)
497                 peer->ibp_persistence--;
498
499         if (peer->ibp_persistence != 0)
500                 return;
501
502         if (list_empty(&peer->ibp_conns)) {
503                 kibnal_unlink_peer_locked(peer);
504         } else {
505                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
506                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
507
508                         kibnal_close_conn_locked (conn, 0);
509                 }
510                 /* NB peer is no longer persistent; closing its last conn
511                  * unlinked it. */
512         }
513         /* NB peer now unlinked; might even be freed if the peer table had the
514          * last ref on it. */
515 }
516
517 int
518 kibnal_del_peer (ptl_nid_t nid, int single_share)
519 {
520         unsigned long      flags;
521         struct list_head  *ptmp;
522         struct list_head  *pnxt;
523         kib_peer_t        *peer;
524         int                lo;
525         int                hi;
526         int                i;
527         int                rc = -ENOENT;
528
529         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
530
531         if (nid != PTL_NID_ANY)
532                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
533         else {
534                 lo = 0;
535                 hi = kibnal_data.kib_peer_hash_size - 1;
536         }
537
538         for (i = lo; i <= hi; i++) {
539                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
540                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
541                         LASSERT (peer->ibp_persistence != 0 ||
542                                  peer->ibp_connecting != 0 ||
543                                  !list_empty (&peer->ibp_conns));
544
545                         if (!(nid == PTL_NID_ANY || peer->ibp_nid == nid))
546                                 continue;
547
548                         kibnal_del_peer_locked (peer, single_share);
549                         rc = 0;         /* matched something */
550
551                         if (single_share)
552                                 goto out;
553                 }
554         }
555  out:
556         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
557
558         return (rc);
559 }
560
561 static kib_conn_t *
562 kibnal_get_conn_by_idx (int index)
563 {
564         kib_peer_t        *peer;
565         struct list_head  *ptmp;
566         kib_conn_t        *conn;
567         struct list_head  *ctmp;
568         unsigned long      flags;
569         int                i;
570
571         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
572
573         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
574                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
575
576                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
577                         LASSERT (peer->ibp_persistence > 0 ||
578                                  peer->ibp_connecting != 0 ||
579                                  !list_empty (&peer->ibp_conns));
580
581                         list_for_each (ctmp, &peer->ibp_conns) {
582                                 if (index-- > 0)
583                                         continue;
584
585                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
586                                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
587                                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
588                                        atomic_read (&conn->ibc_refcount));
589                                 atomic_inc (&conn->ibc_refcount);
590                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
591                                                        flags);
592                                 return (conn);
593                         }
594                 }
595         }
596
597         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
598         return (NULL);
599 }
600
601 kib_conn_t *
602 kibnal_create_conn (void)
603 {
604         kib_conn_t  *conn;
605         int          i;
606         __u64        vaddr = 0;
607         __u64        vaddr_base;
608         int          page_offset;
609         int          ipage;
610         vv_qp_attr_t qp_attr;
611         vv_return_t  retval;
612         int          rc;
613         void        *qp_context;
614         
615         PORTAL_ALLOC(conn, sizeof (*conn));
616         if (conn == NULL) {
617                 CERROR ("Can't allocate connection\n");
618                 return (NULL);
619         }
620
621         /* zero flags, NULL pointers etc... */
622         memset (conn, 0, sizeof (*conn));
623
624         INIT_LIST_HEAD (&conn->ibc_tx_queue);
625         INIT_LIST_HEAD (&conn->ibc_active_txs);
626         spin_lock_init (&conn->ibc_lock);
627         
628         atomic_inc (&kibnal_data.kib_nconns);
629         /* well not really, but I call destroy() on failure, which decrements */
630
631         PORTAL_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
632         if (conn->ibc_rxs == NULL) {
633                 CERROR("Cannot allocate RX buffers\n");
634                 goto failed;
635         }
636         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
637
638         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES, 1);
639         if (rc != 0)
640                 goto failed;
641
642         vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;
643
644         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
645                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
646                 kib_rx_t   *rx = &conn->ibc_rxs[i];
647
648                 rx->rx_conn = conn;
649                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
650                              page_offset);
651
652                 if (kibnal_whole_mem()) {
653                         void *newaddr;
654                         vv_mem_reg_h_t mem_h;
655                         vv_r_key_t r_key;
656
657                         /* Voltaire stack already registers the whole
658                          * memory, so use that API. */
659                         retval = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
660                                                       rx->rx_msg,
661                                                       IBNAL_MSG_SIZE,
662                                                       &mem_h,
663                                                       &rx->l_key,
664                                                       &r_key);
665                         if (retval) {
666                                 CERROR("vv_get_gen_mr_attrib failed: %d", retval);
667                                 /* TODO: free pages? */
668                                 goto failed;
669                         }
670                 }
671                 
672                 vaddr += IBNAL_MSG_SIZE;
673                 LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);
674                 
675                 page_offset += IBNAL_MSG_SIZE;
676                 LASSERT (page_offset <= PAGE_SIZE);
677
678                 if (page_offset == PAGE_SIZE) {
679                         page_offset = 0;
680                         ipage++;
681                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
682                 }
683         }
684
685         qp_attr = (vv_qp_attr_t) {
686                 .create.qp_type          = vv_qp_type_r_conn,
687                 .create.cq_send_h        = kibnal_data.kib_cq,
688                 .create.cq_receive_h     = kibnal_data.kib_cq,
689                 .create.send_max_outstand_wr = IBNAL_TX_MAX_SG * 
690                                            IBNAL_MSG_QUEUE_SIZE,
691                 .create.receive_max_outstand_wr = IBNAL_MSG_QUEUE_SIZE,
692                 .create.max_scatgat_per_send_wr = 1,
693                 .create.max_scatgat_per_receive_wr = 1,
694                 .create.signaling_type   = vv_selectable_signaling, /* TODO: correct? */
695                 .create.pd_h             = kibnal_data.kib_pd,
696                 .create.recv_solicited_events = vv_signal_all,
697         };
698         retval = vv_qp_create(kibnal_data.kib_hca, &qp_attr, NULL,
699                               &conn->ibc_qp, &conn->ibc_qp_attrs);
700         if (retval != 0) {
701                 CERROR ("Failed to create queue pair: %d\n", retval);
702                 goto failed;
703         }
704
705         /* Mark QP created */
706         conn->ibc_state = IBNAL_CONN_INIT_QP;
707
708         qp_attr = (vv_qp_attr_t) {
709                 .modify.qp_modify_into_state = vv_qp_state_init,
710                 .modify.vv_qp_attr_mask      = VV_QP_AT_STATE | VV_QP_AT_PHY_PORT_NUM | VV_QP_AT_P_KEY_IX | VV_QP_AT_ACCESS_CON_F,
711                 .modify.qp_type              = vv_qp_type_r_conn,
712
713                 .modify.params.init.p_key_indx      = 0,
714                 .modify.params.init.phy_port_num    = kibnal_data.kib_port,
715                 .modify.params.init.access_control  = vv_acc_r_mem_write | vv_acc_r_mem_read,
716         };
717         retval = vv_qp_modify(kibnal_data.kib_hca, conn->ibc_qp, &qp_attr, &conn->ibc_qp_attrs);
718         if (retval != 0) {
719                 CERROR ("Failed to modify queue pair: %d\n", retval);
720                 goto failed;
721         }
722
723         retval = vv_qp_query(kibnal_data.kib_hca, conn->ibc_qp, &qp_context, &conn->ibc_qp_attrs);
724         if (retval) {
725                 CERROR ("Failed to query queue pair: %d\n", retval);
726                 goto failed;
727         }
728
729         /* 1 ref for caller */
730         atomic_set (&conn->ibc_refcount, 1);
731         return (conn);
732         
733  failed:
734         kibnal_destroy_conn (conn);
735         return (NULL);
736 }
737
738 void
739 kibnal_destroy_conn (kib_conn_t *conn)
740 {
741         vv_return_t retval;
742         
743         CDEBUG (D_NET, "connection %p\n", conn);
744
745         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
746         LASSERT (list_empty(&conn->ibc_tx_queue));
747         LASSERT (list_empty(&conn->ibc_active_txs));
748         LASSERT (conn->ibc_nsends_posted == 0);
749         LASSERT (conn->ibc_connreq == NULL);
750
751         switch (conn->ibc_state) {
752         case IBNAL_CONN_DISCONNECTED:
753                 /* called after connection sequence initiated */
754                 /* fall through */
755
756         case IBNAL_CONN_INIT_QP:
757                 /* _destroy includes an implicit Reset of the QP which 
758                  * discards posted work */
759                 retval = vv_qp_destroy(kibnal_data.kib_hca, conn->ibc_qp);
760                 if (retval)
761                         CERROR("Can't destroy QP: %d\n", retval);
762                 /* fall through */
763                 
764         case IBNAL_CONN_INIT_NOTHING:
765                 break;
766
767         default:
768                 LASSERT (0);
769         }
770
771         if (conn->ibc_cep != NULL) {
772                 retval = cm_destroy_cep(conn->ibc_cep);
773                 if (retval)
774                         CERROR("Can't destroy CEP %p: %d\n", conn->ibc_cep, 
775                                retval);
776         }
777
778         if (conn->ibc_rx_pages != NULL) 
779                 kibnal_free_pages(conn->ibc_rx_pages);
780         
781         if (conn->ibc_rxs != NULL)
782                 PORTAL_FREE(conn->ibc_rxs, 
783                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
784
785         if (conn->ibc_peer != NULL)
786                 kib_peer_decref(conn->ibc_peer);
787
788         PORTAL_FREE(conn, sizeof (*conn));
789
790         atomic_dec(&kibnal_data.kib_nconns);
791         
792         if (atomic_read (&kibnal_data.kib_nconns) == 0 &&
793             kibnal_data.kib_shutdown) {
794                 /* I just nuked the last connection on shutdown; wake up
795                  * everyone so they can exit. */
796                 wake_up_all(&kibnal_data.kib_sched_waitq);
797                 wake_up_all(&kibnal_data.kib_connd_waitq);
798         }
799 }
800
801 void
802 kibnal_put_conn (kib_conn_t *conn)
803 {
804         unsigned long flags;
805
806         CDEBUG (D_NET, "putting conn[%p] state %d -> "LPX64" (%d)\n",
807                 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
808                 atomic_read (&conn->ibc_refcount));
809
810         LASSERT (atomic_read (&conn->ibc_refcount) > 0);
811         if (!atomic_dec_and_test (&conn->ibc_refcount))
812                 return;
813
814         /* must disconnect before dropping the final ref */
815         LASSERT (conn->ibc_state == IBNAL_CONN_DISCONNECTED);
816
817         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
818
819         list_add (&conn->ibc_list, &kibnal_data.kib_connd_conns);
820         wake_up (&kibnal_data.kib_connd_waitq);
821
822         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
823 }
824
825 static int
826 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
827 {
828         kib_conn_t         *conn;
829         struct list_head   *ctmp;
830         struct list_head   *cnxt;
831         int                 count = 0;
832
833         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
834                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
835
836                 count++;
837                 kibnal_close_conn_locked (conn, why);
838         }
839
840         return (count);
841 }
842
843 int
844 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
845 {
846         kib_conn_t         *conn;
847         struct list_head   *ctmp;
848         struct list_head   *cnxt;
849         int                 count = 0;
850
851         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
852                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
853
854                 if (conn->ibc_incarnation == incarnation)
855                         continue;
856
857                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
858                        peer->ibp_nid, conn->ibc_incarnation, incarnation);
859                 
860                 count++;
861                 kibnal_close_conn_locked (conn, -ESTALE);
862         }
863
864         return (count);
865 }
866
867 static int
868 kibnal_close_matching_conns (ptl_nid_t nid)
869 {
870         unsigned long       flags;
871         kib_peer_t         *peer;
872         struct list_head   *ptmp;
873         struct list_head   *pnxt;
874         int                 lo;
875         int                 hi;
876         int                 i;
877         int                 count = 0;
878
879         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
880
881         if (nid != PTL_NID_ANY)
882                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
883         else {
884                 lo = 0;
885                 hi = kibnal_data.kib_peer_hash_size - 1;
886         }
887
888         for (i = lo; i <= hi; i++) {
889                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
890
891                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
892                         LASSERT (peer->ibp_persistence != 0 ||
893                                  peer->ibp_connecting != 0 ||
894                                  !list_empty (&peer->ibp_conns));
895
896                         if (!(nid == PTL_NID_ANY || nid == peer->ibp_nid))
897                                 continue;
898
899                         count += kibnal_close_peer_conns_locked (peer, 0);
900                 }
901         }
902
903         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
904
905         /* wildcards always succeed */
906         if (nid == PTL_NID_ANY)
907                 return (0);
908         
909         return (count == 0 ? -ENOENT : 0);
910 }
911
912 static int
913 kibnal_cmd(struct portals_cfg *pcfg, void * private)
914 {
915         int rc = -EINVAL;
916         ENTRY;
917
918         LASSERT (pcfg != NULL);
919
920         switch(pcfg->pcfg_command) {
921         case NAL_CMD_GET_PEER: {
922                 ptl_nid_t   nid = 0;
923                 int         share_count = 0;
924
925                 rc = kibnal_get_peer_info(pcfg->pcfg_count,
926                                           &nid, &share_count);
927                 pcfg->pcfg_nid   = nid;
928                 pcfg->pcfg_size  = 0;
929                 pcfg->pcfg_id    = 0;
930                 pcfg->pcfg_misc  = 0;
931                 pcfg->pcfg_count = 0;
932                 pcfg->pcfg_wait  = share_count;
933                 break;
934         }
935         case NAL_CMD_ADD_PEER: {
936                 rc = kibnal_add_persistent_peer (pcfg->pcfg_nid);
937                 break;
938         }
939         case NAL_CMD_DEL_PEER: {
940                 rc = kibnal_del_peer (pcfg->pcfg_nid, 
941                                        /* flags == single_share */
942                                        pcfg->pcfg_flags != 0);
943                 break;
944         }
945         case NAL_CMD_GET_CONN: {
946                 kib_conn_t *conn = kibnal_get_conn_by_idx (pcfg->pcfg_count);
947
948                 if (conn == NULL)
949                         rc = -ENOENT;
950                 else {
951                         rc = 0;
952                         pcfg->pcfg_nid   = conn->ibc_peer->ibp_nid;
953                         pcfg->pcfg_id    = 0;
954                         pcfg->pcfg_misc  = 0;
955                         pcfg->pcfg_flags = 0;
956                         kibnal_put_conn (conn);
957                 }
958                 break;
959         }
960         case NAL_CMD_CLOSE_CONNECTION: {
961                 rc = kibnal_close_matching_conns (pcfg->pcfg_nid);
962                 break;
963         }
964         case NAL_CMD_REGISTER_MYNID: {
965                 if (pcfg->pcfg_nid == PTL_NID_ANY)
966                         rc = -EINVAL;
967                 else
968                         rc = kibnal_set_mynid (pcfg->pcfg_nid);
969                 break;
970         }
971         }
972
973         RETURN(rc);
974 }
975
976 void
977 kibnal_free_pages (kib_pages_t *p)
978 {
979         int     npages = p->ibp_npages;
980         vv_return_t retval;
981         int     i;
982         
983         if (p->ibp_mapped) {
984                 retval = vv_mem_region_destroy(kibnal_data.kib_hca, p->ibp_handle);
985                 if (retval != 0)
986                         CERROR ("Deregister error: %d\n", retval);
987         }
988         
989         for (i = 0; i < npages; i++)
990                 if (p->ibp_pages[i] != NULL)
991                         __free_page(p->ibp_pages[i]);
992         
993         PORTAL_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
994 }
995
996 int
997 kibnal_alloc_pages (kib_pages_t **pp, int npages, int allow_write)
998 {
999         kib_pages_t   *p;
1000         vv_phy_list_t  phys_pages;
1001         vv_phy_buf_t  *phys_buf;
1002         int            i;
1003         vv_return_t    retval;
1004
1005         PORTAL_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1006         if (p == NULL) {
1007                 CERROR ("Can't allocate buffer %d\n", npages);
1008                 return (-ENOMEM);
1009         }
1010
1011         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1012         p->ibp_npages = npages;
1013         
1014         for (i = 0; i < npages; i++) {
1015                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1016                 if (p->ibp_pages[i] == NULL) {
1017                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1018                         kibnal_free_pages(p);
1019                         return (-ENOMEM);
1020                 }
1021         }
1022
1023         if (kibnal_whole_mem())
1024                 goto out;
1025
1026         PORTAL_ALLOC(phys_buf, npages * sizeof(vv_phy_buf_t));
1027         if (phys_buf == NULL) {
1028                 CERROR ("Can't allocate phys_buf for %d pages\n", npages);
1029                 /* XXX free ibp_pages? */
1030                 kibnal_free_pages(p);
1031                 return (-ENOMEM);
1032         }
1033
1034         phys_pages.number_of_buff = npages;
1035         phys_pages.phy_list = phys_buf;
1036
1037         /* if we were using the _contig_ registration variant we would have
1038          * an array of PhysAddr/Length pairs, but the discontiguous variant
1039          * just takes the PhysAddr */
1040         for (i = 0; i < npages; i++) {
1041                 phys_buf[i].start = kibnal_page2phys(p->ibp_pages[i]);
1042                 phys_buf[i].size = PAGE_SIZE;
1043         }
1044
1045         retval = vv_phy_mem_region_register(kibnal_data.kib_hca,
1046                                             &phys_pages,
1047                                             0, /* requested vaddr */
1048                                             npages * PAGE_SIZE,
1049                                             0, /* offset */
1050                                             kibnal_data.kib_pd,
1051                                             vv_acc_l_mem_write | vv_acc_r_mem_write | vv_acc_r_mem_read | vv_acc_mem_bind, /* TODO: translated as-is, but seems incorrect or too much */
1052                                             &p->ibp_handle, &p->ibp_vaddr,                                           
1053                                             &p->ibp_lkey, &p->ibp_rkey);
1054         
1055         PORTAL_FREE(phys_buf, npages * sizeof(vv_phy_buf_t));
1056         
1057         if (retval) {
1058                 CERROR ("Error %d mapping %d pages\n", retval, npages);
1059                 kibnal_free_pages(p);
1060                 return (-ENOMEM);
1061         }
1062
1063         CDEBUG(D_NET, "registered %d pages; handle: %x vaddr "LPX64" "
1064                       "lkey %x rkey %x\n", npages, p->ibp_handle,
1065                       p->ibp_vaddr, p->ibp_lkey, p->ibp_rkey);
1066         
1067         p->ibp_mapped = 1;
1068 out:
1069         *pp = p;
1070         return (0);
1071 }
1072
1073 static int
1074 kibnal_setup_tx_descs (void)
1075 {
1076         int           ipage = 0;
1077         int           page_offset = 0;
1078         __u64         vaddr;
1079         __u64         vaddr_base;
1080         struct page  *page;
1081         kib_tx_t     *tx;
1082         int           i;
1083         int           rc;
1084
1085         /* pre-mapped messages are not bigger than 1 page */
1086         LASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1087
1088         /* No fancy arithmetic when we do the buffer calculations */
1089         LASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1090
1091         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages, IBNAL_TX_MSG_PAGES, 
1092                                 0);
1093         if (rc != 0)
1094                 return (rc);
1095
1096         /* ignored for the whole_mem case */
1097         vaddr = vaddr_base = kibnal_data.kib_tx_pages->ibp_vaddr;
1098
1099         for (i = 0; i < IBNAL_TX_MSGS; i++) {
1100                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1101                 tx = &kibnal_data.kib_tx_descs[i];
1102
1103                 memset (tx, 0, sizeof(*tx));    /* zero flags etc */
1104                 
1105                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1106                                            page_offset);
1107
1108                 if (kibnal_whole_mem()) {
1109                         void *newaddr;
1110                         vv_mem_reg_h_t mem_h;
1111                         vv_return_t  retval;
1112
1113                         /* Voltaire stack already registers the whole
1114                          * memory, so use that API. */
1115                         retval = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1116                                                       tx->tx_msg,
1117                                                       IBNAL_MSG_SIZE,
1118                                                       &mem_h,
1119                                                       &tx->l_key,
1120                                                       &tx->r_key);
1121                         if (retval) {
1122                                 CERROR("vv_get_gen_mr_attrib failed: %d", retval);
1123                                 /* TODO: free pages? */
1124                                 /* TODO: return. */
1125                         }
1126                 }
1127
1128                 tx->tx_isnblk = (i >= IBNAL_NTX);
1129                 tx->tx_mapped = KIB_TX_UNMAPPED;
1130
1131                 CDEBUG(D_NET, "Tx[%d] %p->%p\n", i, tx, tx->tx_msg);
1132
1133                 if (tx->tx_isnblk)
1134                         list_add (&tx->tx_list, 
1135                                   &kibnal_data.kib_idle_nblk_txs);
1136                 else
1137                         list_add (&tx->tx_list, 
1138                                   &kibnal_data.kib_idle_txs);
1139
1140                 vaddr += IBNAL_MSG_SIZE;
1141                 LASSERT (vaddr <= vaddr_base + IBNAL_TX_MSG_BYTES);
1142
1143                 page_offset += IBNAL_MSG_SIZE;
1144                 LASSERT (page_offset <= PAGE_SIZE);
1145
1146                 if (page_offset == PAGE_SIZE) {
1147                         page_offset = 0;
1148                         ipage++;
1149                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES);
1150                 }
1151         }
1152         
1153         return (0);
1154 }
1155
1156 static void
1157 kibnal_api_shutdown (nal_t *nal)
1158 {
1159         int   i;
1160         int   rc;
1161         vv_return_t retval;
1162
1163         if (nal->nal_refct != 0) {
1164                 /* This module got the first ref */
1165                 PORTAL_MODULE_UNUSE;
1166                 return;
1167         }
1168
1169         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1170                atomic_read (&portal_kmemory));
1171
1172         LASSERT(nal == &kibnal_api);
1173
1174         switch (kibnal_data.kib_init) {
1175
1176         case IBNAL_INIT_ALL:
1177                 /* stop calls to nal_cmd */
1178                 libcfs_nal_cmd_unregister(VIBNAL);
1179                 /* No new peers */
1180
1181                 /* resetting my NID to unadvertises me, removes my
1182                  * listener and nukes all current peers */
1183                 kibnal_set_mynid (PTL_NID_ANY);
1184
1185                 /* Wait for all peer state to clean up (crazy) */
1186                 i = 2;
1187                 while (atomic_read (&kibnal_data.kib_npeers) != 0) {
1188                         i++;
1189                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1190                                "waiting for %d peers to disconnect (can take a few seconds)\n",
1191                                atomic_read (&kibnal_data.kib_npeers));
1192                         set_current_state (TASK_UNINTERRUPTIBLE);
1193                         schedule_timeout (HZ);
1194                 }
1195                 /* fall through */
1196
1197         case IBNAL_INIT_CQ:
1198                 retval = vv_cq_destroy(kibnal_data.kib_hca, kibnal_data.kib_cq);
1199                 if (retval)
1200                         CERROR ("Destroy CQ error: %d\n", retval);
1201                 /* fall through */
1202
1203         case IBNAL_INIT_TXD:
1204                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1205                 /* fall through */
1206
1207 #if IBNAL_FMR
1208         case IBNAL_INIT_FMR:
1209                 rc = ib_fmr_pool_destroy (kibnal_data.kib_fmr_pool);
1210                 if (rc != 0)
1211                         CERROR ("Destroy FMR pool error: %d\n", rc);
1212                 /* fall through */
1213 #endif
1214         case IBNAL_INIT_PD:
1215 #if IBNAL_WHOLE_MEM==0
1216                 retval = vv_pd_deallocate(kibnal_data.kib_hca, kibnal_data.kib_pd);
1217                 if (retval != 0)
1218                         CERROR ("Destroy PD error: %d\n", retval);
1219 #endif
1220                 /* fall through */
1221
1222         case IBNAL_INIT_GSI:
1223                 retval = gsi_deregister_class(kibnal_data.gsi_handle);
1224                 if (retval != 0)
1225                         CERROR ("GSI deregister failed: %d\n", retval);
1226                 /* fall through */
1227
1228         case IBNAL_INIT_GSI_POOL:
1229                 gsi_dtgrm_pool_destroy(kibnal_data.gsi_pool_handle);
1230                 /* fall through */
1231
1232         case IBNAL_INIT_PORT:
1233                 /* XXX ??? */
1234                 /* fall through */
1235
1236         case IBNAL_INIT_ASYNC:
1237                 retval = vv_dell_async_event_cb (kibnal_data.kib_hca,
1238                                                  kibnal_ca_async_callback);
1239                 if (retval)
1240                         CERROR("deregister asynchronous call back error: %d\n", retval);
1241                         
1242                 /* fall through */
1243
1244         case IBNAL_INIT_HCA:
1245                 retval = vv_hca_close(kibnal_data.kib_hca);
1246                 if (retval != 0)
1247                         CERROR ("Close HCA  error: %d\n", retval);
1248                 /* fall through */
1249
1250         case IBNAL_INIT_LIB:
1251                 lib_fini(&kibnal_lib);
1252                 /* fall through */
1253
1254         case IBNAL_INIT_DATA:
1255                 /* Module refcount only gets to zero when all peers
1256                  * have been closed so all lists must be empty */
1257                 LASSERT (atomic_read (&kibnal_data.kib_npeers) == 0);
1258                 LASSERT (kibnal_data.kib_peers != NULL);
1259                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1260                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1261                 }
1262                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1263                 LASSERT (list_empty (&kibnal_data.kib_sched_rxq));
1264                 LASSERT (list_empty (&kibnal_data.kib_sched_txq));
1265                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1266                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1267
1268                 /* flag threads to terminate; wake and wait for them to die */
1269                 kibnal_data.kib_shutdown = 1;
1270                 wake_up_all (&kibnal_data.kib_sched_waitq);
1271                 wake_up_all (&kibnal_data.kib_connd_waitq);
1272
1273                 i = 2;
1274                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1275                         i++;
1276                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1277                                "Waiting for %d threads to terminate\n",
1278                                atomic_read (&kibnal_data.kib_nthreads));
1279                         set_current_state (TASK_INTERRUPTIBLE);
1280                         schedule_timeout (HZ);
1281                 }
1282                 /* fall through */
1283                 
1284         case IBNAL_INIT_NOTHING:
1285                 break;
1286         }
1287
1288         if (kibnal_data.kib_tx_descs != NULL)
1289                 PORTAL_FREE (kibnal_data.kib_tx_descs,
1290                              IBNAL_TX_MSGS * sizeof(kib_tx_t));
1291
1292         if (kibnal_data.kib_peers != NULL)
1293                 PORTAL_FREE (kibnal_data.kib_peers,
1294                              sizeof (struct list_head) * 
1295                              kibnal_data.kib_peer_hash_size);
1296
1297         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1298                atomic_read (&portal_kmemory));
1299         printk(KERN_INFO "Lustre: Voltaire IB NAL unloaded (final mem %d)\n",
1300                atomic_read(&portal_kmemory));
1301
1302         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1303 }
1304
1305 #define roundup_power(val, power) \
1306         ( (val + (__u64)(power - 1)) & ~((__u64)(power - 1)) )
1307
1308 /* this isn't very portable or sturdy in the face of funny mem/bus configs */
1309 static __u64 max_phys_mem(void)
1310 {
1311         struct sysinfo si;
1312         __u64 ret;
1313
1314         si_meminfo(&si);
1315         ret = (__u64)max(si.totalram, max_mapnr) * si.mem_unit;
1316         return roundup_power(ret, 128 * 1024 * 1024);
1317
1318 #undef roundup_power
1319
1320 static int
1321 kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1322                      ptl_ni_limits_t *requested_limits,
1323                      ptl_ni_limits_t *actual_limits)
1324 {
1325         ptl_process_id_t    process_id;
1326         int                 pkmem = atomic_read(&portal_kmemory);
1327         int                 rc;
1328         int                 i;
1329         vv_request_event_record_t req_er;
1330         vv_return_t         retval;
1331
1332         LASSERT (nal == &kibnal_api);
1333
1334         if (nal->nal_refct != 0) {
1335                 if (actual_limits != NULL)
1336                         *actual_limits = kibnal_lib.libnal_ni.ni_actual_limits;
1337                 /* This module got the first ref */
1338                 PORTAL_MODULE_USE;
1339                 return (PTL_OK);
1340         }
1341
1342         LASSERT (kibnal_data.kib_init == IBNAL_INIT_NOTHING);
1343
1344         init_MUTEX (&kibnal_data.kib_nid_mutex);
1345         kibnal_data.kib_nid = PTL_NID_ANY;
1346
1347         rwlock_init(&kibnal_data.kib_global_lock);
1348
1349         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1350         PORTAL_ALLOC (kibnal_data.kib_peers,
1351                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1352         if (kibnal_data.kib_peers == NULL) {
1353                 goto failed;
1354         }
1355         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1356                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1357
1358         spin_lock_init (&kibnal_data.kib_connd_lock);
1359         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1360         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1361         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1362
1363         spin_lock_init (&kibnal_data.kib_sched_lock);
1364         INIT_LIST_HEAD (&kibnal_data.kib_sched_txq);
1365         INIT_LIST_HEAD (&kibnal_data.kib_sched_rxq);
1366         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1367
1368         spin_lock_init (&kibnal_data.kib_tx_lock);
1369         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1370         INIT_LIST_HEAD (&kibnal_data.kib_idle_nblk_txs);
1371         init_waitqueue_head(&kibnal_data.kib_idle_tx_waitq);
1372
1373         INIT_LIST_HEAD (&kibnal_data.gsi_pending);
1374         init_MUTEX (&kibnal_data.gsi_mutex);
1375
1376         PORTAL_ALLOC (kibnal_data.kib_tx_descs,
1377                       IBNAL_TX_MSGS * sizeof(kib_tx_t));
1378         if (kibnal_data.kib_tx_descs == NULL) {
1379                 CERROR ("Can't allocate tx descs\n");
1380                 goto failed;
1381         }
1382
1383         /* lists/ptrs/locks initialised */
1384         kibnal_data.kib_init = IBNAL_INIT_DATA;
1385         /*****************************************************/
1386
1387         process_id.pid = requested_pid;
1388         process_id.nid = kibnal_data.kib_nid;
1389         
1390         rc = lib_init(&kibnal_lib, nal, process_id,
1391                       requested_limits, actual_limits);
1392         if (rc != PTL_OK) {
1393                 CERROR("lib_init failed: error %d\n", rc);
1394                 goto failed;
1395         }
1396
1397         /* lib interface initialised */
1398         kibnal_data.kib_init = IBNAL_INIT_LIB;
1399         /*****************************************************/
1400
1401         for (i = 0; i < IBNAL_N_SCHED; i++) {
1402                 rc = kibnal_thread_start (kibnal_scheduler, (void *)i);
1403                 if (rc != 0) {
1404                         CERROR("Can't spawn vibnal scheduler[%d]: %d\n",
1405                                i, rc);
1406                         goto failed;
1407                 }
1408         }
1409
1410         rc = kibnal_thread_start (kibnal_connd, NULL);
1411         if (rc != 0) {
1412                 CERROR ("Can't spawn vibnal connd: %d\n", rc);
1413                 goto failed;
1414         }
1415
1416         /* TODO: apparently only one adapter is supported */
1417         retval = vv_hca_open("ANY_HCA", NULL, &kibnal_data.kib_hca);
1418         if (retval) {
1419                 CERROR ("Can't open CA: %d\n", retval);
1420                 goto failed;
1421         }
1422
1423         /* Channel Adapter opened */
1424         kibnal_data.kib_init = IBNAL_INIT_HCA;
1425
1426         /* register to get HCA's asynchronous events. */
1427         req_er.req_event_type = VV_ASYNC_EVENT_ALL_MASK;
1428         retval = vv_set_async_event_cb (kibnal_data.kib_hca,
1429                                         req_er,
1430                                         kibnal_ca_async_callback);
1431
1432         if (retval) {
1433                 CERROR ("Can't open CA: %d\n", retval);
1434                 goto failed; 
1435         }
1436
1437         kibnal_data.kib_init = IBNAL_INIT_ASYNC;
1438
1439         /*****************************************************/
1440
1441         retval = vv_hca_query(kibnal_data.kib_hca,
1442                              &kibnal_data.kib_hca_attrs);
1443         if (retval) {
1444                 CERROR ("Can't size port attrs: %d\n", retval);
1445                 goto failed;
1446         }
1447
1448         kibnal_data.kib_port = -1;
1449
1450         for (i = 0; i<kibnal_data.kib_hca_attrs.port_num; i++) {
1451
1452                 int port_num = i+1;
1453                 u_int32_t tbl_count;
1454                 vv_port_attrib_t *pattr = &kibnal_data.kib_port_attr;
1455
1456                 retval = vv_port_query(kibnal_data.kib_hca, port_num, pattr);
1457                 if (retval) {
1458                         CERROR("vv_port_query failed for port %d: %d\n", port_num, retval);
1459                         continue;
1460                 }
1461
1462                 switch (pattr->port_state) {
1463                 case vv_state_linkDoun:
1464                         CDEBUG(D_NET, "port[%d] Down\n", port_num);
1465                         continue;
1466                 case vv_state_linkInit:
1467                         CDEBUG(D_NET, "port[%d] Init\n", port_num);
1468                         continue;
1469                 case vv_state_linkArm:
1470                         CDEBUG(D_NET, "port[%d] Armed\n", port_num);
1471                         continue;
1472                 case vv_state_linkActive:
1473                         CDEBUG(D_NET, "port[%d] Active\n", port_num);
1474
1475                         /* Found a suitable port. Get its GUID and PKEY. */
1476                         kibnal_data.kib_port = port_num;
1477                         
1478                         tbl_count = 1;
1479                         retval = vv_get_port_gid_tbl(kibnal_data.kib_hca, port_num, &tbl_count, &kibnal_data.kib_port_gid);
1480                         if (retval) {
1481                                 CERROR("vv_get_port_gid_tbl failed for port %d: %d\n", port_num, retval);
1482                                 continue;
1483                         }
1484
1485                         tbl_count = 1;
1486                         retval = vv_get_port_partition_tbl (kibnal_data.kib_hca, port_num, &tbl_count, &kibnal_data.kib_port_pkey);
1487                         if (retval) {
1488                                 CERROR("vv_get_port_partition_tbl failed for port %d: %d\n", port_num, retval);
1489                                 continue;
1490                         }
1491
1492                         break;
1493                 case vv_state_linkActDefer: /* TODO: correct? */
1494                 case vv_state_linkNoChange:
1495                         CERROR("Unexpected port[%d] state %d\n",
1496                                i, pattr->port_state);
1497                         continue;
1498                 }
1499                 break;
1500         }
1501
1502         if (kibnal_data.kib_port == -1) {
1503                 CERROR ("Can't find an active port\n");
1504                 goto failed;
1505         }
1506
1507         CDEBUG(D_NET, "Using port %d - GID="LPX64":"LPX64"\n",
1508                kibnal_data.kib_port, kibnal_data.kib_port_gid.scope.g.subnet, kibnal_data.kib_port_gid.scope.g.eui64);
1509         CDEBUG(D_NET, "got guid "LPX64"\n", cpu_to_le64(kibnal_data.kib_port_gid.scope.g.eui64));
1510         
1511         /* Active port found */
1512         kibnal_data.kib_init = IBNAL_INIT_PORT;
1513         /*****************************************************/
1514
1515         /* Prepare things to be able to send/receive MADS */
1516         retval = gsi_dtgrm_pool_create(IBNAL_CONCURRENT_PEERS, &kibnal_data.gsi_pool_handle);
1517         if (retval) {
1518                 CERROR("Could not create GSI pool: %d\n", retval);
1519                 goto failed;
1520         }
1521         kibnal_data.kib_init = IBNAL_INIT_GSI_POOL;
1522
1523         retval = gsi_register_class(MAD_CLASS_SUBN_ADM, /* TODO: correct? */
1524                                 2,      /* version */
1525                                 "ANY_HCA",
1526 #ifdef GSI_PASS_PORT_NUM
1527                                 kibnal_data.kib_port,
1528 #endif                   
1529                                 0, 0,
1530                                 vibnal_mad_sent_cb,     vibnal_mad_received_cb,
1531                                 NULL, &kibnal_data.gsi_handle);
1532         if (retval) {
1533                 CERROR("Cannot register GSI class: %d\n", retval);
1534                 goto failed;
1535         }
1536
1537         kibnal_data.kib_init = IBNAL_INIT_GSI;
1538         /*****************************************************/
1539
1540 #if IBNAL_WHOLE_MEM==0
1541         retval = vv_pd_allocate(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1542 #else
1543         retval = vv_get_gen_pd_h(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1544 #endif
1545         if (retval) {
1546                 CERROR ("Can't create PD: %d\n", retval);
1547                 goto failed;
1548         }
1549         
1550         /* flag PD initialised */
1551         kibnal_data.kib_init = IBNAL_INIT_PD;
1552         /*****************************************************/
1553
1554 #if IBNAL_FMR
1555         {
1556                 const int pool_size = IBNAL_NTX + IBNAL_NTX_NBLK;
1557                 struct ib_fmr_pool_param params = {
1558                         .max_pages_per_fmr = PTL_MTU/PAGE_SIZE,
1559                         .access            = (IB_ACCESS_LOCAL_WRITE |
1560                                               IB_ACCESS_REMOTE_WRITE |
1561                                               IB_ACCESS_REMOTE_READ),
1562                         .pool_size         = pool_size,
1563                         .dirty_watermark   = (pool_size * 3)/4,
1564                         .flush_function    = NULL,
1565                         .flush_arg         = NULL,
1566                         .cache             = 1,
1567                 };
1568                 rc = ib_fmr_pool_create(kibnal_data.kib_pd, &params,
1569                                         &kibnal_data.kib_fmr_pool);
1570                 if (rc != 0) {
1571                         CERROR ("Can't create FMR pool size %d: %d\n", 
1572                                 pool_size, rc);
1573                         goto failed;
1574                 }
1575         }
1576
1577         /* flag FMR pool initialised */
1578         kibnal_data.kib_init = IBNAL_INIT_FMR;
1579 #endif
1580
1581         /*****************************************************/
1582
1583         rc = kibnal_setup_tx_descs();
1584         if (rc != 0) {
1585                 CERROR ("Can't register tx descs: %d\n", rc);
1586                 goto failed;
1587         }
1588         
1589         /* flag TX descs initialised */
1590         kibnal_data.kib_init = IBNAL_INIT_TXD;
1591         /*****************************************************/
1592         {
1593                 uint32_t nentries;
1594
1595                 retval = vv_cq_create(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES,
1596                                       kibnal_ca_callback, 
1597                                       NULL, /* context */
1598                                       &kibnal_data.kib_cq, &nentries);
1599                 if (retval) {
1600                         CERROR ("Can't create RX CQ: %d\n", retval);
1601                         goto failed;
1602                 }
1603
1604                 /* flag CQ initialised */
1605                 kibnal_data.kib_init = IBNAL_INIT_CQ;
1606
1607                 if (nentries < IBNAL_CQ_ENTRIES) {
1608                         CERROR ("CQ only has %d entries, need %d\n", 
1609                                 nentries, IBNAL_CQ_ENTRIES);
1610                         goto failed;
1611                 }
1612
1613                 retval = vv_request_completion_notification(kibnal_data.kib_hca, kibnal_data.kib_cq, vv_next_solicit_unsolicit_event);
1614                 if (retval != 0) {
1615                         CERROR ("Failed to re-arm completion queue: %d\n", rc);
1616                         goto failed;
1617                 }
1618         }
1619         
1620         /*****************************************************/
1621
1622         rc = libcfs_nal_cmd_register(VIBNAL, &kibnal_cmd, NULL);
1623         if (rc != 0) {
1624                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1625                 goto failed;
1626         }
1627
1628         /* flag everything initialised */
1629         kibnal_data.kib_init = IBNAL_INIT_ALL;
1630         /*****************************************************/
1631
1632         printk(KERN_INFO "Lustre: Voltaire IB NAL loaded "
1633                "(initial mem %d)\n", pkmem);
1634
1635         return (PTL_OK);
1636
1637  failed:
1638         CDEBUG(D_NET, "kibnal_api_startup failed\n");
1639         kibnal_api_shutdown (&kibnal_api);    
1640         return (PTL_FAIL);
1641 }
1642
1643 void __exit
1644 kibnal_module_fini (void)
1645 {
1646 #ifdef CONFIG_SYSCTL
1647         if (kibnal_tunables.kib_sysctl != NULL)
1648                 unregister_sysctl_table (kibnal_tunables.kib_sysctl);
1649 #endif
1650         PtlNIFini(kibnal_ni);
1651
1652         ptl_unregister_nal(VIBNAL);
1653 }
1654
1655 int __init
1656 kibnal_module_init (void)
1657 {
1658         int    rc;
1659
1660         if (sizeof(kib_wire_connreq_t) > cm_REQ_priv_data_len) {
1661                 CERROR("sizeof(kib_wire_connreq_t) > cm_REQ_priv_data_len\n");
1662                 return -EINVAL;
1663         }
1664
1665         /* the following must be sizeof(int) for proc_dointvec() */
1666         if (sizeof (kibnal_tunables.kib_io_timeout) != sizeof (int)) {
1667                 CERROR("sizeof (kibnal_tunables.kib_io_timeout) != sizeof (int)\n");
1668                 return -EINVAL;
1669         }
1670
1671         kibnal_api.nal_ni_init = kibnal_api_startup;
1672         kibnal_api.nal_ni_fini = kibnal_api_shutdown;
1673
1674         /* Initialise dynamic tunables to defaults once only */
1675         kibnal_tunables.kib_io_timeout = IBNAL_IO_TIMEOUT;
1676
1677         rc = ptl_register_nal(VIBNAL, &kibnal_api);
1678         if (rc != PTL_OK) {
1679                 CERROR("Can't register IBNAL: %d\n", rc);
1680                 return (-ENOMEM);               /* or something... */
1681         }
1682
1683         /* Pure gateways want the NAL started up at module load time... */
1684         rc = PtlNIInit(VIBNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kibnal_ni);
1685         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
1686                 ptl_unregister_nal(VIBNAL);
1687                 return (-ENODEV);
1688         }
1689         
1690 #ifdef CONFIG_SYSCTL
1691         /* Press on regardless even if registering sysctl doesn't work */
1692         kibnal_tunables.kib_sysctl = 
1693                 register_sysctl_table (kibnal_top_ctl_table, 0);
1694 #endif
1695         return (0);
1696 }
1697
1698 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1699 MODULE_DESCRIPTION("Kernel Voltaire IB NAL v0.01");
1700 MODULE_LICENSE("GPL");
1701
1702 module_init(kibnal_module_init);
1703 module_exit(kibnal_module_fini);
1704