Whamcloud - gitweb
b=14425
[fs/lustre-release.git] / lnet / klnds / o2iblnd / o2iblnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2006 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "o2iblnd.h"
25
26 lnd_t the_kiblnd = {
27         .lnd_type       = O2IBLND,
28         .lnd_startup    = kiblnd_startup,
29         .lnd_shutdown   = kiblnd_shutdown,
30         .lnd_ctl        = kiblnd_ctl,
31         .lnd_send       = kiblnd_send,
32         .lnd_recv       = kiblnd_recv,
33 };
34
35 kib_data_t              kiblnd_data;
36
37 __u32
38 kiblnd_cksum (void *ptr, int nob)
39 {
40         char  *c  = ptr;
41         __u32  sum = 0;
42
43         while (nob-- > 0)
44                 sum = ((sum << 1) | (sum >> 31)) + *c++;
45
46         /* ensure I don't return 0 (== no checksum) */
47         return (sum == 0) ? 1 : sum;
48 }
49
50 void
51 kiblnd_init_msg (kib_msg_t *msg, int type, int body_nob)
52 {
53         msg->ibm_type = type;
54         msg->ibm_nob  = offsetof(kib_msg_t, ibm_u) + body_nob;
55 }
56
57 void
58 kiblnd_pack_msg (lnet_ni_t *ni, kib_msg_t *msg,
59                  int credits, lnet_nid_t dstnid, __u64 dststamp)
60 {
61         kib_net_t *net = ni->ni_data;
62
63         /* CAVEAT EMPTOR! all message fields not set here should have been
64          * initialised previously. */
65         msg->ibm_magic    = IBLND_MSG_MAGIC;
66         msg->ibm_version  = IBLND_MSG_VERSION;
67         /*   ibm_type */
68         msg->ibm_credits  = credits;
69         /*   ibm_nob */
70         msg->ibm_cksum    = 0;
71         msg->ibm_srcnid   = lnet_ptlcompat_srcnid(ni->ni_nid, dstnid);
72         msg->ibm_srcstamp = net->ibn_incarnation;
73         msg->ibm_dstnid   = dstnid;
74         msg->ibm_dststamp = dststamp;
75
76         if (*kiblnd_tunables.kib_cksum) {
77                 /* NB ibm_cksum zero while computing cksum */
78                 msg->ibm_cksum = kiblnd_cksum(msg, msg->ibm_nob);
79         }
80 }
81
82 int
83 kiblnd_unpack_msg(kib_msg_t *msg, int nob)
84 {
85         const int hdr_size = offsetof(kib_msg_t, ibm_u);
86         __u32     msg_cksum;
87         int       flip;
88         int       msg_nob;
89 #if !IBLND_MAP_ON_DEMAND
90         int       i;
91         int       n;
92 #endif
93         /* 6 bytes are enough to have received magic + version */
94         if (nob < 6) {
95                 CERROR("Short message: %d\n", nob);
96                 return -EPROTO;
97         }
98
99         if (msg->ibm_magic == IBLND_MSG_MAGIC) {
100                 flip = 0;
101         } else if (msg->ibm_magic == __swab32(IBLND_MSG_MAGIC)) {
102                 flip = 1;
103         } else {
104                 CERROR("Bad magic: %08x\n", msg->ibm_magic);
105                 return -EPROTO;
106         }
107
108         if (msg->ibm_version !=
109             (flip ? __swab16(IBLND_MSG_VERSION) : IBLND_MSG_VERSION)) {
110                 CERROR("Bad version: %d\n", msg->ibm_version);
111                 return -EPROTO;
112         }
113
114         if (nob < hdr_size) {
115                 CERROR("Short message: %d\n", nob);
116                 return -EPROTO;
117         }
118
119         msg_nob = flip ? __swab32(msg->ibm_nob) : msg->ibm_nob;
120         if (msg_nob > nob) {
121                 CERROR("Short message: got %d, wanted %d\n", nob, msg_nob);
122                 return -EPROTO;
123         }
124
125         /* checksum must be computed with ibm_cksum zero and BEFORE anything
126          * gets flipped */
127         msg_cksum = flip ? __swab32(msg->ibm_cksum) : msg->ibm_cksum;
128         msg->ibm_cksum = 0;
129         if (msg_cksum != 0 &&
130             msg_cksum != kiblnd_cksum(msg, msg_nob)) {
131                 CERROR("Bad checksum\n");
132                 return -EPROTO;
133         }
134         msg->ibm_cksum = msg_cksum;
135
136         if (flip) {
137                 /* leave magic unflipped as a clue to peer endianness */
138                 __swab16s(&msg->ibm_version);
139                 CLASSERT (sizeof(msg->ibm_type) == 1);
140                 CLASSERT (sizeof(msg->ibm_credits) == 1);
141                 msg->ibm_nob = msg_nob;
142                 __swab64s(&msg->ibm_srcnid);
143                 __swab64s(&msg->ibm_srcstamp);
144                 __swab64s(&msg->ibm_dstnid);
145                 __swab64s(&msg->ibm_dststamp);
146         }
147
148         if (msg->ibm_srcnid == LNET_NID_ANY) {
149                 CERROR("Bad src nid: %s\n", libcfs_nid2str(msg->ibm_srcnid));
150                 return -EPROTO;
151         }
152
153         switch (msg->ibm_type) {
154         default:
155                 CERROR("Unknown message type %x\n", msg->ibm_type);
156                 return -EPROTO;
157
158         case IBLND_MSG_NOOP:
159                 break;
160
161         case IBLND_MSG_IMMEDIATE:
162                 if (msg_nob < offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0])) {
163                         CERROR("Short IMMEDIATE: %d(%d)\n", msg_nob,
164                                (int)offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[0]));
165                         return -EPROTO;
166                 }
167                 break;
168
169         case IBLND_MSG_PUT_REQ:
170                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putreq)) {
171                         CERROR("Short PUT_REQ: %d(%d)\n", msg_nob,
172                                (int)(hdr_size + sizeof(msg->ibm_u.putreq)));
173                         return -EPROTO;
174                 }
175                 break;
176
177         case IBLND_MSG_PUT_ACK:
178                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.putack)) {
179                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
180                                (int)(hdr_size + sizeof(msg->ibm_u.putack)));
181                         return -EPROTO;
182                 }
183 #if IBLND_MAP_ON_DEMAND
184                 if (flip) {
185                         __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_addr);
186                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nob);
187                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
188                 }
189 #else
190                 if (flip) {
191                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_key);
192                         __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_nfrags);
193                 }
194                 
195                 n = msg->ibm_u.putack.ibpam_rd.rd_nfrags;
196                 if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
197                         CERROR("Bad PUT_ACK nfrags: %d, should be 0 < n <= %d\n", 
198                                n, IBLND_MAX_RDMA_FRAGS);
199                         return -EPROTO;
200                 }
201                 
202                 if (msg_nob < offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n])) {
203                         CERROR("Short PUT_ACK: %d(%d)\n", msg_nob,
204                                (int)offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[n]));
205                         return -EPROTO;
206                 }
207
208                 if (flip) {
209                         for (i = 0; i < n; i++) {
210                                 __swab32s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_nob);
211                                 __swab64s(&msg->ibm_u.putack.ibpam_rd.rd_frags[i].rf_addr);
212                         }
213                 }
214 #endif
215                 break;
216
217         case IBLND_MSG_GET_REQ:
218                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.get)) {
219                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
220                                (int)(hdr_size + sizeof(msg->ibm_u.get)));
221                         return -EPROTO;
222                 }
223 #if IBLND_MAP_ON_DEMAND
224                 if (flip) {
225                         __swab64s(&msg->ibm_u.get.ibgm_rd.rd_addr);
226                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nob);
227                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
228                 }
229 #else
230                 if (flip) {
231                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_key);
232                         __swab32s(&msg->ibm_u.get.ibgm_rd.rd_nfrags);
233                 }
234
235                 n = msg->ibm_u.get.ibgm_rd.rd_nfrags;
236                 if (n <= 0 || n > IBLND_MAX_RDMA_FRAGS) {
237                         CERROR("Bad GET_REQ nfrags: %d, should be 0 < n <= %d\n", 
238                                n, IBLND_MAX_RDMA_FRAGS);
239                         return -EPROTO;
240                 }
241                 
242                 if (msg_nob < offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n])) {
243                         CERROR("Short GET_REQ: %d(%d)\n", msg_nob,
244                                (int)offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[n]));
245                         return -EPROTO;
246                 }
247                 
248                 if (flip)
249                         for (i = 0; i < msg->ibm_u.get.ibgm_rd.rd_nfrags; i++) {
250                                 __swab32s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_nob);
251                                 __swab64s(&msg->ibm_u.get.ibgm_rd.rd_frags[i].rf_addr);
252                         }
253 #endif
254                 break;
255
256         case IBLND_MSG_PUT_NAK:
257         case IBLND_MSG_PUT_DONE:
258         case IBLND_MSG_GET_DONE:
259                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.completion)) {
260                         CERROR("Short RDMA completion: %d(%d)\n", msg_nob,
261                                (int)(hdr_size + sizeof(msg->ibm_u.completion)));
262                         return -EPROTO;
263                 }
264                 if (flip)
265                         __swab32s(&msg->ibm_u.completion.ibcm_status);
266                 break;
267
268         case IBLND_MSG_CONNREQ:
269         case IBLND_MSG_CONNACK:
270                 if (msg_nob < hdr_size + sizeof(msg->ibm_u.connparams)) {
271                         CERROR("Short connreq/ack: %d(%d)\n", msg_nob,
272                                (int)(hdr_size + sizeof(msg->ibm_u.connparams)));
273                         return -EPROTO;
274                 }
275                 if (flip) {
276                         __swab16s(&msg->ibm_u.connparams.ibcp_queue_depth);
277                         __swab16s(&msg->ibm_u.connparams.ibcp_max_frags);
278                         __swab32s(&msg->ibm_u.connparams.ibcp_max_msg_size);
279                 }
280                 break;
281         }
282         return 0;
283 }
284
285 int
286 kiblnd_create_peer (lnet_ni_t *ni, kib_peer_t **peerp, lnet_nid_t nid)
287 {
288         kib_peer_t     *peer;
289         kib_net_t      *net = ni->ni_data;
290         unsigned long   flags;
291
292         LASSERT (net != NULL);
293         LASSERT (nid != LNET_NID_ANY);
294
295         LIBCFS_ALLOC(peer, sizeof(*peer));
296         if (peer == NULL) {
297                 CERROR("Cannot allocate peer\n");
298                 return -ENOMEM;
299         }
300
301         memset(peer, 0, sizeof(*peer));         /* zero flags etc */
302
303         peer->ibp_ni = ni;
304         peer->ibp_nid = nid;
305         peer->ibp_error = 0;
306         peer->ibp_last_alive = cfs_time_current();
307         atomic_set(&peer->ibp_refcount, 1);     /* 1 ref for caller */
308
309         INIT_LIST_HEAD(&peer->ibp_list);       /* not in the peer table yet */
310         INIT_LIST_HEAD(&peer->ibp_conns);
311         INIT_LIST_HEAD(&peer->ibp_tx_queue);
312
313         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
314
315         /* always called with a ref on ni, which prevents ni being shutdown */
316         LASSERT (net->ibn_shutdown == 0);
317         
318         /* npeers only grows with the global lock held */
319         atomic_inc(&net->ibn_npeers);
320
321         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
322
323         *peerp = peer;
324         return 0;
325 }
326
327 void
328 kiblnd_destroy_peer (kib_peer_t *peer)
329 {
330         kib_net_t *net = peer->ibp_ni->ni_data;
331
332         LASSERT (net != NULL);
333         LASSERT (atomic_read(&peer->ibp_refcount) == 0);
334         LASSERT (!kiblnd_peer_active(peer));
335         LASSERT (peer->ibp_connecting == 0);
336         LASSERT (peer->ibp_accepting == 0);
337         LASSERT (list_empty(&peer->ibp_conns));
338         LASSERT (list_empty(&peer->ibp_tx_queue));
339
340         LIBCFS_FREE(peer, sizeof(*peer));
341
342         /* NB a peer's connections keep a reference on their peer until
343          * they are destroyed, so we can be assured that _all_ state to do
344          * with this peer has been cleaned up when its refcount drops to
345          * zero. */
346         atomic_dec(&net->ibn_npeers);
347 }
348
349 void
350 kiblnd_destroy_dev (kib_dev_t *dev)
351 {
352         LASSERT (dev->ibd_nnets == 0);
353
354         if (!list_empty(&dev->ibd_list)) /* on kib_devs? */
355                 list_del_init(&dev->ibd_list);
356
357         if (dev->ibd_mr != NULL)
358                 ib_dereg_mr(dev->ibd_mr);
359
360         if (dev->ibd_pd != NULL)
361                 ib_dealloc_pd(dev->ibd_pd);
362
363         if (dev->ibd_cmid != NULL)
364                 rdma_destroy_id(dev->ibd_cmid);
365
366         LIBCFS_FREE(dev, sizeof(*dev));
367 }
368
369 kib_peer_t *
370 kiblnd_find_peer_locked (lnet_nid_t nid)
371 {
372         /* the caller is responsible for accounting the additional reference
373          * that this creates */
374         struct list_head *peer_list = kiblnd_nid2peerlist(nid);
375         struct list_head *tmp;
376         kib_peer_t       *peer;
377
378         list_for_each (tmp, peer_list) {
379
380                 peer = list_entry(tmp, kib_peer_t, ibp_list);
381
382                 LASSERT (peer->ibp_connecting > 0 || /* creating conns */
383                          peer->ibp_accepting > 0 ||
384                          !list_empty(&peer->ibp_conns));  /* active conn */
385
386                 if (peer->ibp_nid != nid)
387                         continue;
388
389                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
390                        peer, libcfs_nid2str(nid),
391                        atomic_read(&peer->ibp_refcount));
392                 return peer;
393         }
394         return NULL;
395 }
396
397 void
398 kiblnd_unlink_peer_locked (kib_peer_t *peer)
399 {
400         LASSERT (list_empty(&peer->ibp_conns));
401
402         LASSERT (kiblnd_peer_active(peer));
403         list_del_init(&peer->ibp_list);
404         /* lose peerlist's ref */
405         kiblnd_peer_decref(peer);
406 }
407
408 int
409 kiblnd_get_peer_info (lnet_ni_t *ni, int index, 
410                       lnet_nid_t *nidp, int *count)
411 {
412         kib_peer_t        *peer;
413         struct list_head  *ptmp;
414         int                i;
415         unsigned long      flags;
416
417         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
418
419         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
420
421                 list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
422
423                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
424                         LASSERT (peer->ibp_connecting > 0 ||
425                                  peer->ibp_accepting > 0 ||
426                                  !list_empty(&peer->ibp_conns));
427
428                         if (peer->ibp_ni != ni)
429                                 continue;
430
431                         if (index-- > 0)
432                                 continue;
433
434                         *nidp = peer->ibp_nid;
435                         *count = atomic_read(&peer->ibp_refcount);
436
437                         read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
438                                                flags);
439                         return 0;
440                 }
441         }
442
443         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
444         return -ENOENT;
445 }
446
447 void
448 kiblnd_del_peer_locked (kib_peer_t *peer)
449 {
450         struct list_head *ctmp;
451         struct list_head *cnxt;
452         kib_conn_t       *conn;
453
454         if (list_empty(&peer->ibp_conns)) {
455                 kiblnd_unlink_peer_locked(peer);
456         } else {
457                 list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
458                         conn = list_entry(ctmp, kib_conn_t, ibc_list);
459
460                         kiblnd_close_conn_locked(conn, 0);
461                 }
462                 /* NB closing peer's last conn unlinked it. */
463         }
464         /* NB peer now unlinked; might even be freed if the peer table had the
465          * last ref on it. */
466 }
467
468 int
469 kiblnd_del_peer (lnet_ni_t *ni, lnet_nid_t nid)
470 {
471         CFS_LIST_HEAD     (zombies);
472         struct list_head  *ptmp;
473         struct list_head  *pnxt;
474         kib_peer_t        *peer;
475         int                lo;
476         int                hi;
477         int                i;
478         unsigned long      flags;
479         int                rc = -ENOENT;
480
481         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
482
483         if (nid != LNET_NID_ANY) {
484                 lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
485         } else {
486                 lo = 0;
487                 hi = kiblnd_data.kib_peer_hash_size - 1;
488         }
489
490         for (i = lo; i <= hi; i++) {
491                 list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
492                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
493                         LASSERT (peer->ibp_connecting > 0 ||
494                                  peer->ibp_accepting > 0 ||
495                                  !list_empty(&peer->ibp_conns));
496
497                         if (peer->ibp_ni != ni)
498                                 continue;
499
500                         if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))
501                                 continue;
502
503                         if (!list_empty(&peer->ibp_tx_queue)) {
504                                 LASSERT (list_empty(&peer->ibp_conns));
505
506                                 list_splice_init(&peer->ibp_tx_queue, &zombies);
507                         }
508
509                         kiblnd_del_peer_locked(peer);
510                         rc = 0;         /* matched something */
511                 }
512         }
513
514         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
515
516         kiblnd_txlist_done(ni, &zombies, -EIO);
517
518         return rc;
519 }
520
521 kib_conn_t *
522 kiblnd_get_conn_by_idx (lnet_ni_t *ni, int index)
523 {
524         kib_peer_t        *peer;
525         struct list_head  *ptmp;
526         kib_conn_t        *conn;
527         struct list_head  *ctmp;
528         int                i;
529         unsigned long      flags;
530
531         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
532
533         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
534                 list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {
535
536                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
537                         LASSERT (peer->ibp_connecting > 0 ||
538                                  peer->ibp_accepting > 0 ||
539                                  !list_empty(&peer->ibp_conns));
540
541                         if (peer->ibp_ni != ni)
542                                 continue;
543
544                         list_for_each (ctmp, &peer->ibp_conns) {
545                                 if (index-- > 0)
546                                         continue;
547
548                                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
549                                 kiblnd_conn_addref(conn);
550                                 read_unlock_irqrestore(&kiblnd_data.kib_global_lock,
551                                                        flags);
552                                 return conn;
553                         }
554                 }
555         }
556
557         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
558         return NULL;
559 }
560
561 void
562 kiblnd_debug_rx (kib_rx_t *rx)
563 {
564         CDEBUG(D_CONSOLE, "      %p status %d msg_type %x cred %d\n",
565                rx, rx->rx_status, rx->rx_msg->ibm_type,
566                rx->rx_msg->ibm_credits);
567 }
568
569 void
570 kiblnd_debug_tx (kib_tx_t *tx)
571 {
572         CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "
573                "cookie "LPX64" msg %s%s type %x cred %d\n",
574                tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,
575                tx->tx_status, tx->tx_deadline, tx->tx_cookie,
576                tx->tx_lntmsg[0] == NULL ? "-" : "!",
577                tx->tx_lntmsg[1] == NULL ? "-" : "!",
578                tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits);
579 }
580
581 void
582 kiblnd_debug_conn (kib_conn_t *conn)
583 {
584         struct list_head *tmp;
585         int               i;
586
587         spin_lock(&conn->ibc_lock);
588
589         CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n",
590                atomic_read(&conn->ibc_refcount), conn,
591                libcfs_nid2str(conn->ibc_peer->ibp_nid));
592         CDEBUG(D_CONSOLE, "   state %d nposted %d cred %d o_cred %d r_cred %d\n",
593                conn->ibc_state, conn->ibc_nsends_posted, conn->ibc_credits, 
594                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);
595         CDEBUG(D_CONSOLE, "   comms_err %d\n", conn->ibc_comms_error);
596
597         CDEBUG(D_CONSOLE, "   early_rxs:\n");
598         list_for_each(tmp, &conn->ibc_early_rxs)
599                 kiblnd_debug_rx(list_entry(tmp, kib_rx_t, rx_list));
600
601         CDEBUG(D_CONSOLE, "   tx_noops:\n");
602         list_for_each(tmp, &conn->ibc_tx_noops)
603                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
604
605         CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");
606         list_for_each(tmp, &conn->ibc_tx_queue_nocred)
607                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
608
609         CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");
610         list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)
611                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
612
613         CDEBUG(D_CONSOLE, "   tx_queue:\n");
614         list_for_each(tmp, &conn->ibc_tx_queue)
615                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
616
617         CDEBUG(D_CONSOLE, "   active_txs:\n");
618         list_for_each(tmp, &conn->ibc_active_txs)
619                 kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));
620
621         CDEBUG(D_CONSOLE, "   rxs:\n");
622         for (i = 0; i < IBLND_RX_MSGS; i++)
623                 kiblnd_debug_rx(&conn->ibc_rxs[i]);
624
625         spin_unlock(&conn->ibc_lock);
626 }
627
628 kib_conn_t *
629 kiblnd_create_conn (kib_peer_t *peer, struct rdma_cm_id *cmid, int state)
630 {
631         /* CAVEAT EMPTOR:
632          * If the new conn is created successfully it takes over the caller's
633          * ref on 'peer'.  It also "owns" 'cmid' and destroys it when it itself
634          * is destroyed.  On failure, the caller's ref on 'peer' remains and
635          * she must dispose of 'cmid'.  (Actually I'd block forever if I tried
636          * to destroy 'cmid' here since I'm called from the CM which still has
637          * its ref on 'cmid'). */
638         kib_conn_t             *conn;
639         kib_net_t              *net = peer->ibp_ni->ni_data;
640         int                     i;
641         int                     page_offset;
642         int                     ipage;
643         int                     rc;
644         struct ib_cq           *cq;
645         struct ib_qp_init_attr *init_qp_attr;
646         unsigned long           flags;
647
648         LASSERT (net != NULL);
649         LASSERT (!in_interrupt());
650
651         LIBCFS_ALLOC(init_qp_attr, sizeof(*init_qp_attr));
652         if (init_qp_attr == NULL) {
653                 CERROR("Can't allocate qp_attr for %s\n",
654                        libcfs_nid2str(peer->ibp_nid));
655                 goto failed_0;
656         }
657
658         LIBCFS_ALLOC(conn, sizeof(*conn));
659         if (conn == NULL) {
660                 CERROR("Can't allocate connection for %s\n",
661                        libcfs_nid2str(peer->ibp_nid));
662                 goto failed_1;
663         }
664
665         memset(conn, 0, sizeof(*conn)); /* zero flags, NULL pointers etc... */
666
667         conn->ibc_state = IBLND_CONN_INIT;
668         conn->ibc_peer = peer;                  /* I take the caller's ref */
669         cmid->context = conn;                   /* for future CM callbacks */
670         conn->ibc_cmid = cmid;
671
672         INIT_LIST_HEAD(&conn->ibc_early_rxs);
673         INIT_LIST_HEAD(&conn->ibc_tx_noops);
674         INIT_LIST_HEAD(&conn->ibc_tx_queue);
675         INIT_LIST_HEAD(&conn->ibc_tx_queue_rsrvd);
676         INIT_LIST_HEAD(&conn->ibc_tx_queue_nocred);
677         INIT_LIST_HEAD(&conn->ibc_active_txs);
678         spin_lock_init(&conn->ibc_lock);
679
680         LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
681         if (conn->ibc_connvars == NULL) {
682                 CERROR("Can't allocate in-progress connection state\n");
683                 goto failed_2;
684         }
685         memset(conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));
686
687         LIBCFS_ALLOC(conn->ibc_rxs, IBLND_RX_MSGS * sizeof(kib_rx_t));
688         if (conn->ibc_rxs == NULL) {
689                 CERROR("Cannot allocate RX buffers\n");
690                 goto failed_2;
691         }
692         memset(conn->ibc_rxs, 0, IBLND_RX_MSGS * sizeof(kib_rx_t));
693
694         rc = kiblnd_alloc_pages(&conn->ibc_rx_pages, IBLND_RX_MSG_PAGES);
695         if (rc != 0)
696                 goto failed_2;
697
698         for (i = ipage = page_offset = 0; i < IBLND_RX_MSGS; i++) {
699                 struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];
700                 kib_rx_t    *rx = &conn->ibc_rxs[i];
701
702                 rx->rx_conn = conn;
703                 rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) +
704                                            page_offset);
705                 rx->rx_msgaddr = kiblnd_dma_map_single(cmid->device,
706                                                        rx->rx_msg, IBLND_MSG_SIZE,
707                                                        DMA_FROM_DEVICE);
708                 KIBLND_UNMAP_ADDR_SET(rx, rx_msgunmap, rx->rx_msgaddr);
709
710                 CDEBUG(D_NET,"rx %d: %p "LPX64"("LPX64")\n",
711                        i, rx->rx_msg, rx->rx_msgaddr,
712                        lnet_page2phys(page) + page_offset);
713
714                 page_offset += IBLND_MSG_SIZE;
715                 LASSERT (page_offset <= PAGE_SIZE);
716
717                 if (page_offset == PAGE_SIZE) {
718                         page_offset = 0;
719                         ipage++;
720                         LASSERT (ipage <= IBLND_RX_MSG_PAGES);
721                 }
722         }
723
724 #if (IBLND_OFED_VERSION == 1025)
725         cq = ib_create_cq(cmid->device,
726                           kiblnd_cq_completion, kiblnd_cq_event, conn,
727                           IBLND_CQ_ENTRIES(), 0);
728 #else
729         cq = ib_create_cq(cmid->device,
730                           kiblnd_cq_completion, kiblnd_cq_event, conn,
731                           IBLND_CQ_ENTRIES());
732 #endif
733         if (!IS_ERR(cq)) {
734                 conn->ibc_cq = cq;
735         } else {
736                 CERROR("Can't create CQ: %ld\n", PTR_ERR(cq));
737                 goto failed_2;
738         }
739
740         rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
741         if (rc != 0) {
742                 CERROR("Can't request completion notificiation: %d\n", rc);
743                 goto failed_2;
744         }
745         
746         memset(init_qp_attr, 0, sizeof(*init_qp_attr));
747         init_qp_attr->event_handler = kiblnd_qp_event;
748         init_qp_attr->qp_context = conn;
749         init_qp_attr->cap.max_send_wr = IBLND_SEND_WRS;
750         init_qp_attr->cap.max_recv_wr = IBLND_RECV_WRS;
751         init_qp_attr->cap.max_send_sge = 1;
752         init_qp_attr->cap.max_recv_sge = 1;
753         init_qp_attr->sq_sig_type = IB_SIGNAL_REQ_WR;
754         init_qp_attr->qp_type = IB_QPT_RC;
755         init_qp_attr->send_cq = cq;
756         init_qp_attr->recv_cq = cq;
757
758         rc = 0;
759         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
760         switch (*kiblnd_tunables.kib_ib_mtu) {
761         default:
762                 rc = *kiblnd_tunables.kib_ib_mtu;
763                 /* fall through to... */
764         case 0: /* set tunable to the default
765                  * CAVEAT EMPTOR! this assumes the default is one of the MTUs
766                  * below, otherwise we'll WARN on the next QP create */
767                 *kiblnd_tunables.kib_ib_mtu =
768                         ib_mtu_enum_to_int(cmid->route.path_rec->mtu);
769                 break;
770         case 256:
771                 cmid->route.path_rec->mtu = IB_MTU_256;
772                 break;
773         case 512:
774                 cmid->route.path_rec->mtu = IB_MTU_512;
775                 break;
776         case 1024:
777                 cmid->route.path_rec->mtu = IB_MTU_1024;
778                 break;
779         case 2048:
780                 cmid->route.path_rec->mtu = IB_MTU_2048;
781                 break;
782         case 4096:
783                 cmid->route.path_rec->mtu = IB_MTU_4096;
784                 break;
785         }
786         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
787
788         if (rc != 0)
789                 CWARN("Invalid IB MTU value %d, using default value %d\n",
790                       rc, *kiblnd_tunables.kib_ib_mtu);
791                                 
792         rc = rdma_create_qp(cmid, net->ibn_dev->ibd_pd, init_qp_attr);
793         if (rc != 0) {
794                 CERROR("Can't create QP: %d\n", rc);
795                 goto failed_2;
796         }
797
798         LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
799
800         /* 1 ref for caller and each rxmsg */
801         atomic_set(&conn->ibc_refcount, 1 + IBLND_RX_MSGS);
802         conn->ibc_nrx = IBLND_RX_MSGS;
803
804         /* post receives */
805         for (i = 0; i < IBLND_RX_MSGS; i++) {
806                 rc = kiblnd_post_rx(&conn->ibc_rxs[i],
807                                     IBLND_POSTRX_NO_CREDIT);
808                 if (rc != 0) {
809                         CERROR("Can't post rxmsg: %d\n", rc);
810
811                         /* Make posted receives complete */
812                         kiblnd_abort_receives(conn);
813
814                         /* correct # of posted buffers 
815                          * NB locking needed now I'm racing with completion */
816                         spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);
817                         conn->ibc_nrx -= IBLND_RX_MSGS - i;
818                         spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,
819                                                flags);
820
821                         /* Drop my own and unused rxbuffer refcounts */
822                         while (i++ <= IBLND_RX_MSGS)
823                                 kiblnd_conn_decref(conn);
824
825                         return NULL;
826                 }
827         }
828         
829         /* Init successful! */
830         LASSERT (state == IBLND_CONN_ACTIVE_CONNECT ||
831                  state == IBLND_CONN_PASSIVE_WAIT);
832         conn->ibc_state = state;
833
834         /* 1 more conn */
835         atomic_inc(&net->ibn_nconns);
836         return conn;
837
838  failed_2:
839         kiblnd_destroy_conn(conn);
840  failed_1:
841         LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));
842  failed_0:
843         return NULL;
844 }
845
846 void
847 kiblnd_destroy_conn (kib_conn_t *conn)
848 {
849         struct rdma_cm_id *cmid = conn->ibc_cmid;
850         kib_peer_t        *peer = conn->ibc_peer;
851         int                rc;
852         int                i;
853
854         LASSERT (!in_interrupt());
855         LASSERT (atomic_read(&conn->ibc_refcount) == 0);
856         LASSERT (list_empty(&conn->ibc_early_rxs));
857         LASSERT (list_empty(&conn->ibc_tx_noops));
858         LASSERT (list_empty(&conn->ibc_tx_queue));
859         LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));
860         LASSERT (list_empty(&conn->ibc_tx_queue_nocred));
861         LASSERT (list_empty(&conn->ibc_active_txs));
862         LASSERT (conn->ibc_nsends_posted == 0);
863
864         switch (conn->ibc_state) {
865         default:
866                 /* conn must be completely disengaged from the network */
867                 LBUG();
868
869         case IBLND_CONN_DISCONNECTED:
870                 /* connvars should have been freed already */
871                 LASSERT (conn->ibc_connvars == NULL);
872                 break;
873
874         case IBLND_CONN_INIT:
875                 break;
876         }
877
878         if (conn->ibc_cmid->qp != NULL)
879                 rdma_destroy_qp(conn->ibc_cmid);
880
881         if (conn->ibc_cq != NULL) {
882                 rc = ib_destroy_cq(conn->ibc_cq);
883                 if (rc != 0)
884                         CWARN("Error destroying CQ: %d\n", rc);
885         }
886
887         if (conn->ibc_rx_pages != NULL) {
888                 LASSERT (conn->ibc_rxs != NULL);
889
890                 for (i = 0; i < IBLND_RX_MSGS; i++) {
891                         kib_rx_t *rx = &conn->ibc_rxs[i];
892
893                         LASSERT (rx->rx_nob >= 0); /* not posted */
894
895                         kiblnd_dma_unmap_single(conn->ibc_cmid->device,
896                                                 KIBLND_UNMAP_ADDR(rx, rx_msgunmap,
897                                                                   rx->rx_msgaddr),
898                                                 IBLND_MSG_SIZE, DMA_FROM_DEVICE);
899                 }
900
901                 kiblnd_free_pages(conn->ibc_rx_pages);
902         }
903
904         if (conn->ibc_rxs != NULL) {
905                 LIBCFS_FREE(conn->ibc_rxs,
906                             IBLND_RX_MSGS * sizeof(kib_rx_t));
907         }
908
909         if (conn->ibc_connvars != NULL)
910                 LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
911
912         /* See CAVEAT EMPTOR above in kiblnd_create_conn */
913         if (conn->ibc_state != IBLND_CONN_INIT) {
914                 kib_net_t *net = peer->ibp_ni->ni_data;
915
916                 kiblnd_peer_decref(peer);
917                 rdma_destroy_id(cmid);
918                 atomic_dec(&net->ibn_nconns);
919         }
920
921         LIBCFS_FREE(conn, sizeof(*conn));
922 }
923
924 int
925 kiblnd_close_peer_conns_locked (kib_peer_t *peer, int why)
926 {
927         kib_conn_t         *conn;
928         struct list_head   *ctmp;
929         struct list_head   *cnxt;
930         int                 count = 0;
931
932         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
933                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
934
935                 count++;
936                 kiblnd_close_conn_locked(conn, why);
937         }
938
939         return count;
940 }
941
942 int
943 kiblnd_close_stale_conns_locked (kib_peer_t *peer, __u64 incarnation)
944 {
945         kib_conn_t         *conn;
946         struct list_head   *ctmp;
947         struct list_head   *cnxt;
948         int                 count = 0;
949
950         list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {
951                 conn = list_entry(ctmp, kib_conn_t, ibc_list);
952
953                 if (conn->ibc_incarnation == incarnation)
954                         continue;
955
956                 CDEBUG(D_NET, "Closing stale conn -> %s incarnation:"LPX64"("LPX64")\n",
957                        libcfs_nid2str(peer->ibp_nid),
958                        conn->ibc_incarnation, incarnation);
959
960                 count++;
961                 kiblnd_close_conn_locked(conn, -ESTALE);
962         }
963
964         return count;
965 }
966
967 int
968 kiblnd_close_matching_conns (lnet_ni_t *ni, lnet_nid_t nid)
969 {
970         kib_peer_t         *peer;
971         struct list_head   *ptmp;
972         struct list_head   *pnxt;
973         int                 lo;
974         int                 hi;
975         int                 i;
976         unsigned long       flags;
977         int                 count = 0;
978
979         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
980
981         if (nid != LNET_NID_ANY)
982                 lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;
983         else {
984                 lo = 0;
985                 hi = kiblnd_data.kib_peer_hash_size - 1;
986         }
987
988         for (i = lo; i <= hi; i++) {
989                 list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {
990
991                         peer = list_entry(ptmp, kib_peer_t, ibp_list);
992                         LASSERT (peer->ibp_connecting > 0 ||
993                                  peer->ibp_accepting > 0 ||
994                                  !list_empty(&peer->ibp_conns));
995
996                         if (peer->ibp_ni != ni)
997                                 continue;
998
999                         if (!(nid == LNET_NID_ANY || nid == peer->ibp_nid))
1000                                 continue;
1001
1002                         count += kiblnd_close_peer_conns_locked(peer, 0);
1003                 }
1004         }
1005
1006         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1007
1008         /* wildcards always succeed */
1009         if (nid == LNET_NID_ANY)
1010                 return 0;
1011
1012         return (count == 0) ? -ENOENT : 0;
1013 }
1014
1015 int
1016 kiblnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1017 {
1018         struct libcfs_ioctl_data *data = arg;
1019         int                       rc = -EINVAL;
1020
1021         switch(cmd) {
1022         case IOC_LIBCFS_GET_PEER: {
1023                 lnet_nid_t   nid = 0;
1024                 int          count = 0;
1025
1026                 rc = kiblnd_get_peer_info(ni, data->ioc_count,
1027                                           &nid, &count);
1028                 data->ioc_nid    = nid;
1029                 data->ioc_count  = count;
1030                 break;
1031         }
1032
1033         case IOC_LIBCFS_DEL_PEER: {
1034                 rc = kiblnd_del_peer(ni, data->ioc_nid);
1035                 break;
1036         }
1037         case IOC_LIBCFS_GET_CONN: {
1038                 kib_conn_t *conn = kiblnd_get_conn_by_idx(ni, data->ioc_count);
1039
1040                 if (conn == NULL) {
1041                         rc = -ENOENT;
1042                 } else {
1043                         // kiblnd_debug_conn(conn);
1044                         rc = 0;
1045                         data->ioc_nid = conn->ibc_peer->ibp_nid;
1046                         kiblnd_conn_decref(conn);
1047                 }
1048                 break;
1049         }
1050         case IOC_LIBCFS_CLOSE_CONNECTION: {
1051                 rc = kiblnd_close_matching_conns(ni, data->ioc_nid);
1052                 break;
1053         }
1054
1055         default:
1056                 break;
1057         }
1058
1059         return rc;
1060 }
1061
1062 void
1063 kiblnd_free_pages (kib_pages_t *p)
1064 {
1065         int         npages = p->ibp_npages;
1066         int         i;
1067
1068         for (i = 0; i < npages; i++)
1069                 if (p->ibp_pages[i] != NULL)
1070                         __free_page(p->ibp_pages[i]);
1071
1072         LIBCFS_FREE (p, offsetof(kib_pages_t, ibp_pages[npages]));
1073 }
1074
1075 int
1076 kiblnd_alloc_pages (kib_pages_t **pp, int npages)
1077 {
1078         kib_pages_t   *p;
1079         int            i;
1080
1081         LIBCFS_ALLOC(p, offsetof(kib_pages_t, ibp_pages[npages]));
1082         if (p == NULL) {
1083                 CERROR("Can't allocate descriptor for %d pages\n", npages);
1084                 return -ENOMEM;
1085         }
1086
1087         memset(p, 0, offsetof(kib_pages_t, ibp_pages[npages]));
1088         p->ibp_npages = npages;
1089
1090         for (i = 0; i < npages; i++) {
1091                 p->ibp_pages[i] = alloc_page(GFP_KERNEL);
1092                 if (p->ibp_pages[i] == NULL) {
1093                         CERROR("Can't allocate page %d of %d\n", i, npages);
1094                         kiblnd_free_pages(p);
1095                         return -ENOMEM;
1096                 }
1097         }
1098
1099         *pp = p;
1100         return 0;
1101 }
1102
1103 void
1104 kiblnd_free_tx_descs (lnet_ni_t *ni)
1105 {
1106         int        i;
1107         kib_net_t *net = ni->ni_data;
1108
1109         LASSERT (net != NULL);
1110
1111         if (net->ibn_tx_descs != NULL) {
1112                 for (i = 0; i < IBLND_TX_MSGS(); i++) {
1113                         kib_tx_t *tx = &net->ibn_tx_descs[i];
1114
1115 #if IBLND_MAP_ON_DEMAND
1116                         if (tx->tx_pages != NULL)
1117                                 LIBCFS_FREE(tx->tx_pages, LNET_MAX_IOV *
1118                                             sizeof(*tx->tx_pages));
1119 #else
1120                         if (tx->tx_wrq != NULL)
1121                                 LIBCFS_FREE(tx->tx_wrq, 
1122                                             (1 + IBLND_MAX_RDMA_FRAGS) * 
1123                                             sizeof(*tx->tx_wrq));
1124
1125                         if (tx->tx_sge != NULL)
1126                                 LIBCFS_FREE(tx->tx_sge, 
1127                                             (1 + IBLND_MAX_RDMA_FRAGS) * 
1128                                             sizeof(*tx->tx_sge));
1129
1130                         if (tx->tx_rd != NULL)
1131                                 LIBCFS_FREE(tx->tx_rd, 
1132                                             offsetof(kib_rdma_desc_t, 
1133                                                rd_frags[IBLND_MAX_RDMA_FRAGS]));
1134
1135                         if (tx->tx_frags != NULL)
1136                                 LIBCFS_FREE(tx->tx_frags, 
1137                                             IBLND_MAX_RDMA_FRAGS *
1138                                             sizeof(*tx->tx_frags));
1139 #endif
1140                 }
1141
1142                 LIBCFS_FREE(net->ibn_tx_descs,
1143                             IBLND_TX_MSGS() * sizeof(kib_tx_t));
1144         }
1145
1146         if (net->ibn_tx_pages != NULL)
1147                 kiblnd_free_pages(net->ibn_tx_pages);
1148 }
1149
1150 int
1151 kiblnd_alloc_tx_descs (lnet_ni_t *ni)
1152 {
1153         int        i;
1154         int        rc;
1155         kib_net_t *net = ni->ni_data;
1156
1157         LASSERT (net != NULL);
1158
1159         rc = kiblnd_alloc_pages(&net->ibn_tx_pages, IBLND_TX_MSG_PAGES());
1160
1161         if (rc != 0) {
1162                 CERROR("Can't allocate tx pages\n");
1163                 return rc;
1164         }
1165
1166         LIBCFS_ALLOC (net->ibn_tx_descs,
1167                       IBLND_TX_MSGS() * sizeof(kib_tx_t));
1168         if (net->ibn_tx_descs == NULL) {
1169                 CERROR("Can't allocate %d tx descriptors\n", IBLND_TX_MSGS());
1170                 return -ENOMEM;
1171         }
1172
1173         memset(net->ibn_tx_descs, 0,
1174                IBLND_TX_MSGS() * sizeof(kib_tx_t));
1175
1176         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1177                 kib_tx_t *tx = &net->ibn_tx_descs[i];
1178
1179 #if IBLND_MAP_ON_DEMAND
1180                 LIBCFS_ALLOC(tx->tx_pages, LNET_MAX_IOV *
1181                              sizeof(*tx->tx_pages));
1182                 if (tx->tx_pages == NULL) {
1183                         CERROR("Can't allocate phys page vector[%d]\n",
1184                                LNET_MAX_IOV);
1185                         return -ENOMEM;
1186                 }
1187 #else
1188                 LIBCFS_ALLOC(tx->tx_wrq, 
1189                              (1 + IBLND_MAX_RDMA_FRAGS) * 
1190                              sizeof(*tx->tx_wrq));
1191                 if (tx->tx_wrq == NULL)
1192                         return -ENOMEM;
1193                 
1194                 LIBCFS_ALLOC(tx->tx_sge, 
1195                              (1 + IBLND_MAX_RDMA_FRAGS) * 
1196                              sizeof(*tx->tx_sge));
1197                 if (tx->tx_sge == NULL)
1198                         return -ENOMEM;
1199                 
1200                 LIBCFS_ALLOC(tx->tx_rd, 
1201                              offsetof(kib_rdma_desc_t, 
1202                                       rd_frags[IBLND_MAX_RDMA_FRAGS]));
1203                 if (tx->tx_rd == NULL)
1204                         return -ENOMEM;
1205
1206                 LIBCFS_ALLOC(tx->tx_frags,
1207                              IBLND_MAX_RDMA_FRAGS * 
1208                              sizeof(*tx->tx_frags));
1209                 if (tx->tx_frags == NULL)
1210                         return -ENOMEM;
1211 #endif
1212         }
1213
1214         return 0;
1215 }
1216
1217 void
1218 kiblnd_unmap_tx_descs (lnet_ni_t *ni)
1219 {
1220         int             i;
1221         kib_tx_t       *tx;
1222         kib_net_t      *net = ni->ni_data;
1223
1224         LASSERT (net != NULL);
1225
1226         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1227                 tx = &net->ibn_tx_descs[i];
1228
1229                 kiblnd_dma_unmap_single(net->ibn_dev->ibd_cmid->device,
1230                                         KIBLND_UNMAP_ADDR(tx, tx_msgunmap,
1231                                                           tx->tx_msgaddr),
1232                                         IBLND_MSG_SIZE, DMA_TO_DEVICE);
1233         }
1234 }
1235
1236 void
1237 kiblnd_map_tx_descs (lnet_ni_t *ni)
1238 {
1239         int             ipage = 0;
1240         int             page_offset = 0;
1241         int             i;
1242         struct page    *page;
1243         kib_tx_t       *tx;
1244         kib_net_t      *net = ni->ni_data;
1245
1246         LASSERT (net != NULL);
1247
1248         /* pre-mapped messages are not bigger than 1 page */
1249         CLASSERT (IBLND_MSG_SIZE <= PAGE_SIZE);
1250
1251         /* No fancy arithmetic when we do the buffer calculations */
1252         CLASSERT (PAGE_SIZE % IBLND_MSG_SIZE == 0);
1253
1254         for (i = 0; i < IBLND_TX_MSGS(); i++) {
1255                 page = net->ibn_tx_pages->ibp_pages[ipage];
1256                 tx = &net->ibn_tx_descs[i];
1257
1258                 tx->tx_msg = (kib_msg_t *)(((char *)page_address(page)) +
1259                                            page_offset);
1260
1261                 tx->tx_msgaddr = kiblnd_dma_map_single(
1262                         net->ibn_dev->ibd_cmid->device,
1263                         tx->tx_msg, IBLND_MSG_SIZE, DMA_TO_DEVICE);
1264                 KIBLND_UNMAP_ADDR_SET(tx, tx_msgunmap, tx->tx_msgaddr);
1265
1266                 list_add(&tx->tx_list, &net->ibn_idle_txs);
1267
1268                 page_offset += IBLND_MSG_SIZE;
1269                 LASSERT (page_offset <= PAGE_SIZE);
1270
1271                 if (page_offset == PAGE_SIZE) {
1272                         page_offset = 0;
1273                         ipage++;
1274                         LASSERT (ipage <= IBLND_TX_MSG_PAGES());
1275                 }
1276         }
1277 }
1278
1279 void
1280 kiblnd_base_shutdown (void)
1281 {
1282         int i;
1283
1284         LASSERT (list_empty(&kiblnd_data.kib_devs));
1285
1286         CDEBUG(D_MALLOC, "before LND base cleanup: kmem %d\n",
1287                atomic_read(&libcfs_kmemory));
1288
1289         switch (kiblnd_data.kib_init) {
1290         default:
1291                 LBUG();
1292
1293         case IBLND_INIT_ALL:
1294         case IBLND_INIT_DATA:
1295                 LASSERT (kiblnd_data.kib_peers != NULL);
1296                 for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {
1297                         LASSERT (list_empty(&kiblnd_data.kib_peers[i]));
1298                 }
1299                 LASSERT (list_empty(&kiblnd_data.kib_connd_zombies));
1300                 LASSERT (list_empty(&kiblnd_data.kib_connd_conns));
1301
1302                 /* flag threads to terminate; wake and wait for them to die */
1303                 kiblnd_data.kib_shutdown = 1;
1304                 wake_up_all(&kiblnd_data.kib_sched_waitq);
1305                 wake_up_all(&kiblnd_data.kib_connd_waitq);
1306
1307                 i = 2;
1308                 while (atomic_read(&kiblnd_data.kib_nthreads) != 0) {
1309                         i++;
1310                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1311                                "Waiting for %d threads to terminate\n",
1312                                atomic_read(&kiblnd_data.kib_nthreads));
1313                         cfs_pause(cfs_time_seconds(1));
1314                 }
1315
1316                 /* fall through */
1317
1318         case IBLND_INIT_NOTHING:
1319                 break;
1320         }
1321
1322         if (kiblnd_data.kib_peers != NULL)
1323                 LIBCFS_FREE(kiblnd_data.kib_peers,
1324                             sizeof(struct list_head) *
1325                             kiblnd_data.kib_peer_hash_size);
1326
1327         CDEBUG(D_MALLOC, "after LND base cleanup: kmem %d\n",
1328                atomic_read(&libcfs_kmemory));
1329
1330         kiblnd_data.kib_init = IBLND_INIT_NOTHING;
1331         PORTAL_MODULE_UNUSE;
1332 }
1333
1334 void
1335 kiblnd_shutdown (lnet_ni_t *ni)
1336 {
1337         kib_net_t        *net = ni->ni_data;
1338         rwlock_t         *g_lock = &kiblnd_data.kib_global_lock;
1339         int               i;
1340         unsigned long     flags;
1341
1342         LASSERT(kiblnd_data.kib_init == IBLND_INIT_ALL);
1343
1344         if (net == NULL)
1345                 goto out;
1346
1347         CDEBUG(D_MALLOC, "before LND net cleanup: kmem %d\n",
1348                atomic_read(&libcfs_kmemory));
1349
1350         write_lock_irqsave(g_lock, flags);
1351         net->ibn_shutdown = 1;
1352         write_unlock_irqrestore(g_lock, flags);
1353
1354         switch (net->ibn_init) {
1355         default:
1356                 LBUG();
1357
1358         case IBLND_INIT_ALL:
1359                 /* nuke all existing peers within this net */
1360                 kiblnd_del_peer(ni, LNET_NID_ANY);
1361
1362                 /* Wait for all peer state to clean up */
1363                 i = 2;
1364                 while (atomic_read(&net->ibn_npeers) != 0) {
1365                         i++;
1366                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* 2**n? */
1367                                "%s: waiting for %d peers to disconnect\n",
1368                                libcfs_nid2str(ni->ni_nid),
1369                                atomic_read(&net->ibn_npeers));
1370                         cfs_pause(cfs_time_seconds(1));
1371                 }
1372
1373                 kiblnd_unmap_tx_descs(ni);
1374
1375                 LASSERT (net->ibn_dev->ibd_nnets > 0);
1376                 net->ibn_dev->ibd_nnets--;
1377
1378                 /* fall through */
1379
1380         case IBLND_INIT_NOTHING:
1381                 LASSERT (atomic_read(&net->ibn_nconns) == 0);
1382
1383 #if IBLND_MAP_ON_DEMAND
1384                 if (net->ibn_fmrpool != NULL)
1385                         ib_destroy_fmr_pool(net->ibn_fmrpool);
1386 #endif
1387                 if (net->ibn_dev != NULL &&
1388                     net->ibn_dev->ibd_nnets == 0)
1389                         kiblnd_destroy_dev(net->ibn_dev);
1390
1391                 break;
1392         }
1393
1394         kiblnd_free_tx_descs(ni);
1395
1396         CDEBUG(D_MALLOC, "after LND net cleanup: kmem %d\n",
1397                atomic_read(&libcfs_kmemory));
1398
1399         net->ibn_init = IBLND_INIT_NOTHING;
1400         ni->ni_data = NULL;
1401         
1402         LIBCFS_FREE(net, sizeof(*net));
1403
1404 out:
1405         if (list_empty(&kiblnd_data.kib_devs))
1406                 kiblnd_base_shutdown();
1407         return;
1408 }
1409
1410 int
1411 kiblnd_base_startup (void)
1412 {
1413         int               rc;
1414         int               i;
1415
1416         LASSERT (kiblnd_data.kib_init == IBLND_INIT_NOTHING);
1417
1418         if (*kiblnd_tunables.kib_credits > *kiblnd_tunables.kib_ntx) {
1419                 CERROR("Can't set credits(%d) > ntx(%d)\n",
1420                        *kiblnd_tunables.kib_credits,
1421                        *kiblnd_tunables.kib_ntx);
1422                 return -EINVAL;
1423         }
1424
1425         PORTAL_MODULE_USE;
1426         memset(&kiblnd_data, 0, sizeof(kiblnd_data)); /* zero pointers, flags etc */
1427
1428         rwlock_init(&kiblnd_data.kib_global_lock);
1429
1430         INIT_LIST_HEAD(&kiblnd_data.kib_devs);
1431
1432         kiblnd_data.kib_peer_hash_size = IBLND_PEER_HASH_SIZE;
1433         LIBCFS_ALLOC(kiblnd_data.kib_peers,
1434                      sizeof(struct list_head) * kiblnd_data.kib_peer_hash_size);
1435         if (kiblnd_data.kib_peers == NULL) {
1436                 goto failed;
1437         }
1438         for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++)
1439                 INIT_LIST_HEAD(&kiblnd_data.kib_peers[i]);
1440
1441         spin_lock_init(&kiblnd_data.kib_connd_lock);
1442         INIT_LIST_HEAD(&kiblnd_data.kib_connd_conns);
1443         INIT_LIST_HEAD(&kiblnd_data.kib_connd_zombies);
1444         init_waitqueue_head(&kiblnd_data.kib_connd_waitq);
1445
1446         spin_lock_init(&kiblnd_data.kib_sched_lock);
1447         INIT_LIST_HEAD(&kiblnd_data.kib_sched_conns);
1448         init_waitqueue_head(&kiblnd_data.kib_sched_waitq);
1449
1450         kiblnd_data.kib_error_qpa.qp_state = IB_QPS_ERR;
1451
1452         /* lists/ptrs/locks initialised */
1453         kiblnd_data.kib_init = IBLND_INIT_DATA;
1454         /*****************************************************/
1455
1456         for (i = 0; i < IBLND_N_SCHED; i++) {
1457                 rc = kiblnd_thread_start(kiblnd_scheduler, (void *)((long)i));
1458                 if (rc != 0) {
1459                         CERROR("Can't spawn o2iblnd scheduler[%d]: %d\n",
1460                                i, rc);
1461                         goto failed;
1462                 }
1463         }
1464
1465         rc = kiblnd_thread_start(kiblnd_connd, NULL);
1466         if (rc != 0) {
1467                 CERROR("Can't spawn o2iblnd connd: %d\n", rc);
1468                 goto failed;
1469         }
1470
1471         /* flag everything initialised */
1472         kiblnd_data.kib_init = IBLND_INIT_ALL;
1473         /*****************************************************/
1474
1475         return 0;
1476
1477  failed:
1478         kiblnd_base_shutdown();
1479         return -ENETDOWN;
1480 }
1481
1482 int
1483 kiblnd_startup (lnet_ni_t *ni)
1484 {
1485         char                     *ifname;
1486         kib_net_t                *net;
1487         kib_dev_t                *ibdev;
1488         struct list_head         *tmp;
1489         struct timeval            tv;
1490         int                       rc;
1491
1492         LASSERT (ni->ni_lnd == &the_kiblnd);
1493
1494         if (kiblnd_data.kib_init == IBLND_INIT_NOTHING) {
1495                 rc = kiblnd_base_startup();
1496                 if (rc != 0)
1497                         return rc;
1498         }
1499
1500         LIBCFS_ALLOC(net, sizeof(*net));
1501         ni->ni_data = net;
1502         if (net == NULL)
1503                 goto failed;
1504
1505         memset(net, 0, sizeof(*net));
1506
1507         do_gettimeofday(&tv);
1508         net->ibn_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1509
1510         ni->ni_maxtxcredits = *kiblnd_tunables.kib_credits;
1511         ni->ni_peertxcredits = *kiblnd_tunables.kib_peercredits;
1512
1513         spin_lock_init(&net->ibn_tx_lock);
1514         INIT_LIST_HEAD(&net->ibn_idle_txs);
1515
1516         rc = kiblnd_alloc_tx_descs(ni);
1517         if (rc != 0) {
1518                 CERROR("Can't allocate tx descs\n");
1519                 goto failed;
1520         }
1521
1522         if (ni->ni_interfaces[0] != NULL) {
1523                 /* Use the IPoIB interface specified in 'networks=' */
1524
1525                 CLASSERT (LNET_MAX_INTERFACES > 1);
1526                 if (ni->ni_interfaces[1] != NULL) {
1527                         CERROR("Multiple interfaces not supported\n");
1528                         goto failed;
1529                 }
1530
1531                 ifname = ni->ni_interfaces[0];
1532         } else {
1533                 ifname = *kiblnd_tunables.kib_default_ipif;
1534         }
1535
1536         if (strlen(ifname) >= sizeof(ibdev->ibd_ifname)) {
1537                 CERROR("IPoIB interface name too long: %s\n", ifname);
1538                 goto failed;
1539         }
1540
1541         ibdev = NULL;
1542         list_for_each (tmp, &kiblnd_data.kib_devs) {
1543                 ibdev = list_entry(tmp, kib_dev_t, ibd_list);
1544
1545                 if (!strcmp(&ibdev->ibd_ifname[0], ifname))
1546                         break;
1547
1548                 ibdev = NULL;
1549         }
1550
1551         if (ibdev == NULL) {
1552                 __u32                     ip;
1553                 __u32                     netmask;
1554                 int                       up;
1555                 struct rdma_cm_id        *id;
1556                 struct ib_pd             *pd;
1557                 struct ib_mr             *mr;
1558                 struct sockaddr_in        addr;
1559
1560                 rc = libcfs_ipif_query(ifname, &up, &ip, &netmask);
1561                 if (rc != 0) {
1562                         CERROR("Can't query IPoIB interface %s: %d\n",
1563                                ifname, rc);
1564                         goto failed;
1565                 }
1566
1567                 if (!up) {
1568                         CERROR("Can't query IPoIB interface %s: it's down\n",
1569                                ifname);
1570                         goto failed;
1571                 }
1572
1573                 LIBCFS_ALLOC(ibdev, sizeof(*ibdev));
1574                 if (ibdev == NULL)
1575                         goto failed;
1576
1577                 memset(ibdev, 0, sizeof(*ibdev));
1578
1579                 INIT_LIST_HEAD(&ibdev->ibd_list); /* not yet in kib_devs */
1580                 ibdev->ibd_ifip = ip;
1581                 strcpy(&ibdev->ibd_ifname[0], ifname);
1582
1583                 id = rdma_create_id(kiblnd_cm_callback, ibdev, RDMA_PS_TCP);
1584                 if (!IS_ERR(id)) {
1585                         ibdev->ibd_cmid = id;
1586                 } else {
1587                         CERROR("Can't create listen ID: %ld\n", PTR_ERR(id));
1588                         goto failed;
1589                 }
1590
1591                 memset(&addr, 0, sizeof(addr));
1592                 addr.sin_family      = AF_INET;
1593                 addr.sin_port        = htons(*kiblnd_tunables.kib_service);
1594                 addr.sin_addr.s_addr = htonl(ip);
1595
1596                 rc = rdma_bind_addr(id, (struct sockaddr *)&addr);
1597                 if (rc != 0) {
1598                         CERROR("Can't bind to %s: %d\n", ifname, rc);
1599                         goto failed;
1600                 }
1601
1602                 /* Binding should have assigned me an IB device */
1603                 LASSERT (id->device != NULL);
1604
1605                 pd = ib_alloc_pd(id->device);
1606                 if (!IS_ERR(pd)) {
1607                         ibdev->ibd_pd = pd;
1608                 } else {
1609                         CERROR("Can't allocate PD: %ld\n", PTR_ERR(pd));
1610                         goto failed;
1611                 }
1612
1613 #if IBLND_MAP_ON_DEMAND
1614                 /* MR for sends and receives */
1615                 mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE);
1616 #else
1617                 /* MR for sends, recieves _and_ RDMA...........v */
1618                 mr = ib_get_dma_mr(pd, IB_ACCESS_LOCAL_WRITE |
1619                                        IB_ACCESS_REMOTE_WRITE);
1620 #endif
1621                 if (!IS_ERR(mr)) {
1622                         ibdev->ibd_mr = mr;
1623                 } else {
1624                         CERROR("Can't get MR: %ld\n", PTR_ERR(mr));
1625                         goto failed;
1626                 }
1627
1628                 rc = rdma_listen(id, 0);
1629                 if (rc != 0) {
1630                         CERROR("Can't start listener: %d\n", rc);
1631                         goto failed;
1632                 }
1633
1634                 list_add_tail(&ibdev->ibd_list, 
1635                               &kiblnd_data.kib_devs);
1636         }
1637
1638         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ibdev->ibd_ifip);
1639         net->ibn_dev = ibdev;
1640
1641 #if IBLND_MAP_ON_DEMAND
1642         /* FMR pool for RDMA */
1643         {
1644                 struct ib_fmr_pool      *fmrpool;
1645                 struct ib_fmr_pool_param param = {
1646                         .max_pages_per_fmr = LNET_MAX_PAYLOAD/PAGE_SIZE,
1647                         .page_shift        = PAGE_SHIFT,
1648                         .access            = (IB_ACCESS_LOCAL_WRITE |
1649                                               IB_ACCESS_REMOTE_WRITE),
1650                         .pool_size         = *kiblnd_tunables.kib_fmr_pool_size,
1651                         .dirty_watermark   = *kiblnd_tunables.kib_fmr_flush_trigger,
1652                         .flush_function    = NULL,
1653                         .flush_arg         = NULL,
1654                         .cache             = *kiblnd_tunables.kib_fmr_cache};
1655
1656                 if (*kiblnd_tunables.kib_fmr_pool_size < 
1657                     *kiblnd_tunables.kib_ntx) {
1658                         CERROR("Can't set fmr pool size (%d) < ntx(%d)\n",
1659                                *kiblnd_tunables.kib_fmr_pool_size,
1660                                *kiblnd_tunables.kib_ntx);
1661                         goto failed;
1662                 }
1663
1664                 fmrpool = ib_create_fmr_pool(ibdev->ibd_pd, &param);
1665                 if (!IS_ERR(fmrpool)) {
1666                         net->ibn_fmrpool = fmrpool;
1667                 } else {
1668                         CERROR("Can't create FMR pool: %ld\n", 
1669                                PTR_ERR(fmrpool));
1670                         goto failed;
1671                 }
1672         }
1673 #endif
1674
1675         kiblnd_map_tx_descs(ni);
1676
1677         ibdev->ibd_nnets++;
1678         net->ibn_init = IBLND_INIT_ALL;
1679
1680         return 0;
1681
1682 failed:
1683         kiblnd_shutdown(ni);
1684
1685         CDEBUG(D_NET, "kiblnd_startup failed\n");
1686         return -ENETDOWN;
1687 }
1688
1689 void __exit
1690 kiblnd_module_fini (void)
1691 {
1692         lnet_unregister_lnd(&the_kiblnd);
1693         kiblnd_tunables_fini();
1694 }
1695
1696 int __init
1697 kiblnd_module_init (void)
1698 {
1699         int    rc;
1700
1701         CLASSERT (sizeof(kib_msg_t) <= IBLND_MSG_SIZE);
1702 #if !IBLND_MAP_ON_DEMAND
1703         CLASSERT (offsetof(kib_msg_t, ibm_u.get.ibgm_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
1704                   <= IBLND_MSG_SIZE);
1705         CLASSERT (offsetof(kib_msg_t, ibm_u.putack.ibpam_rd.rd_frags[IBLND_MAX_RDMA_FRAGS])
1706                   <= IBLND_MSG_SIZE);
1707 #endif
1708         rc = kiblnd_tunables_init();
1709         if (rc != 0)
1710                 return rc;
1711
1712         lnet_register_lnd(&the_kiblnd);
1713
1714         return 0;
1715 }
1716
1717 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1718 MODULE_DESCRIPTION("Kernel OpenIB gen2 LND v1.00");
1719 MODULE_LICENSE("GPL");
1720
1721 module_init(kiblnd_module_init);
1722 module_exit(kiblnd_module_fini);