Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / portals / knals / openibnal / openibnal.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "openibnal.h"
25
26 nal_t                   koibnal_api;
27 ptl_handle_ni_t         koibnal_ni;
28 koib_data_t             koibnal_data;
29 koib_tunables_t         koibnal_tunables;
30
31 #ifdef CONFIG_SYSCTL
32 #define OPENIBNAL_SYSCTL        202
33
34 #define OPENIBNAL_SYSCTL_TIMEOUT     1
35 #define OPENIBNAL_SYSCTL_ZERO_COPY   2
36
37 static ctl_table koibnal_ctl_table[] = {
38         {OPENIBNAL_SYSCTL_TIMEOUT, "timeout", 
39          &koibnal_tunables.koib_io_timeout, sizeof (int),
40          0644, NULL, &proc_dointvec},
41         { 0 }
42 };
43
44 static ctl_table koibnal_top_ctl_table[] = {
45         {OPENIBNAL_SYSCTL, "openibnal", NULL, 0, 0555, koibnal_ctl_table},
46         { 0 }
47 };
48 #endif
49
50 void
51 print_service(struct ib_common_attrib_service *service, char *tag, int rc)
52 {
53         char name[32];
54
55         if (service == NULL) 
56         {
57                 CWARN("tag       : %s\n"
58                       "status    : %d (NULL)\n", tag, rc);
59                 return;
60         }
61         strncpy (name, service->service_name, sizeof(name)-1);
62         name[sizeof(name)-1] = 0;
63         
64         CWARN("tag       : %s\n"
65               "status    : %d\n"
66               "service id: "LPX64"\n"
67               "name      : %s\n"
68               "NID       : "LPX64"\n", tag, rc,
69               service->service_id, name, service->service_data64[0]);
70 }
71
72 void
73 koibnal_service_setunset_done (tTS_IB_CLIENT_QUERY_TID tid, int status,
74                                struct ib_common_attrib_service *service, void *arg)
75 {
76         *(int *)arg = status;
77         up (&koibnal_data.koib_nid_signal);
78 }
79
80 int
81 koibnal_advertise (void)
82 {
83         __u64   tid;
84         int     rc;
85         int     rc2;
86
87         LASSERT (koibnal_data.koib_nid != PTL_NID_ANY);
88
89         memset (&koibnal_data.koib_service, 0, 
90                 sizeof (koibnal_data.koib_service));
91         
92         koibnal_data.koib_service.service_id
93                 = koibnal_data.koib_cm_service_id;
94
95         rc = ib_cached_gid_get(koibnal_data.koib_device,
96                                koibnal_data.koib_port,
97                                0,
98                                koibnal_data.koib_service.service_gid);
99         if (rc != 0) {
100                 CERROR ("Can't get port %d GID: %d\n",
101                         koibnal_data.koib_port, rc);
102                 return (rc);
103         }
104         
105         rc = ib_cached_pkey_get(koibnal_data.koib_device,
106                                 koibnal_data.koib_port,
107                                 0,
108                                 &koibnal_data.koib_service.service_pkey);
109         if (rc != 0) {
110                 CERROR ("Can't get port %d PKEY: %d\n",
111                         koibnal_data.koib_port, rc);
112                 return (rc);
113         }
114         
115         koibnal_data.koib_service.service_lease = 0xffffffff;
116
117         koibnal_set_service_keys(&koibnal_data.koib_service, koibnal_data.koib_nid);
118
119         CDEBUG(D_NET, "Advertising service id "LPX64" %s:"LPX64"\n", 
120                koibnal_data.koib_service.service_id,
121                koibnal_data.koib_service.service_name, 
122                *koibnal_service_nid_field(&koibnal_data.koib_service));
123
124         rc = ib_service_set (koibnal_data.koib_device,
125                              koibnal_data.koib_port,
126                              &koibnal_data.koib_service,
127                              IB_SA_SERVICE_COMP_MASK_ID |
128                              IB_SA_SERVICE_COMP_MASK_GID |
129                              IB_SA_SERVICE_COMP_MASK_PKEY |
130                              IB_SA_SERVICE_COMP_MASK_LEASE |
131                              KOIBNAL_SERVICE_KEY_MASK,
132                              koibnal_tunables.koib_io_timeout * HZ,
133                              koibnal_service_setunset_done, &rc2, &tid);
134
135         if (rc == 0) {
136                 down (&koibnal_data.koib_nid_signal);
137                 rc = rc2;
138         }
139         
140         if (rc != 0)
141                 CERROR ("Error %d advertising SM service\n", rc);
142
143         return (rc);
144 }
145
146 int
147 koibnal_unadvertise (int expect_success)
148 {
149         __u64   tid;
150         int     rc;
151         int     rc2;
152
153         LASSERT (koibnal_data.koib_nid != PTL_NID_ANY);
154
155         memset (&koibnal_data.koib_service, 0,
156                 sizeof (koibnal_data.koib_service));
157
158         koibnal_set_service_keys(&koibnal_data.koib_service, koibnal_data.koib_nid);
159
160         CDEBUG(D_NET, "Unadvertising service %s:"LPX64"\n",
161                koibnal_data.koib_service.service_name,
162                *koibnal_service_nid_field(&koibnal_data.koib_service));
163
164         rc = ib_service_delete (koibnal_data.koib_device,
165                                 koibnal_data.koib_port,
166                                 &koibnal_data.koib_service,
167                                 KOIBNAL_SERVICE_KEY_MASK,
168                                 koibnal_tunables.koib_io_timeout * HZ,
169                                 koibnal_service_setunset_done, &rc2, &tid);
170         if (rc != 0) {
171                 CERROR ("Immediate error %d unadvertising NID "LPX64"\n",
172                         rc, koibnal_data.koib_nid);
173                 return (rc);
174         }
175
176         down (&koibnal_data.koib_nid_signal);
177         
178         if ((rc2 == 0) == !!expect_success)
179                 return (0);
180
181         if (expect_success)
182                 CERROR("Error %d unadvertising NID "LPX64"\n",
183                         rc, koibnal_data.koib_nid);
184         else
185                 CWARN("Removed conflicting NID "LPX64"\n",
186                       koibnal_data.koib_nid);
187
188         return (rc);
189 }
190
191 int
192 koibnal_check_advert (void)
193 {
194         __u64   tid;
195         int     rc;
196         int     rc2;
197
198         static struct ib_common_attrib_service srv;
199
200         memset (&srv, 0, sizeof (srv));
201
202         koibnal_set_service_keys(&srv, koibnal_data.koib_nid);
203
204         rc = ib_service_get (koibnal_data.koib_device, 
205                              koibnal_data.koib_port,
206                              &srv,
207                              KOIBNAL_SERVICE_KEY_MASK,
208                              koibnal_tunables.koib_io_timeout * HZ,
209                              koibnal_service_setunset_done, &rc2, 
210                              &tid);
211
212         if (rc != 0) {
213                 CERROR ("Immediate error %d checking SM service\n", rc);
214         } else {
215                 down (&koibnal_data.koib_nid_signal);
216                 rc = rc2;
217
218                 if (rc != 0)
219                         CERROR ("Error %d checking SM service\n", rc);
220         }
221
222         return (rc);
223 }
224
225 int
226 koibnal_set_mynid(ptl_nid_t nid)
227 {
228         struct timeval tv;
229         lib_ni_t      *ni = &koibnal_lib.libnal_ni;
230         int            rc;
231
232         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
233                nid, ni->ni_pid.nid);
234
235         do_gettimeofday(&tv);
236
237         down (&koibnal_data.koib_nid_mutex);
238
239         if (nid == koibnal_data.koib_nid) {
240                 /* no change of NID */
241                 up (&koibnal_data.koib_nid_mutex);
242                 return (0);
243         }
244
245         CDEBUG(D_NET, "NID "LPX64"("LPX64")\n",
246                koibnal_data.koib_nid, nid);
247         
248         if (koibnal_data.koib_nid != PTL_NID_ANY) {
249
250                 koibnal_unadvertise (1);
251
252                 rc = ib_cm_listen_stop (koibnal_data.koib_listen_handle);
253                 if (rc != 0)
254                         CERROR ("Error %d stopping listener\n", rc);
255         }
256         
257         koibnal_data.koib_nid = ni->ni_pid.nid = nid;
258         koibnal_data.koib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
259         
260         /* Delete all existing peers and their connections after new
261          * NID/incarnation set to ensure no old connections in our brave
262          * new world. */
263         koibnal_del_peer (PTL_NID_ANY, 0);
264
265         rc = 0;
266         if (koibnal_data.koib_nid != PTL_NID_ANY) {
267                 /* New NID installed */
268
269                 /* remove any previous advert (crashed node etc) */
270                 koibnal_unadvertise(0);
271
272                 /* Assign new service number */
273                 koibnal_data.koib_cm_service_id = ib_cm_service_assign();
274                 CDEBUG(D_NET, "service_id "LPX64"\n", koibnal_data.koib_cm_service_id);
275         
276                 rc = ib_cm_listen(koibnal_data.koib_cm_service_id,
277                                   TS_IB_CM_SERVICE_EXACT_MASK,
278                                   koibnal_passive_conn_callback, NULL,
279                                   &koibnal_data.koib_listen_handle);
280                 if (rc != 0) {
281                         CERROR ("ib_cm_listen error: %d\n", rc);
282                         goto out;
283                 }
284
285                 rc = koibnal_advertise();
286
287                 koibnal_check_advert();
288         }
289         
290  out:
291         if (rc != 0) {
292                 koibnal_data.koib_nid = PTL_NID_ANY;
293                 /* remove any peers that sprung up while I failed to
294                  * advertise myself */
295                 koibnal_del_peer (PTL_NID_ANY, 0);
296         }
297
298         up (&koibnal_data.koib_nid_mutex);
299         return (0);
300 }
301
302 koib_peer_t *
303 koibnal_create_peer (ptl_nid_t nid)
304 {
305         koib_peer_t *peer;
306
307         LASSERT (nid != PTL_NID_ANY);
308
309         PORTAL_ALLOC (peer, sizeof (*peer));
310         if (peer == NULL)
311                 return (NULL);
312
313         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
314
315         peer->ibp_nid = nid;
316         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
317
318         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
319         INIT_LIST_HEAD (&peer->ibp_conns);
320         INIT_LIST_HEAD (&peer->ibp_tx_queue);
321
322         peer->ibp_reconnect_time = jiffies;
323         peer->ibp_reconnect_interval = OPENIBNAL_MIN_RECONNECT_INTERVAL;
324
325         atomic_inc (&koibnal_data.koib_npeers);
326         return (peer);
327 }
328
329 void
330 koibnal_destroy_peer (koib_peer_t *peer)
331 {
332         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ibp_nid, peer);
333
334         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
335         LASSERT (peer->ibp_persistence == 0);
336         LASSERT (!koibnal_peer_active(peer));
337         LASSERT (peer->ibp_connecting == 0);
338         LASSERT (list_empty (&peer->ibp_conns));
339         LASSERT (list_empty (&peer->ibp_tx_queue));
340
341         PORTAL_FREE (peer, sizeof (*peer));
342
343         /* NB a peer's connections keep a reference on their peer until
344          * they are destroyed, so we can be assured that _all_ state to do
345          * with this peer has been cleaned up when its refcount drops to
346          * zero. */
347         atomic_dec (&koibnal_data.koib_npeers);
348 }
349
350 void
351 koibnal_put_peer (koib_peer_t *peer)
352 {
353         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
354                 peer, peer->ibp_nid,
355                 atomic_read (&peer->ibp_refcount));
356
357         LASSERT (atomic_read (&peer->ibp_refcount) > 0);
358         if (!atomic_dec_and_test (&peer->ibp_refcount))
359                 return;
360
361         koibnal_destroy_peer (peer);
362 }
363
364 koib_peer_t *
365 koibnal_find_peer_locked (ptl_nid_t nid)
366 {
367         struct list_head *peer_list = koibnal_nid2peerlist (nid);
368         struct list_head *tmp;
369         koib_peer_t      *peer;
370
371         list_for_each (tmp, peer_list) {
372
373                 peer = list_entry (tmp, koib_peer_t, ibp_list);
374
375                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
376                          peer->ibp_connecting != 0 || /* creating conns */
377                          !list_empty (&peer->ibp_conns));  /* active conn */
378
379                 if (peer->ibp_nid != nid)
380                         continue;
381
382                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
383                        peer, nid, atomic_read (&peer->ibp_refcount));
384                 return (peer);
385         }
386         return (NULL);
387 }
388
389 koib_peer_t *
390 koibnal_get_peer (ptl_nid_t nid)
391 {
392         koib_peer_t     *peer;
393
394         read_lock (&koibnal_data.koib_global_lock);
395         peer = koibnal_find_peer_locked (nid);
396         if (peer != NULL)                       /* +1 ref for caller? */
397                 atomic_inc (&peer->ibp_refcount);
398         read_unlock (&koibnal_data.koib_global_lock);
399
400         return (peer);
401 }
402
403 void
404 koibnal_unlink_peer_locked (koib_peer_t *peer)
405 {
406         LASSERT (peer->ibp_persistence == 0);
407         LASSERT (list_empty(&peer->ibp_conns));
408
409         LASSERT (koibnal_peer_active(peer));
410         list_del_init (&peer->ibp_list);
411         /* lose peerlist's ref */
412         koibnal_put_peer (peer);
413 }
414
415 int
416 koibnal_get_peer_info (int index, ptl_nid_t *nidp, int *persistencep)
417 {
418         koib_peer_t       *peer;
419         struct list_head  *ptmp;
420         int                i;
421
422         read_lock (&koibnal_data.koib_global_lock);
423
424         for (i = 0; i < koibnal_data.koib_peer_hash_size; i++) {
425
426                 list_for_each (ptmp, &koibnal_data.koib_peers[i]) {
427                         
428                         peer = list_entry (ptmp, koib_peer_t, ibp_list);
429                         LASSERT (peer->ibp_persistence != 0 ||
430                                  peer->ibp_connecting != 0 ||
431                                  !list_empty (&peer->ibp_conns));
432
433                         if (index-- > 0)
434                                 continue;
435
436                         *nidp = peer->ibp_nid;
437                         *persistencep = peer->ibp_persistence;
438                         
439                         read_unlock (&koibnal_data.koib_global_lock);
440                         return (0);
441                 }
442         }
443
444         read_unlock (&koibnal_data.koib_global_lock);
445         return (-ENOENT);
446 }
447
448 int
449 koibnal_add_persistent_peer (ptl_nid_t nid)
450 {
451         unsigned long      flags;
452         koib_peer_t       *peer;
453         koib_peer_t       *peer2;
454         
455         if (nid == PTL_NID_ANY)
456                 return (-EINVAL);
457
458         peer = koibnal_create_peer (nid);
459         if (peer == NULL)
460                 return (-ENOMEM);
461
462         write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
463
464         peer2 = koibnal_find_peer_locked (nid);
465         if (peer2 != NULL) {
466                 koibnal_put_peer (peer);
467                 peer = peer2;
468         } else {
469                 /* peer table takes existing ref on peer */
470                 list_add_tail (&peer->ibp_list,
471                                koibnal_nid2peerlist (nid));
472         }
473
474         peer->ibp_persistence++;
475         
476         write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
477         return (0);
478 }
479
480 void
481 koibnal_del_peer_locked (koib_peer_t *peer, int single_share)
482 {
483         struct list_head *ctmp;
484         struct list_head *cnxt;
485         koib_conn_t      *conn;
486
487         if (!single_share)
488                 peer->ibp_persistence = 0;
489         else if (peer->ibp_persistence > 0)
490                 peer->ibp_persistence--;
491
492         if (peer->ibp_persistence != 0)
493                 return;
494
495         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
496                 conn = list_entry(ctmp, koib_conn_t, ibc_list);
497
498                 koibnal_close_conn_locked (conn, 0);
499         }
500
501         /* NB peer unlinks itself when last conn is closed */
502 }
503
504 int
505 koibnal_del_peer (ptl_nid_t nid, int single_share)
506 {
507         unsigned long      flags;
508         struct list_head  *ptmp;
509         struct list_head  *pnxt;
510         koib_peer_t      *peer;
511         int                lo;
512         int                hi;
513         int                i;
514         int                rc = -ENOENT;
515
516         write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
517
518         if (nid != PTL_NID_ANY)
519                 lo = hi = koibnal_nid2peerlist(nid) - koibnal_data.koib_peers;
520         else {
521                 lo = 0;
522                 hi = koibnal_data.koib_peer_hash_size - 1;
523         }
524
525         for (i = lo; i <= hi; i++) {
526                 list_for_each_safe (ptmp, pnxt, &koibnal_data.koib_peers[i]) {
527                         peer = list_entry (ptmp, koib_peer_t, ibp_list);
528                         LASSERT (peer->ibp_persistence != 0 ||
529                                  peer->ibp_connecting != 0 ||
530                                  !list_empty (&peer->ibp_conns));
531
532                         if (!(nid == PTL_NID_ANY || peer->ibp_nid == nid))
533                                 continue;
534
535                         koibnal_del_peer_locked (peer, single_share);
536                         rc = 0;         /* matched something */
537
538                         if (single_share)
539                                 goto out;
540                 }
541         }
542  out:
543         write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
544
545         return (rc);
546 }
547
548 koib_conn_t *
549 koibnal_get_conn_by_idx (int index)
550 {
551         koib_peer_t       *peer;
552         struct list_head  *ptmp;
553         koib_conn_t       *conn;
554         struct list_head  *ctmp;
555         int                i;
556
557         read_lock (&koibnal_data.koib_global_lock);
558
559         for (i = 0; i < koibnal_data.koib_peer_hash_size; i++) {
560                 list_for_each (ptmp, &koibnal_data.koib_peers[i]) {
561
562                         peer = list_entry (ptmp, koib_peer_t, ibp_list);
563                         LASSERT (peer->ibp_persistence > 0 ||
564                                  peer->ibp_connecting != 0 ||
565                                  !list_empty (&peer->ibp_conns));
566
567                         list_for_each (ctmp, &peer->ibp_conns) {
568                                 if (index-- > 0)
569                                         continue;
570
571                                 conn = list_entry (ctmp, koib_conn_t, ibc_list);
572                                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
573                                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
574                                        atomic_read (&conn->ibc_refcount));
575                                 atomic_inc (&conn->ibc_refcount);
576                                 read_unlock (&koibnal_data.koib_global_lock);
577                                 return (conn);
578                         }
579                 }
580         }
581
582         read_unlock (&koibnal_data.koib_global_lock);
583         return (NULL);
584 }
585
586 koib_conn_t *
587 koibnal_create_conn (void)
588 {
589         koib_conn_t *conn;
590         int          i;
591         __u64        vaddr = 0;
592         __u64        vaddr_base;
593         int          page_offset;
594         int          ipage;
595         int          rc;
596         union {
597                 struct ib_qp_create_param  qp_create;
598                 struct ib_qp_attribute     qp_attr;
599         } params;
600         
601         PORTAL_ALLOC (conn, sizeof (*conn));
602         if (conn == NULL) {
603                 CERROR ("Can't allocate connection\n");
604                 return (NULL);
605         }
606
607         /* zero flags, NULL pointers etc... */
608         memset (conn, 0, sizeof (*conn));
609
610         INIT_LIST_HEAD (&conn->ibc_tx_queue);
611         INIT_LIST_HEAD (&conn->ibc_rdma_queue);
612         spin_lock_init (&conn->ibc_lock);
613         
614         atomic_inc (&koibnal_data.koib_nconns);
615         /* well not really, but I call destroy() on failure, which decrements */
616
617         PORTAL_ALLOC (conn->ibc_rxs, OPENIBNAL_RX_MSGS * sizeof (koib_rx_t));
618         if (conn->ibc_rxs == NULL)
619                 goto failed;
620         memset (conn->ibc_rxs, 0, OPENIBNAL_RX_MSGS * sizeof(koib_rx_t));
621
622         rc = koibnal_alloc_pages(&conn->ibc_rx_pages,
623                                  OPENIBNAL_RX_MSG_PAGES,
624                                  IB_ACCESS_LOCAL_WRITE);
625         if (rc != 0)
626                 goto failed;
627
628         vaddr_base = vaddr = conn->ibc_rx_pages->oibp_vaddr;
629
630         for (i = ipage = page_offset = 0; i < OPENIBNAL_RX_MSGS; i++) {
631                 struct page *page = conn->ibc_rx_pages->oibp_pages[ipage];
632                 koib_rx_t   *rx = &conn->ibc_rxs[i];
633
634                 rx->rx_conn = conn;
635                 rx->rx_vaddr = vaddr;
636                 rx->rx_msg = (koib_msg_t *)(((char *)page_address(page)) + page_offset);
637                 
638                 vaddr += OPENIBNAL_MSG_SIZE;
639                 LASSERT (vaddr <= vaddr_base + OPENIBNAL_RX_MSG_BYTES);
640                 
641                 page_offset += OPENIBNAL_MSG_SIZE;
642                 LASSERT (page_offset <= PAGE_SIZE);
643
644                 if (page_offset == PAGE_SIZE) {
645                         page_offset = 0;
646                         ipage++;
647                         LASSERT (ipage <= OPENIBNAL_RX_MSG_PAGES);
648                 }
649         }
650
651         params.qp_create = (struct ib_qp_create_param) {
652                 .limit = {
653                         /* Sends have an optional RDMA */
654                         .max_outstanding_send_request    = 2 * OPENIBNAL_MSG_QUEUE_SIZE,
655                         .max_outstanding_receive_request = OPENIBNAL_MSG_QUEUE_SIZE,
656                         .max_send_gather_element         = 1,
657                         .max_receive_scatter_element     = 1,
658                 },
659                 .pd              = koibnal_data.koib_pd,
660                 .send_queue      = koibnal_data.koib_tx_cq,
661                 .receive_queue   = koibnal_data.koib_rx_cq,
662                 .send_policy     = IB_WQ_SIGNAL_SELECTABLE,
663                 .receive_policy  = IB_WQ_SIGNAL_SELECTABLE,
664                 .rd_domain       = 0,
665                 .transport       = IB_TRANSPORT_RC,
666                 .device_specific = NULL,
667         };
668         
669         rc = ib_qp_create (&params.qp_create, &conn->ibc_qp, &conn->ibc_qpn);
670         if (rc != 0) {
671                 CERROR ("Failed to create queue pair: %d\n", rc);
672                 goto failed;
673         }
674         
675         /* Mark QP created */
676         conn->ibc_state = OPENIBNAL_CONN_INIT_QP;
677
678         params.qp_attr = (struct ib_qp_attribute) {
679                 .state             = IB_QP_STATE_INIT,
680                 .port              = koibnal_data.koib_port,
681                 .enable_rdma_read  = 1,
682                 .enable_rdma_write = 1,
683                 .valid_fields      = (IB_QP_ATTRIBUTE_STATE |
684                                       IB_QP_ATTRIBUTE_PORT |
685                                       IB_QP_ATTRIBUTE_PKEY_INDEX |
686                                       IB_QP_ATTRIBUTE_RDMA_ATOMIC_ENABLE),
687         };
688         rc = ib_qp_modify(conn->ibc_qp, &params.qp_attr);
689         if (rc != 0) {
690                 CERROR ("Failed to modify queue pair: %d\n", rc);
691                 goto failed;
692         }
693
694         /* 1 ref for caller */
695         atomic_set (&conn->ibc_refcount, 1);
696         return (conn);
697         
698  failed:
699         koibnal_destroy_conn (conn);
700         return (NULL);
701 }
702
703 void
704 koibnal_destroy_conn (koib_conn_t *conn)
705 {
706         int    rc;
707         
708         CDEBUG (D_NET, "connection %p\n", conn);
709
710         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
711         LASSERT (list_empty(&conn->ibc_tx_queue));
712         LASSERT (list_empty(&conn->ibc_rdma_queue));
713         LASSERT (conn->ibc_nsends_posted == 0);
714         LASSERT (conn->ibc_connreq == NULL);
715
716         switch (conn->ibc_state) {
717         case OPENIBNAL_CONN_ZOMBIE:
718                 /* called after connection sequence initiated */
719
720         case OPENIBNAL_CONN_INIT_QP:
721                 rc = ib_qp_destroy(conn->ibc_qp);
722                 if (rc != 0)
723                         CERROR("Can't destroy QP: %d\n", rc);
724                 /* fall through */
725                 
726         case OPENIBNAL_CONN_INIT_NOTHING:
727                 break;
728
729         default:
730                 LASSERT (0);
731         }
732
733         if (conn->ibc_rx_pages != NULL) 
734                 koibnal_free_pages(conn->ibc_rx_pages);
735         
736         if (conn->ibc_rxs != NULL)
737                 PORTAL_FREE(conn->ibc_rxs, 
738                             OPENIBNAL_RX_MSGS * sizeof(koib_rx_t));
739
740         if (conn->ibc_peer != NULL)
741                 koibnal_put_peer(conn->ibc_peer);
742
743         PORTAL_FREE(conn, sizeof (*conn));
744
745         atomic_dec(&koibnal_data.koib_nconns);
746         
747         if (atomic_read (&koibnal_data.koib_nconns) == 0 &&
748             koibnal_data.koib_shutdown) {
749                 /* I just nuked the last connection on shutdown; wake up
750                  * everyone so they can exit. */
751                 wake_up_all(&koibnal_data.koib_sched_waitq);
752                 wake_up_all(&koibnal_data.koib_connd_waitq);
753         }
754 }
755
756 void
757 koibnal_put_conn (koib_conn_t *conn)
758 {
759         unsigned long flags;
760
761         CDEBUG (D_NET, "putting conn[%p] state %d -> "LPX64" (%d)\n",
762                 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
763                 atomic_read (&conn->ibc_refcount));
764
765         LASSERT (atomic_read (&conn->ibc_refcount) > 0);
766         if (!atomic_dec_and_test (&conn->ibc_refcount))
767                 return;
768
769         /* last ref only goes on zombies */
770         LASSERT (conn->ibc_state == OPENIBNAL_CONN_ZOMBIE);
771
772         spin_lock_irqsave (&koibnal_data.koib_connd_lock, flags);
773
774         list_add (&conn->ibc_list, &koibnal_data.koib_connd_conns);
775         wake_up (&koibnal_data.koib_connd_waitq);
776
777         spin_unlock_irqrestore (&koibnal_data.koib_connd_lock, flags);
778 }
779
780 int
781 koibnal_close_peer_conns_locked (koib_peer_t *peer, int why)
782 {
783         koib_conn_t        *conn;
784         struct list_head   *ctmp;
785         struct list_head   *cnxt;
786         int                 count = 0;
787
788         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
789                 conn = list_entry (ctmp, koib_conn_t, ibc_list);
790
791                 count++;
792                 koibnal_close_conn_locked (conn, why);
793         }
794
795         return (count);
796 }
797
798 int
799 koibnal_close_stale_conns_locked (koib_peer_t *peer, __u64 incarnation)
800 {
801         koib_conn_t        *conn;
802         struct list_head   *ctmp;
803         struct list_head   *cnxt;
804         int                 count = 0;
805
806         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
807                 conn = list_entry (ctmp, koib_conn_t, ibc_list);
808
809                 if (conn->ibc_incarnation == incarnation)
810                         continue;
811
812                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
813                        peer->ibp_nid, conn->ibc_incarnation, incarnation);
814                 
815                 count++;
816                 koibnal_close_conn_locked (conn, -ESTALE);
817         }
818
819         return (count);
820 }
821
822 int
823 koibnal_close_matching_conns (ptl_nid_t nid)
824 {
825         unsigned long       flags;
826         koib_peer_t        *peer;
827         struct list_head   *ptmp;
828         struct list_head   *pnxt;
829         int                 lo;
830         int                 hi;
831         int                 i;
832         int                 count = 0;
833
834         write_lock_irqsave (&koibnal_data.koib_global_lock, flags);
835
836         if (nid != PTL_NID_ANY)
837                 lo = hi = koibnal_nid2peerlist(nid) - koibnal_data.koib_peers;
838         else {
839                 lo = 0;
840                 hi = koibnal_data.koib_peer_hash_size - 1;
841         }
842
843         for (i = lo; i <= hi; i++) {
844                 list_for_each_safe (ptmp, pnxt, &koibnal_data.koib_peers[i]) {
845
846                         peer = list_entry (ptmp, koib_peer_t, ibp_list);
847                         LASSERT (peer->ibp_persistence != 0 ||
848                                  peer->ibp_connecting != 0 ||
849                                  !list_empty (&peer->ibp_conns));
850
851                         if (!(nid == PTL_NID_ANY || nid == peer->ibp_nid))
852                                 continue;
853
854                         count += koibnal_close_peer_conns_locked (peer, 0);
855                 }
856         }
857
858         write_unlock_irqrestore (&koibnal_data.koib_global_lock, flags);
859
860         /* wildcards always succeed */
861         if (nid == PTL_NID_ANY)
862                 return (0);
863         
864         return (count == 0 ? -ENOENT : 0);
865 }
866
867 int
868 koibnal_cmd(struct portals_cfg *pcfg, void * private)
869 {
870         int rc = -EINVAL;
871
872         LASSERT (pcfg != NULL);
873
874         switch(pcfg->pcfg_command) {
875         case NAL_CMD_GET_PEER: {
876                 ptl_nid_t   nid = 0;
877                 int         share_count = 0;
878
879                 rc = koibnal_get_peer_info(pcfg->pcfg_count,
880                                            &nid, &share_count);
881                 pcfg->pcfg_nid   = nid;
882                 pcfg->pcfg_size  = 0;
883                 pcfg->pcfg_id    = 0;
884                 pcfg->pcfg_misc  = 0;
885                 pcfg->pcfg_count = 0;
886                 pcfg->pcfg_wait  = share_count;
887                 break;
888         }
889         case NAL_CMD_ADD_PEER: {
890                 rc = koibnal_add_persistent_peer (pcfg->pcfg_nid);
891                 break;
892         }
893         case NAL_CMD_DEL_PEER: {
894                 rc = koibnal_del_peer (pcfg->pcfg_nid, 
895                                        /* flags == single_share */
896                                        pcfg->pcfg_flags != 0);
897                 break;
898         }
899         case NAL_CMD_GET_CONN: {
900                 koib_conn_t *conn = koibnal_get_conn_by_idx (pcfg->pcfg_count);
901
902                 if (conn == NULL)
903                         rc = -ENOENT;
904                 else {
905                         rc = 0;
906                         pcfg->pcfg_nid   = conn->ibc_peer->ibp_nid;
907                         pcfg->pcfg_id    = 0;
908                         pcfg->pcfg_misc  = 0;
909                         pcfg->pcfg_flags = 0;
910                         koibnal_put_conn (conn);
911                 }
912                 break;
913         }
914         case NAL_CMD_CLOSE_CONNECTION: {
915                 rc = koibnal_close_matching_conns (pcfg->pcfg_nid);
916                 break;
917         }
918         case NAL_CMD_REGISTER_MYNID: {
919                 if (pcfg->pcfg_nid == PTL_NID_ANY)
920                         rc = -EINVAL;
921                 else
922                         rc = koibnal_set_mynid (pcfg->pcfg_nid);
923                 break;
924         }
925         }
926
927         return rc;
928 }
929
930 void
931 koibnal_free_pages (koib_pages_t *p)
932 {
933         int     npages = p->oibp_npages;
934         int     rc;
935         int     i;
936         
937         if (p->oibp_mapped) {
938                 rc = ib_memory_deregister(p->oibp_handle);
939                 if (rc != 0)
940                         CERROR ("Deregister error: %d\n", rc);
941         }
942         
943         for (i = 0; i < npages; i++)
944                 if (p->oibp_pages[i] != NULL)
945                         __free_page(p->oibp_pages[i]);
946         
947         PORTAL_FREE (p, offsetof(koib_pages_t, oibp_pages[npages]));
948 }
949
950 int
951 koibnal_alloc_pages (koib_pages_t **pp, int npages, int access)
952 {
953         koib_pages_t               *p;
954         struct ib_physical_buffer  *phys_pages;
955         int                         i;
956         int                         rc;
957
958         PORTAL_ALLOC(p, offsetof(koib_pages_t, oibp_pages[npages]));
959         if (p == NULL) {
960                 CERROR ("Can't allocate buffer %d\n", npages);
961                 return (-ENOMEM);
962         }
963
964         memset (p, 0, offsetof(koib_pages_t, oibp_pages[npages]));
965         p->oibp_npages = npages;
966         
967         for (i = 0; i < npages; i++) {
968                 p->oibp_pages[i] = alloc_page (GFP_KERNEL);
969                 if (p->oibp_pages[i] == NULL) {
970                         CERROR ("Can't allocate page %d of %d\n", i, npages);
971                         koibnal_free_pages(p);
972                         return (-ENOMEM);
973                 }
974         }
975
976         PORTAL_ALLOC(phys_pages, npages * sizeof(*phys_pages));
977         if (phys_pages == NULL) {
978                 CERROR ("Can't allocate physarray for %d pages\n", npages);
979                 koibnal_free_pages(p);
980                 return (-ENOMEM);
981         }
982
983         for (i = 0; i < npages; i++) {
984                 phys_pages[i].size = PAGE_SIZE;
985                 phys_pages[i].address =
986                         koibnal_page2phys(p->oibp_pages[i]);
987         }
988
989         p->oibp_vaddr = 0;
990         rc = ib_memory_register_physical(koibnal_data.koib_pd,
991                                          phys_pages, npages,
992                                          &p->oibp_vaddr,
993                                          npages * PAGE_SIZE, 0,
994                                          access,
995                                          &p->oibp_handle,
996                                          &p->oibp_lkey,
997                                          &p->oibp_rkey);
998         
999         PORTAL_FREE(phys_pages, npages * sizeof(*phys_pages));
1000         
1001         if (rc != 0) {
1002                 CERROR ("Error %d mapping %d pages\n", rc, npages);
1003                 koibnal_free_pages(p);
1004                 return (rc);
1005         }
1006         
1007         p->oibp_mapped = 1;
1008         *pp = p;
1009         return (0);
1010 }
1011
1012 int
1013 koibnal_setup_tx_descs (void)
1014 {
1015         int           ipage = 0;
1016         int           page_offset = 0;
1017         __u64         vaddr;
1018         __u64         vaddr_base;
1019         struct page  *page;
1020         koib_tx_t    *tx;
1021         int           i;
1022         int           rc;
1023
1024         /* pre-mapped messages are not bigger than 1 page */
1025         LASSERT (OPENIBNAL_MSG_SIZE <= PAGE_SIZE);
1026
1027         /* No fancy arithmetic when we do the buffer calculations */
1028         LASSERT (PAGE_SIZE % OPENIBNAL_MSG_SIZE == 0);
1029
1030         rc = koibnal_alloc_pages(&koibnal_data.koib_tx_pages,
1031                                  OPENIBNAL_TX_MSG_PAGES, 
1032                                  0);            /* local read access only */
1033         if (rc != 0)
1034                 return (rc);
1035
1036         vaddr = vaddr_base = koibnal_data.koib_tx_pages->oibp_vaddr;
1037
1038         for (i = 0; i < OPENIBNAL_TX_MSGS; i++) {
1039                 page = koibnal_data.koib_tx_pages->oibp_pages[ipage];
1040                 tx = &koibnal_data.koib_tx_descs[i];
1041
1042                 memset (tx, 0, sizeof(*tx));    /* zero flags etc */
1043                 
1044                 tx->tx_msg = (koib_msg_t *)(((char *)page_address(page)) + page_offset);
1045                 tx->tx_vaddr = vaddr;
1046                 tx->tx_isnblk = (i >= OPENIBNAL_NTX);
1047                 tx->tx_mapped = KOIB_TX_UNMAPPED;
1048
1049                 CDEBUG(D_NET, "Tx[%d] %p->%p - "LPX64"\n", 
1050                        i, tx, tx->tx_msg, tx->tx_vaddr);
1051
1052                 if (tx->tx_isnblk)
1053                         list_add (&tx->tx_list, 
1054                                   &koibnal_data.koib_idle_nblk_txs);
1055                 else
1056                         list_add (&tx->tx_list, 
1057                                   &koibnal_data.koib_idle_txs);
1058
1059                 vaddr += OPENIBNAL_MSG_SIZE;
1060                 LASSERT (vaddr <= vaddr_base + OPENIBNAL_TX_MSG_BYTES);
1061
1062                 page_offset += OPENIBNAL_MSG_SIZE;
1063                 LASSERT (page_offset <= PAGE_SIZE);
1064
1065                 if (page_offset == PAGE_SIZE) {
1066                         page_offset = 0;
1067                         ipage++;
1068                         LASSERT (ipage <= OPENIBNAL_TX_MSG_PAGES);
1069                 }
1070         }
1071         
1072         return (0);
1073 }
1074
1075 void
1076 koibnal_api_shutdown (nal_t *nal)
1077 {
1078         int   i;
1079         int   rc;
1080
1081         if (nal->nal_refct != 0) {
1082                 /* This module got the first ref */
1083                 PORTAL_MODULE_UNUSE;
1084                 return;
1085         }
1086
1087         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1088                atomic_read (&portal_kmemory));
1089
1090         LASSERT(nal == &koibnal_api);
1091
1092         switch (koibnal_data.koib_init) {
1093         default:
1094                 CERROR ("Unexpected state %d\n", koibnal_data.koib_init);
1095                 LBUG();
1096
1097         case OPENIBNAL_INIT_ALL:
1098                 /* stop calls to nal_cmd */
1099                 libcfs_nal_cmd_unregister(OPENIBNAL);
1100                 /* No new peers */
1101
1102                 /* resetting my NID to unadvertises me, removes my
1103                  * listener and nukes all current peers */
1104                 koibnal_set_mynid (PTL_NID_ANY);
1105
1106                 /* Wait for all peer state to clean up */
1107                 i = 2;
1108                 while (atomic_read (&koibnal_data.koib_npeers) != 0) {
1109                         i++;
1110                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1111                                "waiting for %d peers to close down\n",
1112                                atomic_read (&koibnal_data.koib_npeers));
1113                         set_current_state (TASK_INTERRUPTIBLE);
1114                         schedule_timeout (HZ);
1115                 }
1116                 /* fall through */
1117
1118         case OPENIBNAL_INIT_TX_CQ:
1119                 rc = ib_cq_destroy (koibnal_data.koib_tx_cq);
1120                 if (rc != 0)
1121                         CERROR ("Destroy tx CQ error: %d\n", rc);
1122                 /* fall through */
1123
1124         case OPENIBNAL_INIT_RX_CQ:
1125                 rc = ib_cq_destroy (koibnal_data.koib_rx_cq);
1126                 if (rc != 0)
1127                         CERROR ("Destroy rx CQ error: %d\n", rc);
1128                 /* fall through */
1129
1130         case OPENIBNAL_INIT_TXD:
1131                 koibnal_free_pages (koibnal_data.koib_tx_pages);
1132                 /* fall through */
1133 #if OPENIBNAL_FMR
1134         case OPENIBNAL_INIT_FMR:
1135                 rc = ib_fmr_pool_destroy (koibnal_data.koib_fmr_pool);
1136                 if (rc != 0)
1137                         CERROR ("Destroy FMR pool error: %d\n", rc);
1138                 /* fall through */
1139 #endif
1140         case OPENIBNAL_INIT_PD:
1141                 rc = ib_pd_destroy(koibnal_data.koib_pd);
1142                 if (rc != 0)
1143                         CERROR ("Destroy PD error: %d\n", rc);
1144                 /* fall through */
1145
1146         case OPENIBNAL_INIT_LIB:
1147                 lib_fini(&koibnal_lib);
1148                 /* fall through */
1149
1150         case OPENIBNAL_INIT_DATA:
1151                 /* Module refcount only gets to zero when all peers
1152                  * have been closed so all lists must be empty */
1153                 LASSERT (atomic_read (&koibnal_data.koib_npeers) == 0);
1154                 LASSERT (koibnal_data.koib_peers != NULL);
1155                 for (i = 0; i < koibnal_data.koib_peer_hash_size; i++) {
1156                         LASSERT (list_empty (&koibnal_data.koib_peers[i]));
1157                 }
1158                 LASSERT (atomic_read (&koibnal_data.koib_nconns) == 0);
1159                 LASSERT (list_empty (&koibnal_data.koib_sched_rxq));
1160                 LASSERT (list_empty (&koibnal_data.koib_sched_txq));
1161                 LASSERT (list_empty (&koibnal_data.koib_connd_conns));
1162                 LASSERT (list_empty (&koibnal_data.koib_connd_peers));
1163
1164                 /* flag threads to terminate; wake and wait for them to die */
1165                 koibnal_data.koib_shutdown = 1;
1166                 wake_up_all (&koibnal_data.koib_sched_waitq);
1167                 wake_up_all (&koibnal_data.koib_connd_waitq);
1168
1169                 i = 2;
1170                 while (atomic_read (&koibnal_data.koib_nthreads) != 0) {
1171                         i++;
1172                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1173                                "Waiting for %d threads to terminate\n",
1174                                atomic_read (&koibnal_data.koib_nthreads));
1175                         set_current_state (TASK_INTERRUPTIBLE);
1176                         schedule_timeout (HZ);
1177                 }
1178                 /* fall through */
1179                 
1180         case OPENIBNAL_INIT_NOTHING:
1181                 break;
1182         }
1183
1184         if (koibnal_data.koib_tx_descs != NULL)
1185                 PORTAL_FREE (koibnal_data.koib_tx_descs,
1186                              OPENIBNAL_TX_MSGS * sizeof(koib_tx_t));
1187
1188         if (koibnal_data.koib_peers != NULL)
1189                 PORTAL_FREE (koibnal_data.koib_peers,
1190                              sizeof (struct list_head) * 
1191                              koibnal_data.koib_peer_hash_size);
1192
1193         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1194                atomic_read (&portal_kmemory));
1195         printk(KERN_INFO "Lustre: OpenIB NAL unloaded (final mem %d)\n",
1196                atomic_read(&portal_kmemory));
1197
1198         koibnal_data.koib_init = OPENIBNAL_INIT_NOTHING;
1199 }
1200
1201 int
1202 koibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1203                      ptl_ni_limits_t *requested_limits,
1204                      ptl_ni_limits_t *actual_limits)
1205 {
1206         ptl_process_id_t  process_id;
1207         int               pkmem = atomic_read(&portal_kmemory);
1208         int               rc;
1209         int               i;
1210
1211         LASSERT (nal == &koibnal_api);
1212
1213         if (nal->nal_refct != 0) {
1214                 if (actual_limits != NULL)
1215                         *actual_limits = koibnal_lib.libnal_ni.ni_actual_limits;
1216                 /* This module got the first ref */
1217                 PORTAL_MODULE_USE;
1218                 return (PTL_OK);
1219         }
1220
1221         LASSERT (koibnal_data.koib_init == OPENIBNAL_INIT_NOTHING);
1222
1223         memset (&koibnal_data, 0, sizeof (koibnal_data)); /* zero pointers, flags etc */
1224
1225         init_MUTEX (&koibnal_data.koib_nid_mutex);
1226         init_MUTEX_LOCKED (&koibnal_data.koib_nid_signal);
1227         koibnal_data.koib_nid = PTL_NID_ANY;
1228
1229         rwlock_init(&koibnal_data.koib_global_lock);
1230
1231         koibnal_data.koib_peer_hash_size = OPENIBNAL_PEER_HASH_SIZE;
1232         PORTAL_ALLOC (koibnal_data.koib_peers,
1233                       sizeof (struct list_head) * koibnal_data.koib_peer_hash_size);
1234         if (koibnal_data.koib_peers == NULL) {
1235                 goto failed;
1236         }
1237         for (i = 0; i < koibnal_data.koib_peer_hash_size; i++)
1238                 INIT_LIST_HEAD(&koibnal_data.koib_peers[i]);
1239
1240         spin_lock_init (&koibnal_data.koib_connd_lock);
1241         INIT_LIST_HEAD (&koibnal_data.koib_connd_peers);
1242         INIT_LIST_HEAD (&koibnal_data.koib_connd_conns);
1243         init_waitqueue_head (&koibnal_data.koib_connd_waitq);
1244
1245         spin_lock_init (&koibnal_data.koib_sched_lock);
1246         INIT_LIST_HEAD (&koibnal_data.koib_sched_txq);
1247         INIT_LIST_HEAD (&koibnal_data.koib_sched_rxq);
1248         init_waitqueue_head (&koibnal_data.koib_sched_waitq);
1249
1250         spin_lock_init (&koibnal_data.koib_tx_lock);
1251         INIT_LIST_HEAD (&koibnal_data.koib_idle_txs);
1252         INIT_LIST_HEAD (&koibnal_data.koib_idle_nblk_txs);
1253         init_waitqueue_head(&koibnal_data.koib_idle_tx_waitq);
1254
1255         PORTAL_ALLOC (koibnal_data.koib_tx_descs,
1256                       OPENIBNAL_TX_MSGS * sizeof(koib_tx_t));
1257         if (koibnal_data.koib_tx_descs == NULL) {
1258                 CERROR ("Can't allocate tx descs\n");
1259                 goto failed;
1260         }
1261
1262         /* lists/ptrs/locks initialised */
1263         koibnal_data.koib_init = OPENIBNAL_INIT_DATA;
1264         /*****************************************************/
1265
1266         process_id.pid = requested_pid;
1267         process_id.nid = koibnal_data.koib_nid;
1268         
1269         rc = lib_init(&koibnal_lib, nal, process_id,
1270                       requested_limits, actual_limits);
1271         if (rc != PTL_OK) {
1272                 CERROR("lib_init failed: error %d\n", rc);
1273                 goto failed;
1274         }
1275
1276         /* lib interface initialised */
1277         koibnal_data.koib_init = OPENIBNAL_INIT_LIB;
1278         /*****************************************************/
1279
1280         for (i = 0; i < OPENIBNAL_N_SCHED; i++) {
1281                 rc = koibnal_thread_start (koibnal_scheduler, (void *)i);
1282                 if (rc != 0) {
1283                         CERROR("Can't spawn openibnal scheduler[%d]: %d\n",
1284                                i, rc);
1285                         goto failed;
1286                 }
1287         }
1288
1289         rc = koibnal_thread_start (koibnal_connd, NULL);
1290         if (rc != 0) {
1291                 CERROR ("Can't spawn openibnal connd: %d\n", rc);
1292                 goto failed;
1293         }
1294
1295         koibnal_data.koib_device = ib_device_get_by_index(0);
1296         if (koibnal_data.koib_device == NULL) {
1297                 CERROR ("Can't open ib device 0\n");
1298                 goto failed;
1299         }
1300         
1301         rc = ib_device_properties_get(koibnal_data.koib_device,
1302                                       &koibnal_data.koib_device_props);
1303         if (rc != 0) {
1304                 CERROR ("Can't get device props: %d\n", rc);
1305                 goto failed;
1306         }
1307
1308         CDEBUG(D_NET, "Max Initiator: %d Max Responder %d\n", 
1309                koibnal_data.koib_device_props.max_initiator_per_qp,
1310                koibnal_data.koib_device_props.max_responder_per_qp);
1311
1312         koibnal_data.koib_port = 0;
1313         for (i = 1; i <= 2; i++) {
1314                 rc = ib_port_properties_get(koibnal_data.koib_device, i,
1315                                             &koibnal_data.koib_port_props);
1316                 if (rc == 0) {
1317                         koibnal_data.koib_port = i;
1318                         break;
1319                 }
1320         }
1321         if (koibnal_data.koib_port == 0) {
1322                 CERROR ("Can't find a port\n");
1323                 goto failed;
1324         }
1325
1326         rc = ib_pd_create(koibnal_data.koib_device,
1327                           NULL, &koibnal_data.koib_pd);
1328         if (rc != 0) {
1329                 CERROR ("Can't create PD: %d\n", rc);
1330                 goto failed;
1331         }
1332         
1333         /* flag PD initialised */
1334         koibnal_data.koib_init = OPENIBNAL_INIT_PD;
1335         /*****************************************************/
1336 #if OPENIBNAL_FMR
1337         {
1338                 const int pool_size = OPENIBNAL_NTX + OPENIBNAL_NTX_NBLK;
1339                 struct ib_fmr_pool_param params = {
1340                         .max_pages_per_fmr = PTL_MTU/PAGE_SIZE,
1341                         .access            = (IB_ACCESS_LOCAL_WRITE |
1342                                               IB_ACCESS_REMOTE_WRITE |
1343                                               IB_ACCESS_REMOTE_READ),
1344                         .pool_size         = pool_size,
1345                         .dirty_watermark   = (pool_size * 3)/4,
1346                         .flush_function    = NULL,
1347                         .flush_arg         = NULL,
1348                         .cache             = 1,
1349                 };
1350                 rc = ib_fmr_pool_create(koibnal_data.koib_pd, &params,
1351                                         &koibnal_data.koib_fmr_pool);
1352                 if (rc != 0) {
1353                         CERROR ("Can't create FMR pool size %d: %d\n", 
1354                                 pool_size, rc);
1355                         goto failed;
1356                 }
1357         }
1358
1359         /* flag FMR pool initialised */
1360         koibnal_data.koib_init = OPENIBNAL_INIT_FMR;
1361 #endif
1362         /*****************************************************/
1363
1364         rc = koibnal_setup_tx_descs();
1365         if (rc != 0) {
1366                 CERROR ("Can't register tx descs: %d\n", rc);
1367                 goto failed;
1368         }
1369         
1370         /* flag TX descs initialised */
1371         koibnal_data.koib_init = OPENIBNAL_INIT_TXD;
1372         /*****************************************************/
1373         
1374         {
1375                 struct ib_cq_callback callback = {
1376                         .context        = OPENIBNAL_CALLBACK_CTXT,
1377                         .policy         = IB_CQ_PROVIDER_REARM,
1378                         .function       = {
1379                                 .entry  = koibnal_rx_callback,
1380                         },
1381                         .arg            = NULL,
1382                 };
1383                 int  nentries = OPENIBNAL_RX_CQ_ENTRIES;
1384                 
1385                 rc = ib_cq_create (koibnal_data.koib_device, 
1386                                    &nentries, &callback, NULL,
1387                                    &koibnal_data.koib_rx_cq);
1388                 if (rc != 0) {
1389                         CERROR ("Can't create RX CQ: %d\n", rc);
1390                         goto failed;
1391                 }
1392
1393                 /* I only want solicited events */
1394                 rc = ib_cq_request_notification(koibnal_data.koib_rx_cq, 1);
1395                 LASSERT (rc == 0);
1396         }
1397         
1398         /* flag RX CQ initialised */
1399         koibnal_data.koib_init = OPENIBNAL_INIT_RX_CQ;
1400         /*****************************************************/
1401
1402         {
1403                 struct ib_cq_callback callback = {
1404                         .context        = OPENIBNAL_CALLBACK_CTXT,
1405                         .policy         = IB_CQ_PROVIDER_REARM,
1406                         .function       = {
1407                                 .entry  = koibnal_tx_callback,
1408                         },
1409                         .arg            = NULL,
1410                 };
1411                 int  nentries = OPENIBNAL_TX_CQ_ENTRIES;
1412                 
1413                 rc = ib_cq_create (koibnal_data.koib_device, 
1414                                    &nentries, &callback, NULL,
1415                                    &koibnal_data.koib_tx_cq);
1416                 if (rc != 0) {
1417                         CERROR ("Can't create RX CQ: %d\n", rc);
1418                         goto failed;
1419                 }
1420
1421                 /* I only want solicited events */
1422                 rc = ib_cq_request_notification(koibnal_data.koib_tx_cq, 1);
1423                 LASSERT (rc == 0);
1424         }
1425                                    
1426         /* flag TX CQ initialised */
1427         koibnal_data.koib_init = OPENIBNAL_INIT_TX_CQ;
1428         /*****************************************************/
1429         
1430         rc = libcfs_nal_cmd_register(OPENIBNAL, &koibnal_cmd, NULL);
1431         if (rc != 0) {
1432                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1433                 goto failed;
1434         }
1435
1436         /* flag everything initialised */
1437         koibnal_data.koib_init = OPENIBNAL_INIT_ALL;
1438         /*****************************************************/
1439
1440         printk(KERN_INFO "Lustre: OpenIB NAL loaded "
1441                "(initial mem %d)\n", pkmem);
1442
1443         return (PTL_OK);
1444
1445  failed:
1446         koibnal_api_shutdown (&koibnal_api);    
1447         return (PTL_FAIL);
1448 }
1449
1450 void __exit
1451 koibnal_module_fini (void)
1452 {
1453 #ifdef CONFIG_SYSCTL
1454         if (koibnal_tunables.koib_sysctl != NULL)
1455                 unregister_sysctl_table (koibnal_tunables.koib_sysctl);
1456 #endif
1457         PtlNIFini(koibnal_ni);
1458
1459         ptl_unregister_nal(OPENIBNAL);
1460 }
1461
1462 int __init
1463 koibnal_module_init (void)
1464 {
1465         int    rc;
1466
1467         /* the following must be sizeof(int) for proc_dointvec() */
1468         LASSERT(sizeof (koibnal_tunables.koib_io_timeout) == sizeof (int));
1469
1470         koibnal_api.nal_ni_init = koibnal_api_startup;
1471         koibnal_api.nal_ni_fini = koibnal_api_shutdown;
1472
1473         /* Initialise dynamic tunables to defaults once only */
1474         koibnal_tunables.koib_io_timeout = OPENIBNAL_IO_TIMEOUT;
1475
1476         rc = ptl_register_nal(OPENIBNAL, &koibnal_api);
1477         if (rc != PTL_OK) {
1478                 CERROR("Can't register OPENIBNAL: %d\n", rc);
1479                 return (-ENOMEM);               /* or something... */
1480         }
1481
1482         /* Pure gateways want the NAL started up at module load time... */
1483         rc = PtlNIInit(OPENIBNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &koibnal_ni);
1484         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
1485                 ptl_unregister_nal(OPENIBNAL);
1486                 return (-ENODEV);
1487         }
1488         
1489 #ifdef CONFIG_SYSCTL
1490         /* Press on regardless even if registering sysctl doesn't work */
1491         koibnal_tunables.koib_sysctl = 
1492                 register_sysctl_table (koibnal_top_ctl_table, 0);
1493 #endif
1494         return (0);
1495 }
1496
1497 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1498 MODULE_DESCRIPTION("Kernel OpenIB NAL v0.01");
1499 MODULE_LICENSE("GPL");
1500
1501 module_init(koibnal_module_init);
1502 module_exit(koibnal_module_fini);
1503