Whamcloud - gitweb
b=16186,i=liangzhen,i=maxim:
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/o2iblnd/o2iblnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "o2iblnd.h"
42
43 lnd_t the_kiblnd = {
44         .lnd_type       = O2IBLND,
45         .lnd_startup    = kiblnd_startup,
46         .lnd_shutdown   = kiblnd_shutdown,
47         .lnd_ctl        = kiblnd_ctl,
48         .lnd_query      = kiblnd_query,
49         .lnd_send       = kiblnd_send,
50         .lnd_recv       = kiblnd_recv,
51 };
52
53 kib_data_t              kiblnd_data;
54
55 __u32
56 kiblnd_cksum (void *ptr, int nob)
57 {
58         char  *c  = ptr;
59         __u32  sum = 0;
60
61         while (nob-- > 0)
62                 sum = ((sum << 1) | (sum >> 31)) + *c++;
63
64         /* ensure I don't return 0 (== no checksum) */
65         return (sum == 0) ? 1 : sum;
66 }
67
68 void
69 kiblnd_init_msg (kib_msg_t *msg, int type, int body_nob)
70 {
71         msg->ibm_type = type;
72         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
73 }
74
75 void
76 kiblnd_pack_msg (lnet_ni_t *ni, kib_msg_t *msg,
77                  int credits, lnet_nid_t dstnid, __u64 dststamp)
78 {
79         kib_net_t *net = ni->ni_data;
80
81         /* CAVEAT EMPTOR! all message fields not set here should have been
82          * initialised previously. */
83         msg->ibm_magic    = IBLND_MSG_MAGIC;
84         msg->ibm_version  = IBLND_MSG_VERSION;
85         /*   ibm_type */
86         msg->ibm_credits  = credits;
87         /*   ibm_nob */
88         msg->ibm_cksum    = 0;
89         msg->ibm_srcnid   = ni->ni_nid;
90         msg->ibm_srcstamp = net->ibn_incarnation;
91         msg->ibm_dstnid   = dstnid;
92         msg->ibm_dststamp = dststamp;
93
94         if (*kiblnd_tunables.kib_cksum) {
95                 /* NB ibm_cksum zero while computing cksum */
96                 msg->ibm_cksum = kiblnd_cksum(msg, msg->ibm_nob);
97         }
98 }
99
100 int
101 kiblnd_unpack_msg(kib_msg_t *msg, int nob)
102 {
103         const int hdr_size = offsetof(kib_msg_t, ibm_u);
104         __u32     msg_cksum;
105         int       flip;
106         int       msg_nob;
107 #if !IBLND_MAP_ON_DEMAND
108         int       i;
109         int       n;
110 #endif
111         /* 6 bytes are enough to have received magic + version */
112         if (nob < 6) {
113                 CERROR("Short message: %d\n", nob);
114                 return -EPROTO;
115         }
116
117         if (msg->ibm_magic == IBLND_MSG_MAGIC) {
118                 flip = 0;
119         } else if (msg->ibm_magic == __swab32(IBLND_MSG_MAGIC)) {
120                 flip = 1;
121         } else {
122                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
123                 return -EPROTO;
124         }
125
126         if (msg->ibm_version !=
127             (flip ? __swab16(IBLND_MSG_VERSION) : IBLND_MSG_VERSION)) {
128                 CERROR("Bad version: %d\n", msg->ibm_version);
129                 return -EPROTO;
130         }
131
132         if (nob < hdr_size) {
133                 CERROR("Short message: %d\n", nob);
134                 return -EPROTO;
135         }
136
137         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
138         if (msg_nob > nob) {
139                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
140                 return -EPROTO;
141         }
142
143         /* checksum must be computed with ibm_cksum zero and BEFORE anything
144          * gets flipped */
145         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
146         msg->ibm_cksum = 0;
147         if (msg_cksum != 0 &&
148             msg_cksum != kiblnd_cksum(msg, msg_nob)) {
149                 CERROR("Bad checksum\n");
150                 return -EPROTO;
151         }
152         msg->ibm_cksum = msg_cksum;
153
154         if (flip) {
155                 /* leave magic unflipped as a clue to peer endianness */
156                 __swab16s(&msg->ibm_version);
157                 CLASSERT (sizeof(msg->ibm_type) == 1);
158                 CLASSERT (sizeof(msg->ibm_credits) == 1);
159                 msg->ibm_nob = msg_nob;
160                 __swab64s(&msg->ibm_srcnid);
161                 __swab64s(&msg->ibm_srcstamp);
162                 __swab64s(&msg->ibm_dstnid);
163                 __swab64s(&msg->ibm_dststamp);
164         }
165
166         if (msg->ibm_srcnid == LNET_NID_ANY) {
167                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
168                 return -EPROTO;
169         }
170
171         switch (msg->ibm_type) {
172         default:
173                 CERROR("Unknown message type %x\n", msg->ibm_type);
174                 return -EPROTO;
175
176         case IBLND_MSG_NOOP:
177                 break;
178
179         case IBLND_MSG_IMMEDIATE:
180                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
181                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
182                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
183                         return -EPROTO;
184                 }
185                 break;
186
187         case IBLND_MSG_PUT_REQ:
188                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
189                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
190                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
191                         return -EPROTO;
192                 }
193                 break;
194
195         case IBLND_MSG_PUT_ACK:
196                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
197                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
198                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
199                         return -EPROTO;
200                 }
201 #if IBLND_MAP_ON_DEMAND
202                 if (flip) {
203                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
204                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
205                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
206                 }
207 #else
208                 if (flip) {
209                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
210                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrags);
211                 }
212
213                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrags;
214                 if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
215                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
216                                n, IBLND_MAX_RDMA_FRAGS);
217                         return -EPROTO;
218                 }
219
220                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
221                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
222                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
223                         return -EPROTO;
224                 }
225
226                 if (flip) {
227                         for (i = 0; i < n; i++) {
228                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
229                                 __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr);
230                         }
231                 }
232 #endif
233                 break;
234
235         case IBLND_MSG_GET_REQ:
236                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
237                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
238                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
239                         return -EPROTO;
240                 }
241 #if IBLND_MAP_ON_DEMAND
242                 if (flip) {
243                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
244                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
245                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
246                 }
247 #else
248                 if (flip) {
249                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
250                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrags);
251                 }
252
253                 n = msg->ibm_u.get.ibgm_rd.rd_nfrags;
254                 if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
255                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
256                                n, IBLND_MAX_RDMA_FRAGS);
257                         return -EPROTO;
258                 }
259                 
260                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
261                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
262                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
263                         return -EPROTO;
264                 }
265                 
266                 if (flip)
267                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrags; i++) {
268                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
269                                 __swab64s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr);
270                         }
271 #endif
272                 break;
273
274         case IBLND_MSG_PUT_NAK:
275         case IBLND_MSG_PUT_DONE:
276         case IBLND_MSG_GET_DONE:
277                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
278                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
279                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
280                         return -EPROTO;
281                 }
282                 if (flip)
283                         __swab32s(&msg->ibm_u.completion.ibcm_status);
284                 break;
285
286         case IBLND_MSG_CONNREQ:
287         case IBLND_MSG_CONNACK:
288                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
289                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
290                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
291                         return -EPROTO;
292                 }
293                 if (flip) {
294                         __swab16s(&msg->ibm_u.connparams.ibcp_queue_depth);
295                         __swab16s(&msg->ibm_u.connparams.ibcp_max_frags);
296                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
297                 }
298                 break;
299         }
300         return 0;
301 }
302
303 int
304 kiblnd_create_peer (lnet_ni_t *ni, kib_peer_t **peerp, lnet_nid_t nid)
305 {
306         kib_peer_t     *peer;
307         kib_net_t      *net = ni->ni_data;
308         unsigned long   flags;
309
310         LASSERT (net != NULL);
311         LASSERT (nid != LNET_NID_ANY);
312
313         LIBCFS_ALLOC(peer, sizeof(*peer));
314         if (peer == NULL) {
315                 CERROR("Cannot allocate peer\n");
316                 return -ENOMEM;
317         }
318
319         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
320
321         peer->ibp_ni = ni;
322         peer->ibp_nid = nid;
323         peer->ibp_error = 0;
324         peer->ibp_last_alive = 0;
325         atomic_set(&peer->ibp_refcount, 1);     /* 1 ref for caller */
326
327         INIT_LIST_HEAD(&peer->ibp_list);       /* not in the peer table yet */
328         INIT_LIST_HEAD(&peer->ibp_conns);
329         INIT_LIST_HEAD(&peer->ibp_tx_queue);
330
331         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
332
333         /* always called with a ref on ni, which prevents ni being shutdown */
334         LASSERT (net->ibn_shutdown == 0);
335
336         /* npeers only grows with the global lock held */
337         atomic_inc(&net->ibn_npeers);
338
339         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
340
341         *peerp = peer;
342         return 0;
343 }
344
345 void
346 kiblnd_destroy_peer (kib_peer_t *peer)
347 {
348         kib_net_t *net = peer->ibp_ni->ni_data;
349
350         LASSERT (net != NULL);
351         LASSERT (atomic_read(&peer->ibp_refcount) == 0);
352         LASSERT (!kiblnd_peer_active(peer));
353         LASSERT (peer->ibp_connecting == 0);
354         LASSERT (peer->ibp_accepting == 0);
355         LASSERT (list_empty(&peer->ibp_conns));
356         LASSERT (list_empty(&peer->ibp_tx_queue));
357
358         LIBCFS_FREE(peer, sizeof(*peer));
359
360         /* NB a peer's connections keep a reference on their peer until
361          * they are destroyed, so we can be assured that _all_ state to do
362          * with this peer has been cleaned up when its refcount drops to
363          * zero. */
364         atomic_dec(&net->ibn_npeers);
365 }
366
367 void
368 kiblnd_destroy_dev (kib_dev_t *dev)
369 {
370         LASSERT (dev->ibd_nnets == 0);
371
372         if (!list_empty(&dev->ibd_list)) /* on kib_devs? */
373                 list_del_init(&dev->ibd_list);
374
375         if (dev->ibd_mr != NULL)
376                 ib_dereg_mr(dev->ibd_mr);
377
378         if (dev->ibd_pd != NULL)
379                 ib_dealloc_pd(dev->ibd_pd);
380
381         if (dev->ibd_cmid != NULL)
382                 rdma_destroy_id(dev->ibd_cmid);
383
384         LIBCFS_FREE(dev, sizeof(*dev));
385 }
386
387 kib_peer_t *
388 kiblnd_find_peer_locked (lnet_nid_t nid)
389 {
390         /* the caller is responsible for accounting the additional reference
391          * that this creates */
392         struct list_head *peer_list = kiblnd_nid2peerlist(nid);
393         struct list_head *tmp;
394         kib_peer_t       *peer;
395
396         list_for_each (tmp, peer_list) {
397
398                 peer = list_entry(tmp, kib_peer_t, ibp_list);
399
400                 LASSERT (peer->ibp_connecting > 0 || /* creating conns */
401                          peer->ibp_accepting > 0 ||
402                          !list_empty(&peer->ibp_conns));  /* active conn */
403
404                 if (peer->ibp_nid != nid)
405                         continue;
406
407                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
408                        peer, libcfs_nid2str(nid),
409                        atomic_read(&peer->ibp_refcount));
410                 return peer;
411         }
412         return NULL;
413 }
414
415 void
416 kiblnd_unlink_peer_locked (kib_peer_t *peer)
417 {
418         LASSERT (list_empty(&peer->ibp_conns));
419
420         LASSERT (kiblnd_peer_active(peer));
421         list_del_init(&peer->ibp_list);
422         /* lose peerlist's ref */
423         kiblnd_peer_decref(peer);
424 }
425
426 int
427 kiblnd_get_peer_info (lnet_ni_t *ni, int index, 
428                       lnet_nid_t *nidp, int *count)
429 {
430         kib_peer_t        *peer;
431         struct list_head  *ptmp;
432         int                i;
433         unsigned long      flags;
434
435         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
436
437         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
438
439                 list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
440
441                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
442                         LASSERT (peer->ibp_connecting > 0 ||
443                                  peer->ibp_accepting > 0 ||
444                                  !list_empty(&peer->ibp_conns));
445
446                         if (peer->ibp_ni != ni)
447                                 continue;
448
449                         if (index-- > 0)
450                                 continue;
451
452                         *nidp = peer->ibp_nid;
453                         *count = atomic_read(&peer->ibp_refcount);
454
455                         read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
456                                                flags);
457                         return 0;
458                 }
459         }
460
461         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
462         return -ENOENT;
463 }
464
465 void
466 kiblnd_del_peer_locked (kib_peer_t *peer)
467 {
468         struct list_head *ctmp;
469         struct list_head *cnxt;
470         kib_conn_t       *conn;
471
472         if (list_empty(&peer->ibp_conns)) {
473                 kiblnd_unlink_peer_locked(peer);
474         } else {
475                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
476                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
477
478                         kiblnd_close_conn_locked(conn, 0);
479                 }
480                 /* NB closing peer's last conn unlinked it. */
481         }
482         /* NB peer now unlinked; might even be freed if the peer table had the
483          * last ref on it. */
484 }
485
486 int
487 kiblnd_del_peer (lnet_ni_t *ni, lnet_nid_t nid)
488 {
489         CFS_LIST_HEAD     (zombies);
490         struct list_head  *ptmp;
491         struct list_head  *pnxt;
492         kib_peer_t        *peer;
493         int                lo;
494         int                hi;
495         int                i;
496         unsigned long      flags;
497         int                rc = -ENOENT;
498
499         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
500
501         if (nid != LNET_NID_ANY) {
502                 lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
503         } else {
504                 lo = 0;
505                 hi = kiblnd_data.kib_peer_hash_size - 1;
506         }
507
508         for (i = lo; i <= hi; i++) {
509                 list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
510                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
511                         LASSERT (peer->ibp_connecting > 0 ||
512                                  peer->ibp_accepting > 0 ||
513                                  !list_empty(&peer->ibp_conns));
514
515                         if (peer->ibp_ni != ni)
516                                 continue;
517
518                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
519                                 continue;
520
521                         if (!list_empty(&peer->ibp_tx_queue)) {
522                                 LASSERT (list_empty(&peer->ibp_conns));
523
524                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
525                         }
526
527                         kiblnd_del_peer_locked(peer);
528                         rc = 0;         /* matched something */
529                 }
530         }
531
532         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
533
534         kiblnd_txlist_done(ni, &zombies, -EIO);
535
536         return rc;
537 }
538
539 kib_conn_t *
540 kiblnd_get_conn_by_idx (lnet_ni_t *ni, int index)
541 {
542         kib_peer_t        *peer;
543         struct list_head  *ptmp;
544         kib_conn_t        *conn;
545         struct list_head  *ctmp;
546         int                i;
547         unsigned long      flags;
548
549         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
550
551         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
552                 list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
553
554                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
555                         LASSERT (peer->ibp_connecting > 0 ||
556                                  peer->ibp_accepting > 0 ||
557                                  !list_empty(&peer->ibp_conns));
558
559                         if (peer->ibp_ni != ni)
560                                 continue;
561
562                         list_for_each (ctmp, &peer->ibp_conns) {
563                                 if (index-- > 0)
564                                         continue;
565
566                                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
567                                 kiblnd_conn_addref(conn);
568                                 read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
569                                                        flags);
570                                 return conn;
571                         }
572                 }
573         }
574
575         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
576         return NULL;
577 }
578
579 void
580 kiblnd_debug_rx (kib_rx_t *rx)
581 {
582         CDEBUG(D_CONSOLE, "      %p status %d msg_type %x cred %d\n",
583                rx, rx->rx_status, rx->rx_msg->ibm_type,
584                rx->rx_msg->ibm_credits);
585 }
586
587 void
588 kiblnd_debug_tx (kib_tx_t *tx)
589 {
590         CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "
591                "cookie "LPX64" msg %s%s type %x cred %d\n",
592                tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,
593                tx->tx_status, tx->tx_deadline, tx->tx_cookie,
594                tx->tx_lntmsg[0] == NULL ? "-" : "!",
595                tx->tx_lntmsg[1] == NULL ? "-" : "!",
596                tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits);
597 }
598
599 void
600 kiblnd_debug_conn (kib_conn_t *conn)
601 {
602         struct list_head *tmp;
603         int               i;
604
605         spin_lock(&conn->ibc_lock);
606
607         CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n",
608                atomic_read(&conn->ibc_refcount), conn,
609                libcfs_nid2str(conn->ibc_peer->ibp_nid));
610         CDEBUG(D_CONSOLE, "   state %d nposted %d cred %d o_cred %d r_cred %d\n",
611                conn->ibc_state, conn->ibc_nsends_posted, conn->ibc_credits, 
612                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);
613         CDEBUG(D_CONSOLE, "   comms_err %d\n", conn->ibc_comms_error);
614
615         CDEBUG(D_CONSOLE, "   early_rxs:\n");
616         list_for_each(tmp, &conn->ibc_early_rxs)
617                 kiblnd_debug_rx(list_entry(tmp, kib_rx_t, rx_list));
618
619         CDEBUG(D_CONSOLE, "   tx_noops:\n");
620         list_for_each(tmp, &conn->ibc_tx_noops)
621                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
622
623         CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");
624         list_for_each(tmp, &conn->ibc_tx_queue_nocred)
625                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
626
627         CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");
628         list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)
629                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
630
631         CDEBUG(D_CONSOLE, "   tx_queue:\n");
632         list_for_each(tmp, &conn->ibc_tx_queue)
633                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
634
635         CDEBUG(D_CONSOLE, "   active_txs:\n");
636         list_for_each(tmp, &conn->ibc_active_txs)
637                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
638
639         CDEBUG(D_CONSOLE, "   rxs:\n");
640         for (i = 0; i < IBLND_RX_MSGS; i++)
641                 kiblnd_debug_rx(&conn->ibc_rxs[i]);
642
643         spin_unlock(&conn->ibc_lock);
644 }
645
646 kib_conn_t *
647 kiblnd_create_conn (kib_peer_t *peer, struct rdma_cm_id *cmid, int state)
648 {
649         /* CAVEAT EMPTOR:
650          * If the new conn is created successfully it takes over the caller's
651          * ref on 'peer'.  It also "owns" 'cmid' and destroys it when it itself
652          * is destroyed.  On failure, the caller's ref on 'peer' remains and
653          * she must dispose of 'cmid'.  (Actually I'd block forever if I tried
654          * to destroy 'cmid' here since I'm called from the CM which still has
655          * its ref on 'cmid'). */
656         kib_conn_t             *conn;
657         kib_net_t              *net = peer->ibp_ni->ni_data;
658         int                     i;
659         int                     page_offset;
660         int                     ipage;
661         int                     rc;
662         struct ib_cq           *cq;
663         struct ib_qp_init_attr *init_qp_attr;
664         unsigned long           flags;
665
666         LASSERT (net != NULL);
667         LASSERT (!in_interrupt());
668
669         LIBCFS_ALLOC(init_qp_attr, sizeof(*init_qp_attr));
670         if (init_qp_attr == NULL) {
671                 CERROR("Can't allocate qp_attr for %s\n",
672                        libcfs_nid2str(peer->ibp_nid));
673                 goto failed_0;
674         }
675
676         LIBCFS_ALLOC(conn, sizeof(*conn));
677         if (conn == NULL) {
678                 CERROR("Can't allocate connection for %s\n",
679                        libcfs_nid2str(peer->ibp_nid));
680                 goto failed_1;
681         }
682
683         memset(conn, 0, sizeof(*conn)); /* zero flags, NULL pointers etc... */
684
685         conn->ibc_state = IBLND_CONN_INIT;
686         conn->ibc_peer = peer;                  /* I take the caller's ref */
687         cmid->context = conn;                   /* for future CM callbacks */
688         conn->ibc_cmid = cmid;
689
690         INIT_LIST_HEAD(&conn->ibc_early_rxs);
691         INIT_LIST_HEAD(&conn->ibc_tx_noops);
692         INIT_LIST_HEAD(&conn->ibc_tx_queue);
693         INIT_LIST_HEAD(&conn->ibc_tx_queue_rsrvd);
694         INIT_LIST_HEAD(&conn->ibc_tx_queue_nocred);
695         INIT_LIST_HEAD(&conn->ibc_active_txs);
696         spin_lock_init(&conn->ibc_lock);
697
698         LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
699         if (conn->ibc_connvars == NULL) {
700                 CERROR("Can't allocate in-progress connection state\n");
701                 goto failed_2;
702         }
703         memset(conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));
704
705         LIBCFS_ALLOC(conn->ibc_rxs, IBLND_RX_MSGS * sizeof(kib_rx_t));
706         if (conn->ibc_rxs == NULL) {
707                 CERROR("Cannot allocate RX buffers\n");
708                 goto failed_2;
709         }
710         memset(conn->ibc_rxs, 0, IBLND_RX_MSGS * sizeof(kib_rx_t));
711
712         rc = kiblnd_alloc_pages(&conn->ibc_rx_pages, IBLND_RX_MSG_PAGES);
713         if (rc != 0)
714                 goto failed_2;
715
716         for (i = ipage = page_offset = 0; i < IBLND_RX_MSGS; i++) {
717                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
718                 kib_rx_t    *rx = &conn->ibc_rxs[i];
719
720                 rx->rx_conn = conn;
721                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) +
722                                            page_offset);
723                 rx->rx_msgaddr = kiblnd_dma_map_single(cmid->device,
724                                                        rx->rx_msg, IBLND_MSG_SIZE,
725                                                        DMA_FROM_DEVICE);
726                 KIBLND_UNMAP_ADDR_SET(rx, rx_msgunmap, rx->rx_msgaddr);
727
728                 CDEBUG(D_NET,"rx %d: %p "LPX64"("LPX64")\n",
729                        i, rx->rx_msg, rx->rx_msgaddr,
730                        lnet_page2phys(page) + page_offset);
731
732                 page_offset += IBLND_MSG_SIZE;
733                 LASSERT (page_offset <= PAGE_SIZE);
734
735                 if (page_offset == PAGE_SIZE) {
736                         page_offset = 0;
737                         ipage++;
738                         LASSERT (ipage <= IBLND_RX_MSG_PAGES);
739                 }
740         }
741
742 #ifdef HAVE_OFED_IB_COMP_VECTOR
743         cq = ib_create_cq(cmid->device,
744                           kiblnd_cq_completion, kiblnd_cq_event, conn,
745                           IBLND_CQ_ENTRIES(), 0);
746 #else
747         cq = ib_create_cq(cmid->device,
748                           kiblnd_cq_completion, kiblnd_cq_event, conn,
749                           IBLND_CQ_ENTRIES());
750 #endif
751         if (!IS_ERR(cq)) {
752                 conn->ibc_cq = cq;
753         } else {
754                 CERROR("Can't create CQ: %ld\n", PTR_ERR(cq));
755                 goto failed_2;
756         }
757
758         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
759         if (rc != 0) {
760                 CERROR("Can't request completion notificiation: %d\n", rc);
761                 goto failed_2;
762         }
763
764         memset(init_qp_attr, 0, sizeof(*init_qp_attr));
765         init_qp_attr->event_handler = kiblnd_qp_event;
766         init_qp_attr->qp_context = conn;
767         init_qp_attr->cap.max_send_wr = IBLND_SEND_WRS;
768         init_qp_attr->cap.max_recv_wr = IBLND_RECV_WRS;
769         init_qp_attr->cap.max_send_sge = 1;
770         init_qp_attr->cap.max_recv_sge = 1;
771         init_qp_attr->sq_sig_type = IB_SIGNAL_REQ_WR;
772         init_qp_attr->qp_type = IB_QPT_RC;
773         init_qp_attr->send_cq = cq;
774         init_qp_attr->recv_cq = cq;
775
776         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
777         switch (*kiblnd_tunables.kib_ib_mtu) {
778         case 0: /* don't force path MTU */
779                 break;
780         case 256:
781                 cmid->route.path_rec->mtu = IB_MTU_256;
782                 break;
783         case 512:
784                 cmid->route.path_rec->mtu = IB_MTU_512;
785                 break;
786         case 1024:
787                 cmid->route.path_rec->mtu = IB_MTU_1024;
788                 break;
789         case 2048:
790                 cmid->route.path_rec->mtu = IB_MTU_2048;
791                 break;
792         case 4096:
793                 cmid->route.path_rec->mtu = IB_MTU_4096;
794                 break;
795         default:
796                 LBUG();
797                 break;
798         }
799         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
800
801         rc = rdma_create_qp(cmid, net->ibn_dev->ibd_pd, init_qp_attr);
802         if (rc != 0) {
803                 CERROR("Can't create QP: %d\n", rc);
804                 goto failed_2;
805         }
806
807         LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
808
809         /* 1 ref for caller and each rxmsg */
810         atomic_set(&conn->ibc_refcount, 1 + IBLND_RX_MSGS);
811         conn->ibc_nrx = IBLND_RX_MSGS;
812
813         /* post receives */
814         for (i = 0; i < IBLND_RX_MSGS; i++) {
815                 rc = kiblnd_post_rx(&conn->ibc_rxs[i],
816                                     IBLND_POSTRX_NO_CREDIT);
817                 if (rc != 0) {
818                         CERROR("Can't post rxmsg: %d\n", rc);
819
820                         /* Make posted receives complete */
821                         kiblnd_abort_receives(conn);
822
823                         /* correct # of posted buffers 
824                          * NB locking needed now I'm racing with completion */
825                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
826                         conn->ibc_nrx -= IBLND_RX_MSGS - i;
827                         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
828                                                flags);
829
830                         /* Drop my own and unused rxbuffer refcounts */
831                         while (i++ <= IBLND_RX_MSGS)
832                                 kiblnd_conn_decref(conn);
833
834                         return NULL;
835                 }
836         }
837         
838         /* Init successful! */
839         LASSERT (state == IBLND_CONN_ACTIVE_CONNECT ||
840                  state == IBLND_CONN_PASSIVE_WAIT);
841         conn->ibc_state = state;
842
843         /* 1 more conn */
844         atomic_inc(&net->ibn_nconns);
845         return conn;
846
847  failed_2:
848         kiblnd_destroy_conn(conn);
849  failed_1:
850         LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
851  failed_0:
852         return NULL;
853 }
854
855 void
856 kiblnd_destroy_conn (kib_conn_t *conn)
857 {
858         struct rdma_cm_id *cmid = conn->ibc_cmid;
859         kib_peer_t        *peer = conn->ibc_peer;
860         int                rc;
861         int                i;
862
863         LASSERT (!in_interrupt());
864         LASSERT (atomic_read(&conn->ibc_refcount) == 0);
865         LASSERT (list_empty(&conn->ibc_early_rxs));
866         LASSERT (list_empty(&conn->ibc_tx_noops));
867         LASSERT (list_empty(&conn->ibc_tx_queue));
868         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
869         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
870         LASSERT (list_empty(&conn->ibc_active_txs));
871         LASSERT (conn->ibc_nsends_posted == 0);
872
873         switch (conn->ibc_state) {
874         default:
875                 /* conn must be completely disengaged from the network */
876                 LBUG();
877
878         case IBLND_CONN_DISCONNECTED:
879                 /* connvars should have been freed already */
880                 LASSERT (conn->ibc_connvars == NULL);
881                 break;
882
883         case IBLND_CONN_INIT:
884                 break;
885         }
886
887         if (cmid->qp != NULL)
888                 rdma_destroy_qp(cmid);
889
890         if (conn->ibc_cq != NULL) {
891                 rc = ib_destroy_cq(conn->ibc_cq);
892                 if (rc != 0)
893                         CWARN("Error destroying CQ: %d\n", rc);
894         }
895
896         if (conn->ibc_rx_pages != NULL) {
897                 LASSERT (conn->ibc_rxs != NULL);
898
899                 for (i = 0; i < IBLND_RX_MSGS; i++) {
900                         kib_rx_t *rx = &conn->ibc_rxs[i];
901
902                         LASSERT (rx->rx_nob >= 0); /* not posted */
903
904                         kiblnd_dma_unmap_single(cmid->device,
905                                                 KIBLND_UNMAP_ADDR(rx, rx_msgunmap,
906                                                                   rx->rx_msgaddr),
907                                                 IBLND_MSG_SIZE, DMA_FROM_DEVICE);
908                 }
909
910                 kiblnd_free_pages(conn->ibc_rx_pages);
911         }
912
913         if (conn->ibc_rxs != NULL) {
914                 LIBCFS_FREE(conn->ibc_rxs,
915                             IBLND_RX_MSGS * sizeof(kib_rx_t));
916         }
917
918         if (conn->ibc_connvars != NULL)
919                 LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
920
921         /* See CAVEAT EMPTOR above in kiblnd_create_conn */
922         if (conn->ibc_state != IBLND_CONN_INIT) {
923                 kib_net_t *net = peer->ibp_ni->ni_data;
924
925                 kiblnd_peer_decref(peer);
926                 rdma_destroy_id(cmid);
927                 atomic_dec(&net->ibn_nconns);
928         }
929
930         LIBCFS_FREE(conn, sizeof(*conn));
931 }
932
933 int
934 kiblnd_close_peer_conns_locked (kib_peer_t *peer, int why)
935 {
936         kib_conn_t         *conn;
937         struct list_head   *ctmp;
938         struct list_head   *cnxt;
939         int                 count = 0;
940
941         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
942                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
943
944                 count++;
945                 kiblnd_close_conn_locked(conn, why);
946         }
947
948         return count;
949 }
950
951 int
952 kiblnd_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
953 {
954         kib_conn_t         *conn;
955         struct list_head   *ctmp;
956         struct list_head   *cnxt;
957         int                 count = 0;
958
959         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
960                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
961
962                 if (conn->ibc_incarnation == incarnation)
963                         continue;
964
965                 CDEBUG(D_NET, "Closing stale conn -> %s incarnation:"LPX64"("LPX64")\n",
966                        libcfs_nid2str(peer->ibp_nid),
967                        conn->ibc_incarnation, incarnation);
968
969                 count++;
970                 kiblnd_close_conn_locked(conn, -ESTALE);
971         }
972
973         return count;
974 }
975
976 int
977 kiblnd_close_matching_conns (lnet_ni_t *ni, lnet_nid_t nid)
978 {
979         kib_peer_t         *peer;
980         struct list_head   *ptmp;
981         struct list_head   *pnxt;
982         int                 lo;
983         int                 hi;
984         int                 i;
985         unsigned long       flags;
986         int                 count = 0;
987
988         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
989
990         if (nid != LNET_NID_ANY)
991                 lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
992         else {
993                 lo = 0;
994                 hi = kiblnd_data.kib_peer_hash_size - 1;
995         }
996
997         for (i = lo; i <= hi; i++) {
998                 list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
999
1000                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
1001                         LASSERT (peer->ibp_connecting > 0 ||
1002                                  peer->ibp_accepting > 0 ||
1003                                  !list_empty(&peer->ibp_conns));
1004
1005                         if (peer->ibp_ni != ni)
1006                                 continue;
1007
1008                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1009                                 continue;
1010
1011                         count += kiblnd_close_peer_conns_locked(peer, 0);
1012                 }
1013         }
1014
1015         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1016
1017         /* wildcards always succeed */
1018         if (nid == LNET_NID_ANY)
1019                 return 0;
1020
1021         return (count == 0) ? -ENOENT : 0;
1022 }
1023
1024 int
1025 kiblnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1026 {
1027         struct libcfs_ioctl_data *data = arg;
1028         int                       rc = -EINVAL;
1029
1030         switch(cmd) {
1031         case IOC_LIBCFS_GET_PEER: {
1032                 lnet_nid_t   nid = 0;
1033                 int          count = 0;
1034
1035                 rc = kiblnd_get_peer_info(ni, data->ioc_count,
1036                                           &nid, &count);
1037                 data->ioc_nid    = nid;
1038                 data->ioc_count  = count;
1039                 break;
1040         }
1041
1042         case IOC_LIBCFS_DEL_PEER: {
1043                 rc = kiblnd_del_peer(ni, data->ioc_nid);
1044                 break;
1045         }
1046         case IOC_LIBCFS_GET_CONN: {
1047                 kib_conn_t *conn;
1048
1049                 rc = 0;
1050                 conn = kiblnd_get_conn_by_idx(ni, data->ioc_count);
1051                 if (conn == NULL) {
1052                         rc = -ENOENT;
1053                         break;
1054                 }
1055
1056                 LASSERT (conn->ibc_cmid != NULL);
1057                 data->ioc_nid = conn->ibc_peer->ibp_nid;
1058                 if (conn->ibc_cmid->route.path_rec == NULL)
1059                         data->ioc_u32[0] = 0; /* iWarp has no path MTU */
1060                 else
1061                         data->ioc_u32[0] =
1062                         ib_mtu_enum_to_int(conn->ibc_cmid->route.path_rec->mtu);
1063                 kiblnd_conn_decref(conn);
1064                 break;
1065         }
1066         case IOC_LIBCFS_CLOSE_CONNECTION: {
1067                 rc = kiblnd_close_matching_conns(ni, data->ioc_nid);
1068                 break;
1069         }
1070
1071         default:
1072                 break;
1073         }
1074
1075         return rc;
1076 }
1077
1078 void
1079 kiblnd_query (lnet_ni_t *ni, lnet_nid_t nid, time_t *when)
1080 {
1081         cfs_time_t     last_alive = 0;
1082         rwlock_t      *glock = &kiblnd_data.kib_global_lock;
1083         kib_peer_t    *peer;
1084         unsigned long  flags;
1085
1086         read_lock_irqsave(glock, flags);
1087
1088         peer = kiblnd_find_peer_locked(nid);
1089         if (peer != NULL) {
1090                 LASSERT (peer->ibp_connecting > 0 || /* creating conns */
1091                          peer->ibp_accepting > 0 ||
1092                          !list_empty(&peer->ibp_conns));  /* active conn */
1093                 last_alive = peer->ibp_last_alive;
1094         }
1095
1096         read_unlock_irqrestore(glock, flags);
1097
1098         if (last_alive != 0)
1099                 *when = cfs_time_current_sec() -
1100                         cfs_duration_sec(cfs_time_current() - last_alive);
1101
1102         /* peer is not persistent in hash, trigger peer creation
1103          * and connection establishment with a NULL tx */
1104         if (peer == NULL)
1105                 kiblnd_launch_tx(ni, NULL, nid);
1106         return;
1107 }
1108
1109 void
1110 kiblnd_free_pages (kib_pages_t *p)
1111 {
1112         int         npages = p->ibp_npages;
1113         int         i;
1114
1115         for (i = 0; i < npages; i++)
1116                 if (p->ibp_pages[i] != NULL)
1117                         __free_page(p->ibp_pages[i]);
1118
1119         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1120 }
1121
1122 int
1123 kiblnd_alloc_pages (kib_pages_t **pp, int npages)
1124 {
1125         kib_pages_t   *p;
1126         int            i;
1127
1128         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1129         if (p == NULL) {
1130                 CERROR("Can't allocate descriptor for %d pages\n", npages);
1131                 return -ENOMEM;
1132         }
1133
1134         memset(p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1135         p->ibp_npages = npages;
1136
1137         for (i = 0; i < npages; i++) {
1138                 p->ibp_pages[i] = alloc_page(GFP_KERNEL);
1139                 if (p->ibp_pages[i] == NULL) {
1140                         CERROR("Can't allocate page %d of %d\n", i, npages);
1141                         kiblnd_free_pages(p);
1142                         return -ENOMEM;
1143                 }
1144         }
1145
1146         *pp = p;
1147         return 0;
1148 }
1149
1150 void
1151 kiblnd_free_tx_descs (lnet_ni_t *ni)
1152 {
1153         int        i;
1154         kib_net_t *net = ni->ni_data;
1155
1156         LASSERT (net != NULL);
1157
1158         if (net->ibn_tx_descs != NULL) {
1159                 for (i = 0; i < IBLND_TX_MSGS(); i++) {
1160                         kib_tx_t *tx = &net->ibn_tx_descs[i];
1161
1162 #if IBLND_MAP_ON_DEMAND
1163                         if (tx->tx_pages != NULL)
1164                                 LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1165                                             sizeof(*tx->tx_pages));
1166 #else
1167                         if (tx->tx_wrq != NULL)
1168                                 LIBCFS_FREE(tx->tx_wrq, 
1169                                             (1 + IBLND_MAX_RDMA_FRAGS) * 
1170                                             sizeof(*tx->tx_wrq));
1171
1172                         if (tx->tx_sge != NULL)
1173                                 LIBCFS_FREE(tx->tx_sge, 
1174                                             (1 + IBLND_MAX_RDMA_FRAGS) * 
1175                                             sizeof(*tx->tx_sge));
1176
1177                         if (tx->tx_rd != NULL)
1178                                 LIBCFS_FREE(tx->tx_rd, 
1179                                             offsetof(kib_rdma_desc_t, 
1180                                                rd_frags[IBLND_MAX_RDMA_FRAGS]));
1181
1182                         if (tx->tx_frags != NULL)
1183                                 LIBCFS_FREE(tx->tx_frags, 
1184                                             IBLND_MAX_RDMA_FRAGS *
1185                                             sizeof(*tx->tx_frags));
1186 #endif
1187                 }
1188
1189                 LIBCFS_FREE(net->ibn_tx_descs,
1190                             IBLND_TX_MSGS() * sizeof(kib_tx_t));
1191         }
1192
1193         if (net->ibn_tx_pages != NULL)
1194                 kiblnd_free_pages(net->ibn_tx_pages);
1195 }
1196
1197 int
1198 kiblnd_alloc_tx_descs (lnet_ni_t *ni)
1199 {
1200         int        i;
1201         int        rc;
1202         kib_net_t *net = ni->ni_data;
1203
1204         LASSERT (net != NULL);
1205
1206         rc = kiblnd_alloc_pages(&net->ibn_tx_pages, IBLND_TX_MSG_PAGES());
1207
1208         if (rc != 0) {
1209                 CERROR("Can't allocate tx pages\n");
1210                 return rc;
1211         }
1212
1213         LIBCFS_ALLOC (net->ibn_tx_descs,
1214                       IBLND_TX_MSGS() * sizeof(kib_tx_t));
1215         if (net->ibn_tx_descs == NULL) {
1216                 CERROR("Can't allocate %d tx descriptors\n", IBLND_TX_MSGS());
1217                 return -ENOMEM;
1218         }
1219
1220         memset(net->ibn_tx_descs, 0,
1221                IBLND_TX_MSGS() * sizeof(kib_tx_t));
1222
1223         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1224                 kib_tx_t *tx = &net->ibn_tx_descs[i];
1225
1226 #if IBLND_MAP_ON_DEMAND
1227                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1228                              sizeof(*tx->tx_pages));
1229                 if (tx->tx_pages == NULL) {
1230                         CERROR("Can't allocate phys page vector[%d]\n",
1231                                LNET_MAX_IOV);
1232                         return -ENOMEM;
1233                 }
1234 #else
1235                 LIBCFS_ALLOC(tx->tx_wrq,
1236                              (1 + IBLND_MAX_RDMA_FRAGS) *
1237                              sizeof(*tx->tx_wrq));
1238                 if (tx->tx_wrq == NULL)
1239                         return -ENOMEM;
1240
1241                 LIBCFS_ALLOC(tx->tx_sge,
1242                              (1 + IBLND_MAX_RDMA_FRAGS) *
1243                              sizeof(*tx->tx_sge));
1244                 if (tx->tx_sge == NULL)
1245                         return -ENOMEM;
1246
1247                 LIBCFS_ALLOC(tx->tx_rd,
1248                              offsetof(kib_rdma_desc_t,
1249                                       rd_frags[IBLND_MAX_RDMA_FRAGS]));
1250                 if (tx->tx_rd == NULL)
1251                         return -ENOMEM;
1252
1253                 LIBCFS_ALLOC(tx->tx_frags,
1254                              IBLND_MAX_RDMA_FRAGS * 
1255                              sizeof(*tx->tx_frags));
1256                 if (tx->tx_frags == NULL)
1257                         return -ENOMEM;
1258 #endif
1259         }
1260
1261         return 0;
1262 }
1263
1264 void
1265 kiblnd_unmap_tx_descs (lnet_ni_t *ni)
1266 {
1267         int             i;
1268         kib_tx_t       *tx;
1269         kib_net_t      *net = ni->ni_data;
1270
1271         LASSERT (net != NULL);
1272
1273         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1274                 tx = &net->ibn_tx_descs[i];
1275
1276                 kiblnd_dma_unmap_single(net->ibn_dev->ibd_cmid->device,
1277                                         KIBLND_UNMAP_ADDR(tx, tx_msgunmap,
1278                                                           tx->tx_msgaddr),
1279                                         IBLND_MSG_SIZE, DMA_TO_DEVICE);
1280         }
1281 }
1282
1283 void
1284 kiblnd_map_tx_descs (lnet_ni_t *ni)
1285 {
1286         int             ipage = 0;
1287         int             page_offset = 0;
1288         int             i;
1289         struct page    *page;
1290         kib_tx_t       *tx;
1291         kib_net_t      *net = ni->ni_data;
1292
1293         LASSERT (net != NULL);
1294
1295         /* pre-mapped messages are not bigger than 1 page */
1296         CLASSERT (IBLND_MSG_SIZE <= PAGE_SIZE);
1297
1298         /* No fancy arithmetic when we do the buffer calculations */
1299         CLASSERT (PAGE_SIZE % IBLND_MSG_SIZE == 0);
1300
1301         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1302                 page = net->ibn_tx_pages->ibp_pages[ipage];
1303                 tx = &net->ibn_tx_descs[i];
1304
1305                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1306                                            page_offset);
1307
1308                 tx->tx_msgaddr = kiblnd_dma_map_single(
1309                         net->ibn_dev->ibd_cmid->device,
1310                         tx->tx_msg, IBLND_MSG_SIZE, DMA_TO_DEVICE);
1311                 KIBLND_UNMAP_ADDR_SET(tx, tx_msgunmap, tx->tx_msgaddr);
1312
1313                 list_add(&tx->tx_list, &net->ibn_idle_txs);
1314
1315                 page_offset += IBLND_MSG_SIZE;
1316                 LASSERT (page_offset <= PAGE_SIZE);
1317
1318                 if (page_offset == PAGE_SIZE) {
1319                         page_offset = 0;
1320                         ipage++;
1321                         LASSERT (ipage <= IBLND_TX_MSG_PAGES());
1322                 }
1323         }
1324 }
1325
1326 void
1327 kiblnd_base_shutdown (void)
1328 {
1329         int i;
1330
1331         LASSERT (list_empty(&kiblnd_data.kib_devs));
1332
1333         CDEBUG(D_MALLOC, "before LND base cleanup: kmem %d\n",
1334                atomic_read(&libcfs_kmemory));
1335
1336         switch (kiblnd_data.kib_init) {
1337         default:
1338                 LBUG();
1339
1340         case IBLND_INIT_ALL:
1341         case IBLND_INIT_DATA:
1342                 LASSERT (kiblnd_data.kib_peers != NULL);
1343                 for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
1344                         LASSERT (list_empty(&kiblnd_data.kib_peers[i]));
1345                 }
1346                 LASSERT (list_empty(&kiblnd_data.kib_connd_zombies));
1347                 LASSERT (list_empty(&kiblnd_data.kib_connd_conns));
1348
1349                 /* flag threads to terminate; wake and wait for them to die */
1350                 kiblnd_data.kib_shutdown = 1;
1351                 wake_up_all(&kiblnd_data.kib_sched_waitq);
1352                 wake_up_all(&kiblnd_data.kib_connd_waitq);
1353
1354                 i = 2;
1355                 while (atomic_read(&kiblnd_data.kib_nthreads) != 0) {
1356                         i++;
1357                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1358                                "Waiting for %d threads to terminate\n",
1359                                atomic_read(&kiblnd_data.kib_nthreads));
1360                         cfs_pause(cfs_time_seconds(1));
1361                 }
1362
1363                 /* fall through */
1364
1365         case IBLND_INIT_NOTHING:
1366                 break;
1367         }
1368
1369         if (kiblnd_data.kib_peers != NULL)
1370                 LIBCFS_FREE(kiblnd_data.kib_peers,
1371                             sizeof(struct list_head) *
1372                             kiblnd_data.kib_peer_hash_size);
1373
1374         CDEBUG(D_MALLOC, "after LND base cleanup: kmem %d\n",
1375                atomic_read(&libcfs_kmemory));
1376
1377         kiblnd_data.kib_init = IBLND_INIT_NOTHING;
1378         PORTAL_MODULE_UNUSE;
1379 }
1380
1381 void
1382 kiblnd_shutdown (lnet_ni_t *ni)
1383 {
1384         kib_net_t        *net = ni->ni_data;
1385         rwlock_t         *g_lock = &kiblnd_data.kib_global_lock;
1386         int               i;
1387         unsigned long     flags;
1388
1389         LASSERT(kiblnd_data.kib_init == IBLND_INIT_ALL);
1390
1391         if (net == NULL)
1392                 goto out;
1393
1394         CDEBUG(D_MALLOC, "before LND net cleanup: kmem %d\n",
1395                atomic_read(&libcfs_kmemory));
1396
1397         write_lock_irqsave(g_lock, flags);
1398         net->ibn_shutdown = 1;
1399         write_unlock_irqrestore(g_lock, flags);
1400
1401         switch (net->ibn_init) {
1402         default:
1403                 LBUG();
1404
1405         case IBLND_INIT_ALL:
1406                 /* nuke all existing peers within this net */
1407                 kiblnd_del_peer(ni, LNET_NID_ANY);
1408
1409                 /* Wait for all peer state to clean up */
1410                 i = 2;
1411                 while (atomic_read(&net->ibn_npeers) != 0) {
1412                         i++;
1413                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
1414                                "%s: waiting for %d peers to disconnect\n",
1415                                libcfs_nid2str(ni->ni_nid),
1416                                atomic_read(&net->ibn_npeers));
1417                         cfs_pause(cfs_time_seconds(1));
1418                 }
1419
1420                 kiblnd_unmap_tx_descs(ni);
1421
1422                 LASSERT (net->ibn_dev->ibd_nnets > 0);
1423                 net->ibn_dev->ibd_nnets--;
1424
1425                 /* fall through */
1426
1427         case IBLND_INIT_NOTHING:
1428                 LASSERT (atomic_read(&net->ibn_nconns) == 0);
1429
1430 #if IBLND_MAP_ON_DEMAND
1431                 if (net->ibn_fmrpool != NULL)
1432                         ib_destroy_fmr_pool(net->ibn_fmrpool);
1433 #endif
1434                 if (net->ibn_dev != NULL &&
1435                     net->ibn_dev->ibd_nnets == 0)
1436                         kiblnd_destroy_dev(net->ibn_dev);
1437
1438                 break;
1439         }
1440
1441         kiblnd_free_tx_descs(ni);
1442
1443         CDEBUG(D_MALLOC, "after LND net cleanup: kmem %d\n",
1444                atomic_read(&libcfs_kmemory));
1445
1446         net->ibn_init = IBLND_INIT_NOTHING;
1447         ni->ni_data = NULL;
1448         
1449         LIBCFS_FREE(net, sizeof(*net));
1450
1451 out:
1452         if (list_empty(&kiblnd_data.kib_devs))
1453                 kiblnd_base_shutdown();
1454         return;
1455 }
1456
1457 int
1458 kiblnd_base_startup (void)
1459 {
1460         int i;
1461         int rc;
1462
1463         LASSERT (kiblnd_data.kib_init == IBLND_INIT_NOTHING);
1464
1465         PORTAL_MODULE_USE;
1466         memset(&kiblnd_data, 0, sizeof(kiblnd_data)); /* zero pointers, flags etc */
1467
1468         rwlock_init(&kiblnd_data.kib_global_lock);
1469
1470         INIT_LIST_HEAD(&kiblnd_data.kib_devs);
1471
1472         kiblnd_data.kib_peer_hash_size = IBLND_PEER_HASH_SIZE;
1473         LIBCFS_ALLOC(kiblnd_data.kib_peers,
1474                      sizeof(struct list_head) * kiblnd_data.kib_peer_hash_size);
1475         if (kiblnd_data.kib_peers == NULL) {
1476                 goto failed;
1477         }
1478         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++)
1479                 INIT_LIST_HEAD(&kiblnd_data.kib_peers[i]);
1480
1481         spin_lock_init(&kiblnd_data.kib_connd_lock);
1482         INIT_LIST_HEAD(&kiblnd_data.kib_connd_conns);
1483         INIT_LIST_HEAD(&kiblnd_data.kib_connd_zombies);
1484         init_waitqueue_head(&kiblnd_data.kib_connd_waitq);
1485
1486         spin_lock_init(&kiblnd_data.kib_sched_lock);
1487         INIT_LIST_HEAD(&kiblnd_data.kib_sched_conns);
1488         init_waitqueue_head(&kiblnd_data.kib_sched_waitq);
1489
1490         kiblnd_data.kib_error_qpa.qp_state = IB_QPS_ERR;
1491
1492         /* lists/ptrs/locks initialised */
1493         kiblnd_data.kib_init = IBLND_INIT_DATA;
1494         /*****************************************************/
1495
1496         for (i = 0; i < IBLND_N_SCHED; i++) {
1497                 rc = kiblnd_thread_start(kiblnd_scheduler, (void *)((long)i));
1498                 if (rc != 0) {
1499                         CERROR("Can't spawn o2iblnd scheduler[%d]: %d\n",
1500                                i, rc);
1501                         goto failed;
1502                 }
1503         }
1504
1505         rc = kiblnd_thread_start(kiblnd_connd, NULL);
1506         if (rc != 0) {
1507                 CERROR("Can't spawn o2iblnd connd: %d\n", rc);
1508                 goto failed;
1509         }
1510
1511         /* flag everything initialised */
1512         kiblnd_data.kib_init = IBLND_INIT_ALL;
1513         /*****************************************************/
1514
1515         return 0;
1516
1517  failed:
1518         kiblnd_base_shutdown();
1519         return -ENETDOWN;
1520 }
1521
1522 int
1523 kiblnd_startup (lnet_ni_t *ni)
1524 {
1525         char                     *ifname;
1526         kib_net_t                *net;
1527         kib_dev_t                *ibdev;
1528         struct list_head         *tmp;
1529         struct timeval            tv;
1530         int                       rc;
1531
1532         LASSERT (ni->ni_lnd == &the_kiblnd);
1533
1534         if (kiblnd_data.kib_init == IBLND_INIT_NOTHING) {
1535                 rc = kiblnd_base_startup();
1536                 if (rc != 0)
1537                         return rc;
1538         }
1539
1540         LIBCFS_ALLOC(net, sizeof(*net));
1541         ni->ni_data = net;
1542         if (net == NULL)
1543                 goto failed;
1544
1545         memset(net, 0, sizeof(*net));
1546
1547         do_gettimeofday(&tv);
1548         net->ibn_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1549
1550         ni->ni_maxtxcredits = *kiblnd_tunables.kib_credits;
1551         ni->ni_peertxcredits = *kiblnd_tunables.kib_peercredits;
1552         ni->ni_peertimeout = *kiblnd_tunables.kib_peertimeout;
1553
1554         spin_lock_init(&net->ibn_tx_lock);
1555         INIT_LIST_HEAD(&net->ibn_idle_txs);
1556
1557         rc = kiblnd_alloc_tx_descs(ni);
1558         if (rc != 0) {
1559                 CERROR("Can't allocate tx descs\n");
1560                 goto failed;
1561         }
1562
1563         if (ni->ni_interfaces[0] != NULL) {
1564                 /* Use the IPoIB interface specified in 'networks=' */
1565
1566                 CLASSERT (LNET_MAX_INTERFACES > 1);
1567                 if (ni->ni_interfaces[1] != NULL) {
1568                         CERROR("Multiple interfaces not supported\n");
1569                         goto failed;
1570                 }
1571
1572                 ifname = ni->ni_interfaces[0];
1573         } else {
1574                 ifname = *kiblnd_tunables.kib_default_ipif;
1575         }
1576
1577         if (strlen(ifname) >= sizeof(ibdev->ibd_ifname)) {
1578                 CERROR("IPoIB interface name too long: %s\n", ifname);
1579                 goto failed;
1580         }
1581
1582         ibdev = NULL;
1583         list_for_each (tmp, &kiblnd_data.kib_devs) {
1584                 ibdev = list_entry(tmp, kib_dev_t, ibd_list);
1585
1586                 if (!strcmp(&ibdev->ibd_ifname[0], ifname))
1587                         break;
1588
1589                 ibdev = NULL;
1590         }
1591
1592         if (ibdev == NULL) {
1593                 __u32                     ip;
1594                 __u32                     netmask;
1595                 int                       up;
1596                 struct rdma_cm_id        *id;
1597                 struct ib_pd             *pd;
1598                 struct ib_mr             *mr;
1599                 struct sockaddr_in        addr;
1600
1601                 rc = libcfs_ipif_query(ifname, &up, &ip, &netmask);
1602                 if (rc != 0) {
1603                         CERROR("Can't query IPoIB interface %s: %d\n",
1604                                ifname, rc);
1605                         goto failed;
1606                 }
1607
1608                 if (!up) {
1609                         CERROR("Can't query IPoIB interface %s: it's down\n",
1610                                ifname);
1611                         goto failed;
1612                 }
1613
1614                 LIBCFS_ALLOC(ibdev, sizeof(*ibdev));
1615                 if (ibdev == NULL)
1616                         goto failed;
1617
1618                 memset(ibdev, 0, sizeof(*ibdev));
1619
1620                 INIT_LIST_HEAD(&ibdev->ibd_list); /* not yet in kib_devs */
1621                 ibdev->ibd_ifip = ip;
1622                 strcpy(&ibdev->ibd_ifname[0], ifname);
1623
1624                 id = rdma_create_id(kiblnd_cm_callback, ibdev, RDMA_PS_TCP);
1625                 if (!IS_ERR(id)) {
1626                         ibdev->ibd_cmid = id;
1627                 } else {
1628                         CERROR("Can't create listen ID: %ld\n", PTR_ERR(id));
1629                         goto failed;
1630                 }
1631
1632                 memset(&addr, 0, sizeof(addr));
1633                 addr.sin_family      = AF_INET;
1634                 addr.sin_port        = htons(*kiblnd_tunables.kib_service);
1635                 addr.sin_addr.s_addr = htonl(ip);
1636
1637                 rc = rdma_bind_addr(id, (struct sockaddr *)&addr);
1638                 if (rc != 0) {
1639                         CERROR("Can't bind to %s: %d\n", ifname, rc);
1640                         goto failed;
1641                 }
1642
1643                 /* Binding should have assigned me an IB device */
1644                 LASSERT (id->device != NULL);
1645
1646                 pd = ib_alloc_pd(id->device);
1647                 if (!IS_ERR(pd)) {
1648                         ibdev->ibd_pd = pd;
1649                 } else {
1650                         CERROR("Can't allocate PD: %ld\n", PTR_ERR(pd));
1651                         goto failed;
1652                 }
1653
1654 #if IBLND_MAP_ON_DEMAND
1655                 /* MR for sends and receives */
1656                 mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE);
1657 #else
1658                 /* MR for sends, recieves _and_ RDMA...........v */
1659                 mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE |
1660                                        IB_ACCESS_REMOTE_WRITE);
1661 #endif
1662                 if (!IS_ERR(mr)) {
1663                         ibdev->ibd_mr = mr;
1664                 } else {
1665                         CERROR("Can't get MR: %ld\n", PTR_ERR(mr));
1666                         goto failed;
1667                 }
1668
1669                 rc = rdma_listen(id, 0);
1670                 if (rc != 0) {
1671                         CERROR("Can't start listener: %d\n", rc);
1672                         goto failed;
1673                 }
1674
1675                 list_add_tail(&ibdev->ibd_list, 
1676                               &kiblnd_data.kib_devs);
1677         }
1678
1679         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ibdev->ibd_ifip);
1680         net->ibn_dev = ibdev;
1681
1682 #if IBLND_MAP_ON_DEMAND
1683         /* FMR pool for RDMA */
1684         {
1685                 struct ib_fmr_pool      *fmrpool;
1686                 struct ib_fmr_pool_param param = {
1687                         .max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1688                         .page_shift        = PAGE_SHIFT,
1689                         .access            = (IB_ACCESS_LOCAL_WRITE |
1690                                               IB_ACCESS_REMOTE_WRITE),
1691                         .pool_size         = *kiblnd_tunables.kib_fmr_pool_size,
1692                         .dirty_watermark   = *kiblnd_tunables.kib_fmr_flush_trigger,
1693                         .flush_function    = NULL,
1694                         .flush_arg         = NULL,
1695                         .cache             = *kiblnd_tunables.kib_fmr_cache};
1696
1697                 if (*kiblnd_tunables.kib_fmr_pool_size < 
1698                     *kiblnd_tunables.kib_ntx) {
1699                         CERROR("Can't set fmr pool size (%d) < ntx(%d)\n",
1700                                *kiblnd_tunables.kib_fmr_pool_size,
1701                                *kiblnd_tunables.kib_ntx);
1702                         goto failed;
1703                 }
1704
1705                 fmrpool = ib_create_fmr_pool(ibdev->ibd_pd, &param);
1706                 if (!IS_ERR(fmrpool)) {
1707                         net->ibn_fmrpool = fmrpool;
1708                 } else {
1709                         CERROR("Can't create FMR pool: %ld\n", 
1710                                PTR_ERR(fmrpool));
1711                         goto failed;
1712                 }
1713         }
1714 #endif
1715
1716         kiblnd_map_tx_descs(ni);
1717
1718         ibdev->ibd_nnets++;
1719         net->ibn_init = IBLND_INIT_ALL;
1720
1721         return 0;
1722
1723 failed:
1724         kiblnd_shutdown(ni);
1725
1726         CDEBUG(D_NET, "kiblnd_startup failed\n");
1727         return -ENETDOWN;
1728 }
1729
1730 void __exit
1731 kiblnd_module_fini (void)
1732 {
1733         lnet_unregister_lnd(&the_kiblnd);
1734         kiblnd_tunables_fini();
1735 }
1736
1737 int __init
1738 kiblnd_module_init (void)
1739 {
1740         int    rc;
1741
1742         CLASSERT (sizeof(kib_msg_t) <= IBLND_MSG_SIZE);
1743 #if !IBLND_MAP_ON_DEMAND
1744         CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
1745                   <= IBLND_MSG_SIZE);
1746         CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
1747                   <= IBLND_MSG_SIZE);
1748 #endif
1749         rc = kiblnd_tunables_init();
1750         if (rc != 0)
1751                 return rc;
1752
1753         lnet_register_lnd(&the_kiblnd);
1754
1755         return 0;
1756 }
1757
1758 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1759 MODULE_DESCRIPTION("Kernel OpenIB gen2 LND v1.00");
1760 MODULE_LICENSE("GPL");
1761
1762 module_init(kiblnd_module_init);
1763 module_exit(kiblnd_module_fini);