Whamcloud - gitweb
Land b_release_1_4_3 onto HEAD (20050619_0305)
[fs/lustre-release.git] / lnet / klnds / iiblnd / iiblnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "iibnal.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 static void
31 kibnal_schedule_tx_done (kib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38         wake_up (&kibnal_data.kib_sched_waitq);
39
40         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
41 }
42
43 static void
44 kibnal_tx_done (kib_tx_t *tx)
45 {
46         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
47         unsigned long    flags;
48         int              i;
49         FSTATUS          frc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
53
54         switch (tx->tx_mapped) {
55         default:
56                 LBUG();
57
58         case KIB_TX_UNMAPPED:
59                 break;
60
61         case KIB_TX_MAPPED:
62                 if (in_interrupt()) {
63                         /* can't deregister memory in IRQ context... */
64                         kibnal_schedule_tx_done(tx);
65                         return;
66                 }
67                 frc = iibt_deregister_memory(tx->tx_md.md_handle);
68                 LASSERT (frc == FSUCCESS);
69                 tx->tx_mapped = KIB_TX_UNMAPPED;
70                 break;
71
72 #if IBNAL_FMR
73         case KIB_TX_MAPPED_FMR:
74                 if (in_interrupt() && tx->tx_status != 0) {
75                         /* can't flush FMRs in IRQ context... */
76                         kibnal_schedule_tx_done(tx);
77                         return;
78                 }              
79
80                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
81                 LASSERT (rc == 0);
82
83                 if (tx->tx_status != 0)
84                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
85                 tx->tx_mapped = KIB_TX_UNMAPPED;
86                 break;
87 #endif
88         }
89
90         for (i = 0; i < 2; i++) {
91                 /* tx may have up to 2 libmsgs to finalise */
92                 if (tx->tx_libmsg[i] == NULL)
93                         continue;
94
95                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
96                 tx->tx_libmsg[i] = NULL;
97         }
98         
99         if (tx->tx_conn != NULL) {
100                 kibnal_put_conn (tx->tx_conn);
101                 tx->tx_conn = NULL;
102         }
103
104         tx->tx_nsp = 0;
105         tx->tx_passive_rdma = 0;
106         tx->tx_status = 0;
107
108         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
109
110         if (tx->tx_isnblk) {
111                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
112         } else {
113                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
114                 wake_up (&kibnal_data.kib_idle_tx_waitq);
115         }
116
117         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
118 }
119
120 static kib_tx_t *
121 kibnal_get_idle_tx (int may_block) 
122 {
123         unsigned long  flags;
124         kib_tx_t      *tx = NULL;
125         ENTRY;
126         
127         for (;;) {
128                 spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
129
130                 /* "normal" descriptor is free */
131                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
132                         tx = list_entry (kibnal_data.kib_idle_txs.next,
133                                          kib_tx_t, tx_list);
134                         break;
135                 }
136
137                 if (!may_block) {
138                         /* may dip into reserve pool */
139                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
140                                 CERROR ("reserved tx desc pool exhausted\n");
141                                 break;
142                         }
143
144                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
145                                          kib_tx_t, tx_list);
146                         break;
147                 }
148
149                 /* block for idle tx */
150                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
151
152                 wait_event (kibnal_data.kib_idle_tx_waitq,
153                             !list_empty (&kibnal_data.kib_idle_txs) ||
154                             kibnal_data.kib_shutdown);
155         }
156
157         if (tx != NULL) {
158                 list_del (&tx->tx_list);
159
160                 /* Allocate a new passive RDMA completion cookie.  It might
161                  * not be needed, but we've got a lock right now and we're
162                  * unlikely to wrap... */
163                 tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
164
165                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
166                 LASSERT (tx->tx_nsp == 0);
167                 LASSERT (tx->tx_sending == 0);
168                 LASSERT (tx->tx_status == 0);
169                 LASSERT (tx->tx_conn == NULL);
170                 LASSERT (!tx->tx_passive_rdma);
171                 LASSERT (!tx->tx_passive_rdma_wait);
172                 LASSERT (tx->tx_libmsg[0] == NULL);
173                 LASSERT (tx->tx_libmsg[1] == NULL);
174         }
175
176         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
177         
178         RETURN(tx);
179 }
180
181 static int
182 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
183 {
184         /* I would guess that if kibnal_get_peer (nid) == NULL,
185            and we're not routing, then 'nid' is very distant :) */
186         if ( nal->libnal_ni.ni_pid.nid == nid ) {
187                 *dist = 0;
188         } else {
189                 *dist = 1;
190         }
191
192         return 0;
193 }
194
195 static void
196 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
197 {
198         struct list_head *ttmp;
199         unsigned long     flags;
200         int               idle;
201
202         spin_lock_irqsave (&conn->ibc_lock, flags);
203
204         list_for_each (ttmp, &conn->ibc_active_txs) {
205                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
206
207                 LASSERT (tx->tx_passive_rdma ||
208                          !tx->tx_passive_rdma_wait);
209
210                 LASSERT (tx->tx_passive_rdma_wait ||
211                          tx->tx_sending != 0);
212
213                 if (!tx->tx_passive_rdma_wait ||
214                     tx->tx_passive_rdma_cookie != cookie)
215                         continue;
216
217                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
218
219                 tx->tx_status = status;
220                 tx->tx_passive_rdma_wait = 0;
221                 idle = (tx->tx_sending == 0);
222
223                 if (idle)
224                         list_del (&tx->tx_list);
225
226                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
227
228                 /* I could be racing with tx callbacks.  It's whoever
229                  * _makes_ tx idle that frees it */
230                 if (idle)
231                         kibnal_tx_done (tx);
232                 return;
233         }
234                 
235         spin_unlock_irqrestore (&conn->ibc_lock, flags);
236
237         CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
238                 cookie, conn->ibc_peer->ibp_nid);
239 }
240
241 static __u32
242 kibnal_lkey(kib_pages_t *ibp)
243 {
244         if (kibnal_whole_mem())
245                 return kibnal_data.kib_md.md_lkey;
246
247         return ibp->ibp_lkey;
248 }
249
250 static void
251 kibnal_post_rx (kib_rx_t *rx, int do_credits)
252 {
253         kib_conn_t   *conn = rx->rx_conn;
254         int           rc = 0;
255         unsigned long flags;
256         FSTATUS       frc;
257         ENTRY;
258
259         rx->rx_gl = (IB_LOCAL_DATASEGMENT) {
260                 .Address = rx->rx_vaddr,
261                 .Length  = IBNAL_MSG_SIZE,
262                 .Lkey    = kibnal_lkey(conn->ibc_rx_pages),
263         };
264
265         rx->rx_wrq = (IB_WORK_REQ) {
266                 .Operation              = WROpRecv,
267                 .DSListDepth            = 1,
268                 .MessageLen             = IBNAL_MSG_SIZE,
269                 .WorkReqId              = kibnal_ptr2wreqid(rx, 1),
270                 .DSList                 = &rx->rx_gl,
271         };
272
273         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_ESTABLISHED,
274                                     IBNAL_CONN_DREP);
275         LASSERT (!rx->rx_posted);
276         rx->rx_posted = 1;
277         mb();
278
279         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
280                 rc = -ECONNABORTED;
281         else {
282                 frc = iibt_postrecv(conn->ibc_qp, &rx->rx_wrq);
283                 if (frc != FSUCCESS) {
284                         CDEBUG(D_NET, "post failed %d\n", frc);
285                         rc = -EINVAL;
286                 }
287                 CDEBUG(D_NET, "posted rx %p\n", &rx->rx_wrq);
288         }
289
290         if (rc == 0) {
291                 if (do_credits) {
292                         spin_lock_irqsave(&conn->ibc_lock, flags);
293                         conn->ibc_outstanding_credits++;
294                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
295
296                         kibnal_check_sends(conn);
297                 }
298                 EXIT;
299                 return;
300         }
301
302         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
303                 CERROR ("Error posting receive -> "LPX64": %d\n",
304                         conn->ibc_peer->ibp_nid, rc);
305                 kibnal_close_conn (rx->rx_conn, rc);
306         } else {
307                 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
308                         conn->ibc_peer->ibp_nid, rc);
309         }
310
311         /* Drop rx's ref */
312         kibnal_put_conn (conn);
313         EXIT;
314 }
315
316 #if IBNAL_CKSUM
317 static inline __u32 kibnal_cksum (void *ptr, int nob)
318 {
319         char  *c  = ptr;
320         __u32  sum = 0;
321
322         while (nob-- > 0)
323                 sum = ((sum << 1) | (sum >> 31)) + *c++;
324         
325         return (sum);
326 }
327 #endif
328
329 static void hexdump(char *string, void *ptr, int len)
330 {
331         unsigned char *c = ptr;
332         int i;
333
334         return;
335
336         if (len < 0 || len > 2048)  {
337                 printk("XXX what the hell? %d\n",len);
338                 return;
339         }
340
341         printk("%d bytes of '%s' from 0x%p\n", len, string, ptr);
342
343         for (i = 0; i < len;) {
344                 printk("%02x",*(c++));
345                 i++;
346                 if (!(i & 15)) {
347                         printk("\n");
348                 } else if (!(i&1)) {
349                         printk(" ");
350                 }
351         }
352
353         if(len & 15) {
354                 printk("\n");
355         }
356 }
357
358 static void
359 kibnal_rx_callback (IB_WORK_COMPLETION *wc)
360 {
361         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
362         kib_msg_t    *msg = rx->rx_msg;
363         kib_conn_t   *conn = rx->rx_conn;
364         int           nob = wc->Length;
365         const int     base_nob = offsetof(kib_msg_t, ibm_u);
366         int           credits;
367         int           flipped;
368         unsigned long flags;
369         __u32         i;
370 #if IBNAL_CKSUM
371         __u32         msg_cksum;
372         __u32         computed_cksum;
373 #endif
374
375         /* we set the QP to erroring after we've finished disconnecting, 
376          * maybe we should do so sooner. */
377         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_ESTABLISHED, 
378                                     IBNAL_CONN_DISCONNECTED);
379
380         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
381         LASSERT (rx->rx_posted);
382         rx->rx_posted = 0;
383         mb();
384
385         /* receives complete with error in any case after we've started
386          * disconnecting */
387         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
388                 goto failed;
389
390         if (wc->Status != WRStatusSuccess) {
391                 CERROR("Rx from "LPX64" failed: %d\n", 
392                        conn->ibc_peer->ibp_nid, wc->Status);
393                 goto failed;
394         }
395
396         if (nob < base_nob) {
397                 CERROR ("Short rx from "LPX64": %d < expected %d\n",
398                         conn->ibc_peer->ibp_nid, nob, base_nob);
399                 goto failed;
400         }
401
402         hexdump("rx", rx->rx_msg, sizeof(kib_msg_t));
403
404         /* Receiver does any byte flipping if necessary... */
405
406         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
407                 flipped = 0;
408         } else {
409                 if (msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
410                         CERROR ("Unrecognised magic: %08x from "LPX64"\n", 
411                                 msg->ibm_magic, conn->ibc_peer->ibp_nid);
412                         goto failed;
413                 }
414                 flipped = 1;
415                 __swab16s (&msg->ibm_version);
416                 LASSERT (sizeof(msg->ibm_type) == 1);
417                 LASSERT (sizeof(msg->ibm_credits) == 1);
418         }
419
420         if (msg->ibm_version != IBNAL_MSG_VERSION) {
421                 CERROR ("Incompatible msg version %d (%d expected)\n",
422                         msg->ibm_version, IBNAL_MSG_VERSION);
423                 goto failed;
424         }
425
426 #if IBNAL_CKSUM
427         if (nob != msg->ibm_nob) {
428                 CERROR ("Unexpected # bytes %d (%d expected)\n", nob, msg->ibm_nob);
429                 goto failed;
430         }
431
432         msg_cksum = le32_to_cpu(msg->ibm_cksum);
433         msg->ibm_cksum = 0;
434         computed_cksum = kibnal_cksum (msg, nob);
435         
436         if (msg_cksum != computed_cksum) {
437                 CERROR ("Checksum failure %d: (%d expected)\n",
438                         computed_cksum, msg_cksum);
439 //                goto failed;
440         }
441         CDEBUG(D_NET, "cksum %x, nob %d\n", computed_cksum, nob);
442 #endif
443
444         /* Have I received credits that will let me send? */
445         credits = msg->ibm_credits;
446         if (credits != 0) {
447                 spin_lock_irqsave(&conn->ibc_lock, flags);
448                 conn->ibc_credits += credits;
449                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
450                 
451                 kibnal_check_sends(conn);
452         }
453
454         switch (msg->ibm_type) {
455         case IBNAL_MSG_NOOP:
456                 kibnal_post_rx (rx, 1);
457                 return;
458
459         case IBNAL_MSG_IMMEDIATE:
460                 if (nob < base_nob + sizeof (kib_immediate_msg_t)) {
461                         CERROR ("Short IMMEDIATE from "LPX64": %d\n",
462                                 conn->ibc_peer->ibp_nid, nob);
463                         goto failed;
464                 }
465                 break;
466                 
467         case IBNAL_MSG_PUT_RDMA:
468         case IBNAL_MSG_GET_RDMA:
469                 if (nob < base_nob + sizeof (kib_rdma_msg_t)) {
470                         CERROR ("Short RDMA msg from "LPX64": %d\n",
471                                 conn->ibc_peer->ibp_nid, nob);
472                         goto failed;
473                 }
474                 if (flipped) 
475                         __swab32(msg->ibm_u.rdma.ibrm_num_descs);
476
477                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64":\n",
478                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie);
479
480                 if ((msg->ibm_u.rdma.ibrm_num_descs > PTL_MD_MAX_IOV) ||
481                     (kib_rdma_msg_len(msg->ibm_u.rdma.ibrm_num_descs) > 
482                      min(nob, IBNAL_MSG_SIZE))) {
483                         CERROR ("num_descs %d too large\n", 
484                                 msg->ibm_u.rdma.ibrm_num_descs);
485                         goto failed;
486                 }
487
488                 if (flipped) {
489                         __swab32(msg->ibm_u.rdma.rd_key);
490                 }
491
492                 for(i = 0; i < msg->ibm_u.rdma.ibrm_num_descs; i++) {
493                         kib_rdma_desc_t *desc = &msg->ibm_u.rdma.ibrm_desc[i];
494
495                         if (flipped) {
496                                 __swab32(desc->rd_nob);
497                                 __swab64(desc->rd_addr);
498                         }
499
500                         CDEBUG(D_NET, "  key %x, " "addr "LPX64", nob %u\n",
501                                msg->ibm_u.rdma.rd_key, desc->rd_addr, desc->rd_nob);
502                 }
503                 break;
504                         
505         case IBNAL_MSG_PUT_DONE:
506         case IBNAL_MSG_GET_DONE:
507                 if (nob < base_nob + sizeof (kib_completion_msg_t)) {
508                         CERROR ("Short COMPLETION msg from "LPX64": %d\n",
509                                 conn->ibc_peer->ibp_nid, nob);
510                         goto failed;
511                 }
512                 if (flipped)
513                         __swab32s(&msg->ibm_u.completion.ibcm_status);
514                 
515                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
516                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
517                        msg->ibm_u.completion.ibcm_status);
518
519                 kibnal_complete_passive_rdma (conn, 
520                                               msg->ibm_u.completion.ibcm_cookie,
521                                               msg->ibm_u.completion.ibcm_status);
522                 kibnal_post_rx (rx, 1);
523                 return;
524                         
525         default:
526                 CERROR ("Can't parse type from "LPX64": %d\n",
527                         conn->ibc_peer->ibp_nid, msg->ibm_type);
528                 goto failed;
529         }
530
531         /* schedule for kibnal_rx() in thread context */
532         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
533         
534         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
535         wake_up (&kibnal_data.kib_sched_waitq);
536         
537         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
538         return;
539         
540  failed:
541         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
542         kibnal_close_conn(conn, -ECONNABORTED);
543
544         /* Don't re-post rx & drop its ref on conn */
545         kibnal_put_conn(conn);
546 }
547
548 void
549 kibnal_rx (kib_rx_t *rx)
550 {
551         kib_msg_t   *msg = rx->rx_msg;
552
553         /* Clear flag so I can detect if I've sent an RDMA completion */
554         rx->rx_rdma = 0;
555
556         switch (msg->ibm_type) {
557         case IBNAL_MSG_GET_RDMA:
558                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
559                 /* If the incoming get was matched, I'll have initiated the
560                  * RDMA and the completion message... */
561                 if (rx->rx_rdma)
562                         break;
563
564                 /* Otherwise, I'll send a failed completion now to prevent
565                  * the peer's GET blocking for the full timeout. */
566                 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
567                         rx->rx_conn->ibc_peer->ibp_nid);
568                 kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO,
569                                           rx, NULL, 0, NULL, NULL, 0, 0);
570                 break;
571                 
572         case IBNAL_MSG_PUT_RDMA:
573                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
574                 if (rx->rx_rdma)
575                         break;
576                 /* This is most unusual, since even if lib_parse() didn't
577                  * match anything, it should have asked us to read (and
578                  * discard) the payload.  The portals header must be
579                  * inconsistent with this message type, so it's the
580                  * sender's fault for sending garbage and she can time
581                  * herself out... */
582                 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
583                         rx->rx_conn->ibc_peer->ibp_nid);
584                 break;
585
586         case IBNAL_MSG_IMMEDIATE:
587                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
588                 LASSERT (!rx->rx_rdma);
589                 break;
590                 
591         default:
592                 LBUG();
593                 break;
594         }
595
596         kibnal_post_rx (rx, 1);
597 }
598
599 static struct page *
600 kibnal_kvaddr_to_page (unsigned long vaddr)
601 {
602         struct page *page;
603
604         if (vaddr >= VMALLOC_START &&
605             vaddr < VMALLOC_END)
606                 page = vmalloc_to_page ((void *)vaddr);
607 #if CONFIG_HIGHMEM
608         else if (vaddr >= PKMAP_BASE &&
609                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
610                 page = vmalloc_to_page ((void *)vaddr);
611         /* in 2.4 ^ just walks the page tables */
612 #endif
613         else
614                 page = virt_to_page (vaddr);
615
616         if (!VALID_PAGE (page))
617                 page = NULL;
618
619         return page;
620 }
621
622 static void
623 kibnal_fill_ibrm(kib_tx_t *tx, struct page *page, unsigned long page_offset,
624                  unsigned long len, int active)
625 {
626         kib_rdma_msg_t *ibrm = &tx->tx_msg->ibm_u.rdma;
627         kib_rdma_desc_t *desc;
628
629         LASSERTF(ibrm->ibrm_num_descs < PTL_MD_MAX_IOV, "%u\n", 
630                  ibrm->ibrm_num_descs);
631
632         desc = &ibrm->ibrm_desc[ibrm->ibrm_num_descs];
633         if (active)
634                 ibrm->rd_key = kibnal_data.kib_md.md_lkey;
635         else
636                 ibrm->rd_key = kibnal_data.kib_md.md_rkey;
637         desc->rd_nob = len; /*PAGE_SIZE - kiov->kiov_offset; */
638         desc->rd_addr = kibnal_page2phys(page) + page_offset +
639                         kibnal_data.kib_md.md_addr;
640
641         ibrm->ibrm_num_descs++;
642 }
643
644 static int
645 kibnal_map_rdma_iov(kib_tx_t *tx, unsigned long vaddr, int nob, int active)
646 {
647         struct page *page;
648         int page_offset, len;
649
650         while (nob > 0) {
651                 page = kibnal_kvaddr_to_page(vaddr);
652                 if (page == NULL)
653                         return -EFAULT;
654
655                 page_offset = vaddr & (PAGE_SIZE - 1);
656                 len = min(nob, (int)PAGE_SIZE - page_offset);
657                 
658                 kibnal_fill_ibrm(tx, page, page_offset, len, active);
659                 nob -= len;
660                 vaddr += len;
661         }
662         return 0;
663 }
664
665 static int
666 kibnal_map_iov (kib_tx_t *tx, IB_ACCESS_CONTROL access,
667                  int niov, struct iovec *iov, int offset, int nob, int active)
668                  
669 {
670         void   *vaddr;
671         FSTATUS frc;
672
673         LASSERT (nob > 0);
674         LASSERT (niov > 0);
675         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
676
677         while (offset >= iov->iov_len) {
678                 offset -= iov->iov_len;
679                 niov--;
680                 iov++;
681                 LASSERT (niov > 0);
682         }
683
684         if (nob > iov->iov_len - offset) {
685                 CERROR ("Can't map multiple vaddr fragments\n");
686                 return (-EMSGSIZE);
687         }
688
689         /* our large contiguous iov could be backed by multiple physical
690          * pages. */
691         if (kibnal_whole_mem()) {
692                 int rc;
693                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 0;
694                 rc = kibnal_map_rdma_iov(tx, (unsigned long)iov->iov_base + 
695                                          offset, nob, active);
696                 if (rc != 0) {
697                         CERROR ("Can't map iov: %d\n", rc);
698                         return rc;
699                 }
700                 return 0;
701         }
702
703         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
704         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
705
706         frc = iibt_register_memory(kibnal_data.kib_hca, vaddr, nob,
707                                    kibnal_data.kib_pd, access,
708                                    &tx->tx_md.md_handle, &tx->tx_md.md_lkey,
709                                    &tx->tx_md.md_rkey);
710         if (frc != 0) {
711                 CERROR ("Can't map vaddr %p: %d\n", vaddr, frc);
712                 return -EINVAL;
713         }
714
715         tx->tx_mapped = KIB_TX_MAPPED;
716         return (0);
717 }
718
719 static int
720 kibnal_map_kiov (kib_tx_t *tx, IB_ACCESS_CONTROL access,
721                   int nkiov, ptl_kiov_t *kiov,
722                   int offset, int nob, int active)
723 {
724         __u64                      *phys = NULL;
725         int                         page_offset;
726         int                         nphys;
727         int                         resid;
728         int                         phys_size = 0;
729         FSTATUS                     frc;
730         int                         i, rc = 0;
731
732         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
733
734         LASSERT (nob > 0);
735         LASSERT (nkiov > 0);
736         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
737
738         while (offset >= kiov->kiov_len) {
739                 offset -= kiov->kiov_len;
740                 nkiov--;
741                 kiov++;
742                 LASSERT (nkiov > 0);
743         }
744
745         page_offset = kiov->kiov_offset + offset;
746         nphys = 1;
747
748         if (!kibnal_whole_mem()) {
749                 phys_size = nkiov * sizeof (*phys);
750                 PORTAL_ALLOC(phys, phys_size);
751                 if (phys == NULL) {
752                         CERROR ("Can't allocate tmp phys\n");
753                         return (-ENOMEM);
754                 }
755
756                 phys[0] = kibnal_page2phys(kiov->kiov_page);
757         } else {
758                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 0;
759                 kibnal_fill_ibrm(tx, kiov->kiov_page, kiov->kiov_offset, 
760                                  kiov->kiov_len, active);
761         }
762
763         resid = nob - (kiov->kiov_len - offset);
764
765         while (resid > 0) {
766                 kiov++;
767                 nkiov--;
768                 LASSERT (nkiov > 0);
769
770                 if (kiov->kiov_offset != 0 ||
771                     ((resid > PAGE_SIZE) && 
772                      kiov->kiov_len < PAGE_SIZE)) {
773                         /* Can't have gaps */
774                         CERROR ("Can't make payload contiguous in I/O VM:"
775                                 "page %d, offset %d, len %d \n", nphys, 
776                                 kiov->kiov_offset, kiov->kiov_len);
777
778                         for (i = -nphys; i < nkiov; i++) 
779                         {
780                                 CERROR("kiov[%d] %p +%d for %d\n",
781                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
782                         }
783                         
784                         rc = -EINVAL;
785                         goto out;
786                 }
787
788                 if (nphys == PTL_MD_MAX_IOV) {
789                         CERROR ("payload too big (%d)\n", nphys);
790                         rc = -EMSGSIZE;
791                         goto out;
792                 }
793
794                 if (!kibnal_whole_mem()) {
795                         LASSERT (nphys * sizeof (*phys) < phys_size);
796                         phys[nphys] = kibnal_page2phys(kiov->kiov_page);
797                 } else {
798                         if (kib_rdma_msg_len(nphys) > IBNAL_MSG_SIZE) {
799                                 CERROR ("payload too big (%d)\n", nphys);
800                                 rc = -EMSGSIZE;
801                                 goto out;
802                         }
803                         kibnal_fill_ibrm(tx, kiov->kiov_page, 
804                                          kiov->kiov_offset, kiov->kiov_len,
805                                          active);
806                 }
807
808                 nphys ++;
809                 resid -= PAGE_SIZE;
810         }
811
812         if (kibnal_whole_mem())
813                 goto out;
814
815 #if 0
816         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
817         for (i = 0; i < nphys; i++)
818                 CWARN ("   [%d] "LPX64"\n", i, phys[i]);
819 #endif
820
821 #if IBNAL_FMR
822 #error "iibnal hasn't learned about FMR yet"
823         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
824                                        phys, nphys,
825                                        &tx->tx_md.md_addr,
826                                        page_offset,
827                                        &tx->tx_md.md_handle.fmr,
828                                        &tx->tx_md.md_lkey,
829                                        &tx->tx_md.md_rkey);
830 #else
831         frc = iibt_register_physical_memory(kibnal_data.kib_hca,
832                                             IBNAL_RDMA_BASE,
833                                             phys, nphys,
834                                             0,          /* offset */
835                                             kibnal_data.kib_pd,
836                                             access,
837                                             &tx->tx_md.md_handle,
838                                             &tx->tx_md.md_addr,
839                                             &tx->tx_md.md_lkey,
840                                             &tx->tx_md.md_rkey);
841 #endif
842         if (frc == FSUCCESS) {
843                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
844                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
845 #if IBNAL_FMR
846                 tx->tx_mapped = KIB_TX_MAPPED_FMR;
847 #else
848                 tx->tx_mapped = KIB_TX_MAPPED;
849 #endif
850         } else {
851                 CERROR ("Can't map phys: %d\n", frc);
852                 rc = -EFAULT;
853         }
854
855  out:
856         if (phys != NULL)
857                 PORTAL_FREE(phys, phys_size);
858         return (rc);
859 }
860
861 static kib_conn_t *
862 kibnal_find_conn_locked (kib_peer_t *peer)
863 {
864         struct list_head *tmp;
865
866         /* just return the first connection */
867         list_for_each (tmp, &peer->ibp_conns) {
868                 return (list_entry(tmp, kib_conn_t, ibc_list));
869         }
870
871         return (NULL);
872 }
873
874 void
875 kibnal_check_sends (kib_conn_t *conn)
876 {
877         unsigned long   flags;
878         kib_tx_t       *tx;
879         int             rc;
880         int             i;
881         int             done;
882         int             nwork;
883         ENTRY;
884
885         spin_lock_irqsave (&conn->ibc_lock, flags);
886
887         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
888
889         if (list_empty(&conn->ibc_tx_queue) &&
890             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
891                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
892                 
893                 tx = kibnal_get_idle_tx(0);     /* don't block */
894                 if (tx != NULL)
895                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
896
897                 spin_lock_irqsave(&conn->ibc_lock, flags);
898                 
899                 if (tx != NULL) {
900                         atomic_inc(&conn->ibc_refcount);
901                         kibnal_queue_tx_locked(tx, conn);
902                 }
903         }
904
905         while (!list_empty (&conn->ibc_tx_queue)) {
906                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
907
908                 /* We rely on this for QP sizing */
909                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= IBNAL_TX_MAX_SG);
910
911                 LASSERT (conn->ibc_outstanding_credits >= 0);
912                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
913                 LASSERT (conn->ibc_credits >= 0);
914                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
915
916                 /* Not on ibc_rdma_queue */
917                 LASSERT (!tx->tx_passive_rdma_wait);
918
919                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE)
920                         GOTO(out, 0);
921
922                 if (conn->ibc_credits == 0)     /* no credits */
923                         GOTO(out, 1);
924                 
925                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
926                     conn->ibc_outstanding_credits == 0) /* giving back credits */
927                         GOTO(out, 2);
928
929                 list_del (&tx->tx_list);
930
931                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
932                     (!list_empty(&conn->ibc_tx_queue) ||
933                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
934                         /* redundant NOOP */
935                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
936                         kibnal_tx_done(tx);
937                         spin_lock_irqsave(&conn->ibc_lock, flags);
938                         continue;
939                 }
940
941                 tx->tx_msg->ibm_credits = conn->ibc_outstanding_credits;
942                 conn->ibc_outstanding_credits = 0;
943
944                 conn->ibc_nsends_posted++;
945                 conn->ibc_credits--;
946
947                 /* we only get a tx completion for the final rdma op */ 
948                 tx->tx_sending = min(tx->tx_nsp, 2);
949                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
950                 list_add (&tx->tx_list, &conn->ibc_active_txs);
951 #if IBNAL_CKSUM
952                 tx->tx_msg->ibm_cksum = 0;
953                 tx->tx_msg->ibm_cksum = kibnal_cksum(tx->tx_msg, tx->tx_msg->ibm_nob);
954                 CDEBUG(D_NET, "cksum %x, nob %d\n", tx->tx_msg->ibm_cksum, tx->tx_msg->ibm_nob);
955 #endif
956                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
957
958                 /* NB the gap between removing tx from the queue and sending it
959                  * allows message re-ordering to occur */
960
961                 LASSERT (tx->tx_nsp > 0);
962
963                 rc = -ECONNABORTED;
964                 nwork = 0;
965                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
966                         tx->tx_status = 0;
967                         /* Driver only accepts 1 item at a time */
968                         for (i = 0; i < tx->tx_nsp; i++) {
969                                 hexdump("tx", tx->tx_msg, sizeof(kib_msg_t));
970                                 rc = iibt_postsend(conn->ibc_qp, 
971                                                    &tx->tx_wrq[i]);
972                                 if (rc != 0)
973                                         break;
974                                 if (wrq_signals_completion(&tx->tx_wrq[i]))
975                                         nwork++;
976                                 CDEBUG(D_NET, "posted tx wrq %p\n", 
977                                        &tx->tx_wrq[i]);
978                         }
979                 }
980
981                 spin_lock_irqsave (&conn->ibc_lock, flags);
982                 if (rc != 0) {
983                         /* NB credits are transferred in the actual
984                          * message, which can only be the last work item */
985                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
986                         conn->ibc_credits++;
987                         conn->ibc_nsends_posted--;
988
989                         tx->tx_status = rc;
990                         tx->tx_passive_rdma_wait = 0;
991                         tx->tx_sending -= tx->tx_nsp - nwork;
992
993                         done = (tx->tx_sending == 0);
994                         if (done)
995                                 list_del (&tx->tx_list);
996                         
997                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
998                         
999                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
1000                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
1001                                         rc, conn->ibc_peer->ibp_nid);
1002                         else
1003                                 CDEBUG (D_NET, "Error %d posting transmit to "
1004                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
1005
1006                         kibnal_close_conn (conn, rc);
1007
1008                         if (done)
1009                                 kibnal_tx_done (tx);
1010                         return;
1011                 }
1012                 
1013         }
1014
1015         EXIT;
1016 out:
1017         spin_unlock_irqrestore (&conn->ibc_lock, flags);
1018 }
1019
1020 static void
1021 kibnal_tx_callback (IB_WORK_COMPLETION *wc)
1022 {
1023         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
1024         kib_conn_t   *conn;
1025         unsigned long flags;
1026         int           idle;
1027
1028         conn = tx->tx_conn;
1029         LASSERT (conn != NULL);
1030         LASSERT (tx->tx_sending != 0);
1031
1032         spin_lock_irqsave(&conn->ibc_lock, flags);
1033
1034         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
1035                tx->tx_sending, tx->tx_nsp, wc->Status);
1036
1037         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1038          * gets to free it, which also drops its ref on 'conn'.  If it's
1039          * not me, then I take an extra ref on conn so it can't disappear
1040          * under me. */
1041
1042         tx->tx_sending--;
1043         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1044                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
1045         if (idle)
1046                 list_del(&tx->tx_list);
1047
1048         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1049                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1050                atomic_read (&conn->ibc_refcount));
1051         atomic_inc (&conn->ibc_refcount);
1052
1053         if (tx->tx_sending == 0)
1054                 conn->ibc_nsends_posted--;
1055
1056         if (wc->Status != WRStatusSuccess &&
1057             tx->tx_status == 0)
1058                 tx->tx_status = -ECONNABORTED;
1059                 
1060         spin_unlock_irqrestore(&conn->ibc_lock, flags);
1061
1062         if (idle)
1063                 kibnal_tx_done (tx);
1064
1065         if (wc->Status != WRStatusSuccess) {
1066                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
1067                         conn->ibc_peer->ibp_nid, wc->Status);
1068                 kibnal_close_conn (conn, -ENETDOWN);
1069         } else {
1070                 /* can I shovel some more sends out the door? */
1071                 kibnal_check_sends(conn);
1072         }
1073
1074         kibnal_put_conn (conn);
1075 }
1076
1077 void 
1078 kibnal_ca_async_callback (void *ca_arg, IB_EVENT_RECORD *ev)
1079 {
1080         /* XXX flesh out.  this seems largely for async errors */
1081         CERROR("type: %d code: %u\n", ev->EventType, ev->EventCode);
1082 }
1083
1084 void
1085 kibnal_ca_callback (void *ca_arg, void *cq_arg)
1086 {
1087         IB_HANDLE cq = *(IB_HANDLE *)cq_arg;
1088         IB_HANDLE ca = *(IB_HANDLE *)ca_arg;
1089         IB_WORK_COMPLETION wc;
1090         int armed = 0;
1091
1092         CDEBUG(D_NET, "ca %p cq %p\n", ca, cq);
1093
1094         for(;;) {
1095                 while (iibt_cq_poll(cq, &wc) == FSUCCESS) {
1096
1097                         /* We will need to rearm the CQ to avoid a potential race. */
1098                         armed = 0;
1099                         
1100                         if (kibnal_wreqid_is_rx(wc.WorkReqId))
1101                                 kibnal_rx_callback(&wc);
1102                         else
1103                                 kibnal_tx_callback(&wc);
1104                 }
1105                 if (armed)
1106                         return;
1107                 if (iibt_cq_rearm(cq, CQEventSelNextWC) != FSUCCESS) {
1108                         CERROR("rearm failed?\n");
1109                         return;
1110                 }
1111                 armed = 1;
1112         }
1113 }
1114
1115 void
1116 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1117 {
1118         IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[tx->tx_nsp];
1119         IB_WORK_REQ         *wrq = &tx->tx_wrq[tx->tx_nsp];
1120         int                       fence;
1121         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1122
1123         LASSERT (tx->tx_nsp >= 0 && 
1124                  tx->tx_nsp < sizeof(tx->tx_wrq)/sizeof(tx->tx_wrq[0]));
1125         LASSERT (nob <= IBNAL_MSG_SIZE);
1126         
1127         tx->tx_msg->ibm_magic = IBNAL_MSG_MAGIC;
1128         tx->tx_msg->ibm_version = IBNAL_MSG_VERSION;
1129         tx->tx_msg->ibm_type = type;
1130 #if IBNAL_CKSUM
1131         tx->tx_msg->ibm_nob = nob;
1132 #endif
1133         /* Fence the message if it's bundled with an RDMA read */
1134         fence = (tx->tx_nsp > 0) &&
1135                 (type == IBNAL_MSG_PUT_DONE);
1136
1137         *gl = (IB_LOCAL_DATASEGMENT) {
1138                 .Address = tx->tx_vaddr,
1139                 .Length  = IBNAL_MSG_SIZE,
1140                 .Lkey    = kibnal_lkey(kibnal_data.kib_tx_pages),
1141         };
1142
1143         wrq->WorkReqId      = kibnal_ptr2wreqid(tx, 0);
1144         wrq->Operation      = WROpSend;
1145         wrq->DSList         = gl;
1146         wrq->DSListDepth    = 1;
1147         wrq->MessageLen     = nob;
1148         wrq->Req.SendRC.ImmediateData  = 0;
1149         wrq->Req.SendRC.Options.s.SolicitedEvent         = 1;
1150         wrq->Req.SendRC.Options.s.SignaledCompletion     = 1;
1151         wrq->Req.SendRC.Options.s.ImmediateData          = 0;
1152         wrq->Req.SendRC.Options.s.Fence                  = fence;
1153
1154         tx->tx_nsp++;
1155 }
1156
1157 static void
1158 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1159 {
1160         unsigned long         flags;
1161
1162         spin_lock_irqsave(&conn->ibc_lock, flags);
1163
1164         kibnal_queue_tx_locked (tx, conn);
1165         
1166         spin_unlock_irqrestore(&conn->ibc_lock, flags);
1167         
1168         kibnal_check_sends(conn);
1169 }
1170
1171 static void
1172 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1173 {
1174         unsigned long    flags;
1175         kib_peer_t      *peer;
1176         kib_conn_t      *conn;
1177         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1178
1179         /* If I get here, I've committed to send, so I complete the tx with
1180          * failure on any problems */
1181         
1182         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1183         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
1184
1185         read_lock_irqsave(g_lock, flags);
1186         
1187         peer = kibnal_find_peer_locked (nid);
1188         if (peer == NULL) {
1189                 read_unlock_irqrestore(g_lock, flags);
1190                 tx->tx_status = -EHOSTUNREACH;
1191                 kibnal_tx_done (tx);
1192                 return;
1193         }
1194
1195         conn = kibnal_find_conn_locked (peer);
1196         if (conn != NULL) {
1197                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1198                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1199                        atomic_read (&conn->ibc_refcount));
1200                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1201                 read_unlock_irqrestore(g_lock, flags);
1202                 
1203                 kibnal_queue_tx (tx, conn);
1204                 return;
1205         }
1206         
1207         /* Making one or more connections; I'll need a write lock... */
1208         read_unlock(g_lock);
1209         write_lock(g_lock);
1210
1211         peer = kibnal_find_peer_locked (nid);
1212         if (peer == NULL) {
1213                 write_unlock_irqrestore (g_lock, flags);
1214                 tx->tx_status = -EHOSTUNREACH;
1215                 kibnal_tx_done (tx);
1216                 return;
1217         }
1218
1219         conn = kibnal_find_conn_locked (peer);
1220         if (conn != NULL) {
1221                 /* Connection exists; queue message on it */
1222                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1223                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1224                        atomic_read (&conn->ibc_refcount));
1225                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1226                 write_unlock_irqrestore (g_lock, flags);
1227                 
1228                 kibnal_queue_tx (tx, conn);
1229                 return;
1230         }
1231
1232         if (peer->ibp_connecting == 0) {
1233                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1234                         write_unlock_irqrestore (g_lock, flags);
1235                         tx->tx_status = -EHOSTUNREACH;
1236                         kibnal_tx_done (tx);
1237                         return;
1238                 }
1239         
1240                 peer->ibp_connecting = 1;
1241                 kib_peer_addref(peer); /* extra ref for connd */
1242         
1243                 spin_lock (&kibnal_data.kib_connd_lock);
1244         
1245                 list_add_tail (&peer->ibp_connd_list,
1246                                &kibnal_data.kib_connd_peers);
1247                 wake_up (&kibnal_data.kib_connd_waitq);
1248         
1249                 spin_unlock (&kibnal_data.kib_connd_lock);
1250         }
1251         
1252         /* A connection is being established; queue the message... */
1253         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1254
1255         write_unlock_irqrestore (g_lock, flags);
1256 }
1257
1258 static ptl_err_t
1259 kibnal_start_passive_rdma (int type, ptl_nid_t nid,
1260                             lib_msg_t *libmsg, ptl_hdr_t *hdr)
1261 {
1262         int         nob = libmsg->md->length;
1263         kib_tx_t   *tx;
1264         kib_msg_t  *ibmsg;
1265         int         rc;
1266         IB_ACCESS_CONTROL         access = {0,};
1267         
1268         LASSERT (type == IBNAL_MSG_PUT_RDMA || type == IBNAL_MSG_GET_RDMA);
1269         LASSERT (nob > 0);
1270         LASSERT (!in_interrupt());              /* Mapping could block */
1271
1272         access.s.MWBindable = 1;
1273         access.s.LocalWrite = 1;
1274         access.s.RdmaRead = 1;
1275         access.s.RdmaWrite = 1;
1276
1277         tx = kibnal_get_idle_tx (1);           /* May block; caller is an app thread */
1278         LASSERT (tx != NULL);
1279
1280         if ((libmsg->md->options & PTL_MD_KIOV) == 0) 
1281                 rc = kibnal_map_iov (tx, access,
1282                                      libmsg->md->md_niov,
1283                                      libmsg->md->md_iov.iov,
1284                                      0, nob, 0);
1285         else
1286                 rc = kibnal_map_kiov (tx, access,
1287                                       libmsg->md->md_niov, 
1288                                       libmsg->md->md_iov.kiov,
1289                                       0, nob, 0);
1290
1291         if (rc != 0) {
1292                 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1293                 goto failed;
1294         }
1295         
1296         if (type == IBNAL_MSG_GET_RDMA) {
1297                 /* reply gets finalized when tx completes */
1298                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, 
1299                                                         nid, libmsg);
1300                 if (tx->tx_libmsg[1] == NULL) {
1301                         CERROR ("Can't create reply for GET -> "LPX64"\n",
1302                                 nid);
1303                         rc = -ENOMEM;
1304                         goto failed;
1305                 }
1306         }
1307         
1308         tx->tx_passive_rdma = 1;
1309
1310         ibmsg = tx->tx_msg;
1311
1312         ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
1313         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1314         /* map_kiov alrady filled the rdma descs for the whole_mem case */
1315         if (!kibnal_whole_mem()) {
1316                 ibmsg->ibm_u.rdma.rd_key = tx->tx_md.md_rkey;
1317                 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
1318                 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
1319                 ibmsg->ibm_u.rdma.ibrm_num_descs = 1;
1320         }
1321
1322         kibnal_init_tx_msg (tx, type, 
1323                             kib_rdma_msg_len(ibmsg->ibm_u.rdma.ibrm_num_descs));
1324
1325         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1326                LPX64", nob %d\n",
1327                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1328                tx->tx_md.md_addr, nob);
1329         
1330         /* libmsg gets finalized when tx completes. */
1331         tx->tx_libmsg[0] = libmsg;
1332
1333         kibnal_launch_tx(tx, nid);
1334         return (PTL_OK);
1335
1336  failed:
1337         tx->tx_status = rc;
1338         kibnal_tx_done (tx);
1339         return (PTL_FAIL);
1340 }
1341
1342 void
1343 kibnal_start_active_rdma (int type, int status,
1344                            kib_rx_t *rx, lib_msg_t *libmsg, 
1345                            unsigned int niov,
1346                            struct iovec *iov, ptl_kiov_t *kiov,
1347                            size_t offset, size_t nob)
1348 {
1349         kib_msg_t    *rxmsg = rx->rx_msg;
1350         kib_msg_t    *txmsg;
1351         kib_tx_t     *tx;
1352         IB_ACCESS_CONTROL access = {0,};
1353         IB_WR_OP      rdma_op;
1354         int           rc;
1355         __u32         i;
1356
1357         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1358                type, status, niov, offset, nob);
1359
1360         /* Called by scheduler */
1361         LASSERT (!in_interrupt ());
1362
1363         /* Either all pages or all vaddrs */
1364         LASSERT (!(kiov != NULL && iov != NULL));
1365
1366         /* No data if we're completing with failure */
1367         LASSERT (status == 0 || nob == 0);
1368
1369         LASSERT (type == IBNAL_MSG_GET_DONE ||
1370                  type == IBNAL_MSG_PUT_DONE);
1371
1372         /* Flag I'm completing the RDMA.  Even if I fail to send the
1373          * completion message, I will have tried my best so further
1374          * attempts shouldn't be tried. */
1375         LASSERT (!rx->rx_rdma);
1376         rx->rx_rdma = 1;
1377
1378         if (type == IBNAL_MSG_GET_DONE) {
1379                 rdma_op  = WROpRdmaWrite;
1380                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1381         } else {
1382                 access.s.LocalWrite = 1;
1383                 rdma_op  = WROpRdmaRead;
1384                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1385         }
1386
1387         tx = kibnal_get_idle_tx (0);           /* Mustn't block */
1388         if (tx == NULL) {
1389                 CERROR ("tx descs exhausted on RDMA from "LPX64
1390                         " completing locally with failure\n",
1391                         rx->rx_conn->ibc_peer->ibp_nid);
1392                 lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1393                 return;
1394         }
1395         LASSERT (tx->tx_nsp == 0);
1396                         
1397         if (nob == 0) 
1398                 GOTO(init_tx, 0);
1399
1400         /* We actually need to transfer some data (the transfer
1401          * size could get truncated to zero when the incoming
1402          * message is matched) */
1403         if (kiov != NULL)
1404                 rc = kibnal_map_kiov (tx, access, niov, kiov, offset, nob, 1);
1405         else
1406                 rc = kibnal_map_iov (tx, access, niov, iov, offset, nob, 1);
1407         
1408         if (rc != 0) {
1409                 CERROR ("Can't map RDMA -> "LPX64": %d\n", 
1410                         rx->rx_conn->ibc_peer->ibp_nid, rc);
1411                 /* We'll skip the RDMA and complete with failure. */
1412                 status = rc;
1413                 nob = 0;
1414                 GOTO(init_tx, rc);
1415         } 
1416
1417         if (!kibnal_whole_mem()) {
1418                 tx->tx_msg->ibm_u.rdma.rd_key = tx->tx_md.md_lkey;
1419                 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
1420                 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
1421                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 1;
1422         }
1423
1424         /* XXX ugh.  different page-sized hosts. */ 
1425         if (tx->tx_msg->ibm_u.rdma.ibrm_num_descs !=
1426             rxmsg->ibm_u.rdma.ibrm_num_descs) {
1427                 CERROR("tx descs (%u) != rx descs (%u)\n", 
1428                        tx->tx_msg->ibm_u.rdma.ibrm_num_descs,
1429                        rxmsg->ibm_u.rdma.ibrm_num_descs);
1430                 /* We'll skip the RDMA and complete with failure. */
1431                 status = rc;
1432                 nob = 0;
1433                 GOTO(init_tx, rc);
1434         }
1435
1436         /* map_kiov filled in the rdma descs which describe our side of the
1437          * rdma transfer. */
1438         /* ibrm_num_descs was verified in rx_callback */
1439         for(i = 0; i < rxmsg->ibm_u.rdma.ibrm_num_descs; i++) {
1440                 kib_rdma_desc_t *ldesc, *rdesc; /* local, remote */
1441                 IB_LOCAL_DATASEGMENT *ds = &tx->tx_gl[i];
1442                 IB_WORK_REQ  *wrq = &tx->tx_wrq[i];
1443
1444                 ldesc = &tx->tx_msg->ibm_u.rdma.ibrm_desc[i];
1445                 rdesc = &rxmsg->ibm_u.rdma.ibrm_desc[i];
1446
1447                 ds->Address = ldesc->rd_addr;
1448                 ds->Length  = ldesc->rd_nob;
1449                 ds->Lkey    = tx->tx_msg->ibm_u.rdma.rd_key;
1450
1451                 memset(wrq, 0, sizeof(*wrq));
1452                 wrq->WorkReqId      = kibnal_ptr2wreqid(tx, 0);
1453                 wrq->Operation      = rdma_op;
1454                 wrq->DSList         = ds;
1455                 wrq->DSListDepth    = 1;
1456                 wrq->MessageLen     = ds->Length;
1457                 wrq->Req.SendRC.ImmediateData  = 0;
1458                 wrq->Req.SendRC.Options.s.SolicitedEvent         = 0;
1459                 wrq->Req.SendRC.Options.s.SignaledCompletion     = 0;
1460                 wrq->Req.SendRC.Options.s.ImmediateData          = 0;
1461                 wrq->Req.SendRC.Options.s.Fence                  = 0;
1462                 wrq->Req.SendRC.RemoteDS.Address = rdesc->rd_addr;
1463                 wrq->Req.SendRC.RemoteDS.Rkey = rxmsg->ibm_u.rdma.rd_key;
1464
1465                 /* only the last rdma post triggers tx completion */
1466                 if (i == rxmsg->ibm_u.rdma.ibrm_num_descs - 1)
1467                         wrq->Req.SendRC.Options.s.SignaledCompletion = 1;
1468
1469                 tx->tx_nsp++;
1470         }
1471
1472 init_tx:
1473         txmsg = tx->tx_msg;
1474
1475         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1476         txmsg->ibm_u.completion.ibcm_status = status;
1477         
1478         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1479
1480         if (status == 0 && nob != 0) {
1481                 LASSERT (tx->tx_nsp > 1);
1482                 /* RDMA: libmsg gets finalized when the tx completes.  This
1483                  * is after the completion message has been sent, which in
1484                  * turn is after the RDMA has finished. */
1485                 tx->tx_libmsg[0] = libmsg;
1486         } else {
1487                 LASSERT (tx->tx_nsp == 1);
1488                 /* No RDMA: local completion happens now! */
1489                 CWARN("No data: immediate completion\n");
1490                 lib_finalize (&kibnal_lib, NULL, libmsg,
1491                               status == 0 ? PTL_OK : PTL_FAIL);
1492         }
1493
1494         /* +1 ref for this tx... */
1495         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1496                rx->rx_conn, rx->rx_conn->ibc_state, 
1497                rx->rx_conn->ibc_peer->ibp_nid,
1498                atomic_read (&rx->rx_conn->ibc_refcount));
1499         atomic_inc (&rx->rx_conn->ibc_refcount);
1500         /* ...and queue it up */
1501         kibnal_queue_tx(tx, rx->rx_conn);
1502 }
1503
1504 static ptl_err_t
1505 kibnal_sendmsg(lib_nal_t    *nal, 
1506                 void         *private,
1507                 lib_msg_t    *libmsg,
1508                 ptl_hdr_t    *hdr, 
1509                 int           type, 
1510                 ptl_nid_t     nid, 
1511                 ptl_pid_t     pid,
1512                 unsigned int  payload_niov, 
1513                 struct iovec *payload_iov, 
1514                 ptl_kiov_t   *payload_kiov,
1515                 size_t        payload_offset,
1516                 size_t        payload_nob)
1517 {
1518         kib_msg_t  *ibmsg;
1519         kib_tx_t   *tx;
1520         int         nob;
1521
1522         /* NB 'private' is different depending on what we're sending.... */
1523
1524         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1525                " pid %d\n", payload_nob, payload_niov, nid , pid);
1526
1527         LASSERT (payload_nob == 0 || payload_niov > 0);
1528         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1529
1530         /* Thread context if we're sending payload */
1531         LASSERT (!in_interrupt() || payload_niov == 0);
1532         /* payload is either all vaddrs or all pages */
1533         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1534
1535         switch (type) {
1536         default:
1537                 LBUG();
1538                 return (PTL_FAIL);
1539                 
1540         case PTL_MSG_REPLY: {
1541                 /* reply's 'private' is the incoming receive */
1542                 kib_rx_t *rx = private;
1543
1544                 /* RDMA reply expected? */
1545                 if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) {
1546                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1547                                                  rx, libmsg, payload_niov, 
1548                                                  payload_iov, payload_kiov,
1549                                                  payload_offset, payload_nob);
1550                         return (PTL_OK);
1551                 }
1552                 
1553                 /* Incoming message consistent with immediate reply? */
1554                 if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
1555                         CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1556                                 nid, rx->rx_msg->ibm_type);
1557                         return (PTL_FAIL);
1558                 }
1559
1560                 /* Will it fit in a message? */
1561                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1562                 if (nob >= IBNAL_MSG_SIZE) {
1563                         CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n", 
1564                                nid, payload_nob);
1565                         return (PTL_FAIL);
1566                 }
1567                 break;
1568         }
1569
1570         case PTL_MSG_GET:
1571                 /* might the REPLY message be big enough to need RDMA? */
1572                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1573                 if (nob > IBNAL_MSG_SIZE)
1574                         return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, 
1575                                                           nid, libmsg, hdr));
1576                 break;
1577
1578         case PTL_MSG_ACK:
1579                 LASSERT (payload_nob == 0);
1580                 break;
1581
1582         case PTL_MSG_PUT:
1583                 /* Is the payload big enough to need RDMA? */
1584                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1585                 if (nob > IBNAL_MSG_SIZE)
1586                         return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA,
1587                                                           nid, libmsg, hdr));
1588                 
1589                 break;
1590         }
1591
1592         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1593                                   type == PTL_MSG_REPLY ||
1594                                   in_interrupt()));
1595         if (tx == NULL) {
1596                 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n", 
1597                         type, nid, in_interrupt() ? " (intr)" : "");
1598                 return (PTL_NO_SPACE);
1599         }
1600
1601         ibmsg = tx->tx_msg;
1602         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1603
1604         if (payload_nob > 0) {
1605                 if (payload_kiov != NULL)
1606                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1607                                           payload_niov, payload_kiov,
1608                                           payload_offset, payload_nob);
1609                 else
1610                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1611                                          payload_niov, payload_iov,
1612                                          payload_offset, payload_nob);
1613         }
1614
1615         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1616                             offsetof(kib_immediate_msg_t, 
1617                                      ibim_payload[payload_nob]));
1618
1619         /* libmsg gets finalized when tx completes */
1620         tx->tx_libmsg[0] = libmsg;
1621
1622         kibnal_launch_tx(tx, nid);
1623         return (PTL_OK);
1624 }
1625
1626 static ptl_err_t
1627 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1628                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1629                unsigned int payload_niov, struct iovec *payload_iov,
1630                size_t payload_offset, size_t payload_len)
1631 {
1632         return (kibnal_sendmsg(nal, private, cookie,
1633                                hdr, type, nid, pid,
1634                                payload_niov, payload_iov, NULL,
1635                                payload_offset, payload_len));
1636 }
1637
1638 static ptl_err_t
1639 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1640                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1641                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1642                      size_t payload_offset, size_t payload_len)
1643 {
1644         return (kibnal_sendmsg(nal, private, cookie,
1645                                hdr, type, nid, pid,
1646                                payload_niov, NULL, payload_kiov,
1647                                payload_offset, payload_len));
1648 }
1649
1650 static ptl_err_t
1651 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1652                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1653                  size_t offset, size_t mlen, size_t rlen)
1654 {
1655         kib_rx_t    *rx = private;
1656         kib_msg_t   *rxmsg = rx->rx_msg;
1657         int          msg_nob;
1658         
1659         LASSERT (mlen <= rlen);
1660         LASSERT (!in_interrupt ());
1661         /* Either all pages or all vaddrs */
1662         LASSERT (!(kiov != NULL && iov != NULL));
1663
1664         switch (rxmsg->ibm_type) {
1665         default:
1666                 LBUG();
1667                 return (PTL_FAIL);
1668                 
1669         case IBNAL_MSG_IMMEDIATE:
1670                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1671                 if (msg_nob > IBNAL_MSG_SIZE) {
1672                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1673                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1674                         return (PTL_FAIL);
1675                 }
1676
1677                 if (kiov != NULL)
1678                         lib_copy_buf2kiov(niov, kiov, offset,
1679                                           rxmsg->ibm_u.immediate.ibim_payload,
1680                                           mlen);
1681                 else
1682                         lib_copy_buf2iov(niov, iov, offset,
1683                                          rxmsg->ibm_u.immediate.ibim_payload,
1684                                          mlen);
1685
1686                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1687                 return (PTL_OK);
1688
1689         case IBNAL_MSG_GET_RDMA:
1690                 /* We get called here just to discard any junk after the
1691                  * GET hdr. */
1692                 LASSERT (libmsg == NULL);
1693                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1694                 return (PTL_OK);
1695
1696         case IBNAL_MSG_PUT_RDMA:
1697                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0,
1698                                           rx, libmsg, 
1699                                           niov, iov, kiov, offset, mlen);
1700                 return (PTL_OK);
1701         }
1702 }
1703
1704 static ptl_err_t
1705 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1706               unsigned int niov, struct iovec *iov, 
1707               size_t offset, size_t mlen, size_t rlen)
1708 {
1709         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1710                                 offset, mlen, rlen));
1711 }
1712
1713 static ptl_err_t
1714 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1715                      unsigned int niov, ptl_kiov_t *kiov, 
1716                      size_t offset, size_t mlen, size_t rlen)
1717 {
1718         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1719                                 offset, mlen, rlen));
1720 }
1721
1722 /*****************************************************************************
1723  * the rest of this file concerns connection management.  active connetions
1724  * start with connect_peer, passive connections start with passive_callback.
1725  * active disconnects start with conn_close, cm_callback starts passive
1726  * disconnects and contains the guts of how the disconnect state machine
1727  * progresses. 
1728  *****************************************************************************/
1729
1730 int
1731 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1732 {
1733         long    pid = kernel_thread (fn, arg, 0);
1734
1735         if (pid < 0)
1736                 return ((int)pid);
1737
1738         atomic_inc (&kibnal_data.kib_nthreads);
1739         return (0);
1740 }
1741
1742 static void
1743 kibnal_thread_fini (void)
1744 {
1745         atomic_dec (&kibnal_data.kib_nthreads);
1746 }
1747
1748 /* this can be called by anyone at any time to close a connection.  if
1749  * the connection is still established it heads to the connd to start
1750  * the disconnection in a safe context.  It has no effect if called
1751  * on a connection that is already disconnecting */
1752 void
1753 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1754 {
1755         /* This just does the immmediate housekeeping, and schedules the
1756          * connection for the connd to finish off.
1757          * Caller holds kib_global_lock exclusively in irq context */
1758         kib_peer_t   *peer = conn->ibc_peer;
1759
1760         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_CONNECTING,
1761                                     IBNAL_CONN_DISCONNECTED);
1762
1763         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
1764                 return; /* already disconnecting */
1765
1766         CDEBUG (error == 0 ? D_NET : D_ERROR,
1767                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1768
1769         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1770                 /* kib_connd_conns takes ibc_list's ref */
1771                 list_del (&conn->ibc_list);
1772         } else {
1773                 /* new ref for kib_connd_conns */
1774                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1775                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1776                        atomic_read (&conn->ibc_refcount));
1777                 atomic_inc (&conn->ibc_refcount);
1778         }
1779         
1780         if (list_empty (&peer->ibp_conns) &&    /* no more conns */
1781             peer->ibp_persistence == 0 &&       /* non-persistent peer */
1782             kibnal_peer_active(peer)) {         /* still in peer table */
1783                 kibnal_unlink_peer_locked (peer);
1784         }
1785
1786         conn->ibc_state = IBNAL_CONN_SEND_DREQ;
1787
1788         spin_lock (&kibnal_data.kib_connd_lock);
1789
1790         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1791         wake_up (&kibnal_data.kib_connd_waitq);
1792                 
1793         spin_unlock (&kibnal_data.kib_connd_lock);
1794 }
1795
1796 void
1797 kibnal_close_conn (kib_conn_t *conn, int error)
1798 {
1799         unsigned long     flags;
1800
1801         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1802
1803         kibnal_close_conn_locked (conn, error);
1804         
1805         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1806 }
1807
1808 static void
1809 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc)
1810 {
1811         LIST_HEAD        (zombies);
1812         kib_tx_t         *tx;
1813         unsigned long     flags;
1814
1815         LASSERT (rc != 0);
1816         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1817
1818         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1819
1820         LASSERT (peer->ibp_connecting != 0);
1821         peer->ibp_connecting--;
1822
1823         if (peer->ibp_connecting != 0) {
1824                 /* another connection attempt under way (loopback?)... */
1825                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1826                 return;
1827         }
1828
1829         if (list_empty(&peer->ibp_conns)) {
1830                 /* Say when active connection can be re-attempted */
1831                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1832                 /* Increase reconnection interval */
1833                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1834                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1835         
1836                 /* Take peer's blocked blocked transmits; I'll complete
1837                  * them with error */
1838                 while (!list_empty (&peer->ibp_tx_queue)) {
1839                         tx = list_entry (peer->ibp_tx_queue.next,
1840                                          kib_tx_t, tx_list);
1841                         
1842                         list_del (&tx->tx_list);
1843                         list_add_tail (&tx->tx_list, &zombies);
1844                 }
1845                 
1846                 if (kibnal_peer_active(peer) &&
1847                     (peer->ibp_persistence == 0)) {
1848                         /* failed connection attempt on non-persistent peer */
1849                         kibnal_unlink_peer_locked (peer);
1850                 }
1851         } else {
1852                 /* Can't have blocked transmits if there are connections */
1853                 LASSERT (list_empty(&peer->ibp_tx_queue));
1854         }
1855         
1856         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1857
1858         if (!list_empty (&zombies))
1859                 CERROR ("Deleting messages for "LPX64": connection failed\n",
1860                         peer->ibp_nid);
1861
1862         while (!list_empty (&zombies)) {
1863                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1864
1865                 list_del (&tx->tx_list);
1866                 /* complete now */
1867                 tx->tx_status = -EHOSTUNREACH;
1868                 kibnal_tx_done (tx);
1869         }
1870 }
1871
1872 static void
1873 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1874 {
1875         int               state = conn->ibc_state;
1876         kib_peer_t       *peer = conn->ibc_peer;
1877         kib_tx_t         *tx;
1878         unsigned long     flags;
1879         int               i;
1880
1881         /* passive connection has no connreq & vice versa */
1882         LASSERTF(!active == !(conn->ibc_connreq != NULL),
1883                  "%d %p\n", active, conn->ibc_connreq);
1884         if (active) {
1885                 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1886                 conn->ibc_connreq = NULL;
1887         }
1888
1889         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1890
1891         LASSERT (peer->ibp_connecting != 0);
1892         
1893         if (status == 0) {                         
1894                 /* connection established... */
1895                 KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_CONNECTING);
1896                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1897
1898                 if (!kibnal_peer_active(peer)) {
1899                         /* ...but peer deleted meantime */
1900                         status = -ECONNABORTED;
1901                 }
1902         } else {
1903                 KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_INIT_QP,
1904                                             IBNAL_CONN_CONNECTING);
1905         }
1906
1907         if (status == 0) {
1908                 /* Everything worked! */
1909
1910                 peer->ibp_connecting--;
1911
1912                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1913                  * the IB_CM_IDLE callback */
1914                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1915                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1916                        atomic_read (&conn->ibc_refcount));
1917                 atomic_inc (&conn->ibc_refcount);
1918                 list_add (&conn->ibc_list, &peer->ibp_conns);
1919                 
1920                 /* reset reconnect interval for next attempt */
1921                 peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
1922
1923                 /* post blocked sends to the new connection */
1924                 spin_lock (&conn->ibc_lock);
1925                 
1926                 while (!list_empty (&peer->ibp_tx_queue)) {
1927                         tx = list_entry (peer->ibp_tx_queue.next, 
1928                                          kib_tx_t, tx_list);
1929                         
1930                         list_del (&tx->tx_list);
1931
1932                         /* +1 ref for each tx */
1933                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1934                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1935                                atomic_read (&conn->ibc_refcount));
1936                         atomic_inc (&conn->ibc_refcount);
1937                         kibnal_queue_tx_locked (tx, conn);
1938                 }
1939                 
1940                 spin_unlock (&conn->ibc_lock);
1941
1942                 /* Nuke any dangling conns from a different peer instance... */
1943                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1944                                                  conn->ibc_incarnation);
1945
1946                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1947
1948                 /* queue up all the receives */
1949                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1950                         /* +1 ref for rx desc */
1951                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1952                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1953                                atomic_read (&conn->ibc_refcount));
1954                         atomic_inc (&conn->ibc_refcount);
1955
1956                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1957                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1958                                conn->ibc_rxs[i].rx_vaddr);
1959
1960                         kibnal_post_rx (&conn->ibc_rxs[i], 0);
1961                 }
1962
1963                 kibnal_check_sends (conn);
1964                 return;
1965         }
1966
1967         /* connection failed */
1968         if (state == IBNAL_CONN_CONNECTING) {
1969                 /* schedule for connd to close */
1970                 kibnal_close_conn_locked (conn, status);
1971         } else {
1972                 /* Don't have a CM comm_id; just wait for refs to drain */
1973                 conn->ibc_state = IBNAL_CONN_DISCONNECTED;
1974         } 
1975
1976         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1977
1978         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1979
1980         /* If we didn't establish the connection we don't have to pass
1981          * through the disconnect protocol before dropping the CM ref */
1982         if (state < IBNAL_CONN_CONNECTING) 
1983                 kibnal_put_conn (conn);
1984 }
1985
1986 static int
1987 kibnal_accept (kib_conn_t **connp, IB_HANDLE *cep,
1988                 ptl_nid_t nid, __u64 incarnation, int queue_depth)
1989 {
1990         kib_conn_t    *conn = kibnal_create_conn();
1991         kib_peer_t    *peer;
1992         kib_peer_t    *peer2;
1993         unsigned long  flags;
1994
1995         if (conn == NULL)
1996                 return (-ENOMEM);
1997
1998         if (queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1999                 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
2000                        nid, queue_depth, IBNAL_MSG_QUEUE_SIZE);
2001                 atomic_dec (&conn->ibc_refcount);
2002                 kibnal_destroy_conn(conn);
2003                 return (-EPROTO);
2004         }
2005         
2006         /* assume 'nid' is a new peer */
2007         peer = kibnal_create_peer (nid);
2008         if (peer == NULL) {
2009                 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
2010                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
2011                        atomic_read (&conn->ibc_refcount));
2012                 atomic_dec (&conn->ibc_refcount);
2013                 kibnal_destroy_conn(conn);
2014                 return (-ENOMEM);
2015         }
2016         
2017         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
2018
2019         peer2 = kibnal_find_peer_locked(nid);
2020         if (peer2 == NULL) {
2021                 /* peer table takes my ref on peer */
2022                 list_add_tail (&peer->ibp_list, kibnal_nid2peerlist(nid));
2023         } else {
2024                 kib_peer_decref (peer);
2025                 peer = peer2;
2026         }
2027
2028         kib_peer_addref(peer); /* +1 ref for conn */
2029         peer->ibp_connecting++;
2030
2031         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2032
2033         conn->ibc_peer = peer;
2034         conn->ibc_state = IBNAL_CONN_CONNECTING;
2035         /* conn->ibc_cep is set when cm_accept is called */
2036         conn->ibc_incarnation = incarnation;
2037         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2038
2039         *connp = conn;
2040         return (0);
2041 }
2042
2043 static void kibnal_set_qp_state(IB_HANDLE *qp, IB_QP_STATE state)
2044 {
2045         IB_QP_ATTRIBUTES_MODIFY modify_attr = {0,};
2046         FSTATUS frc;
2047
2048         modify_attr.RequestState = state;
2049
2050         frc = iibt_qp_modify(qp, &modify_attr, NULL);
2051         if (frc != FSUCCESS)
2052                 CERROR("couldn't set qp state to %d, error %d\n", state, frc);
2053 }
2054
2055 static void kibnal_flush_pending(kib_conn_t *conn)
2056 {
2057         LIST_HEAD        (zombies); 
2058         struct list_head *tmp;
2059         struct list_head *nxt;
2060         kib_tx_t         *tx;
2061         unsigned long     flags;
2062         int               done;
2063
2064         /* NB we wait until the connection has closed before completing
2065          * outstanding passive RDMAs so we can be sure the network can't 
2066          * touch the mapped memory any more. */
2067         KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_DISCONNECTED);
2068
2069         /* set the QP to the error state so that we get flush callbacks
2070          * on our posted receives which can then drop their conn refs */
2071         kibnal_set_qp_state(conn->ibc_qp, QPStateError);
2072
2073         spin_lock_irqsave (&conn->ibc_lock, flags);
2074
2075         /* grab passive RDMAs not waiting for the tx callback */
2076         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
2077                 tx = list_entry (tmp, kib_tx_t, tx_list);
2078
2079                 LASSERT (tx->tx_passive_rdma ||
2080                          !tx->tx_passive_rdma_wait);
2081
2082                 LASSERT (tx->tx_passive_rdma_wait ||
2083                          tx->tx_sending != 0);
2084
2085                 /* still waiting for tx callback? */
2086                 if (!tx->tx_passive_rdma_wait)
2087                         continue;
2088
2089                 tx->tx_status = -ECONNABORTED;
2090                 tx->tx_passive_rdma_wait = 0;
2091                 done = (tx->tx_sending == 0);
2092
2093                 if (!done)
2094                         continue;
2095
2096                 list_del (&tx->tx_list);
2097                 list_add (&tx->tx_list, &zombies);
2098         }
2099
2100         /* grab all blocked transmits */
2101         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
2102                 tx = list_entry (tmp, kib_tx_t, tx_list);
2103                 
2104                 list_del (&tx->tx_list);
2105                 list_add (&tx->tx_list, &zombies);
2106         }
2107         
2108         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2109
2110         while (!list_empty(&zombies)) {
2111                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
2112
2113                 list_del(&tx->tx_list);
2114                 kibnal_tx_done (tx);
2115         }
2116 }
2117
2118 static void
2119 kibnal_reject (IB_HANDLE cep, uint16_t reason)
2120 {
2121         CM_REJECT_INFO *rej;
2122
2123         PORTAL_ALLOC(rej, sizeof(*rej));
2124         if (rej == NULL) /* PORTAL_ALLOC() will CERROR on failure */
2125                 return;  
2126
2127         rej->Reason = reason;
2128         iibt_cm_reject(cep, rej);
2129         PORTAL_FREE(rej, sizeof(*rej));
2130 }
2131
2132 static FSTATUS
2133 kibnal_qp_rts(IB_HANDLE qp_handle, __u32 qpn, __u8 resp_res, 
2134               IB_PATH_RECORD *path, __u8 init_depth, __u32 send_psn)
2135 {
2136         IB_QP_ATTRIBUTES_MODIFY modify_attr;
2137         FSTATUS frc;
2138         ENTRY;
2139
2140         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
2141                 .RequestState           = QPStateReadyToRecv,
2142                 .RecvPSN                = IBNAL_STARTING_PSN,
2143                 .DestQPNumber           = qpn,
2144                 .ResponderResources     = resp_res,
2145                 .MinRnrTimer            = UsecToRnrNakTimer(2000), /* 20 ms */
2146                 .Attrs                  = (IB_QP_ATTR_RECVPSN |
2147                                            IB_QP_ATTR_DESTQPNUMBER | 
2148                                            IB_QP_ATTR_RESPONDERRESOURCES | 
2149                                            IB_QP_ATTR_DESTAV | 
2150                                            IB_QP_ATTR_PATHMTU | 
2151                                            IB_QP_ATTR_MINRNRTIMER),
2152         };
2153         GetAVFromPath(0, path, &modify_attr.PathMTU, NULL, 
2154                       &modify_attr.DestAV);
2155
2156         frc = iibt_qp_modify(qp_handle, &modify_attr, NULL);
2157         if (frc != FSUCCESS) 
2158                 RETURN(frc);
2159
2160         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
2161                 .RequestState           = QPStateReadyToSend,
2162                 .FlowControl            = TRUE,
2163                 .InitiatorDepth         = init_depth,
2164                 .SendPSN                = send_psn,
2165                 .LocalAckTimeout        = path->PktLifeTime + 2, /* 2 or 1? */
2166                 .RetryCount             = IBNAL_RETRY,
2167                 .RnrRetryCount          = IBNAL_RNR_RETRY,
2168                 .Attrs                  = (IB_QP_ATTR_FLOWCONTROL | 
2169                                            IB_QP_ATTR_INITIATORDEPTH | 
2170                                            IB_QP_ATTR_SENDPSN | 
2171                                            IB_QP_ATTR_LOCALACKTIMEOUT | 
2172                                            IB_QP_ATTR_RETRYCOUNT | 
2173                                            IB_QP_ATTR_RNRRETRYCOUNT),
2174         };
2175
2176         frc = iibt_qp_modify(qp_handle, &modify_attr, NULL);
2177         RETURN(frc);
2178 }
2179
2180 static void
2181 kibnal_connect_reply (IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2182 {
2183         IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2184         kib_conn_t *conn = arg;
2185         kib_wire_connreq_t *wcr;
2186         CM_REPLY_INFO *rep = &info->Info.Reply;
2187         uint16_t reason;
2188         FSTATUS frc;
2189
2190         wcr = (kib_wire_connreq_t *)info->Info.Reply.PrivateData;
2191
2192         if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
2193                 CERROR ("Can't connect "LPX64": bad magic %08x\n",
2194                         conn->ibc_peer->ibp_nid, le32_to_cpu(wcr->wcr_magic));
2195                 GOTO(reject, reason = RC_USER_REJ);
2196         }
2197         
2198         if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
2199                 CERROR ("Can't connect "LPX64": bad version %d\n",
2200                         conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_magic));
2201                 GOTO(reject, reason = RC_USER_REJ);
2202         }
2203                         
2204         if (wcr->wcr_queue_depth != cpu_to_le16(IBNAL_MSG_QUEUE_SIZE)) {
2205                 CERROR ("Can't connect "LPX64": bad queue depth %d\n",
2206                         conn->ibc_peer->ibp_nid, 
2207                         le16_to_cpu(wcr->wcr_queue_depth));
2208                 GOTO(reject, reason = RC_USER_REJ);
2209         }
2210                         
2211         if (le64_to_cpu(wcr->wcr_nid) != conn->ibc_peer->ibp_nid) {
2212                 CERROR ("Unexpected NID "LPX64" from "LPX64"\n",
2213                         le64_to_cpu(wcr->wcr_nid), conn->ibc_peer->ibp_nid);
2214                 GOTO(reject, reason = RC_USER_REJ);
2215         }
2216
2217         CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2218                conn, conn->ibc_peer->ibp_nid);
2219
2220         conn->ibc_incarnation = le64_to_cpu(wcr->wcr_incarnation);
2221         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2222
2223         frc = kibnal_qp_rts(conn->ibc_qp, rep->QPN, 
2224                             min_t(__u8, rep->ArbInitiatorDepth,
2225                                   ca_attr->MaxQPResponderResources),
2226                             &conn->ibc_connreq->cr_path, 
2227                             min_t(__u8, rep->ArbResponderResources,
2228                                   ca_attr->MaxQPInitiatorDepth),
2229                             rep->StartingPSN);
2230         if (frc != FSUCCESS) {
2231                 CERROR("Connection %p -> "LPX64" QP RTS/RTR failed: %d\n",
2232                        conn, conn->ibc_peer->ibp_nid, frc);
2233                 GOTO(reject, reason = RC_NO_QP);
2234         }
2235
2236         /* the callback arguments are ignored for an active accept */
2237         conn->ibc_connreq->cr_discarded.Status = FSUCCESS;
2238         frc = iibt_cm_accept(cep, &conn->ibc_connreq->cr_discarded, 
2239                              NULL, NULL, NULL, NULL);
2240         if (frc != FCM_CONNECT_ESTABLISHED) {
2241                 CERROR("Connection %p -> "LPX64" CMAccept failed: %d\n",
2242                        conn, conn->ibc_peer->ibp_nid, frc);
2243                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2244                 /* XXX don't call reject after accept fails? */
2245                 return;
2246         }
2247
2248         CDEBUG(D_NET, "Connection %p -> "LPX64" Established\n",
2249                conn, conn->ibc_peer->ibp_nid);
2250
2251         kibnal_connreq_done (conn, 1, 0);
2252         return;
2253
2254 reject:
2255         kibnal_reject(cep, reason);
2256         kibnal_connreq_done (conn, 1, -EPROTO);
2257 }
2258
2259 /* ib_cm.h has a wealth of information on the CM procedures */
2260 static void
2261 kibnal_cm_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2262 {
2263         kib_conn_t       *conn = arg;
2264
2265         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2266
2267         /* Established Connection Notifier */
2268         switch (info->Status) {
2269         default:
2270                 CERROR("unknown status %d on Connection %p -> "LPX64"\n",
2271                        info->Status, conn, conn->ibc_peer->ibp_nid);
2272                 LBUG();
2273                 break;
2274
2275         case FCM_CONNECT_REPLY:
2276                 kibnal_connect_reply(cep, info, arg);
2277                 break;
2278
2279         case FCM_DISCONNECT_REQUEST:
2280                 /* XXX lock around these state management bits? */
2281                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
2282                         kibnal_close_conn (conn, 0);
2283                 conn->ibc_state = IBNAL_CONN_DREP;
2284                 iibt_cm_disconnect(conn->ibc_cep, NULL, NULL);
2285                 break;
2286
2287         /* these both guarantee that no more cm callbacks will occur */
2288         case FCM_DISCONNECTED: /* aka FCM_DISCONNECT_TIMEOUT */
2289         case FCM_DISCONNECT_REPLY:
2290                 CDEBUG(D_NET, "Connection %p -> "LPX64" disconnect done.\n",
2291                        conn, conn->ibc_peer->ibp_nid);
2292
2293                 conn->ibc_state = IBNAL_CONN_DISCONNECTED;
2294                 kibnal_flush_pending(conn);
2295                 kibnal_put_conn(conn);        /* Lose CM's ref */
2296                 break;
2297         }
2298
2299         return;
2300 }
2301
2302 static int
2303 kibnal_set_cm_flags(IB_HANDLE cep)
2304 {
2305         FSTATUS frc;
2306         uint32 value = 1;
2307
2308         frc = iibt_cm_modify_cep(cep, CM_FLAG_TIMEWAIT_CALLBACK,
2309                                  (char *)&value, sizeof(value), 0);
2310         if (frc != FSUCCESS) {
2311                 CERROR("error setting timeout callback: %d\n", frc);
2312                 return -1;
2313         }
2314
2315 #if 0
2316         frc = iibt_cm_modify_cep(cep, CM_FLAG_ASYNC_ACCEPT, (char *)&value,
2317                                  sizeof(value), 0);
2318         if (frc != FSUCCESS) {
2319                 CERROR("error setting async accept: %d\n", frc);
2320                 return -1;
2321         }
2322 #endif
2323
2324         return 0;
2325 }
2326
2327 void
2328 kibnal_listen_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2329 {
2330         IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2331         IB_QP_ATTRIBUTES_QUERY *query;
2332         CM_REQUEST_INFO    *req;
2333         CM_CONN_INFO       *rep = NULL, *rcv = NULL;
2334         kib_wire_connreq_t *wcr;
2335         kib_conn_t         *conn = NULL;
2336         uint16_t            reason = 0;
2337         FSTATUS             frc;
2338         int                 rc = 0;
2339         
2340         LASSERT(cep);
2341         LASSERT(info);
2342         LASSERT(arg == NULL); /* no conn yet for passive */
2343
2344         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2345
2346         req = &info->Info.Request;
2347         wcr = (kib_wire_connreq_t *)req->PrivateData;
2348
2349         CDEBUG(D_NET, "%d from "LPX64"\n", info->Status, 
2350                le64_to_cpu(wcr->wcr_nid));
2351         
2352         if (info->Status == FCM_CONNECT_CANCEL)
2353                 return;
2354         
2355         LASSERT (info->Status == FCM_CONNECT_REQUEST);
2356         
2357         if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
2358                 CERROR ("Can't accept: bad magic %08x\n",
2359                         le32_to_cpu(wcr->wcr_magic));
2360                 GOTO(out, reason = RC_USER_REJ);
2361         }
2362
2363         if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
2364                 CERROR ("Can't accept: bad version %d\n",
2365                         le16_to_cpu(wcr->wcr_magic));
2366                 GOTO(out, reason = RC_USER_REJ);
2367         }
2368
2369         rc = kibnal_accept(&conn, cep,
2370                            le64_to_cpu(wcr->wcr_nid),
2371                            le64_to_cpu(wcr->wcr_incarnation),
2372                            le16_to_cpu(wcr->wcr_queue_depth));
2373         if (rc != 0) {
2374                 CERROR ("Can't accept "LPX64": %d\n",
2375                         le64_to_cpu(wcr->wcr_nid), rc);
2376                 GOTO(out, reason = RC_NO_RESOURCES);
2377         }
2378
2379         frc = kibnal_qp_rts(conn->ibc_qp, req->CEPInfo.QPN,
2380                             min_t(__u8, req->CEPInfo.OfferedInitiatorDepth, 
2381                                   ca_attr->MaxQPResponderResources),
2382                             &req->PathInfo.Path,
2383                             min_t(__u8, req->CEPInfo.OfferedResponderResources, 
2384                                   ca_attr->MaxQPInitiatorDepth),
2385                             req->CEPInfo.StartingPSN);
2386
2387         if (frc != FSUCCESS) {
2388                 CERROR ("Can't mark QP RTS/RTR  "LPX64": %d\n",
2389                         le64_to_cpu(wcr->wcr_nid), frc);
2390                 GOTO(out, reason = RC_NO_QP);
2391         }
2392
2393         frc = iibt_qp_query(conn->ibc_qp, &conn->ibc_qp_attrs, NULL);
2394         if (frc != FSUCCESS) {
2395                 CERROR ("Couldn't query qp attributes "LPX64": %d\n",
2396                         le64_to_cpu(wcr->wcr_nid), frc);
2397                 GOTO(out, reason = RC_NO_QP);
2398         }
2399         query = &conn->ibc_qp_attrs;
2400
2401         PORTAL_ALLOC(rep, sizeof(*rep));
2402         PORTAL_ALLOC(rcv, sizeof(*rcv));
2403         if (rep == NULL || rcv == NULL) {
2404                 if (rep) PORTAL_FREE(rep, sizeof(*rep));
2405                 if (rcv) PORTAL_FREE(rcv, sizeof(*rcv));
2406                 CERROR ("can't allocate reply and receive buffers\n");
2407                 GOTO(out, reason = RC_INSUFFICIENT_RESP_RES);
2408         }
2409
2410         /* don't try to deref this into the incoming wcr :) */
2411         wcr = (kib_wire_connreq_t *)rep->Info.Reply.PrivateData;
2412
2413         rep->Info.Reply = (CM_REPLY_INFO) {
2414                 .QPN = query->QPNumber,
2415                 .QKey = query->Qkey,
2416                 .StartingPSN = query->RecvPSN,
2417                 .EndToEndFlowControl = query->FlowControl,
2418                 /* XXX Hmm. */
2419                 .ArbInitiatorDepth = query->InitiatorDepth,
2420                 .ArbResponderResources = query->ResponderResources,
2421                 .TargetAckDelay = 0,
2422                 .FailoverAccepted = 0,
2423                 .RnRRetryCount = req->CEPInfo.RnrRetryCount,
2424         };
2425                 
2426         *wcr = (kib_wire_connreq_t) {
2427                 .wcr_magic       = cpu_to_le32(IBNAL_MSG_MAGIC),
2428                 .wcr_version     = cpu_to_le16(IBNAL_MSG_VERSION),
2429                 .wcr_queue_depth = cpu_to_le32(IBNAL_MSG_QUEUE_SIZE),
2430                 .wcr_nid         = cpu_to_le64(kibnal_data.kib_nid),
2431                 .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
2432         };
2433
2434         frc = iibt_cm_accept(cep, rep, rcv, kibnal_cm_callback, conn, 
2435                              &conn->ibc_cep);
2436
2437         PORTAL_FREE(rep, sizeof(*rep));
2438         PORTAL_FREE(rcv, sizeof(*rcv));
2439
2440         if (frc != FCM_CONNECT_ESTABLISHED) {
2441                 /* XXX it seems we don't call reject after this point? */
2442                 CERROR("iibt_cm_accept() failed: %d, aborting\n", frc);
2443                 rc = -ECONNABORTED;
2444                 goto out;
2445         }
2446
2447         if (kibnal_set_cm_flags(conn->ibc_cep)) {
2448                 rc = -ECONNABORTED;
2449                 goto out;
2450         }
2451
2452         CWARN("Connection %p -> "LPX64" ESTABLISHED.\n",
2453                conn, conn->ibc_peer->ibp_nid);
2454
2455 out:
2456         if (reason) {
2457                 kibnal_reject(cep, reason);
2458                 rc = -ECONNABORTED;
2459         }
2460         if (conn != NULL) 
2461                 kibnal_connreq_done(conn, 0, rc);
2462
2463         return;
2464 }
2465
2466 static void
2467 dump_path_records(PATH_RESULTS *results)
2468 {
2469         IB_PATH_RECORD *path;
2470         int i;
2471
2472         for(i = 0; i < results->NumPathRecords; i++) {
2473                 path = &results->PathRecords[i];
2474                 CDEBUG(D_NET, "%d: sgid "LPX64":"LPX64" dgid "
2475                        LPX64":"LPX64" pkey %x\n",
2476                        i,
2477                        path->SGID.Type.Global.SubnetPrefix,
2478                        path->SGID.Type.Global.InterfaceID,
2479                        path->DGID.Type.Global.SubnetPrefix,
2480                        path->DGID.Type.Global.InterfaceID,
2481                        path->P_Key);
2482         }
2483 }
2484
2485 static void
2486 kibnal_pathreq_callback (void *arg, QUERY *query, 
2487                          QUERY_RESULT_VALUES *query_res)
2488 {
2489         IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2490         kib_conn_t *conn = arg;
2491         PATH_RESULTS *path;
2492         FSTATUS frc;
2493         
2494         if (query_res->Status != FSUCCESS || query_res->ResultDataSize == 0) {
2495                 CERROR ("status %d data size %d\n", query_res->Status,
2496                         query_res->ResultDataSize);
2497                 kibnal_connreq_done (conn, 1, -EINVAL);
2498                 return;
2499         }
2500
2501         path = (PATH_RESULTS *)query_res->QueryResult;
2502
2503         if (path->NumPathRecords < 1) {
2504                 CERROR ("expected path records: %d\n", path->NumPathRecords);
2505                 kibnal_connreq_done (conn, 1, -EINVAL);
2506                 return;
2507         }
2508
2509         dump_path_records(path);
2510
2511         /* just using the first.  this is probably a horrible idea. */
2512         conn->ibc_connreq->cr_path = path->PathRecords[0];
2513
2514         conn->ibc_cep = iibt_cm_create_cep(CM_RC_TYPE);
2515         if (conn->ibc_cep == NULL) {
2516                 CERROR ("Can't create CEP\n");
2517                 kibnal_connreq_done (conn, 1, -EINVAL);
2518                 return;
2519         }
2520
2521         if (kibnal_set_cm_flags(conn->ibc_cep)) {
2522                 kibnal_connreq_done (conn, 1, -EINVAL);
2523                 return;
2524         }
2525
2526         conn->ibc_connreq->cr_wcr = (kib_wire_connreq_t) {
2527                 .wcr_magic       = cpu_to_le32(IBNAL_MSG_MAGIC),
2528                 .wcr_version     = cpu_to_le16(IBNAL_MSG_VERSION),
2529                 .wcr_queue_depth = cpu_to_le16(IBNAL_MSG_QUEUE_SIZE),
2530                 .wcr_nid         = cpu_to_le64(kibnal_data.kib_nid),
2531                 .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
2532         };
2533
2534         conn->ibc_connreq->cr_cmreq = (CM_REQUEST_INFO) {
2535                 .SID = conn->ibc_connreq->cr_service.RID.ServiceID,
2536                 .CEPInfo = (CM_CEP_INFO) { 
2537                         .CaGUID = kibnal_data.kib_hca_guids[0],
2538                         .EndToEndFlowControl = FALSE,
2539                         .PortGUID = conn->ibc_connreq->cr_path.SGID.Type.Global.InterfaceID,
2540                         .RetryCount = IBNAL_RETRY,
2541                         .RnrRetryCount = IBNAL_RNR_RETRY,
2542                         .AckTimeout = IBNAL_ACK_TIMEOUT,
2543                         .StartingPSN = IBNAL_STARTING_PSN,
2544                         .QPN = conn->ibc_qp_attrs.QPNumber,
2545                         .QKey = conn->ibc_qp_attrs.Qkey,
2546                         .OfferedResponderResources = ca_attr->MaxQPResponderResources,
2547                         .OfferedInitiatorDepth = ca_attr->MaxQPInitiatorDepth,
2548                 },
2549                 .PathInfo = (CM_CEP_PATHINFO) {
2550                         .bSubnetLocal = TRUE,
2551                         .Path = conn->ibc_connreq->cr_path,
2552                 },
2553         };
2554
2555 #if 0
2556         /* XXX set timeout just like SDP!!!*/
2557         conn->ibc_connreq->cr_path.packet_life = 13;
2558 #endif
2559         /* Flag I'm getting involved with the CM... */
2560         conn->ibc_state = IBNAL_CONN_CONNECTING;
2561
2562         CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
2563                conn->ibc_connreq->cr_service.RID.ServiceID, 
2564                *kibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2565
2566         memset(conn->ibc_connreq->cr_cmreq.PrivateData, 0, 
2567                CM_REQUEST_INFO_USER_LEN);
2568         memcpy(conn->ibc_connreq->cr_cmreq.PrivateData, 
2569                &conn->ibc_connreq->cr_wcr, sizeof(conn->ibc_connreq->cr_wcr));
2570
2571         /* kibnal_cm_callback gets my conn ref */
2572         frc = iibt_cm_connect(conn->ibc_cep, &conn->ibc_connreq->cr_cmreq,
2573                               kibnal_cm_callback, conn);
2574         if (frc != FPENDING && frc != FSUCCESS) {
2575                 CERROR ("Connect: %d\n", frc);
2576                 /* Back out state change as connect failed */
2577                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2578                 kibnal_connreq_done (conn, 1, -EINVAL);
2579         }
2580 }
2581
2582 static void
2583 dump_service_records(SERVICE_RECORD_RESULTS *results)
2584 {
2585         IB_SERVICE_RECORD *svc;
2586         int i;
2587
2588         for(i = 0; i < results->NumServiceRecords; i++) {
2589                 svc = &results->ServiceRecords[i];
2590                 CDEBUG(D_NET, "%d: sid "LPX64" gid "LPX64":"LPX64" pkey %x\n",
2591                        i,
2592                        svc->RID.ServiceID,
2593                        svc->RID.ServiceGID.Type.Global.SubnetPrefix,
2594                        svc->RID.ServiceGID.Type.Global.InterfaceID,
2595                        svc->RID.ServiceP_Key);
2596         }
2597 }
2598
2599
2600 static void
2601 kibnal_service_get_callback (void *arg, QUERY *query, 
2602                              QUERY_RESULT_VALUES *query_res)
2603 {
2604         kib_conn_t *conn = arg;
2605         SERVICE_RECORD_RESULTS *svc;
2606         COMMAND_CONTROL_PARAMETERS sd_params;
2607         QUERY   path_query;
2608         FSTATUS frc;
2609         
2610         if (query_res->Status != FSUCCESS || query_res->ResultDataSize == 0) {
2611                 CERROR ("status %d data size %d\n", query_res->Status,
2612                         query_res->ResultDataSize);
2613                 kibnal_connreq_done (conn, 1, -EINVAL);
2614                 return;
2615         }
2616
2617         svc = (SERVICE_RECORD_RESULTS *)query_res->QueryResult;
2618
2619         if (svc->NumServiceRecords < 1) {
2620                 CERROR ("%d service records\n", svc->NumServiceRecords);
2621                 kibnal_connreq_done (conn, 1, -EINVAL);
2622                 return;
2623         }
2624
2625         dump_service_records(svc);
2626
2627         conn->ibc_connreq->cr_service = svc->ServiceRecords[0];
2628
2629         CDEBUG(D_NET, "Got status %d, service id "LPX64", on "LPX64"\n",
2630                query_res->Status , conn->ibc_connreq->cr_service.RID.ServiceID, 
2631                *kibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2632
2633         memset(&path_query, 0, sizeof(path_query));
2634         path_query.InputType = InputTypePortGuidPair;
2635         path_query.OutputType = OutputTypePathRecord;
2636         path_query.InputValue.PortGuidPair.SourcePortGuid = kibnal_data.kib_port_guid;
2637         path_query.InputValue.PortGuidPair.DestPortGuid  = conn->ibc_connreq->cr_service.RID.ServiceGID.Type.Global.InterfaceID;
2638
2639         memset(&sd_params, 0, sizeof(sd_params));
2640         sd_params.RetryCount = IBNAL_RETRY;
2641         sd_params.Timeout = 10 * 1000;   /* wait 10 seconds */
2642
2643         /* kibnal_service_get_callback gets my conn ref */
2644
2645         frc = iibt_sd_query_port_fabric_information(kibnal_data.kib_sd,
2646                                                     kibnal_data.kib_port_guid,
2647                                                     &path_query, 
2648                                                     kibnal_pathreq_callback,
2649                                                     &sd_params, conn);
2650         if (frc == FPENDING)
2651                 return;
2652
2653         CERROR ("Path record request failed: %d\n", frc);
2654         kibnal_connreq_done (conn, 1, -EINVAL);
2655 }
2656
2657 static void
2658 kibnal_connect_peer (kib_peer_t *peer)
2659 {
2660         COMMAND_CONTROL_PARAMETERS sd_params;
2661         QUERY   query;
2662         FSTATUS frc;
2663         kib_conn_t  *conn = kibnal_create_conn();
2664
2665         LASSERT (peer->ibp_connecting != 0);
2666
2667         if (conn == NULL) {
2668                 CERROR ("Can't allocate conn\n");
2669                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2670                 return;
2671         }
2672
2673         conn->ibc_peer = peer;
2674         kib_peer_addref(peer);
2675
2676         PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2677         if (conn->ibc_connreq == NULL) {
2678                 CERROR ("Can't allocate connreq\n");
2679                 kibnal_connreq_done (conn, 1, -ENOMEM);
2680                 return;
2681         }
2682
2683         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2684
2685         kibnal_set_service_keys(&conn->ibc_connreq->cr_service, peer->ibp_nid);
2686
2687         memset(&query, 0, sizeof(query));
2688         query.InputType = InputTypeServiceRecord;
2689         query.OutputType = OutputTypeServiceRecord;
2690         query.InputValue.ServiceRecordValue.ServiceRecord = conn->ibc_connreq->cr_service;
2691         query.InputValue.ServiceRecordValue.ComponentMask = KIBNAL_SERVICE_KEY_MASK;
2692
2693         memset(&sd_params, 0, sizeof(sd_params));
2694         sd_params.RetryCount = IBNAL_RETRY;
2695         sd_params.Timeout = 10 * 1000;   /* wait 10 seconds */
2696
2697         /* kibnal_service_get_callback gets my conn ref */
2698         frc = iibt_sd_query_port_fabric_information(kibnal_data.kib_sd,
2699                                                     kibnal_data.kib_port_guid,
2700                                                     &query, 
2701                                                 kibnal_service_get_callback, 
2702                                                     &sd_params, conn);
2703         if (frc == FPENDING)
2704                 return;
2705
2706         CERROR ("iibt_sd_query_port_fabric_information(): %d\n", frc);
2707         kibnal_connreq_done (conn, 1, frc);
2708 }
2709
2710 static int
2711 kibnal_conn_timed_out (kib_conn_t *conn)
2712 {
2713         kib_tx_t          *tx;
2714         struct list_head  *ttmp;
2715         unsigned long      flags;
2716
2717         spin_lock_irqsave (&conn->ibc_lock, flags);
2718
2719         list_for_each (ttmp, &conn->ibc_tx_queue) {
2720                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2721
2722                 LASSERT (!tx->tx_passive_rdma_wait);
2723                 LASSERT (tx->tx_sending == 0);
2724
2725                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2726                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2727                         return 1;
2728                 }
2729         }
2730
2731         list_for_each (ttmp, &conn->ibc_active_txs) {
2732                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2733
2734                 LASSERT (tx->tx_passive_rdma ||
2735                          !tx->tx_passive_rdma_wait);
2736
2737                 LASSERT (tx->tx_passive_rdma_wait ||
2738                          tx->tx_sending != 0);
2739
2740                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2741                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2742                         return 1;
2743                 }
2744         }
2745
2746         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2747
2748         return 0;
2749 }
2750
2751 static void
2752 kibnal_check_conns (int idx)
2753 {
2754         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2755         struct list_head  *ptmp;
2756         kib_peer_t        *peer;
2757         kib_conn_t        *conn;
2758         struct list_head  *ctmp;
2759         unsigned long      flags;
2760
2761  again:
2762         /* NB. We expect to have a look at all the peers and not find any
2763          * rdmas to time out, so we just use a shared lock while we
2764          * take a look... */
2765         read_lock_irqsave(&kibnal_data.kib_global_lock, flags);
2766
2767         list_for_each (ptmp, peers) {
2768                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2769
2770                 list_for_each (ctmp, &peer->ibp_conns) {
2771                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2772
2773                         KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_ESTABLISHED);
2774
2775                         /* In case we have enough credits to return via a
2776                          * NOOP, but there were no non-blocking tx descs
2777                          * free to do it last time... */
2778                         kibnal_check_sends(conn);
2779
2780                         if (!kibnal_conn_timed_out(conn))
2781                                 continue;
2782                         
2783                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2784                                conn, conn->ibc_state, peer->ibp_nid,
2785                                atomic_read (&conn->ibc_refcount));
2786
2787                         atomic_inc (&conn->ibc_refcount);
2788                         read_unlock_irqrestore(&kibnal_data.kib_global_lock,
2789                                                flags);
2790
2791                         CERROR("Timed out RDMA with "LPX64"\n",
2792                                peer->ibp_nid);
2793
2794                         kibnal_close_conn (conn, -ETIMEDOUT);
2795                         kibnal_put_conn (conn);
2796
2797                         /* start again now I've dropped the lock */
2798                         goto again;
2799                 }
2800         }
2801
2802         read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);
2803 }
2804
2805 static void
2806 kib_connd_handle_state(kib_conn_t *conn)
2807 {
2808         FSTATUS frc;
2809
2810         switch (conn->ibc_state) {
2811                 /* all refs have gone, free and be done with it */ 
2812                 case IBNAL_CONN_DISCONNECTED:
2813                         kibnal_destroy_conn (conn);
2814                         return; /* avoid put_conn */
2815
2816                 case IBNAL_CONN_SEND_DREQ:
2817                         frc = iibt_cm_disconnect(conn->ibc_cep, NULL, NULL);
2818                         if (frc != FSUCCESS) /* XXX do real things */
2819                                 CERROR("disconnect failed: %d\n", frc);
2820                         conn->ibc_state = IBNAL_CONN_DREQ;
2821                         break;
2822
2823                 /* a callback got to the conn before we did */ 
2824                 case IBNAL_CONN_DREP:
2825                         break;
2826                                 
2827                 default:
2828                         CERROR ("Bad conn %p state: %d\n", conn, 
2829                                 conn->ibc_state);
2830                         LBUG();
2831                         break;
2832         }
2833
2834         /* drop ref from close_conn */
2835         kibnal_put_conn(conn);
2836 }
2837
2838 int
2839 kibnal_connd (void *arg)
2840 {
2841         wait_queue_t       wait;
2842         unsigned long      flags;
2843         kib_conn_t        *conn;
2844         kib_peer_t        *peer;
2845         int                timeout;
2846         int                i;
2847         int                peer_index = 0;
2848         unsigned long      deadline = jiffies;
2849         
2850         kportal_daemonize ("kibnal_connd");
2851         kportal_blockallsigs ();
2852
2853         init_waitqueue_entry (&wait, current);
2854
2855         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2856
2857         for (;;) {
2858                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
2859                         conn = list_entry (kibnal_data.kib_connd_conns.next,
2860                                            kib_conn_t, ibc_list);
2861                         list_del (&conn->ibc_list);
2862                         
2863                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2864                         kib_connd_handle_state(conn);
2865
2866                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2867                         continue;
2868                 }
2869
2870                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2871                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2872                                            kib_peer_t, ibp_connd_list);
2873                         
2874                         list_del_init (&peer->ibp_connd_list);
2875                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2876
2877                         kibnal_connect_peer (peer);
2878                         kib_peer_decref (peer);
2879
2880                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2881                 }
2882
2883                 /* shut down and nobody left to reap... */
2884                 if (kibnal_data.kib_shutdown &&
2885                     atomic_read(&kibnal_data.kib_nconns) == 0)
2886                         break;
2887
2888                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2889
2890                 /* careful with the jiffy wrap... */
2891                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2892                         const int n = 4;
2893                         const int p = 1;
2894                         int       chunk = kibnal_data.kib_peer_hash_size;
2895                         
2896                         /* Time to check for RDMA timeouts on a few more
2897                          * peers: I do checks every 'p' seconds on a
2898                          * proportion of the peer table and I need to check
2899                          * every connection 'n' times within a timeout
2900                          * interval, to ensure I detect a timeout on any
2901                          * connection within (n+1)/n times the timeout
2902                          * interval. */
2903
2904                         if (kibnal_tunables.kib_io_timeout > n * p)
2905                                 chunk = (chunk * n * p) / 
2906                                         kibnal_tunables.kib_io_timeout;
2907                         if (chunk == 0)
2908                                 chunk = 1;
2909
2910                         for (i = 0; i < chunk; i++) {
2911                                 kibnal_check_conns (peer_index);
2912                                 peer_index = (peer_index + 1) % 
2913                                              kibnal_data.kib_peer_hash_size;
2914                         }
2915
2916                         deadline += p * HZ;
2917                 }
2918
2919                 kibnal_data.kib_connd_waketime = jiffies + timeout;
2920
2921                 set_current_state (TASK_INTERRUPTIBLE);
2922                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2923
2924                 if (!kibnal_data.kib_shutdown &&
2925                     list_empty (&kibnal_data.kib_connd_conns) &&
2926                     list_empty (&kibnal_data.kib_connd_peers))
2927                         schedule_timeout (timeout);
2928
2929                 set_current_state (TASK_RUNNING);
2930                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2931
2932                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2933         }
2934
2935         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2936
2937         kibnal_thread_fini ();
2938         return (0);
2939 }
2940
2941 int
2942 kibnal_scheduler(void *arg)
2943 {
2944         long            id = (long)arg;
2945         char            name[16];
2946         kib_rx_t       *rx;
2947         kib_tx_t       *tx;
2948         unsigned long   flags;
2949         int             rc;
2950         int             counter = 0;
2951         int             did_something;
2952
2953         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2954         kportal_daemonize(name);
2955         kportal_blockallsigs();
2956
2957         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2958
2959         for (;;) {
2960                 did_something = 0;
2961
2962                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2963                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2964                                         kib_tx_t, tx_list);
2965                         list_del(&tx->tx_list);
2966                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2967                                                flags);
2968                         kibnal_tx_done(tx);
2969
2970                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2971                                           flags);
2972                 }
2973
2974                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2975                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2976                                         kib_rx_t, rx_list);
2977                         list_del(&rx->rx_list);
2978                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2979                                                flags);
2980
2981                         kibnal_rx(rx);
2982
2983                         did_something = 1;
2984                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2985                                           flags);
2986                 }
2987
2988                 /* shut down and no receives to complete... */
2989                 if (kibnal_data.kib_shutdown &&
2990                     atomic_read(&kibnal_data.kib_nconns) == 0)
2991                         break;
2992
2993                 /* nothing to do or hogging CPU */
2994                 if (!did_something || counter++ == IBNAL_RESCHED) {
2995                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2996                                                flags);
2997                         counter = 0;
2998
2999                         if (!did_something) {
3000                                 rc = wait_event_interruptible(
3001                                         kibnal_data.kib_sched_waitq,
3002                                         !list_empty(&kibnal_data.kib_sched_txq) || 
3003                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
3004                                         (kibnal_data.kib_shutdown &&
3005                                          atomic_read (&kibnal_data.kib_nconns) == 0));
3006                         } else {
3007                                 our_cond_resched();
3008                         }
3009
3010                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
3011                                           flags);
3012                 }
3013         }
3014
3015         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3016
3017         kibnal_thread_fini();
3018         return (0);
3019 }
3020
3021
3022 lib_nal_t kibnal_lib = {
3023         libnal_data:        &kibnal_data,      /* NAL private data */
3024         libnal_send:         kibnal_send,
3025         libnal_send_pages:   kibnal_send_pages,
3026         libnal_recv:         kibnal_recv,
3027         libnal_recv_pages:   kibnal_recv_pages,
3028         libnal_dist:         kibnal_dist
3029 };