Whamcloud - gitweb
* First cut working vibnal
[fs/lustre-release.git] / lnet / klnds / viblnd / viblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *   Author: Frank Zago <fzago@systemfabricworks.com>
7  *
8  *   This file is part of Lustre, http://www.lustre.org.
9  *
10  *   Lustre is free software; you can redistribute it and/or
11  *   modify it under the terms of version 2 of the GNU General Public
12  *   License as published by the Free Software Foundation.
13  *
14  *   Lustre is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *   You should have received a copy of the GNU General Public License
20  *   along with Lustre; if not, write to the Free Software
21  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  *
23  */
24
25 #include "vibnal.h"
26
27 nal_t                   kibnal_api;
28 ptl_handle_ni_t         kibnal_ni;
29 kib_data_t              kibnal_data;
30 kib_tunables_t          kibnal_tunables;
31
32 #ifdef CONFIG_SYSCTL
33 #define IBNAL_SYSCTL             202
34
35 #define IBNAL_SYSCTL_TIMEOUT     1
36
37 static ctl_table kibnal_ctl_table[] = {
38         {IBNAL_SYSCTL_TIMEOUT, "timeout", 
39          &kibnal_tunables.kib_io_timeout, sizeof (int),
40          0644, NULL, &proc_dointvec},
41         { 0 }
42 };
43
44 static ctl_table kibnal_top_ctl_table[] = {
45         {IBNAL_SYSCTL, "vibnal", NULL, 0, 0555, kibnal_ctl_table},
46         { 0 }
47 };
48 #endif
49
50 void
51 kibnal_pause(int ticks)
52 {
53         set_current_state(TASK_UNINTERRUPTIBLE);
54         schedule_timeout(ticks);
55 }
56
57 __u32 
58 kibnal_cksum (void *ptr, int nob)
59 {
60         char  *c  = ptr;
61         __u32  sum = 0;
62
63         while (nob-- > 0)
64                 sum = ((sum << 1) | (sum >> 31)) + *c++;
65
66         /* ensure I don't return 0 (== no checksum) */
67         return (sum == 0) ? 1 : sum;
68 }
69
70 void
71 kibnal_init_msg(kib_msg_t *msg, int type, int body_nob)
72 {
73         msg->ibm_type = type;
74         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
75 }
76
77 void
78 kibnal_pack_msg(kib_msg_t *msg, int credits, ptl_nid_t dstnid, __u64 dststamp)
79 {
80         /* CAVEAT EMPTOR! all message fields not set here should have been
81          * initialised previously. */
82         msg->ibm_magic    = IBNAL_MSG_MAGIC;
83         msg->ibm_version  = IBNAL_MSG_VERSION;
84         /*   ibm_type */
85         msg->ibm_credits  = credits;
86         /*   ibm_nob */
87         msg->ibm_cksum    = 0;
88         msg->ibm_srcnid   = kibnal_lib.libnal_ni.ni_pid.nid;
89         msg->ibm_srcstamp = kibnal_data.kib_incarnation;
90         msg->ibm_dstnid   = dstnid;
91         msg->ibm_dststamp = dststamp;
92 #if IBNAL_CKSUM
93         /* NB ibm_cksum zero while computing cksum */
94         msg->ibm_cksum    = kibnal_cksum(msg, msg->ibm_nob);
95 #endif
96 }
97
98 int
99 kibnal_unpack_msg(kib_msg_t *msg, int nob)
100 {
101         const int hdr_size = offsetof(kib_msg_t, ibm_u);
102         __u32     msg_cksum;
103         int       flip;
104         int       msg_nob;
105         int       i;
106         int       n;
107
108         /* 6 bytes are enough to have received magic + version */
109         if (nob < 6) {
110                 CERROR("Short message: %d\n", nob);
111                 return -EPROTO;
112         }
113
114         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
115                 flip = 0;
116         } else if (msg->ibm_magic == __swab32(IBNAL_MSG_MAGIC)) {
117                 flip = 1;
118         } else {
119                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
120                 return -EPROTO;
121         }
122
123         if (msg->ibm_version != 
124             (flip ? __swab16(IBNAL_MSG_VERSION) : IBNAL_MSG_VERSION)) {
125                 CERROR("Bad version: %d\n", msg->ibm_version);
126                 return -EPROTO;
127         }
128
129         if (nob < hdr_size) {
130                 CERROR("Short message: %d\n", nob);
131                 return -EPROTO;
132         }
133
134         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
135         if (msg_nob > nob) {
136                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
137                 return -EPROTO;
138         }
139
140         /* checksum must be computed with ibm_cksum zero and BEFORE anything
141          * gets flipped */
142         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
143         msg->ibm_cksum = 0;
144         if (msg_cksum != 0 &&
145             msg_cksum != kibnal_cksum(msg, msg_nob)) {
146                 CERROR("Bad checksum\n");
147                 return -EPROTO;
148         }
149         msg->ibm_cksum = msg_cksum;
150         
151         if (flip) {
152                 /* leave magic unflipped as a clue to peer endianness */
153                 __swab16s(&msg->ibm_version);
154                 CLASSERT (sizeof(msg->ibm_type) == 1);
155                 CLASSERT (sizeof(msg->ibm_credits) == 1);
156                 msg->ibm_nob = msg_nob;
157                 __swab64s(&msg->ibm_srcnid);
158                 __swab64s(&msg->ibm_srcstamp);
159                 __swab64s(&msg->ibm_dstnid);
160                 __swab64s(&msg->ibm_dststamp);
161         }
162         
163         if (msg->ibm_srcnid == PTL_NID_ANY) {
164                 CERROR("Bad src nid: "LPX64"\n", msg->ibm_srcnid);
165                 return -EPROTO;
166         }
167
168         switch (msg->ibm_type) {
169         default:
170                 CERROR("Unknown message type %x\n", msg->ibm_type);
171                 return -EPROTO;
172                 
173         case IBNAL_MSG_NOOP:
174                 break;
175
176         case IBNAL_MSG_IMMEDIATE:
177                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
178                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
179                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
180                         return -EPROTO;
181                 }
182                 break;
183
184         case IBNAL_MSG_PUT_REQ:
185                 /* CAVEAT EMPTOR!  We don't actually put ibprm_rd on the wire;
186                  * it's just there to remember the source buffers while we wait
187                  * for the PUT_ACK */
188                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putreq.ibprm_rd)) {
189                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
190                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
191                         return -EPROTO;
192                 }
193                 break;
194
195         case IBNAL_MSG_PUT_ACK:
196                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[0])) {
197                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
198                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[0]));
199                         return -EPROTO;
200                 }
201
202                 if (flip) {
203                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
204                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrag);
205                 }
206                 
207                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrag;
208                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
209                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
210                                n, IBNAL_MAX_RDMA_FRAGS);
211                         return -EPROTO;
212                 }
213                 
214                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
215                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
216                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
217                         return -EPROTO;
218                 }
219
220                 if (flip)
221                         for (i = 0; i < n; i++) {
222                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
223                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr_lo);
224                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr_hi);
225                         }
226                 break;
227
228         case IBNAL_MSG_GET_REQ:
229                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
230                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
231                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
232                         return -EPROTO;
233                 }
234                 if (flip) {
235                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
236                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrag);
237                 }
238
239                 n = msg->ibm_u.get.ibgm_rd.rd_nfrag;
240                 if (n <= 0 || n > IBNAL_MAX_RDMA_FRAGS) {
241                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
242                                n, IBNAL_MAX_RDMA_FRAGS);
243                         return -EPROTO;
244                 }
245                 
246                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
247                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
248                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
249                         return -EPROTO;
250                 }
251                 
252                 if (flip)
253                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrag; i++) {
254                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
255                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr_lo);
256                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr_hi);
257                         }
258                 break;
259
260         case IBNAL_MSG_PUT_NAK:
261         case IBNAL_MSG_PUT_DONE:
262         case IBNAL_MSG_GET_DONE:
263                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
264                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
265                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
266                         return -EPROTO;
267                 }
268                 if (flip)
269                         __swab32s(&msg->ibm_u.completion.ibcm_status);
270                 break;
271
272         case IBNAL_MSG_CONNREQ:
273         case IBNAL_MSG_CONNACK:
274                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
275                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
276                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
277                         return -EPROTO;
278                 }
279                 if (flip) {
280                         __swab32s(&msg->ibm_u.connparams.ibcp_queue_depth);
281                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
282                         __swab32s(&msg->ibm_u.connparams.ibcp_max_frags);
283                 }
284                 break;
285         }
286         return 0;
287 }
288
289 int
290 kibnal_set_mynid(ptl_nid_t nid)
291 {
292         static cm_listen_data_t info;           /* protected by kib_nid_mutex */
293
294         lib_ni_t        *ni = &kibnal_lib.libnal_ni;
295         int              rc;
296         cm_return_t      cmrc;
297
298         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
299                nid, ni->ni_pid.nid);
300
301         down (&kibnal_data.kib_nid_mutex);
302
303         if (nid == ni->ni_pid.nid) {
304                 /* no change of NID */
305                 up (&kibnal_data.kib_nid_mutex);
306                 return (0);
307         }
308
309         CDEBUG(D_NET, "NID "LPX64"("LPX64")\n", ni->ni_pid.nid, nid);
310
311         if (kibnal_data.kib_listen_handle != NULL) {
312                 cmrc = cm_cancel(kibnal_data.kib_listen_handle);
313                 if (cmrc != cm_stat_success)
314                         CERROR ("Error %d stopping listener\n", cmrc);
315
316                 kibnal_pause(HZ/10);            /* ensure no more callbacks */
317         
318                 cmrc = cm_destroy_cep(kibnal_data.kib_listen_handle);
319                 if (cmrc != vv_return_ok)
320                         CERROR ("Error %d destroying CEP\n", cmrc);
321
322                 kibnal_data.kib_listen_handle = NULL;
323         }
324
325         /* Change NID.  NB queued passive connection requests (if any) will be
326          * rejected with an incorrect destination NID */
327         ni->ni_pid.nid = nid;
328         kibnal_data.kib_incarnation++;
329         mb();
330
331         /* Delete all existing peers and their connections after new
332          * NID/incarnation set to ensure no old connections in our brave
333          * new world. */
334         kibnal_del_peer (PTL_NID_ANY, 0);
335
336         if (ni->ni_pid.nid != PTL_NID_ANY) {    /* got a new NID to install */
337                 kibnal_data.kib_listen_handle = 
338                         cm_create_cep(cm_cep_transp_rc);
339                 if (kibnal_data.kib_listen_handle == NULL) {
340                         CERROR ("Can't create listen CEP\n");
341                         rc = -ENOMEM;
342                         goto failed_0;
343                 }
344
345                 CDEBUG(D_NET, "Created CEP %p for listening\n", 
346                        kibnal_data.kib_listen_handle);
347
348                 memset(&info, 0, sizeof(info));
349                 info.listen_addr.end_pt.sid = kibnal_data.kib_svc_id;
350
351                 cmrc = cm_listen(kibnal_data.kib_listen_handle, &info,
352                                  kibnal_listen_callback, NULL);
353                 if (cmrc != 0) {
354                         CERROR ("cm_listen error: %d\n", cmrc);
355                         rc = -EINVAL;
356                         goto failed_1;
357                 }
358         }
359
360         up (&kibnal_data.kib_nid_mutex);
361         return (0);
362
363  failed_1:
364         cmrc = cm_destroy_cep(kibnal_data.kib_listen_handle);
365         LASSERT (cmrc == cm_stat_success);
366         kibnal_data.kib_listen_handle = NULL;
367  failed_0:
368         ni->ni_pid.nid = PTL_NID_ANY;
369         kibnal_data.kib_incarnation++;
370         mb();
371         kibnal_del_peer (PTL_NID_ANY, 0);
372         up (&kibnal_data.kib_nid_mutex);
373         return rc;
374 }
375
376 kib_peer_t *
377 kibnal_create_peer (ptl_nid_t nid)
378 {
379         kib_peer_t *peer;
380
381         LASSERT (nid != PTL_NID_ANY);
382
383         PORTAL_ALLOC(peer, sizeof (*peer));
384         if (peer == NULL) {
385                 CERROR("Canot allocate perr\n");
386                 return (NULL);
387         }
388
389         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
390
391         peer->ibp_nid = nid;
392         atomic_set (&peer->ibp_refcount, 1);    /* 1 ref for caller */
393
394         INIT_LIST_HEAD (&peer->ibp_list);       /* not in the peer table yet */
395         INIT_LIST_HEAD (&peer->ibp_conns);
396         INIT_LIST_HEAD (&peer->ibp_tx_queue);
397
398         peer->ibp_reconnect_time = jiffies;
399         peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
400
401         atomic_inc (&kibnal_data.kib_npeers);
402         if (atomic_read(&kibnal_data.kib_npeers) <= IBNAL_CONCURRENT_PEERS)
403                 return peer;
404         
405         CERROR("Too many peers: CQ will overflow\n");
406         kibnal_peer_decref(peer);
407         return NULL;
408 }
409
410 void
411 kibnal_destroy_peer (kib_peer_t *peer)
412 {
413
414         LASSERT (atomic_read (&peer->ibp_refcount) == 0);
415         LASSERT (peer->ibp_persistence == 0);
416         LASSERT (!kibnal_peer_active(peer));
417         LASSERT (peer->ibp_connecting == 0);
418         LASSERT (list_empty (&peer->ibp_conns));
419         LASSERT (list_empty (&peer->ibp_tx_queue));
420         
421         PORTAL_FREE (peer, sizeof (*peer));
422
423         /* NB a peer's connections keep a reference on their peer until
424          * they are destroyed, so we can be assured that _all_ state to do
425          * with this peer has been cleaned up when its refcount drops to
426          * zero. */
427         atomic_dec (&kibnal_data.kib_npeers);
428 }
429
430 /* the caller is responsible for accounting for the additional reference
431  * that this creates */
432 kib_peer_t *
433 kibnal_find_peer_locked (ptl_nid_t nid)
434 {
435         struct list_head *peer_list = kibnal_nid2peerlist (nid);
436         struct list_head *tmp;
437         kib_peer_t       *peer;
438
439         list_for_each (tmp, peer_list) {
440
441                 peer = list_entry (tmp, kib_peer_t, ibp_list);
442
443                 LASSERT (peer->ibp_persistence != 0 || /* persistent peer */
444                          peer->ibp_connecting != 0 || /* creating conns */
445                          !list_empty (&peer->ibp_conns));  /* active conn */
446
447                 if (peer->ibp_nid != nid)
448                         continue;
449
450                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
451                        peer, nid, atomic_read (&peer->ibp_refcount));
452                 return (peer);
453         }
454         return (NULL);
455 }
456
457 void
458 kibnal_unlink_peer_locked (kib_peer_t *peer)
459 {
460         LASSERT (peer->ibp_persistence == 0);
461         LASSERT (list_empty(&peer->ibp_conns));
462
463         LASSERT (kibnal_peer_active(peer));
464         list_del_init (&peer->ibp_list);
465         /* lose peerlist's ref */
466         kibnal_peer_decref(peer);
467 }
468
469 int
470 kibnal_get_peer_info (int index, ptl_nid_t *nidp, __u32 *ipp,
471                       int *persistencep)
472 {
473         kib_peer_t        *peer;
474         struct list_head  *ptmp;
475         int                i;
476         unsigned long      flags;
477
478         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
479
480         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
481
482                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
483
484                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
485                         LASSERT (peer->ibp_persistence != 0 ||
486                                  peer->ibp_connecting != 0 ||
487                                  !list_empty (&peer->ibp_conns));
488
489                         if (index-- > 0)
490                                 continue;
491
492                         *nidp = peer->ibp_nid;
493                         *ipp = peer->ibp_ip;
494                         *persistencep = peer->ibp_persistence;
495
496                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
497                                                flags);
498                         return (0);
499                 }
500         }
501
502         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
503         return (-ENOENT);
504 }
505
506 int
507 kibnal_add_persistent_peer (ptl_nid_t nid, __u32 ip)
508 {
509         kib_peer_t        *peer;
510         kib_peer_t        *peer2;
511         unsigned long      flags;
512
513         CDEBUG(D_NET, LPX64"@%08x\n", nid, ip);
514         
515         if (nid == PTL_NID_ANY)
516                 return (-EINVAL);
517
518         peer = kibnal_create_peer (nid);
519         if (peer == NULL)
520                 return (-ENOMEM);
521
522         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
523
524         peer2 = kibnal_find_peer_locked (nid);
525         if (peer2 != NULL) {
526                 kibnal_peer_decref (peer);
527                 peer = peer2;
528         } else {
529                 /* peer table takes existing ref on peer */
530                 list_add_tail (&peer->ibp_list,
531                                kibnal_nid2peerlist (nid));
532         }
533
534         peer->ibp_ip = ip;
535         peer->ibp_persistence++;
536         
537         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
538         return (0);
539 }
540
541 void
542 kibnal_del_peer_locked (kib_peer_t *peer, int single_share)
543 {
544         struct list_head *ctmp;
545         struct list_head *cnxt;
546         kib_conn_t       *conn;
547
548         if (!single_share)
549                 peer->ibp_persistence = 0;
550         else if (peer->ibp_persistence > 0)
551                 peer->ibp_persistence--;
552
553         if (peer->ibp_persistence != 0)
554                 return;
555
556         if (list_empty(&peer->ibp_conns)) {
557                 kibnal_unlink_peer_locked(peer);
558         } else {
559                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
560                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
561
562                         kibnal_close_conn_locked (conn, 0);
563                 }
564                 /* NB peer is no longer persistent; closing its last conn
565                  * unlinked it. */
566         }
567         /* NB peer now unlinked; might even be freed if the peer table had the
568          * last ref on it. */
569 }
570
571 int
572 kibnal_del_peer (ptl_nid_t nid, int single_share)
573 {
574         struct list_head  *ptmp;
575         struct list_head  *pnxt;
576         kib_peer_t        *peer;
577         int                lo;
578         int                hi;
579         int                i;
580         unsigned long      flags;
581         int                rc = -ENOENT;
582
583         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
584
585         if (nid != PTL_NID_ANY)
586                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
587         else {
588                 lo = 0;
589                 hi = kibnal_data.kib_peer_hash_size - 1;
590         }
591
592         for (i = lo; i <= hi; i++) {
593                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
594                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
595                         LASSERT (peer->ibp_persistence != 0 ||
596                                  peer->ibp_connecting != 0 ||
597                                  !list_empty (&peer->ibp_conns));
598
599                         if (!(nid == PTL_NID_ANY || peer->ibp_nid == nid))
600                                 continue;
601
602                         kibnal_del_peer_locked (peer, single_share);
603                         rc = 0;         /* matched something */
604
605                         if (single_share)
606                                 goto out;
607                 }
608         }
609  out:
610         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
611         return (rc);
612 }
613
614 kib_conn_t *
615 kibnal_get_conn_by_idx (int index)
616 {
617         kib_peer_t        *peer;
618         struct list_head  *ptmp;
619         kib_conn_t        *conn;
620         struct list_head  *ctmp;
621         int                i;
622         unsigned long      flags;
623
624         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
625
626         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
627                 list_for_each (ptmp, &kibnal_data.kib_peers[i]) {
628
629                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
630                         LASSERT (peer->ibp_persistence > 0 ||
631                                  peer->ibp_connecting != 0 ||
632                                  !list_empty (&peer->ibp_conns));
633
634                         list_for_each (ctmp, &peer->ibp_conns) {
635                                 if (index-- > 0)
636                                         continue;
637
638                                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
639                                 kibnal_conn_addref(conn);
640                                 read_unlock_irqrestore(&kibnal_data.kib_global_lock,
641                                                        flags);
642                                 return (conn);
643                         }
644                 }
645         }
646
647         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
648         return (NULL);
649 }
650
651 int
652 kibnal_set_qp_state (kib_conn_t *conn, vv_qp_state_t new_state)
653 {
654         static vv_qp_attr_t attr;
655         
656         kib_connvars_t   *cv = conn->ibc_connvars;
657         vv_return_t       vvrc;
658         
659         /* Only called by connd => static OK */
660         LASSERT (!in_interrupt());
661         LASSERT (current == kibnal_data.kib_connd);
662
663         memset(&attr, 0, sizeof(attr));
664         
665         switch (new_state) {
666         default:
667                 LBUG();
668                 
669         case vv_qp_state_init: {
670                 struct vv_qp_modify_init_st *init = &attr.modify.params.init;
671
672                 init->p_key_indx     = cv->cv_pkey_index;
673                 init->phy_port_num   = cv->cv_port;
674                 init->q_key          = IBNAL_QKEY; /* XXX but VV_QP_AT_Q_KEY not set! */
675                 init->access_control = vv_acc_r_mem_read |
676                                        vv_acc_r_mem_write; /* XXX vv_acc_l_mem_write ? */
677
678                 attr.modify.vv_qp_attr_mask = VV_QP_AT_P_KEY_IX | 
679                                               VV_QP_AT_PHY_PORT_NUM |
680                                               VV_QP_AT_ACCESS_CON_F;
681                 break;
682         }
683         case vv_qp_state_rtr: {
684                 struct vv_qp_modify_rtr_st *rtr = &attr.modify.params.rtr;
685                 vv_add_vec_t               *av  = &rtr->remote_add_vec;
686
687                 av->dlid                      = cv->cv_path.dlid;
688                 av->grh_flag                  = (!IBNAL_LOCAL_SUB);
689                 av->max_static_rate           = IBNAL_R_2_STATIC_RATE(cv->cv_path.rate);
690                 av->service_level             = cv->cv_path.sl;
691                 av->source_path_bit           = IBNAL_SOURCE_PATH_BIT;
692                 av->pmtu                      = cv->cv_path.mtu;
693                 av->rnr_retry_count           = cv->cv_rnr_count;
694                 av->global_dest.traffic_class = cv->cv_path.traffic_class;
695                 av->global_dest.hope_limit    = cv->cv_path.hop_limut;
696                 av->global_dest.flow_lable    = cv->cv_path.flow_label;
697                 av->global_dest.s_gid_index   = cv->cv_sgid_index;
698                 // XXX other av fields zero?
699
700                 rtr->destanation_qp            = cv->cv_remote_qpn;
701                 rtr->receive_psn               = cv->cv_rxpsn;
702                 rtr->responder_rdma_r_atom_num = IBNAL_OUS_DST_RD;
703
704                 // XXX ? rtr->opt_min_rnr_nak_timer = 16;
705
706
707                 // XXX sdp sets VV_QP_AT_OP_F but no actual optional options
708                 attr.modify.vv_qp_attr_mask = VV_QP_AT_ADD_VEC | 
709                                               VV_QP_AT_DEST_QP |
710                                               VV_QP_AT_R_PSN | 
711                                               VV_QP_AT_MIN_RNR_NAK_T |
712                                               VV_QP_AT_RESP_RDMA_ATOM_OUT_NUM |
713                                               VV_QP_AT_OP_F;
714                 break;
715         }
716         case vv_qp_state_rts: {
717                 struct vv_qp_modify_rts_st *rts = &attr.modify.params.rts;
718
719                 rts->send_psn                 = cv->cv_txpsn;
720                 rts->local_ack_timeout        = IBNAL_LOCAL_ACK_TIMEOUT;
721                 rts->retry_num                = IBNAL_RETRY_CNT;
722                 rts->rnr_num                  = IBNAL_RNR_CNT;
723                 rts->dest_out_rdma_r_atom_num = IBNAL_OUS_DST_RD;
724                 
725                 attr.modify.vv_qp_attr_mask = VV_QP_AT_S_PSN |
726                                               VV_QP_AT_L_ACK_T |
727                                               VV_QP_AT_RETRY_NUM |
728                                               VV_QP_AT_RNR_NUM |
729                                               VV_QP_AT_DEST_RDMA_ATOM_OUT_NUM;
730                 break;
731         }
732         case vv_qp_state_error:
733         case vv_qp_state_reset:
734                 attr.modify.vv_qp_attr_mask = 0;
735                 break;
736         }
737                 
738         attr.modify.qp_modify_into_state = new_state;
739         attr.modify.vv_qp_attr_mask |= VV_QP_AT_STATE;
740         
741         vvrc = vv_qp_modify(kibnal_data.kib_hca, conn->ibc_qp, &attr, NULL);
742         if (vvrc != vv_return_ok) {
743                 CERROR("Can't modify qp -> "LPX64" state to %d: %d\n", 
744                        conn->ibc_peer->ibp_nid, new_state, vvrc);
745                 return -EIO;
746         }
747         
748         return 0;
749 }
750
751 kib_conn_t *
752 kibnal_create_conn (cm_cep_handle_t cep)
753 {
754         kib_conn_t   *conn;
755         int           i;
756         __u64         vaddr = 0;
757         __u64         vaddr_base;
758         int           page_offset;
759         int           ipage;
760         vv_return_t   vvrc;
761         int           rc;
762
763         static vv_qp_attr_t  reqattr;
764         static vv_qp_attr_t  rspattr;
765
766         /* Only the connd creates conns => single threaded */
767         LASSERT(!in_interrupt());
768         LASSERT(current == kibnal_data.kib_connd);
769         
770         PORTAL_ALLOC(conn, sizeof (*conn));
771         if (conn == NULL) {
772                 CERROR ("Can't allocate connection\n");
773                 return (NULL);
774         }
775
776         /* zero flags, NULL pointers etc... */
777         memset (conn, 0, sizeof (*conn));
778
779         INIT_LIST_HEAD (&conn->ibc_early_rxs);
780         INIT_LIST_HEAD (&conn->ibc_tx_queue);
781         INIT_LIST_HEAD (&conn->ibc_active_txs);
782         spin_lock_init (&conn->ibc_lock);
783         
784         atomic_inc (&kibnal_data.kib_nconns);
785         /* well not really, but I call destroy() on failure, which decrements */
786
787         conn->ibc_cep = cep;
788
789         PORTAL_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
790         if (conn->ibc_connvars == NULL) {
791                 CERROR("Can't allocate in-progress connection state\n");
792                 goto failed;
793         }
794         memset (conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));
795         /* Random seed for QP sequence number */
796         get_random_bytes(&conn->ibc_connvars->cv_rxpsn,
797                          sizeof(conn->ibc_connvars->cv_rxpsn));
798
799         PORTAL_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));
800         if (conn->ibc_rxs == NULL) {
801                 CERROR("Cannot allocate RX buffers\n");
802                 goto failed;
803         }
804         memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));
805
806         rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES, 1);
807         if (rc != 0)
808                 goto failed;
809
810         vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;
811
812         for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {
813                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
814                 kib_rx_t   *rx = &conn->ibc_rxs[i];
815
816                 rx->rx_conn = conn;
817                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
818                              page_offset);
819
820 #if IBNAL_WHOLE_MEM
821                 {
822                         vv_mem_reg_h_t  mem_h;
823                         vv_r_key_t      r_key;
824
825                         /* Voltaire stack already registers the whole
826                          * memory, so use that API. */
827                         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
828                                                     rx->rx_msg,
829                                                     IBNAL_MSG_SIZE,
830                                                     &mem_h,
831                                                     &rx->rx_lkey,
832                                                     &r_key);
833                         LASSERT (vvrc == vv_return_ok);
834                 }
835 #else
836                 rx->rx_vaddr = vaddr;
837 #endif                
838                 CDEBUG(D_NET, "Rx[%d] %p->%p[%x:"LPX64"]\n", i, rx, 
839                        rx->rx_msg, KIBNAL_RX_LKEY(rx), KIBNAL_RX_VADDR(rx));
840
841                 vaddr += IBNAL_MSG_SIZE;
842                 LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);
843                 
844                 page_offset += IBNAL_MSG_SIZE;
845                 LASSERT (page_offset <= PAGE_SIZE);
846
847                 if (page_offset == PAGE_SIZE) {
848                         page_offset = 0;
849                         ipage++;
850                         LASSERT (ipage <= IBNAL_RX_MSG_PAGES);
851                 }
852         }
853
854         memset(&reqattr, 0, sizeof(reqattr));
855
856         reqattr.create.qp_type                    = vv_qp_type_r_conn;
857         reqattr.create.cq_send_h                  = kibnal_data.kib_cq;
858         reqattr.create.cq_receive_h               = kibnal_data.kib_cq;
859         reqattr.create.send_max_outstand_wr       = (1 + IBNAL_MAX_RDMA_FRAGS) * 
860                                                     IBNAL_MSG_QUEUE_SIZE;
861         reqattr.create.receive_max_outstand_wr    = IBNAL_RX_MSGS;
862         reqattr.create.max_scatgat_per_send_wr    = 1;
863         reqattr.create.max_scatgat_per_receive_wr = 1;
864         reqattr.create.signaling_type             = vv_selectable_signaling;
865         reqattr.create.pd_h                       = kibnal_data.kib_pd;
866         reqattr.create.recv_solicited_events      = vv_selectable_signaling; // vv_signal_all;
867
868         vvrc = vv_qp_create(kibnal_data.kib_hca, &reqattr, NULL,
869                             &conn->ibc_qp, &rspattr);
870         if (vvrc != vv_return_ok) {
871                 CERROR ("Failed to create queue pair: %d\n", vvrc);
872                 goto failed;
873         }
874
875         /* Mark QP created */
876         conn->ibc_state = IBNAL_CONN_INIT;
877         conn->ibc_connvars->cv_local_qpn = rspattr.create_return.qp_num;
878
879         if (rspattr.create_return.receive_max_outstand_wr < 
880             IBNAL_MSG_QUEUE_SIZE ||
881             rspattr.create_return.send_max_outstand_wr < 
882             (1 + IBNAL_MAX_RDMA_FRAGS) * IBNAL_MSG_QUEUE_SIZE) {
883                 CERROR("Insufficient rx/tx work items: wanted %d/%d got %d/%d\n",
884                        IBNAL_MSG_QUEUE_SIZE, 
885                        (1 + IBNAL_MAX_RDMA_FRAGS) * IBNAL_MSG_QUEUE_SIZE,
886                        rspattr.create_return.receive_max_outstand_wr,
887                        rspattr.create_return.send_max_outstand_wr);
888                 goto failed;
889         }
890
891         /* 1 ref for caller */
892         atomic_set (&conn->ibc_refcount, 1);
893         return (conn);
894         
895  failed:
896         kibnal_destroy_conn (conn);
897         return (NULL);
898 }
899
900 void
901 kibnal_destroy_conn (kib_conn_t *conn)
902 {
903         vv_return_t vvrc;
904
905         /* Only the connd does this (i.e. single threaded) */
906         LASSERT (!in_interrupt());
907         LASSERT (current == kibnal_data.kib_connd);
908         
909         CDEBUG (D_NET, "connection %p\n", conn);
910
911         LASSERT (atomic_read (&conn->ibc_refcount) == 0);
912         LASSERT (list_empty(&conn->ibc_early_rxs));
913         LASSERT (list_empty(&conn->ibc_tx_queue));
914         LASSERT (list_empty(&conn->ibc_active_txs));
915         LASSERT (conn->ibc_nsends_posted == 0);
916
917         switch (conn->ibc_state) {
918         default:
919                 /* conn must be completely disengaged from the network */
920                 LBUG();
921
922         case IBNAL_CONN_DISCONNECTED:
923                 /* connvars should have been freed already */
924                 LASSERT (conn->ibc_connvars == NULL);
925                 /* fall through */
926
927         case IBNAL_CONN_INIT:
928                 kibnal_set_qp_state(conn, vv_qp_state_reset);
929                 vvrc = vv_qp_destroy(kibnal_data.kib_hca, conn->ibc_qp);
930                 if (vvrc != vv_return_ok)
931                         CERROR("Can't destroy QP: %d\n", vvrc);
932                 /* fall through */
933                 
934         case IBNAL_CONN_INIT_NOTHING:
935                 break;
936         }
937
938         if (conn->ibc_rx_pages != NULL) 
939                 kibnal_free_pages(conn->ibc_rx_pages);
940
941         if (conn->ibc_rxs != NULL)
942                 PORTAL_FREE(conn->ibc_rxs, 
943                             IBNAL_RX_MSGS * sizeof(kib_rx_t));
944
945         if (conn->ibc_connvars != NULL)
946                 PORTAL_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
947
948         if (conn->ibc_peer != NULL)
949                 kibnal_peer_decref(conn->ibc_peer);
950
951         vvrc = cm_destroy_cep(conn->ibc_cep);
952         LASSERT (vvrc == vv_return_ok);
953
954         PORTAL_FREE(conn, sizeof (*conn));
955
956         atomic_dec(&kibnal_data.kib_nconns);
957 }
958
959 int
960 kibnal_close_peer_conns_locked (kib_peer_t *peer, int why)
961 {
962         kib_conn_t         *conn;
963         struct list_head   *ctmp;
964         struct list_head   *cnxt;
965         int                 count = 0;
966
967         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
968                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
969
970                 count++;
971                 kibnal_close_conn_locked (conn, why);
972         }
973
974         return (count);
975 }
976
977 int
978 kibnal_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
979 {
980         kib_conn_t         *conn;
981         struct list_head   *ctmp;
982         struct list_head   *cnxt;
983         int                 count = 0;
984
985         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
986                 conn = list_entry (ctmp, kib_conn_t, ibc_list);
987
988                 if (conn->ibc_incarnation == incarnation)
989                         continue;
990
991                 CDEBUG(D_NET, "Closing stale conn nid:"LPX64" incarnation:"LPX64"("LPX64")\n",
992                        peer->ibp_nid, conn->ibc_incarnation, incarnation);
993                 
994                 count++;
995                 kibnal_close_conn_locked (conn, -ESTALE);
996         }
997
998         return (count);
999 }
1000
1001 int
1002 kibnal_close_matching_conns (ptl_nid_t nid)
1003 {
1004         kib_peer_t         *peer;
1005         struct list_head   *ptmp;
1006         struct list_head   *pnxt;
1007         int                 lo;
1008         int                 hi;
1009         int                 i;
1010         unsigned long       flags;
1011         int                 count = 0;
1012
1013         write_lock_irqsave(&kibnal_data.kib_global_lock, flags);
1014
1015         if (nid != PTL_NID_ANY)
1016                 lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;
1017         else {
1018                 lo = 0;
1019                 hi = kibnal_data.kib_peer_hash_size - 1;
1020         }
1021
1022         for (i = lo; i <= hi; i++) {
1023                 list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {
1024
1025                         peer = list_entry (ptmp, kib_peer_t, ibp_list);
1026                         LASSERT (peer->ibp_persistence != 0 ||
1027                                  peer->ibp_connecting != 0 ||
1028                                  !list_empty (&peer->ibp_conns));
1029
1030                         if (!(nid == PTL_NID_ANY || nid == peer->ibp_nid))
1031                                 continue;
1032
1033                         count += kibnal_close_peer_conns_locked (peer, 0);
1034                 }
1035         }
1036
1037         write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
1038
1039         /* wildcards always succeed */
1040         if (nid == PTL_NID_ANY)
1041                 return (0);
1042         
1043         return (count == 0 ? -ENOENT : 0);
1044 }
1045
1046 int
1047 kibnal_cmd(struct portals_cfg *pcfg, void * private)
1048 {
1049         int rc = -EINVAL;
1050
1051         LASSERT (pcfg != NULL);
1052
1053         switch(pcfg->pcfg_command) {
1054         case NAL_CMD_GET_PEER: {
1055                 ptl_nid_t   nid = 0;
1056                 __u32       ip = 0;
1057                 int         share_count = 0;
1058
1059                 rc = kibnal_get_peer_info(pcfg->pcfg_count,
1060                                           &nid, &ip, &share_count);
1061                 pcfg->pcfg_nid   = nid;
1062                 pcfg->pcfg_size  = 0;
1063                 pcfg->pcfg_id    = ip;
1064                 pcfg->pcfg_misc  = IBNAL_SERVICE_NUMBER; /* port */
1065                 pcfg->pcfg_count = 0;
1066                 pcfg->pcfg_wait  = share_count;
1067                 break;
1068         }
1069         case NAL_CMD_ADD_PEER: {
1070                 rc = kibnal_add_persistent_peer (pcfg->pcfg_nid,
1071                                                  pcfg->pcfg_id); /* IP */
1072                 break;
1073         }
1074         case NAL_CMD_DEL_PEER: {
1075                 rc = kibnal_del_peer (pcfg->pcfg_nid, 
1076                                        /* flags == single_share */
1077                                        pcfg->pcfg_flags != 0);
1078                 break;
1079         }
1080         case NAL_CMD_GET_CONN: {
1081                 kib_conn_t *conn = kibnal_get_conn_by_idx (pcfg->pcfg_count);
1082
1083                 if (conn == NULL)
1084                         rc = -ENOENT;
1085                 else {
1086                         rc = 0;
1087                         pcfg->pcfg_nid   = conn->ibc_peer->ibp_nid;
1088                         pcfg->pcfg_id    = 0;
1089                         pcfg->pcfg_misc  = 0;
1090                         pcfg->pcfg_flags = 0;
1091                         kibnal_conn_decref(conn);
1092                 }
1093                 break;
1094         }
1095         case NAL_CMD_CLOSE_CONNECTION: {
1096                 rc = kibnal_close_matching_conns (pcfg->pcfg_nid);
1097                 break;
1098         }
1099         case NAL_CMD_REGISTER_MYNID: {
1100                 if (pcfg->pcfg_nid == PTL_NID_ANY)
1101                         rc = -EINVAL;
1102                 else
1103                         rc = kibnal_set_mynid (pcfg->pcfg_nid);
1104                 break;
1105         }
1106         }
1107
1108         return rc;
1109 }
1110
1111 void
1112 kibnal_free_pages (kib_pages_t *p)
1113 {
1114         int         npages = p->ibp_npages;
1115         vv_return_t vvrc;
1116         int         i;
1117         
1118         if (p->ibp_mapped) {
1119                 vvrc = vv_mem_region_destroy(kibnal_data.kib_hca, 
1120                                              p->ibp_handle);
1121                 if (vvrc != vv_return_ok)
1122                         CERROR ("Deregister error: %d\n", vvrc);
1123         }
1124         
1125         for (i = 0; i < npages; i++)
1126                 if (p->ibp_pages[i] != NULL)
1127                         __free_page(p->ibp_pages[i]);
1128         
1129         PORTAL_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1130 }
1131
1132 int
1133 kibnal_alloc_pages (kib_pages_t **pp, int npages, int allow_write)
1134 {
1135         kib_pages_t   *p;
1136         int            i;
1137 #if !IBNAL_WHOLE_MEM
1138         vv_phy_list_t            vv_phys;
1139         vv_phy_buf_t            *phys_pages;
1140         vv_return_t              vvrc;
1141         vv_access_con_bit_mask_t access;
1142 #endif
1143
1144         PORTAL_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1145         if (p == NULL) {
1146                 CERROR ("Can't allocate buffer %d\n", npages);
1147                 return (-ENOMEM);
1148         }
1149
1150         memset (p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1151         p->ibp_npages = npages;
1152         
1153         for (i = 0; i < npages; i++) {
1154                 p->ibp_pages[i] = alloc_page (GFP_KERNEL);
1155                 if (p->ibp_pages[i] == NULL) {
1156                         CERROR ("Can't allocate page %d of %d\n", i, npages);
1157                         kibnal_free_pages(p);
1158                         return (-ENOMEM);
1159                 }
1160         }
1161
1162 #if !IBNAL_WHOLE_MEM
1163         PORTAL_ALLOC(phys_pages, npages * sizeof(*phys_pages));
1164         if (phys_pages == NULL) {
1165                 CERROR ("Can't allocate physarray for %d pages\n", npages);
1166                 kibnal_free_pages(p);
1167                 return (-ENOMEM);
1168         }
1169
1170         vv_phys.number_of_buff = npages;
1171         vv_phys.phy_list = phys_pages;
1172
1173         for (i = 0; i < npages; i++) {
1174                 phys_pages[i].size = PAGE_SIZE;
1175                 phys_pages[i].start = 
1176                         kibnal_page2phys(p->ibp_pages[i]);
1177         }
1178
1179         VV_ACCESS_CONTROL_MASK_SET_ALL(access);
1180         
1181         vvrc = vv_phy_mem_region_register(kibnal_data.kib_hca,
1182                                           &vv_phys,
1183                                           0, /* requested vaddr */
1184                                           npages * PAGE_SIZE, 0, /* offset */
1185                                           kibnal_data.kib_pd,
1186                                           access,
1187                                           &p->ibp_handle, 
1188                                           &p->ibp_vaddr,                                           
1189                                           &p->ibp_lkey, 
1190                                           &p->ibp_rkey);
1191         
1192         PORTAL_FREE(phys_pages, npages * sizeof(*phys_pages));
1193         
1194         if (vvrc != vv_return_ok) {
1195                 CERROR ("Error %d mapping %d pages\n", vvrc, npages);
1196                 kibnal_free_pages(p);
1197                 return (-EFAULT);
1198         }
1199
1200         CDEBUG(D_NET, "registered %d pages; handle: %x vaddr "LPX64" "
1201                "lkey %x rkey %x\n", npages, p->ibp_handle,
1202                p->ibp_vaddr, p->ibp_lkey, p->ibp_rkey);
1203         
1204         p->ibp_mapped = 1;
1205 #endif
1206         *pp = p;
1207         return (0);
1208 }
1209
1210 int
1211 kibnal_alloc_tx_descs (void) 
1212 {
1213         int    i;
1214         
1215         PORTAL_ALLOC (kibnal_data.kib_tx_descs,
1216                       IBNAL_TX_MSGS * sizeof(kib_tx_t));
1217         if (kibnal_data.kib_tx_descs == NULL)
1218                 return -ENOMEM;
1219         
1220         memset(kibnal_data.kib_tx_descs, 0,
1221                IBNAL_TX_MSGS * sizeof(kib_tx_t));
1222
1223         for (i = 0; i < IBNAL_TX_MSGS; i++) {
1224                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1225
1226                 PORTAL_ALLOC(tx->tx_wrq, 
1227                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1228                              sizeof(*tx->tx_wrq));
1229                 if (tx->tx_wrq == NULL)
1230                         return -ENOMEM;
1231                 
1232                 PORTAL_ALLOC(tx->tx_gl, 
1233                              (1 + IBNAL_MAX_RDMA_FRAGS) * 
1234                              sizeof(*tx->tx_gl));
1235                 if (tx->tx_gl == NULL)
1236                         return -ENOMEM;
1237                 
1238                 PORTAL_ALLOC(tx->tx_rd, 
1239                              offsetof(kib_rdma_desc_t, 
1240                                       rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1241                 if (tx->tx_rd == NULL)
1242                         return -ENOMEM;
1243         }
1244
1245         return 0;
1246 }
1247
1248 void
1249 kibnal_free_tx_descs (void) 
1250 {
1251         int    i;
1252
1253         if (kibnal_data.kib_tx_descs == NULL)
1254                 return;
1255
1256         for (i = 0; i < IBNAL_TX_MSGS; i++) {
1257                 kib_tx_t *tx = &kibnal_data.kib_tx_descs[i];
1258
1259                 if (tx->tx_wrq != NULL)
1260                         PORTAL_FREE(tx->tx_wrq, 
1261                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1262                                     sizeof(*tx->tx_wrq));
1263
1264                 if (tx->tx_gl != NULL)
1265                         PORTAL_FREE(tx->tx_gl, 
1266                                     (1 + IBNAL_MAX_RDMA_FRAGS) * 
1267                                     sizeof(*tx->tx_gl));
1268
1269                 if (tx->tx_rd != NULL)
1270                         PORTAL_FREE(tx->tx_rd, 
1271                                     offsetof(kib_rdma_desc_t, 
1272                                              rd_frags[IBNAL_MAX_RDMA_FRAGS]));
1273         }
1274
1275         PORTAL_FREE(kibnal_data.kib_tx_descs,
1276                     IBNAL_TX_MSGS * sizeof(kib_tx_t));
1277 }
1278
1279 int
1280 kibnal_setup_tx_descs (void)
1281 {
1282         int           ipage = 0;
1283         int           page_offset = 0;
1284         __u64         vaddr;
1285         __u64         vaddr_base;
1286         struct page  *page;
1287         kib_tx_t     *tx;
1288         int           i;
1289         int           rc;
1290
1291         /* pre-mapped messages are not bigger than 1 page */
1292         CLASSERT (IBNAL_MSG_SIZE <= PAGE_SIZE);
1293
1294         /* No fancy arithmetic when we do the buffer calculations */
1295         CLASSERT (PAGE_SIZE % IBNAL_MSG_SIZE == 0);
1296
1297         rc = kibnal_alloc_pages(&kibnal_data.kib_tx_pages, IBNAL_TX_MSG_PAGES, 
1298                                 0);
1299         if (rc != 0)
1300                 return (rc);
1301
1302         /* ignored for the whole_mem case */
1303         vaddr = vaddr_base = kibnal_data.kib_tx_pages->ibp_vaddr;
1304
1305         for (i = 0; i < IBNAL_TX_MSGS; i++) {
1306                 page = kibnal_data.kib_tx_pages->ibp_pages[ipage];
1307                 tx = &kibnal_data.kib_tx_descs[i];
1308
1309                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) + 
1310                                            page_offset);
1311 #if IBNAL_WHOLE_MEM
1312                 {
1313                         vv_mem_reg_h_t  mem_h;
1314                         vv_r_key_t      rkey;
1315                         vv_return_t     vvrc;
1316
1317                         /* Voltaire stack already registers the whole
1318                          * memory, so use that API. */
1319                         vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,
1320                                                     tx->tx_msg,
1321                                                     IBNAL_MSG_SIZE,
1322                                                     &mem_h,
1323                                                     &tx->tx_lkey,
1324                                                     &rkey);
1325                         LASSERT (vvrc == vv_return_ok);
1326                 }
1327 #else
1328                 tx->tx_vaddr = vaddr;
1329 #endif
1330                 tx->tx_isnblk = (i >= IBNAL_NTX);
1331                 tx->tx_mapped = KIB_TX_UNMAPPED;
1332
1333                 CDEBUG(D_NET, "Tx[%d] %p->%p[%x:"LPX64"]\n", i, tx, 
1334                        tx->tx_msg, KIBNAL_TX_LKEY(tx), KIBNAL_TX_VADDR(tx));
1335
1336                 if (tx->tx_isnblk)
1337                         list_add (&tx->tx_list, 
1338                                   &kibnal_data.kib_idle_nblk_txs);
1339                 else
1340                         list_add (&tx->tx_list, 
1341                                   &kibnal_data.kib_idle_txs);
1342
1343                 vaddr += IBNAL_MSG_SIZE;
1344                 LASSERT (vaddr <= vaddr_base + IBNAL_TX_MSG_BYTES);
1345
1346                 page_offset += IBNAL_MSG_SIZE;
1347                 LASSERT (page_offset <= PAGE_SIZE);
1348
1349                 if (page_offset == PAGE_SIZE) {
1350                         page_offset = 0;
1351                         ipage++;
1352                         LASSERT (ipage <= IBNAL_TX_MSG_PAGES);
1353                 }
1354         }
1355         
1356         return (0);
1357 }
1358
1359 void
1360 kibnal_api_shutdown (nal_t *nal)
1361 {
1362         int         i;
1363         vv_return_t vvrc;
1364
1365         if (nal->nal_refct != 0) {
1366                 /* This module got the first ref */
1367                 PORTAL_MODULE_UNUSE;
1368                 return;
1369         }
1370
1371         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1372                atomic_read (&portal_kmemory));
1373
1374         LASSERT(nal == &kibnal_api);
1375
1376         switch (kibnal_data.kib_init) {
1377
1378         case IBNAL_INIT_ALL:
1379                 /* stop calls to nal_cmd */
1380                 libcfs_nal_cmd_unregister(VIBNAL);
1381                 /* No new peers */
1382
1383                 /* resetting my NID removes my listener and nukes all current
1384                  * peers and their connections */
1385                 kibnal_set_mynid (PTL_NID_ANY);
1386
1387                 /* Wait for all peer state to clean up */
1388                 i = 2;
1389                 while (atomic_read (&kibnal_data.kib_npeers) != 0) {
1390                         i++;
1391                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1392                                "waiting for %d peers to disconnect\n",
1393                                atomic_read (&kibnal_data.kib_npeers));
1394                         set_current_state (TASK_UNINTERRUPTIBLE);
1395                         schedule_timeout (HZ);
1396                 }
1397                 /* fall through */
1398
1399         case IBNAL_INIT_CQ:
1400                 vvrc = vv_cq_destroy(kibnal_data.kib_hca, kibnal_data.kib_cq);
1401                 if (vvrc != vv_return_ok)
1402                         CERROR ("Destroy CQ error: %d\n", vvrc);
1403                 /* fall through */
1404
1405         case IBNAL_INIT_TXD:
1406                 kibnal_free_pages (kibnal_data.kib_tx_pages);
1407                 /* fall through */
1408
1409         case IBNAL_INIT_PD:
1410 #if !IBNAL_WHOLE_MEM
1411                 vvrc = vv_pd_deallocate(kibnal_data.kib_hca,
1412                                         kibnal_data.kib_pd);
1413                 if (vvrc != vv_return_ok)
1414                         CERROR ("Destroy PD error: %d\n", vvrc);
1415 #endif
1416                 /* fall through */
1417
1418         case IBNAL_INIT_ASYNC:
1419                 vvrc = vv_dell_async_event_cb (kibnal_data.kib_hca,
1420                                               kibnal_async_callback);
1421                 if (vvrc != vv_return_ok)
1422                         CERROR("vv_dell_async_event_cb error: %d\n", vvrc);
1423                         
1424                 /* fall through */
1425
1426         case IBNAL_INIT_HCA:
1427                 vvrc = vv_hca_close(kibnal_data.kib_hca);
1428                 if (vvrc != vv_return_ok)
1429                         CERROR ("Close HCA  error: %d\n", vvrc);
1430                 /* fall through */
1431
1432         case IBNAL_INIT_LIB:
1433                 lib_fini(&kibnal_lib);
1434                 /* fall through */
1435
1436         case IBNAL_INIT_DATA:
1437                 LASSERT (atomic_read (&kibnal_data.kib_npeers) == 0);
1438                 LASSERT (kibnal_data.kib_peers != NULL);
1439                 for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {
1440                         LASSERT (list_empty (&kibnal_data.kib_peers[i]));
1441                 }
1442                 LASSERT (atomic_read (&kibnal_data.kib_nconns) == 0);
1443                 LASSERT (list_empty (&kibnal_data.kib_sched_rxq));
1444                 LASSERT (list_empty (&kibnal_data.kib_sched_txq));
1445                 LASSERT (list_empty (&kibnal_data.kib_connd_zombies));
1446                 LASSERT (list_empty (&kibnal_data.kib_connd_conns));
1447                 LASSERT (list_empty (&kibnal_data.kib_connd_pcreqs));
1448                 LASSERT (list_empty (&kibnal_data.kib_connd_peers));
1449
1450                 /* flag threads to terminate; wake and wait for them to die */
1451                 kibnal_data.kib_shutdown = 1;
1452                 wake_up_all (&kibnal_data.kib_sched_waitq);
1453                 wake_up_all (&kibnal_data.kib_connd_waitq);
1454
1455                 i = 2;
1456                 while (atomic_read (&kibnal_data.kib_nthreads) != 0) {
1457                         i++;
1458                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1459                                "Waiting for %d threads to terminate\n",
1460                                atomic_read (&kibnal_data.kib_nthreads));
1461                         set_current_state (TASK_INTERRUPTIBLE);
1462                         schedule_timeout (HZ);
1463                 }
1464                 /* fall through */
1465                 
1466         case IBNAL_INIT_NOTHING:
1467                 break;
1468         }
1469
1470         kibnal_free_tx_descs();
1471
1472         if (kibnal_data.kib_peers != NULL)
1473                 PORTAL_FREE (kibnal_data.kib_peers,
1474                              sizeof (struct list_head) * 
1475                              kibnal_data.kib_peer_hash_size);
1476
1477         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1478                atomic_read (&portal_kmemory));
1479         printk(KERN_INFO "Lustre: Voltaire IB NAL unloaded (final mem %d)\n",
1480                atomic_read(&portal_kmemory));
1481
1482         kibnal_data.kib_init = IBNAL_INIT_NOTHING;
1483 }
1484
1485 int
1486 kibnal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1487                      ptl_ni_limits_t *requested_limits,
1488                      ptl_ni_limits_t *actual_limits)
1489 {
1490         struct timeval            tv;
1491         ptl_process_id_t          process_id;
1492         int                       pkmem = atomic_read(&portal_kmemory);
1493         int                       rc;
1494         int                       i;
1495         vv_request_event_record_t req_er;
1496         vv_return_t               vvrc;
1497
1498         LASSERT (nal == &kibnal_api);
1499
1500         if (nal->nal_refct != 0) {
1501                 if (actual_limits != NULL)
1502                         *actual_limits = kibnal_lib.libnal_ni.ni_actual_limits;
1503                 /* This module got the first ref */
1504                 PORTAL_MODULE_USE;
1505                 return (PTL_OK);
1506         }
1507
1508         LASSERT (kibnal_data.kib_init == IBNAL_INIT_NOTHING);
1509         memset (&kibnal_data, 0, sizeof (kibnal_data)); /* zero pointers, flags etc */
1510         
1511         do_gettimeofday(&tv);
1512         kibnal_data.kib_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1513         kibnal_data.kib_svc_id = IBNAL_SERVICE_NUMBER;
1514
1515         init_MUTEX (&kibnal_data.kib_nid_mutex);
1516
1517         rwlock_init(&kibnal_data.kib_global_lock);
1518
1519         kibnal_data.kib_peer_hash_size = IBNAL_PEER_HASH_SIZE;
1520         PORTAL_ALLOC (kibnal_data.kib_peers,
1521                       sizeof (struct list_head) * kibnal_data.kib_peer_hash_size);
1522         if (kibnal_data.kib_peers == NULL) {
1523                 goto failed;
1524         }
1525         for (i = 0; i < kibnal_data.kib_peer_hash_size; i++)
1526                 INIT_LIST_HEAD(&kibnal_data.kib_peers[i]);
1527
1528         spin_lock_init (&kibnal_data.kib_connd_lock);
1529         INIT_LIST_HEAD (&kibnal_data.kib_connd_peers);
1530         INIT_LIST_HEAD (&kibnal_data.kib_connd_pcreqs);
1531         INIT_LIST_HEAD (&kibnal_data.kib_connd_conns);
1532         INIT_LIST_HEAD (&kibnal_data.kib_connd_zombies);
1533         init_waitqueue_head (&kibnal_data.kib_connd_waitq);
1534
1535         spin_lock_init (&kibnal_data.kib_sched_lock);
1536         INIT_LIST_HEAD (&kibnal_data.kib_sched_txq);
1537         INIT_LIST_HEAD (&kibnal_data.kib_sched_rxq);
1538         init_waitqueue_head (&kibnal_data.kib_sched_waitq);
1539
1540         spin_lock_init (&kibnal_data.kib_tx_lock);
1541         INIT_LIST_HEAD (&kibnal_data.kib_idle_txs);
1542         INIT_LIST_HEAD (&kibnal_data.kib_idle_nblk_txs);
1543         init_waitqueue_head(&kibnal_data.kib_idle_tx_waitq);
1544
1545         rc = kibnal_alloc_tx_descs();
1546         if (rc != 0) {
1547                 CERROR("Can't allocate tx descs\n");
1548                 goto failed;
1549         }
1550         
1551         /* lists/ptrs/locks initialised */
1552         kibnal_data.kib_init = IBNAL_INIT_DATA;
1553         /*****************************************************/
1554
1555         process_id.pid = requested_pid;
1556         process_id.nid = PTL_NID_ANY;
1557         
1558         rc = lib_init(&kibnal_lib, nal, process_id,
1559                       requested_limits, actual_limits);
1560         if (rc != PTL_OK) {
1561                 CERROR("lib_init failed: error %d\n", rc);
1562                 goto failed;
1563         }
1564
1565         /* lib interface initialised */
1566         kibnal_data.kib_init = IBNAL_INIT_LIB;
1567         /*****************************************************/
1568
1569         for (i = 0; i < IBNAL_N_SCHED; i++) {
1570                 rc = kibnal_thread_start (kibnal_scheduler, (void *)((long)i));
1571                 if (rc != 0) {
1572                         CERROR("Can't spawn vibnal scheduler[%d]: %d\n",
1573                                i, rc);
1574                         goto failed;
1575                 }
1576         }
1577
1578         rc = kibnal_thread_start (kibnal_connd, NULL);
1579         if (rc != 0) {
1580                 CERROR ("Can't spawn vibnal connd: %d\n", rc);
1581                 goto failed;
1582         }
1583
1584         /* TODO: apparently only one adapter is supported */
1585         vvrc = vv_hca_open("ANY_HCA", NULL, &kibnal_data.kib_hca);
1586         if (vvrc != vv_return_ok) {
1587                 CERROR ("Can't open CA: %d\n", vvrc);
1588                 goto failed;
1589         }
1590
1591         /* Channel Adapter opened */
1592         kibnal_data.kib_init = IBNAL_INIT_HCA;
1593
1594         /* register to get HCA's asynchronous events. */
1595         req_er.req_event_type = VV_ASYNC_EVENT_ALL_MASK;
1596         vvrc = vv_set_async_event_cb (kibnal_data.kib_hca, req_er,
1597                                      kibnal_async_callback);
1598         if (vvrc != vv_return_ok) {
1599                 CERROR ("Can't open CA: %d\n", vvrc);
1600                 goto failed; 
1601         }
1602
1603         kibnal_data.kib_init = IBNAL_INIT_ASYNC;
1604
1605         /*****************************************************/
1606
1607         vvrc = vv_hca_query(kibnal_data.kib_hca, &kibnal_data.kib_hca_attrs);
1608         if (vvrc != vv_return_ok) {
1609                 CERROR ("Can't size port attrs: %d\n", vvrc);
1610                 goto failed;
1611         }
1612
1613         kibnal_data.kib_port = -1;
1614
1615         for (i = 0; i<kibnal_data.kib_hca_attrs.port_num; i++) {
1616
1617                 int port_num = i+1;
1618                 u_int32_t tbl_count;
1619                 vv_port_attrib_t *pattr = &kibnal_data.kib_port_attr;
1620
1621                 vvrc = vv_port_query(kibnal_data.kib_hca, port_num, pattr);
1622                 if (vvrc != vv_return_ok) {
1623                         CERROR("vv_port_query failed for port %d: %d\n",
1624                                port_num, vvrc);
1625                         continue;
1626                 }
1627
1628                 switch (pattr->port_state) {
1629                 case vv_state_linkDoun:
1630                         CDEBUG(D_NET, "port[%d] Down\n", port_num);
1631                         continue;
1632                 case vv_state_linkInit:
1633                         CDEBUG(D_NET, "port[%d] Init\n", port_num);
1634                         continue;
1635                 case vv_state_linkArm:
1636                         CDEBUG(D_NET, "port[%d] Armed\n", port_num);
1637                         continue;
1638                 case vv_state_linkActive:
1639                         CDEBUG(D_NET, "port[%d] Active\n", port_num);
1640
1641                         /* Found a suitable port. Get its GUID and PKEY. */
1642                         kibnal_data.kib_port = port_num;
1643                         
1644                         tbl_count = 1;
1645                         vvrc = vv_get_port_gid_tbl(kibnal_data.kib_hca, 
1646                                                    port_num, &tbl_count,
1647                                                    &kibnal_data.kib_port_gid);
1648                         if (vvrc != vv_return_ok) {
1649                                 CERROR("vv_get_port_gid_tbl failed "
1650                                        "for port %d: %d\n", port_num, vvrc);
1651                                 continue;
1652                         }
1653
1654                         tbl_count = 1;
1655                         vvrc = vv_get_port_partition_tbl(kibnal_data.kib_hca, 
1656                                                         port_num, &tbl_count,
1657                                                         &kibnal_data.kib_port_pkey);
1658                         if (vvrc != vv_return_ok) {
1659                                 CERROR("vv_get_port_partition_tbl failed "
1660                                        "for port %d: %d\n", port_num, vvrc);
1661                                 continue;
1662                         }
1663
1664                         break;
1665                 case vv_state_linkActDefer: /* TODO: correct? */
1666                 case vv_state_linkNoChange:
1667                         CERROR("Unexpected port[%d] state %d\n",
1668                                i, pattr->port_state);
1669                         continue;
1670                 }
1671                 break;
1672         }
1673
1674         if (kibnal_data.kib_port == -1) {
1675                 CERROR ("Can't find an active port\n");
1676                 goto failed;
1677         }
1678
1679         CDEBUG(D_NET, "Using port %d - GID="LPX64":"LPX64"\n",
1680                kibnal_data.kib_port, 
1681                kibnal_data.kib_port_gid.scope.g.subnet, 
1682                kibnal_data.kib_port_gid.scope.g.eui64);
1683         
1684         /*****************************************************/
1685
1686 #if !IBNAL_WHOLE_MEM
1687         vvrc = vv_pd_allocate(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1688 #else
1689         vvrc = vv_get_gen_pd_h(kibnal_data.kib_hca, &kibnal_data.kib_pd);
1690 #endif
1691         if (vvrc != 0) {
1692                 CERROR ("Can't create PD: %d\n", vvrc);
1693                 goto failed;
1694         }
1695         
1696         /* flag PD initialised */
1697         kibnal_data.kib_init = IBNAL_INIT_PD;
1698         /*****************************************************/
1699
1700         rc = kibnal_setup_tx_descs();
1701         if (rc != 0) {
1702                 CERROR ("Can't register tx descs: %d\n", rc);
1703                 goto failed;
1704         }
1705         
1706         /* flag TX descs initialised */
1707         kibnal_data.kib_init = IBNAL_INIT_TXD;
1708         /*****************************************************/
1709         {
1710                 uint32_t nentries;
1711
1712                 vvrc = vv_cq_create(kibnal_data.kib_hca, IBNAL_CQ_ENTRIES,
1713                                     kibnal_cq_callback, 
1714                                     NULL, /* context */
1715                                     &kibnal_data.kib_cq, &nentries);
1716                 if (vvrc != 0) {
1717                         CERROR ("Can't create RX CQ: %d\n", vvrc);
1718                         goto failed;
1719                 }
1720
1721                 /* flag CQ initialised */
1722                 kibnal_data.kib_init = IBNAL_INIT_CQ;
1723
1724                 if (nentries < IBNAL_CQ_ENTRIES) {
1725                         CERROR ("CQ only has %d entries, need %d\n", 
1726                                 nentries, IBNAL_CQ_ENTRIES);
1727                         goto failed;
1728                 }
1729
1730                 vvrc = vv_request_completion_notification(kibnal_data.kib_hca, 
1731                                                           kibnal_data.kib_cq, 
1732                                                           vv_next_solicit_unsolicit_event);
1733                 if (vvrc != 0) {
1734                         CERROR ("Failed to re-arm completion queue: %d\n", rc);
1735                         goto failed;
1736                 }
1737         }
1738         
1739         /*****************************************************/
1740
1741         rc = libcfs_nal_cmd_register(VIBNAL, &kibnal_cmd, NULL);
1742         if (rc != 0) {
1743                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1744                 goto failed;
1745         }
1746
1747         /* flag everything initialised */
1748         kibnal_data.kib_init = IBNAL_INIT_ALL;
1749         /*****************************************************/
1750
1751         printk(KERN_INFO "Lustre: Voltaire IB NAL loaded "
1752                "(initial mem %d)\n", pkmem);
1753
1754         return (PTL_OK);
1755
1756  failed:
1757         CDEBUG(D_NET, "kibnal_api_startup failed\n");
1758         kibnal_api_shutdown (&kibnal_api);    
1759         return (PTL_FAIL);
1760 }
1761
1762 void __exit
1763 kibnal_module_fini (void)
1764 {
1765 #ifdef CONFIG_SYSCTL
1766         if (kibnal_tunables.kib_sysctl != NULL)
1767                 unregister_sysctl_table (kibnal_tunables.kib_sysctl);
1768 #endif
1769         PtlNIFini(kibnal_ni);
1770
1771         ptl_unregister_nal(VIBNAL);
1772 }
1773
1774 int __init
1775 kibnal_module_init (void)
1776 {
1777         int    rc;
1778
1779         CLASSERT (offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t) 
1780                   <= cm_REQ_priv_data_len);
1781         CLASSERT (offsetof(kib_msg_t, ibm_u) + sizeof(kib_connparams_t) 
1782                   <= cm_REP_priv_data_len);
1783         CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBNAL_MAX_RDMA_FRAGS])
1784                   <= IBNAL_MSG_SIZE);
1785         CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBNAL_MAX_RDMA_FRAGS])
1786                   <= IBNAL_MSG_SIZE);
1787         
1788         /* the following must be sizeof(int) for proc_dointvec() */
1789         CLASSERT (sizeof (kibnal_tunables.kib_io_timeout) == sizeof (int));
1790
1791         kibnal_api.nal_ni_init = kibnal_api_startup;
1792         kibnal_api.nal_ni_fini = kibnal_api_shutdown;
1793
1794         /* Initialise dynamic tunables to defaults once only */
1795         kibnal_tunables.kib_io_timeout = IBNAL_IO_TIMEOUT;
1796
1797         rc = ptl_register_nal(VIBNAL, &kibnal_api);
1798         if (rc != PTL_OK) {
1799                 CERROR("Can't register IBNAL: %d\n", rc);
1800                 return (-ENOMEM);               /* or something... */
1801         }
1802
1803         /* Pure gateways want the NAL started up at module load time... */
1804         rc = PtlNIInit(VIBNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &kibnal_ni);
1805         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
1806                 ptl_unregister_nal(VIBNAL);
1807                 return (-ENODEV);
1808         }
1809         
1810 #ifdef CONFIG_SYSCTL
1811         /* Press on regardless even if registering sysctl doesn't work */
1812         kibnal_tunables.kib_sysctl = 
1813                 register_sysctl_table (kibnal_top_ctl_table, 0);
1814 #endif
1815         return (0);
1816 }
1817
1818 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1819 MODULE_DESCRIPTION("Kernel Voltaire IB NAL v0.01");
1820 MODULE_LICENSE("GPL");
1821
1822 module_init(kibnal_module_init);
1823 module_exit(kibnal_module_fini);
1824