Whamcloud - gitweb
Use the newer libsysio tags.
[fs/lustre-release.git] / lustre / portals / knals / iibnal / iibnal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  */
23
24 #include "iibnal.h"
25
26 /*
27  *  LIB functions follow
28  *
29  */
30 static void
31 kibnal_schedule_tx_done (kib_tx_t *tx)
32 {
33         unsigned long flags;
34
35         spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
36
37         list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38         wake_up (&kibnal_data.kib_sched_waitq);
39
40         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
41 }
42
43 static void
44 kibnal_tx_done (kib_tx_t *tx)
45 {
46         ptl_err_t        ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
47         unsigned long    flags;
48         int              i;
49         FSTATUS          frc;
50
51         LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting callback */
52         LASSERT (!tx->tx_passive_rdma_wait);    /* mustn't be awaiting RDMA */
53
54         switch (tx->tx_mapped) {
55         default:
56                 LBUG();
57
58         case KIB_TX_UNMAPPED:
59                 break;
60
61         case KIB_TX_MAPPED:
62                 if (in_interrupt()) {
63                         /* can't deregister memory in IRQ context... */
64                         kibnal_schedule_tx_done(tx);
65                         return;
66                 }
67                 frc = iibt_deregister_memory(tx->tx_md.md_handle);
68                 LASSERT (frc == FSUCCESS);
69                 tx->tx_mapped = KIB_TX_UNMAPPED;
70                 break;
71
72 #if IBNAL_FMR
73         case KIB_TX_MAPPED_FMR:
74                 if (in_interrupt() && tx->tx_status != 0) {
75                         /* can't flush FMRs in IRQ context... */
76                         kibnal_schedule_tx_done(tx);
77                         return;
78                 }              
79
80                 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
81                 LASSERT (rc == 0);
82
83                 if (tx->tx_status != 0)
84                         ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
85                 tx->tx_mapped = KIB_TX_UNMAPPED;
86                 break;
87 #endif
88         }
89
90         for (i = 0; i < 2; i++) {
91                 /* tx may have up to 2 libmsgs to finalise */
92                 if (tx->tx_libmsg[i] == NULL)
93                         continue;
94
95                 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
96                 tx->tx_libmsg[i] = NULL;
97         }
98         
99         if (tx->tx_conn != NULL) {
100                 kibnal_put_conn (tx->tx_conn);
101                 tx->tx_conn = NULL;
102         }
103
104         tx->tx_nsp = 0;
105         tx->tx_passive_rdma = 0;
106         tx->tx_status = 0;
107
108         spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
109
110         if (tx->tx_isnblk) {
111                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
112         } else {
113                 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
114                 wake_up (&kibnal_data.kib_idle_tx_waitq);
115         }
116
117         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
118 }
119
120 static kib_tx_t *
121 kibnal_get_idle_tx (int may_block) 
122 {
123         unsigned long  flags;
124         kib_tx_t      *tx = NULL;
125         ENTRY;
126         
127         for (;;) {
128                 spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
129
130                 /* "normal" descriptor is free */
131                 if (!list_empty (&kibnal_data.kib_idle_txs)) {
132                         tx = list_entry (kibnal_data.kib_idle_txs.next,
133                                          kib_tx_t, tx_list);
134                         break;
135                 }
136
137                 if (!may_block) {
138                         /* may dip into reserve pool */
139                         if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
140                                 CERROR ("reserved tx desc pool exhausted\n");
141                                 break;
142                         }
143
144                         tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
145                                          kib_tx_t, tx_list);
146                         break;
147                 }
148
149                 /* block for idle tx */
150                 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
151
152                 wait_event (kibnal_data.kib_idle_tx_waitq,
153                             !list_empty (&kibnal_data.kib_idle_txs) ||
154                             kibnal_data.kib_shutdown);
155         }
156
157         if (tx != NULL) {
158                 list_del (&tx->tx_list);
159
160                 /* Allocate a new passive RDMA completion cookie.  It might
161                  * not be needed, but we've got a lock right now and we're
162                  * unlikely to wrap... */
163                 tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
164
165                 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
166                 LASSERT (tx->tx_nsp == 0);
167                 LASSERT (tx->tx_sending == 0);
168                 LASSERT (tx->tx_status == 0);
169                 LASSERT (tx->tx_conn == NULL);
170                 LASSERT (!tx->tx_passive_rdma);
171                 LASSERT (!tx->tx_passive_rdma_wait);
172                 LASSERT (tx->tx_libmsg[0] == NULL);
173                 LASSERT (tx->tx_libmsg[1] == NULL);
174         }
175
176         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
177         
178         RETURN(tx);
179 }
180
181 static int
182 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
183 {
184         /* I would guess that if kibnal_get_peer (nid) == NULL,
185            and we're not routing, then 'nid' is very distant :) */
186         if ( nal->libnal_ni.ni_pid.nid == nid ) {
187                 *dist = 0;
188         } else {
189                 *dist = 1;
190         }
191
192         return 0;
193 }
194
195 static void
196 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
197 {
198         struct list_head *ttmp;
199         unsigned long     flags;
200         int               idle;
201
202         spin_lock_irqsave (&conn->ibc_lock, flags);
203
204         list_for_each (ttmp, &conn->ibc_active_txs) {
205                 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
206
207                 LASSERT (tx->tx_passive_rdma ||
208                          !tx->tx_passive_rdma_wait);
209
210                 LASSERT (tx->tx_passive_rdma_wait ||
211                          tx->tx_sending != 0);
212
213                 if (!tx->tx_passive_rdma_wait ||
214                     tx->tx_passive_rdma_cookie != cookie)
215                         continue;
216
217                 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
218
219                 tx->tx_status = status;
220                 tx->tx_passive_rdma_wait = 0;
221                 idle = (tx->tx_sending == 0);
222
223                 if (idle)
224                         list_del (&tx->tx_list);
225
226                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
227
228                 /* I could be racing with tx callbacks.  It's whoever
229                  * _makes_ tx idle that frees it */
230                 if (idle)
231                         kibnal_tx_done (tx);
232                 return;
233         }
234                 
235         spin_unlock_irqrestore (&conn->ibc_lock, flags);
236
237         CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
238                 cookie, conn->ibc_peer->ibp_nid);
239 }
240
241 static __u32
242 kibnal_lkey(kib_pages_t *ibp)
243 {
244         if (kibnal_whole_mem())
245                 return kibnal_data.kib_md.md_lkey;
246
247         return ibp->ibp_lkey;
248 }
249
250 static void
251 kibnal_post_rx (kib_rx_t *rx, int do_credits)
252 {
253         kib_conn_t   *conn = rx->rx_conn;
254         int           rc = 0;
255         unsigned long flags;
256         FSTATUS       frc;
257         ENTRY;
258
259         rx->rx_gl = (IB_LOCAL_DATASEGMENT) {
260                 .Address = rx->rx_vaddr,
261                 .Length  = IBNAL_MSG_SIZE,
262                 .Lkey    = kibnal_lkey(conn->ibc_rx_pages),
263         };
264
265         rx->rx_wrq = (IB_WORK_REQ) {
266                 .Operation              = WROpRecv,
267                 .DSListDepth            = 1,
268                 .MessageLen             = IBNAL_MSG_SIZE,
269                 .WorkReqId              = kibnal_ptr2wreqid(rx, 1),
270                 .DSList                 = &rx->rx_gl,
271         };
272
273         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_ESTABLISHED,
274                                     IBNAL_CONN_DREP);
275         LASSERT (!rx->rx_posted);
276         rx->rx_posted = 1;
277         mb();
278
279         if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
280                 rc = -ECONNABORTED;
281         else {
282                 frc = iibt_postrecv(conn->ibc_qp, &rx->rx_wrq);
283                 if (frc != FSUCCESS) {
284                         CDEBUG(D_NET, "post failed %d\n", frc);
285                         rc = -EINVAL;
286                 }
287                 CDEBUG(D_NET, "posted rx %p\n", &rx->rx_wrq);
288         }
289
290         if (rc == 0) {
291                 if (do_credits) {
292                         spin_lock_irqsave(&conn->ibc_lock, flags);
293                         conn->ibc_outstanding_credits++;
294                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
295
296                         kibnal_check_sends(conn);
297                 }
298                 EXIT;
299                 return;
300         }
301
302         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
303                 CERROR ("Error posting receive -> "LPX64": %d\n",
304                         conn->ibc_peer->ibp_nid, rc);
305                 kibnal_close_conn (rx->rx_conn, rc);
306         } else {
307                 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
308                         conn->ibc_peer->ibp_nid, rc);
309         }
310
311         /* Drop rx's ref */
312         kibnal_put_conn (conn);
313         EXIT;
314 }
315
316 #if IBNAL_CKSUM
317 static inline __u32 kibnal_cksum (void *ptr, int nob)
318 {
319         char  *c  = ptr;
320         __u32  sum = 0;
321
322         while (nob-- > 0)
323                 sum = ((sum << 1) | (sum >> 31)) + *c++;
324         
325         return (sum);
326 }
327 #endif
328
329 static void hexdump(char *string, void *ptr, int len)
330 {
331         unsigned char *c = ptr;
332         int i;
333
334         return;
335
336         if (len < 0 || len > 2048)  {
337                 printk("XXX what the hell? %d\n",len);
338                 return;
339         }
340
341         printk("%d bytes of '%s' from 0x%p\n", len, string, ptr);
342
343         for (i = 0; i < len;) {
344                 printk("%02x",*(c++));
345                 i++;
346                 if (!(i & 15)) {
347                         printk("\n");
348                 } else if (!(i&1)) {
349                         printk(" ");
350                 }
351         }
352
353         if(len & 15) {
354                 printk("\n");
355         }
356 }
357
358 static void
359 kibnal_rx_callback (IB_WORK_COMPLETION *wc)
360 {
361         kib_rx_t     *rx = (kib_rx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
362         kib_msg_t    *msg = rx->rx_msg;
363         kib_conn_t   *conn = rx->rx_conn;
364         int           nob = wc->Length;
365         const int     base_nob = offsetof(kib_msg_t, ibm_u);
366         int           credits;
367         int           flipped;
368         unsigned long flags;
369         __u32         i;
370 #if IBNAL_CKSUM
371         __u32         msg_cksum;
372         __u32         computed_cksum;
373 #endif
374
375         /* we set the QP to erroring after we've finished disconnecting, 
376          * maybe we should do so sooner. */
377         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_ESTABLISHED, 
378                                     IBNAL_CONN_DISCONNECTED);
379
380         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
381         LASSERT (rx->rx_posted);
382         rx->rx_posted = 0;
383         mb();
384
385         /* receives complete with error in any case after we've started
386          * disconnecting */
387         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
388                 goto failed;
389
390         if (wc->Status != WRStatusSuccess) {
391                 CERROR("Rx from "LPX64" failed: %d\n", 
392                        conn->ibc_peer->ibp_nid, wc->Status);
393                 goto failed;
394         }
395
396         if (nob < base_nob) {
397                 CERROR ("Short rx from "LPX64": %d < expected %d\n",
398                         conn->ibc_peer->ibp_nid, nob, base_nob);
399                 goto failed;
400         }
401
402         hexdump("rx", rx->rx_msg, sizeof(kib_msg_t));
403
404         /* Receiver does any byte flipping if necessary... */
405
406         if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
407                 flipped = 0;
408         } else {
409                 if (msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
410                         CERROR ("Unrecognised magic: %08x from "LPX64"\n", 
411                                 msg->ibm_magic, conn->ibc_peer->ibp_nid);
412                         goto failed;
413                 }
414                 flipped = 1;
415                 __swab16s (&msg->ibm_version);
416                 LASSERT (sizeof(msg->ibm_type) == 1);
417                 LASSERT (sizeof(msg->ibm_credits) == 1);
418         }
419
420         if (msg->ibm_version != IBNAL_MSG_VERSION) {
421                 CERROR ("Incompatible msg version %d (%d expected)\n",
422                         msg->ibm_version, IBNAL_MSG_VERSION);
423                 goto failed;
424         }
425
426 #if IBNAL_CKSUM
427         if (nob != msg->ibm_nob) {
428                 CERROR ("Unexpected # bytes %d (%d expected)\n", nob, msg->ibm_nob);
429                 goto failed;
430         }
431
432         msg_cksum = le32_to_cpu(msg->ibm_cksum);
433         msg->ibm_cksum = 0;
434         computed_cksum = kibnal_cksum (msg, nob);
435         
436         if (msg_cksum != computed_cksum) {
437                 CERROR ("Checksum failure %d: (%d expected)\n",
438                         computed_cksum, msg_cksum);
439 //                goto failed;
440         }
441         CDEBUG(D_NET, "cksum %x, nob %d\n", computed_cksum, nob);
442 #endif
443
444         /* Have I received credits that will let me send? */
445         credits = msg->ibm_credits;
446         if (credits != 0) {
447                 spin_lock_irqsave(&conn->ibc_lock, flags);
448                 conn->ibc_credits += credits;
449                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
450                 
451                 kibnal_check_sends(conn);
452         }
453
454         switch (msg->ibm_type) {
455         case IBNAL_MSG_NOOP:
456                 kibnal_post_rx (rx, 1);
457                 return;
458
459         case IBNAL_MSG_IMMEDIATE:
460                 if (nob < base_nob + sizeof (kib_immediate_msg_t)) {
461                         CERROR ("Short IMMEDIATE from "LPX64": %d\n",
462                                 conn->ibc_peer->ibp_nid, nob);
463                         goto failed;
464                 }
465                 break;
466                 
467         case IBNAL_MSG_PUT_RDMA:
468         case IBNAL_MSG_GET_RDMA:
469                 if (nob < base_nob + sizeof (kib_rdma_msg_t)) {
470                         CERROR ("Short RDMA msg from "LPX64": %d\n",
471                                 conn->ibc_peer->ibp_nid, nob);
472                         goto failed;
473                 }
474                 if (flipped) 
475                         __swab32(msg->ibm_u.rdma.ibrm_num_descs);
476
477                 CDEBUG(D_NET, "%d RDMA: cookie "LPX64":\n",
478                        msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie);
479
480                 if ((msg->ibm_u.rdma.ibrm_num_descs > PTL_MD_MAX_IOV) ||
481                     (kib_rdma_msg_len(msg->ibm_u.rdma.ibrm_num_descs) > 
482                      min(nob, IBNAL_MSG_SIZE))) {
483                         CERROR ("num_descs %d too large\n", 
484                                 msg->ibm_u.rdma.ibrm_num_descs);
485                         goto failed;
486                 }
487
488                 for(i = 0; i < msg->ibm_u.rdma.ibrm_num_descs; i++) {
489                         kib_rdma_desc_t *desc = &msg->ibm_u.rdma.ibrm_desc[i];
490
491                         if (flipped) {
492                                 __swab32(desc->rd_key);
493                                 __swab32(desc->rd_nob);
494                                 __swab64(desc->rd_addr);
495                         }
496
497                         CDEBUG(D_NET, "  key %x, " "addr "LPX64", nob %u\n",
498                                desc->rd_key, desc->rd_addr, desc->rd_nob);
499                 }
500                 break;
501                         
502         case IBNAL_MSG_PUT_DONE:
503         case IBNAL_MSG_GET_DONE:
504                 if (nob < base_nob + sizeof (kib_completion_msg_t)) {
505                         CERROR ("Short COMPLETION msg from "LPX64": %d\n",
506                                 conn->ibc_peer->ibp_nid, nob);
507                         goto failed;
508                 }
509                 if (flipped)
510                         __swab32s(&msg->ibm_u.completion.ibcm_status);
511                 
512                 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
513                        msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
514                        msg->ibm_u.completion.ibcm_status);
515
516                 kibnal_complete_passive_rdma (conn, 
517                                               msg->ibm_u.completion.ibcm_cookie,
518                                               msg->ibm_u.completion.ibcm_status);
519                 kibnal_post_rx (rx, 1);
520                 return;
521                         
522         default:
523                 CERROR ("Can't parse type from "LPX64": %d\n",
524                         conn->ibc_peer->ibp_nid, msg->ibm_type);
525                 goto failed;
526         }
527
528         /* schedule for kibnal_rx() in thread context */
529         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
530         
531         list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
532         wake_up (&kibnal_data.kib_sched_waitq);
533         
534         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
535         return;
536         
537  failed:
538         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
539         kibnal_close_conn(conn, -ECONNABORTED);
540
541         /* Don't re-post rx & drop its ref on conn */
542         kibnal_put_conn(conn);
543 }
544
545 void
546 kibnal_rx (kib_rx_t *rx)
547 {
548         kib_msg_t   *msg = rx->rx_msg;
549
550         /* Clear flag so I can detect if I've sent an RDMA completion */
551         rx->rx_rdma = 0;
552
553         switch (msg->ibm_type) {
554         case IBNAL_MSG_GET_RDMA:
555                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
556                 /* If the incoming get was matched, I'll have initiated the
557                  * RDMA and the completion message... */
558                 if (rx->rx_rdma)
559                         break;
560
561                 /* Otherwise, I'll send a failed completion now to prevent
562                  * the peer's GET blocking for the full timeout. */
563                 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
564                         rx->rx_conn->ibc_peer->ibp_nid);
565                 kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO,
566                                           rx, NULL, 0, NULL, NULL, 0, 0);
567                 break;
568                 
569         case IBNAL_MSG_PUT_RDMA:
570                 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
571                 if (rx->rx_rdma)
572                         break;
573                 /* This is most unusual, since even if lib_parse() didn't
574                  * match anything, it should have asked us to read (and
575                  * discard) the payload.  The portals header must be
576                  * inconsistent with this message type, so it's the
577                  * sender's fault for sending garbage and she can time
578                  * herself out... */
579                 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
580                         rx->rx_conn->ibc_peer->ibp_nid);
581                 break;
582
583         case IBNAL_MSG_IMMEDIATE:
584                 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
585                 LASSERT (!rx->rx_rdma);
586                 break;
587                 
588         default:
589                 LBUG();
590                 break;
591         }
592
593         kibnal_post_rx (rx, 1);
594 }
595
596 static struct page *
597 kibnal_kvaddr_to_page (unsigned long vaddr)
598 {
599         struct page *page;
600
601         if (vaddr >= VMALLOC_START &&
602             vaddr < VMALLOC_END)
603                 page = vmalloc_to_page ((void *)vaddr);
604 #if CONFIG_HIGHMEM
605         else if (vaddr >= PKMAP_BASE &&
606                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
607                 page = vmalloc_to_page ((void *)vaddr);
608         /* in 2.4 ^ just walks the page tables */
609 #endif
610         else
611                 page = virt_to_page (vaddr);
612
613         if (!VALID_PAGE (page))
614                 page = NULL;
615
616         return page;
617 }
618
619 static void
620 kibnal_fill_ibrm(kib_tx_t *tx, struct page *page, unsigned long page_offset,
621                  unsigned long len, int active)
622 {
623         kib_rdma_msg_t *ibrm = &tx->tx_msg->ibm_u.rdma;
624         kib_rdma_desc_t *desc;
625
626         LASSERTF(ibrm->ibrm_num_descs < PTL_MD_MAX_IOV, "%u\n", 
627                  ibrm->ibrm_num_descs);
628
629         desc = &ibrm->ibrm_desc[ibrm->ibrm_num_descs];
630         if (active)
631                 desc->rd_key = kibnal_data.kib_md.md_lkey;
632         else
633                 desc->rd_key = kibnal_data.kib_md.md_rkey;
634         desc->rd_nob = len; /*PAGE_SIZE - kiov->kiov_offset; */
635         desc->rd_addr = kibnal_page2phys(page) + page_offset +
636                         kibnal_data.kib_md.md_addr;
637
638         ibrm->ibrm_num_descs++;
639 }
640
641 static int
642 kibnal_map_rdma_iov(kib_tx_t *tx, unsigned long vaddr, int nob, int active)
643 {
644         struct page *page;
645         int page_offset, len;
646
647         while (nob > 0) {
648                 page = kibnal_kvaddr_to_page(vaddr);
649                 if (page == NULL)
650                         return -EFAULT;
651
652                 page_offset = vaddr & (PAGE_SIZE - 1);
653                 len = min(nob, (int)PAGE_SIZE - page_offset);
654                 
655                 kibnal_fill_ibrm(tx, page, page_offset, len, active);
656                 nob -= len;
657                 vaddr += len;
658         }
659         return 0;
660 }
661
662 static int
663 kibnal_map_iov (kib_tx_t *tx, IB_ACCESS_CONTROL access,
664                  int niov, struct iovec *iov, int offset, int nob, int active)
665                  
666 {
667         void   *vaddr;
668         FSTATUS frc;
669
670         LASSERT (nob > 0);
671         LASSERT (niov > 0);
672         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
673
674         while (offset >= iov->iov_len) {
675                 offset -= iov->iov_len;
676                 niov--;
677                 iov++;
678                 LASSERT (niov > 0);
679         }
680
681         if (nob > iov->iov_len - offset) {
682                 CERROR ("Can't map multiple vaddr fragments\n");
683                 return (-EMSGSIZE);
684         }
685
686         /* our large contiguous iov could be backed by multiple physical
687          * pages. */
688         if (kibnal_whole_mem()) {
689                 int rc;
690                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 0;
691                 rc = kibnal_map_rdma_iov(tx, (unsigned long)iov->iov_base + 
692                                          offset, nob, active);
693                 if (rc != 0) {
694                         CERROR ("Can't map iov: %d\n", rc);
695                         return rc;
696                 }
697                 return 0;
698         }
699
700         vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
701         tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
702
703         frc = iibt_register_memory(kibnal_data.kib_hca, vaddr, nob,
704                                    kibnal_data.kib_pd, access,
705                                    &tx->tx_md.md_handle, &tx->tx_md.md_lkey,
706                                    &tx->tx_md.md_rkey);
707         if (frc != 0) {
708                 CERROR ("Can't map vaddr %p: %d\n", vaddr, frc);
709                 return -EINVAL;
710         }
711
712         tx->tx_mapped = KIB_TX_MAPPED;
713         return (0);
714 }
715
716 static int
717 kibnal_map_kiov (kib_tx_t *tx, IB_ACCESS_CONTROL access,
718                   int nkiov, ptl_kiov_t *kiov,
719                   int offset, int nob, int active)
720 {
721         __u64                      *phys = NULL;
722         int                         page_offset;
723         int                         nphys;
724         int                         resid;
725         int                         phys_size = 0;
726         FSTATUS                     frc;
727         int                         i, rc = 0;
728
729         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
730
731         LASSERT (nob > 0);
732         LASSERT (nkiov > 0);
733         LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
734
735         while (offset >= kiov->kiov_len) {
736                 offset -= kiov->kiov_len;
737                 nkiov--;
738                 kiov++;
739                 LASSERT (nkiov > 0);
740         }
741
742         page_offset = kiov->kiov_offset + offset;
743         nphys = 1;
744
745         if (!kibnal_whole_mem()) {
746                 phys_size = nkiov * sizeof (*phys);
747                 PORTAL_ALLOC(phys, phys_size);
748                 if (phys == NULL) {
749                         CERROR ("Can't allocate tmp phys\n");
750                         return (-ENOMEM);
751                 }
752
753                 phys[0] = kibnal_page2phys(kiov->kiov_page);
754         } else {
755                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 0;
756                 kibnal_fill_ibrm(tx, kiov->kiov_page, kiov->kiov_offset, 
757                                  kiov->kiov_len, active);
758         }
759
760         resid = nob - (kiov->kiov_len - offset);
761
762         while (resid > 0) {
763                 kiov++;
764                 nkiov--;
765                 LASSERT (nkiov > 0);
766
767                 if (kiov->kiov_offset != 0 ||
768                     ((resid > PAGE_SIZE) && 
769                      kiov->kiov_len < PAGE_SIZE)) {
770                         /* Can't have gaps */
771                         CERROR ("Can't make payload contiguous in I/O VM:"
772                                 "page %d, offset %d, len %d \n", nphys, 
773                                 kiov->kiov_offset, kiov->kiov_len);
774
775                         for (i = -nphys; i < nkiov; i++) 
776                         {
777                                 CERROR("kiov[%d] %p +%d for %d\n",
778                                        i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
779                         }
780                         
781                         rc = -EINVAL;
782                         goto out;
783                 }
784
785                 if (nphys == PTL_MD_MAX_IOV) {
786                         CERROR ("payload too big (%d)\n", nphys);
787                         rc = -EMSGSIZE;
788                         goto out;
789                 }
790
791                 if (!kibnal_whole_mem()) {
792                         LASSERT (nphys * sizeof (*phys) < phys_size);
793                         phys[nphys] = kibnal_page2phys(kiov->kiov_page);
794                 } else {
795                         if (kib_rdma_msg_len(nphys) > IBNAL_MSG_SIZE) {
796                                 CERROR ("payload too big (%d)\n", nphys);
797                                 rc = -EMSGSIZE;
798                                 goto out;
799                         }
800                         kibnal_fill_ibrm(tx, kiov->kiov_page, 
801                                          kiov->kiov_offset, kiov->kiov_len,
802                                          active);
803                 }
804
805                 nphys ++;
806                 resid -= PAGE_SIZE;
807         }
808
809         if (kibnal_whole_mem())
810                 goto out;
811
812 #if 0
813         CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
814         for (i = 0; i < nphys; i++)
815                 CWARN ("   [%d] "LPX64"\n", i, phys[i]);
816 #endif
817
818 #if IBNAL_FMR
819 #error "iibnal hasn't learned about FMR yet"
820         rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
821                                        phys, nphys,
822                                        &tx->tx_md.md_addr,
823                                        page_offset,
824                                        &tx->tx_md.md_handle.fmr,
825                                        &tx->tx_md.md_lkey,
826                                        &tx->tx_md.md_rkey);
827 #else
828         frc = iibt_register_physical_memory(kibnal_data.kib_hca,
829                                             IBNAL_RDMA_BASE,
830                                             phys, nphys,
831                                             0,          /* offset */
832                                             kibnal_data.kib_pd,
833                                             access,
834                                             &tx->tx_md.md_handle,
835                                             &tx->tx_md.md_addr,
836                                             &tx->tx_md.md_lkey,
837                                             &tx->tx_md.md_rkey);
838 #endif
839         if (frc == FSUCCESS) {
840                 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
841                        nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
842 #if IBNAL_FMR
843                 tx->tx_mapped = KIB_TX_MAPPED_FMR;
844 #else
845                 tx->tx_mapped = KIB_TX_MAPPED;
846 #endif
847         } else {
848                 CERROR ("Can't map phys: %d\n", rc);
849                 rc = -EFAULT;
850         }
851
852  out:
853         if (phys != NULL)
854                 PORTAL_FREE(phys, phys_size);
855         return (rc);
856 }
857
858 static kib_conn_t *
859 kibnal_find_conn_locked (kib_peer_t *peer)
860 {
861         struct list_head *tmp;
862
863         /* just return the first connection */
864         list_for_each (tmp, &peer->ibp_conns) {
865                 return (list_entry(tmp, kib_conn_t, ibc_list));
866         }
867
868         return (NULL);
869 }
870
871 void
872 kibnal_check_sends (kib_conn_t *conn)
873 {
874         unsigned long   flags;
875         kib_tx_t       *tx;
876         int             rc;
877         int             i;
878         int             done;
879         int             nwork;
880         ENTRY;
881
882         spin_lock_irqsave (&conn->ibc_lock, flags);
883
884         LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
885
886         if (list_empty(&conn->ibc_tx_queue) &&
887             conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
888                 spin_unlock_irqrestore(&conn->ibc_lock, flags);
889                 
890                 tx = kibnal_get_idle_tx(0);     /* don't block */
891                 if (tx != NULL)
892                         kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
893
894                 spin_lock_irqsave(&conn->ibc_lock, flags);
895                 
896                 if (tx != NULL) {
897                         atomic_inc(&conn->ibc_refcount);
898                         kibnal_queue_tx_locked(tx, conn);
899                 }
900         }
901
902         while (!list_empty (&conn->ibc_tx_queue)) {
903                 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
904
905                 /* We rely on this for QP sizing */
906                 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= IBNAL_TX_MAX_SG);
907
908                 LASSERT (conn->ibc_outstanding_credits >= 0);
909                 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
910                 LASSERT (conn->ibc_credits >= 0);
911                 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
912
913                 /* Not on ibc_rdma_queue */
914                 LASSERT (!tx->tx_passive_rdma_wait);
915
916                 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE)
917                         GOTO(out, 0);
918
919                 if (conn->ibc_credits == 0)     /* no credits */
920                         GOTO(out, 1);
921                 
922                 if (conn->ibc_credits == 1 &&   /* last credit reserved for */
923                     conn->ibc_outstanding_credits == 0) /* giving back credits */
924                         GOTO(out, 2);
925
926                 list_del (&tx->tx_list);
927
928                 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
929                     (!list_empty(&conn->ibc_tx_queue) ||
930                      conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
931                         /* redundant NOOP */
932                         spin_unlock_irqrestore(&conn->ibc_lock, flags);
933                         kibnal_tx_done(tx);
934                         spin_lock_irqsave(&conn->ibc_lock, flags);
935                         continue;
936                 }
937
938                 tx->tx_msg->ibm_credits = conn->ibc_outstanding_credits;
939                 conn->ibc_outstanding_credits = 0;
940
941                 conn->ibc_nsends_posted++;
942                 conn->ibc_credits--;
943
944                 /* we only get a tx completion for the final rdma op */ 
945                 tx->tx_sending = min(tx->tx_nsp, 2);
946                 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
947                 list_add (&tx->tx_list, &conn->ibc_active_txs);
948 #if IBNAL_CKSUM
949                 tx->tx_msg->ibm_cksum = 0;
950                 tx->tx_msg->ibm_cksum = kibnal_cksum(tx->tx_msg, tx->tx_msg->ibm_nob);
951                 CDEBUG(D_NET, "cksum %x, nob %d\n", tx->tx_msg->ibm_cksum, tx->tx_msg->ibm_nob);
952 #endif
953                 spin_unlock_irqrestore (&conn->ibc_lock, flags);
954
955                 /* NB the gap between removing tx from the queue and sending it
956                  * allows message re-ordering to occur */
957
958                 LASSERT (tx->tx_nsp > 0);
959
960                 rc = -ECONNABORTED;
961                 nwork = 0;
962                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
963                         tx->tx_status = 0;
964                         /* Driver only accepts 1 item at a time */
965                         for (i = 0; i < tx->tx_nsp; i++) {
966                                 hexdump("tx", tx->tx_msg, sizeof(kib_msg_t));
967                                 rc = iibt_postsend(conn->ibc_qp, 
968                                                    &tx->tx_wrq[i]);
969                                 if (rc != 0)
970                                         break;
971                                 if (wrq_signals_completion(&tx->tx_wrq[i]))
972                                         nwork++;
973                                 CDEBUG(D_NET, "posted tx wrq %p\n", 
974                                        &tx->tx_wrq[i]);
975                         }
976                 }
977
978                 spin_lock_irqsave (&conn->ibc_lock, flags);
979                 if (rc != 0) {
980                         /* NB credits are transferred in the actual
981                          * message, which can only be the last work item */
982                         conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
983                         conn->ibc_credits++;
984                         conn->ibc_nsends_posted--;
985
986                         tx->tx_status = rc;
987                         tx->tx_passive_rdma_wait = 0;
988                         tx->tx_sending -= tx->tx_nsp - nwork;
989
990                         done = (tx->tx_sending == 0);
991                         if (done)
992                                 list_del (&tx->tx_list);
993                         
994                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
995                         
996                         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
997                                 CERROR ("Error %d posting transmit to "LPX64"\n", 
998                                         rc, conn->ibc_peer->ibp_nid);
999                         else
1000                                 CDEBUG (D_NET, "Error %d posting transmit to "
1001                                         LPX64"\n", rc, conn->ibc_peer->ibp_nid);
1002
1003                         kibnal_close_conn (conn, rc);
1004
1005                         if (done)
1006                                 kibnal_tx_done (tx);
1007                         return;
1008                 }
1009                 
1010         }
1011
1012         EXIT;
1013 out:
1014         spin_unlock_irqrestore (&conn->ibc_lock, flags);
1015 }
1016
1017 static void
1018 kibnal_tx_callback (IB_WORK_COMPLETION *wc)
1019 {
1020         kib_tx_t     *tx = (kib_tx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
1021         kib_conn_t   *conn;
1022         unsigned long flags;
1023         int           idle;
1024
1025         conn = tx->tx_conn;
1026         LASSERT (conn != NULL);
1027         LASSERT (tx->tx_sending != 0);
1028
1029         spin_lock_irqsave(&conn->ibc_lock, flags);
1030
1031         CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
1032                tx->tx_sending, tx->tx_nsp, wc->Status);
1033
1034         /* I could be racing with rdma completion.  Whoever makes 'tx' idle
1035          * gets to free it, which also drops its ref on 'conn'.  If it's
1036          * not me, then I take an extra ref on conn so it can't disappear
1037          * under me. */
1038
1039         tx->tx_sending--;
1040         idle = (tx->tx_sending == 0) &&         /* This is the final callback */
1041                (!tx->tx_passive_rdma_wait);     /* Not waiting for RDMA completion */
1042         if (idle)
1043                 list_del(&tx->tx_list);
1044
1045         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1046                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1047                atomic_read (&conn->ibc_refcount));
1048         atomic_inc (&conn->ibc_refcount);
1049
1050         if (tx->tx_sending == 0)
1051                 conn->ibc_nsends_posted--;
1052
1053         if (wc->Status != WRStatusSuccess &&
1054             tx->tx_status == 0)
1055                 tx->tx_status = -ECONNABORTED;
1056                 
1057         spin_unlock_irqrestore(&conn->ibc_lock, flags);
1058
1059         if (idle)
1060                 kibnal_tx_done (tx);
1061
1062         if (wc->Status != WRStatusSuccess) {
1063                 CERROR ("Tx completion to "LPX64" failed: %d\n", 
1064                         conn->ibc_peer->ibp_nid, wc->Status);
1065                 kibnal_close_conn (conn, -ENETDOWN);
1066         } else {
1067                 /* can I shovel some more sends out the door? */
1068                 kibnal_check_sends(conn);
1069         }
1070
1071         kibnal_put_conn (conn);
1072 }
1073
1074 void 
1075 kibnal_ca_async_callback (void *ca_arg, IB_EVENT_RECORD *ev)
1076 {
1077         /* XXX flesh out.  this seems largely for async errors */
1078         CERROR("type: %d code: %u\n", ev->EventType, ev->EventCode);
1079 }
1080
1081 void
1082 kibnal_ca_callback (void *ca_arg, void *cq_arg)
1083 {
1084         IB_HANDLE cq = *(IB_HANDLE *)cq_arg;
1085         IB_HANDLE ca = *(IB_HANDLE *)ca_arg;
1086         IB_WORK_COMPLETION wc;
1087         int armed = 0;
1088
1089         CDEBUG(D_NET, "ca %p cq %p\n", ca, cq);
1090
1091         for(;;) {
1092                 while (iibt_cq_poll(cq, &wc) == FSUCCESS) {
1093                         if (kibnal_wreqid_is_rx(wc.WorkReqId))
1094                                 kibnal_rx_callback(&wc);
1095                         else
1096                                 kibnal_tx_callback(&wc);
1097                 }
1098                 if (armed)
1099                         return;
1100                 if (iibt_cq_rearm(cq, CQEventSelNextWC) != FSUCCESS) {
1101                         CERROR("rearm failed?\n");
1102                         return;
1103                 }
1104                 armed = 1;
1105         }
1106 }
1107
1108 void
1109 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1110 {
1111         IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[tx->tx_nsp];
1112         IB_WORK_REQ         *wrq = &tx->tx_wrq[tx->tx_nsp];
1113         int                       fence;
1114         int                       nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1115
1116         LASSERT (tx->tx_nsp >= 0 && 
1117                  tx->tx_nsp < sizeof(tx->tx_wrq)/sizeof(tx->tx_wrq[0]));
1118         LASSERT (nob <= IBNAL_MSG_SIZE);
1119         
1120         tx->tx_msg->ibm_magic = IBNAL_MSG_MAGIC;
1121         tx->tx_msg->ibm_version = IBNAL_MSG_VERSION;
1122         tx->tx_msg->ibm_type = type;
1123 #if IBNAL_CKSUM
1124         tx->tx_msg->ibm_nob = nob;
1125 #endif
1126         /* Fence the message if it's bundled with an RDMA read */
1127         fence = (tx->tx_nsp > 0) &&
1128                 (type == IBNAL_MSG_PUT_DONE);
1129
1130         *gl = (IB_LOCAL_DATASEGMENT) {
1131                 .Address = tx->tx_vaddr,
1132                 .Length  = IBNAL_MSG_SIZE,
1133                 .Lkey    = kibnal_lkey(kibnal_data.kib_tx_pages),
1134         };
1135
1136         wrq->WorkReqId      = kibnal_ptr2wreqid(tx, 0);
1137         wrq->Operation      = WROpSend;
1138         wrq->DSList         = gl;
1139         wrq->DSListDepth    = 1;
1140         wrq->MessageLen     = nob;
1141         wrq->Req.SendRC.ImmediateData  = 0;
1142         wrq->Req.SendRC.Options.s.SolicitedEvent         = 1;
1143         wrq->Req.SendRC.Options.s.SignaledCompletion     = 1;
1144         wrq->Req.SendRC.Options.s.ImmediateData          = 0;
1145         wrq->Req.SendRC.Options.s.Fence                  = fence;
1146
1147         tx->tx_nsp++;
1148 }
1149
1150 static void
1151 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1152 {
1153         unsigned long         flags;
1154
1155         spin_lock_irqsave(&conn->ibc_lock, flags);
1156
1157         kibnal_queue_tx_locked (tx, conn);
1158         
1159         spin_unlock_irqrestore(&conn->ibc_lock, flags);
1160         
1161         kibnal_check_sends(conn);
1162 }
1163
1164 static void
1165 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1166 {
1167         unsigned long    flags;
1168         kib_peer_t      *peer;
1169         kib_conn_t      *conn;
1170         rwlock_t        *g_lock = &kibnal_data.kib_global_lock;
1171
1172         /* If I get here, I've committed to send, so I complete the tx with
1173          * failure on any problems */
1174         
1175         LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */
1176         LASSERT (tx->tx_nsp > 0);               /* work items have been set up */
1177
1178         read_lock (g_lock);
1179         
1180         peer = kibnal_find_peer_locked (nid);
1181         if (peer == NULL) {
1182                 read_unlock (g_lock);
1183                 tx->tx_status = -EHOSTUNREACH;
1184                 kibnal_tx_done (tx);
1185                 return;
1186         }
1187
1188         conn = kibnal_find_conn_locked (peer);
1189         if (conn != NULL) {
1190                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1191                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1192                        atomic_read (&conn->ibc_refcount));
1193                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1194                 read_unlock (g_lock);
1195                 
1196                 kibnal_queue_tx (tx, conn);
1197                 return;
1198         }
1199         
1200         /* Making one or more connections; I'll need a write lock... */
1201         read_unlock (g_lock);
1202         write_lock_irqsave (g_lock, flags);
1203
1204         peer = kibnal_find_peer_locked (nid);
1205         if (peer == NULL) {
1206                 write_unlock_irqrestore (g_lock, flags);
1207                 tx->tx_status = -EHOSTUNREACH;
1208                 kibnal_tx_done (tx);
1209                 return;
1210         }
1211
1212         conn = kibnal_find_conn_locked (peer);
1213         if (conn != NULL) {
1214                 /* Connection exists; queue message on it */
1215                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1216                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1217                        atomic_read (&conn->ibc_refcount));
1218                 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1219                 write_unlock_irqrestore (g_lock, flags);
1220                 
1221                 kibnal_queue_tx (tx, conn);
1222                 return;
1223         }
1224
1225         if (peer->ibp_connecting == 0) {
1226                 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1227                         write_unlock_irqrestore (g_lock, flags);
1228                         tx->tx_status = -EHOSTUNREACH;
1229                         kibnal_tx_done (tx);
1230                         return;
1231                 }
1232         
1233                 peer->ibp_connecting = 1;
1234                 kib_peer_addref(peer); /* extra ref for connd */
1235         
1236                 spin_lock (&kibnal_data.kib_connd_lock);
1237         
1238                 list_add_tail (&peer->ibp_connd_list,
1239                                &kibnal_data.kib_connd_peers);
1240                 wake_up (&kibnal_data.kib_connd_waitq);
1241         
1242                 spin_unlock (&kibnal_data.kib_connd_lock);
1243         }
1244         
1245         /* A connection is being established; queue the message... */
1246         list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1247
1248         write_unlock_irqrestore (g_lock, flags);
1249 }
1250
1251 static ptl_err_t
1252 kibnal_start_passive_rdma (int type, ptl_nid_t nid,
1253                             lib_msg_t *libmsg, ptl_hdr_t *hdr)
1254 {
1255         int         nob = libmsg->md->length;
1256         kib_tx_t   *tx;
1257         kib_msg_t  *ibmsg;
1258         int         rc;
1259         IB_ACCESS_CONTROL         access = {0,};
1260         
1261         LASSERT (type == IBNAL_MSG_PUT_RDMA || type == IBNAL_MSG_GET_RDMA);
1262         LASSERT (nob > 0);
1263         LASSERT (!in_interrupt());              /* Mapping could block */
1264
1265         access.s.MWBindable = 1;
1266         access.s.LocalWrite = 1;
1267         access.s.RdmaRead = 1;
1268         access.s.RdmaWrite = 1;
1269
1270         tx = kibnal_get_idle_tx (1);           /* May block; caller is an app thread */
1271         LASSERT (tx != NULL);
1272
1273         if ((libmsg->md->options & PTL_MD_KIOV) == 0) 
1274                 rc = kibnal_map_iov (tx, access,
1275                                      libmsg->md->md_niov,
1276                                      libmsg->md->md_iov.iov,
1277                                      0, nob, 0);
1278         else
1279                 rc = kibnal_map_kiov (tx, access,
1280                                       libmsg->md->md_niov, 
1281                                       libmsg->md->md_iov.kiov,
1282                                       0, nob, 0);
1283
1284         if (rc != 0) {
1285                 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1286                 goto failed;
1287         }
1288         
1289         if (type == IBNAL_MSG_GET_RDMA) {
1290                 /* reply gets finalized when tx completes */
1291                 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib, 
1292                                                         nid, libmsg);
1293                 if (tx->tx_libmsg[1] == NULL) {
1294                         CERROR ("Can't create reply for GET -> "LPX64"\n",
1295                                 nid);
1296                         rc = -ENOMEM;
1297                         goto failed;
1298                 }
1299         }
1300         
1301         tx->tx_passive_rdma = 1;
1302
1303         ibmsg = tx->tx_msg;
1304
1305         ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
1306         ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1307         /* map_kiov alrady filled the rdma descs for the whole_mem case */
1308         if (!kibnal_whole_mem()) {
1309                 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_key = tx->tx_md.md_rkey;
1310                 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
1311                 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
1312                 ibmsg->ibm_u.rdma.ibrm_num_descs = 1;
1313         }
1314
1315         kibnal_init_tx_msg (tx, type, 
1316                             kib_rdma_msg_len(ibmsg->ibm_u.rdma.ibrm_num_descs));
1317
1318         CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1319                LPX64", nob %d\n",
1320                tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1321                tx->tx_md.md_addr, nob);
1322         
1323         /* libmsg gets finalized when tx completes. */
1324         tx->tx_libmsg[0] = libmsg;
1325
1326         kibnal_launch_tx(tx, nid);
1327         return (PTL_OK);
1328
1329  failed:
1330         tx->tx_status = rc;
1331         kibnal_tx_done (tx);
1332         return (PTL_FAIL);
1333 }
1334
1335 void
1336 kibnal_start_active_rdma (int type, int status,
1337                            kib_rx_t *rx, lib_msg_t *libmsg, 
1338                            unsigned int niov,
1339                            struct iovec *iov, ptl_kiov_t *kiov,
1340                            size_t offset, size_t nob)
1341 {
1342         kib_msg_t    *rxmsg = rx->rx_msg;
1343         kib_msg_t    *txmsg;
1344         kib_tx_t     *tx;
1345         IB_ACCESS_CONTROL access = {0,};
1346         IB_WR_OP      rdma_op;
1347         int           rc;
1348         __u32         i;
1349
1350         CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1351                type, status, niov, offset, nob);
1352
1353         /* Called by scheduler */
1354         LASSERT (!in_interrupt ());
1355
1356         /* Either all pages or all vaddrs */
1357         LASSERT (!(kiov != NULL && iov != NULL));
1358
1359         /* No data if we're completing with failure */
1360         LASSERT (status == 0 || nob == 0);
1361
1362         LASSERT (type == IBNAL_MSG_GET_DONE ||
1363                  type == IBNAL_MSG_PUT_DONE);
1364
1365         /* Flag I'm completing the RDMA.  Even if I fail to send the
1366          * completion message, I will have tried my best so further
1367          * attempts shouldn't be tried. */
1368         LASSERT (!rx->rx_rdma);
1369         rx->rx_rdma = 1;
1370
1371         if (type == IBNAL_MSG_GET_DONE) {
1372                 rdma_op  = WROpRdmaWrite;
1373                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1374         } else {
1375                 access.s.LocalWrite = 1;
1376                 rdma_op  = WROpRdmaRead;
1377                 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1378         }
1379
1380         tx = kibnal_get_idle_tx (0);           /* Mustn't block */
1381         if (tx == NULL) {
1382                 CERROR ("tx descs exhausted on RDMA from "LPX64
1383                         " completing locally with failure\n",
1384                         rx->rx_conn->ibc_peer->ibp_nid);
1385                 lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1386                 return;
1387         }
1388         LASSERT (tx->tx_nsp == 0);
1389                         
1390         if (nob == 0) 
1391                 GOTO(init_tx, 0);
1392
1393         /* We actually need to transfer some data (the transfer
1394          * size could get truncated to zero when the incoming
1395          * message is matched) */
1396         if (kiov != NULL)
1397                 rc = kibnal_map_kiov (tx, access, niov, kiov, offset, nob, 1);
1398         else
1399                 rc = kibnal_map_iov (tx, access, niov, iov, offset, nob, 1);
1400         
1401         if (rc != 0) {
1402                 CERROR ("Can't map RDMA -> "LPX64": %d\n", 
1403                         rx->rx_conn->ibc_peer->ibp_nid, rc);
1404                 /* We'll skip the RDMA and complete with failure. */
1405                 status = rc;
1406                 nob = 0;
1407                 GOTO(init_tx, rc);
1408         } 
1409
1410         if (!kibnal_whole_mem()) {
1411                 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_key = tx->tx_md.md_lkey;
1412                 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
1413                 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
1414                 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 1;
1415         }
1416
1417         /* XXX ugh.  different page-sized hosts. */ 
1418         if (tx->tx_msg->ibm_u.rdma.ibrm_num_descs !=
1419             rxmsg->ibm_u.rdma.ibrm_num_descs) {
1420                 CERROR("tx descs (%u) != rx descs (%u)\n", 
1421                        tx->tx_msg->ibm_u.rdma.ibrm_num_descs,
1422                        rxmsg->ibm_u.rdma.ibrm_num_descs);
1423                 /* We'll skip the RDMA and complete with failure. */
1424                 status = rc;
1425                 nob = 0;
1426                 GOTO(init_tx, rc);
1427         }
1428
1429         /* map_kiov filled in the rdma descs which describe our side of the
1430          * rdma transfer. */
1431         /* ibrm_num_descs was verified in rx_callback */
1432         for(i = 0; i < rxmsg->ibm_u.rdma.ibrm_num_descs; i++) {
1433                 kib_rdma_desc_t *ldesc, *rdesc; /* local, remote */
1434                 IB_LOCAL_DATASEGMENT *ds = &tx->tx_gl[i];
1435                 IB_WORK_REQ  *wrq = &tx->tx_wrq[i];
1436
1437                 ldesc = &tx->tx_msg->ibm_u.rdma.ibrm_desc[i];
1438                 rdesc = &rxmsg->ibm_u.rdma.ibrm_desc[i];
1439
1440                 ds->Address = ldesc->rd_addr;
1441                 ds->Length  = ldesc->rd_nob;
1442                 ds->Lkey    = ldesc->rd_key;
1443
1444                 memset(wrq, 0, sizeof(*wrq));
1445                 wrq->WorkReqId      = kibnal_ptr2wreqid(tx, 0);
1446                 wrq->Operation      = rdma_op;
1447                 wrq->DSList         = ds;
1448                 wrq->DSListDepth    = 1;
1449                 wrq->MessageLen     = ds->Length;
1450                 wrq->Req.SendRC.ImmediateData  = 0;
1451                 wrq->Req.SendRC.Options.s.SolicitedEvent         = 0;
1452                 wrq->Req.SendRC.Options.s.SignaledCompletion     = 0;
1453                 wrq->Req.SendRC.Options.s.ImmediateData          = 0;
1454                 wrq->Req.SendRC.Options.s.Fence                  = 0;
1455                 wrq->Req.SendRC.RemoteDS.Address = rdesc->rd_addr;
1456                 wrq->Req.SendRC.RemoteDS.Rkey = rdesc->rd_key;
1457
1458                 /* only the last rdma post triggers tx completion */
1459                 if (i == rxmsg->ibm_u.rdma.ibrm_num_descs - 1)
1460                         wrq->Req.SendRC.Options.s.SignaledCompletion = 1;
1461
1462                 tx->tx_nsp++;
1463         }
1464
1465 init_tx:
1466         txmsg = tx->tx_msg;
1467
1468         txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1469         txmsg->ibm_u.completion.ibcm_status = status;
1470         
1471         kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1472
1473         if (status == 0 && nob != 0) {
1474                 LASSERT (tx->tx_nsp > 1);
1475                 /* RDMA: libmsg gets finalized when the tx completes.  This
1476                  * is after the completion message has been sent, which in
1477                  * turn is after the RDMA has finished. */
1478                 tx->tx_libmsg[0] = libmsg;
1479         } else {
1480                 LASSERT (tx->tx_nsp == 1);
1481                 /* No RDMA: local completion happens now! */
1482                 CDEBUG(D_WARNING,"No data: immediate completion\n");
1483                 lib_finalize (&kibnal_lib, NULL, libmsg,
1484                               status == 0 ? PTL_OK : PTL_FAIL);
1485         }
1486
1487         /* +1 ref for this tx... */
1488         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1489                rx->rx_conn, rx->rx_conn->ibc_state, 
1490                rx->rx_conn->ibc_peer->ibp_nid,
1491                atomic_read (&rx->rx_conn->ibc_refcount));
1492         atomic_inc (&rx->rx_conn->ibc_refcount);
1493         /* ...and queue it up */
1494         kibnal_queue_tx(tx, rx->rx_conn);
1495 }
1496
1497 static ptl_err_t
1498 kibnal_sendmsg(lib_nal_t    *nal, 
1499                 void         *private,
1500                 lib_msg_t    *libmsg,
1501                 ptl_hdr_t    *hdr, 
1502                 int           type, 
1503                 ptl_nid_t     nid, 
1504                 ptl_pid_t     pid,
1505                 unsigned int  payload_niov, 
1506                 struct iovec *payload_iov, 
1507                 ptl_kiov_t   *payload_kiov,
1508                 size_t        payload_offset,
1509                 size_t        payload_nob)
1510 {
1511         kib_msg_t  *ibmsg;
1512         kib_tx_t   *tx;
1513         int         nob;
1514
1515         /* NB 'private' is different depending on what we're sending.... */
1516
1517         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1518                " pid %d\n", payload_nob, payload_niov, nid , pid);
1519
1520         LASSERT (payload_nob == 0 || payload_niov > 0);
1521         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1522
1523         /* Thread context if we're sending payload */
1524         LASSERT (!in_interrupt() || payload_niov == 0);
1525         /* payload is either all vaddrs or all pages */
1526         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1527
1528         switch (type) {
1529         default:
1530                 LBUG();
1531                 return (PTL_FAIL);
1532                 
1533         case PTL_MSG_REPLY: {
1534                 /* reply's 'private' is the incoming receive */
1535                 kib_rx_t *rx = private;
1536
1537                 /* RDMA reply expected? */
1538                 if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) {
1539                         kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1540                                                  rx, libmsg, payload_niov, 
1541                                                  payload_iov, payload_kiov,
1542                                                  payload_offset, payload_nob);
1543                         return (PTL_OK);
1544                 }
1545                 
1546                 /* Incoming message consistent with immediate reply? */
1547                 if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
1548                         CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1549                                 nid, rx->rx_msg->ibm_type);
1550                         return (PTL_FAIL);
1551                 }
1552
1553                 /* Will it fit in a message? */
1554                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1555                 if (nob >= IBNAL_MSG_SIZE) {
1556                         CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n", 
1557                                nid, payload_nob);
1558                         return (PTL_FAIL);
1559                 }
1560                 break;
1561         }
1562
1563         case PTL_MSG_GET:
1564                 /* might the REPLY message be big enough to need RDMA? */
1565                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1566                 if (nob > IBNAL_MSG_SIZE)
1567                         return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, 
1568                                                           nid, libmsg, hdr));
1569                 break;
1570
1571         case PTL_MSG_ACK:
1572                 LASSERT (payload_nob == 0);
1573                 break;
1574
1575         case PTL_MSG_PUT:
1576                 /* Is the payload big enough to need RDMA? */
1577                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1578                 if (nob > IBNAL_MSG_SIZE)
1579                         return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA,
1580                                                           nid, libmsg, hdr));
1581                 
1582                 break;
1583         }
1584
1585         tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1586                                   type == PTL_MSG_REPLY ||
1587                                   in_interrupt()));
1588         if (tx == NULL) {
1589                 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n", 
1590                         type, nid, in_interrupt() ? " (intr)" : "");
1591                 return (PTL_NO_SPACE);
1592         }
1593
1594         ibmsg = tx->tx_msg;
1595         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1596
1597         if (payload_nob > 0) {
1598                 if (payload_kiov != NULL)
1599                         lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1600                                           payload_niov, payload_kiov,
1601                                           payload_offset, payload_nob);
1602                 else
1603                         lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1604                                          payload_niov, payload_iov,
1605                                          payload_offset, payload_nob);
1606         }
1607
1608         kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1609                             offsetof(kib_immediate_msg_t, 
1610                                      ibim_payload[payload_nob]));
1611
1612         /* libmsg gets finalized when tx completes */
1613         tx->tx_libmsg[0] = libmsg;
1614
1615         kibnal_launch_tx(tx, nid);
1616         return (PTL_OK);
1617 }
1618
1619 static ptl_err_t
1620 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1621                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1622                unsigned int payload_niov, struct iovec *payload_iov,
1623                size_t payload_offset, size_t payload_len)
1624 {
1625         return (kibnal_sendmsg(nal, private, cookie,
1626                                hdr, type, nid, pid,
1627                                payload_niov, payload_iov, NULL,
1628                                payload_offset, payload_len));
1629 }
1630
1631 static ptl_err_t
1632 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1633                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1634                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1635                      size_t payload_offset, size_t payload_len)
1636 {
1637         return (kibnal_sendmsg(nal, private, cookie,
1638                                hdr, type, nid, pid,
1639                                payload_niov, NULL, payload_kiov,
1640                                payload_offset, payload_len));
1641 }
1642
1643 static ptl_err_t
1644 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1645                  unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1646                  size_t offset, size_t mlen, size_t rlen)
1647 {
1648         kib_rx_t    *rx = private;
1649         kib_msg_t   *rxmsg = rx->rx_msg;
1650         int          msg_nob;
1651         
1652         LASSERT (mlen <= rlen);
1653         LASSERT (!in_interrupt ());
1654         /* Either all pages or all vaddrs */
1655         LASSERT (!(kiov != NULL && iov != NULL));
1656
1657         switch (rxmsg->ibm_type) {
1658         default:
1659                 LBUG();
1660                 return (PTL_FAIL);
1661                 
1662         case IBNAL_MSG_IMMEDIATE:
1663                 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1664                 if (msg_nob > IBNAL_MSG_SIZE) {
1665                         CERROR ("Immediate message from "LPX64" too big: %d\n",
1666                                 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1667                         return (PTL_FAIL);
1668                 }
1669
1670                 if (kiov != NULL)
1671                         lib_copy_buf2kiov(niov, kiov, offset,
1672                                           rxmsg->ibm_u.immediate.ibim_payload,
1673                                           mlen);
1674                 else
1675                         lib_copy_buf2iov(niov, iov, offset,
1676                                          rxmsg->ibm_u.immediate.ibim_payload,
1677                                          mlen);
1678
1679                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1680                 return (PTL_OK);
1681
1682         case IBNAL_MSG_GET_RDMA:
1683                 /* We get called here just to discard any junk after the
1684                  * GET hdr. */
1685                 LASSERT (libmsg == NULL);
1686                 lib_finalize (nal, NULL, libmsg, PTL_OK);
1687                 return (PTL_OK);
1688
1689         case IBNAL_MSG_PUT_RDMA:
1690                 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0,
1691                                           rx, libmsg, 
1692                                           niov, iov, kiov, offset, mlen);
1693                 return (PTL_OK);
1694         }
1695 }
1696
1697 static ptl_err_t
1698 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1699               unsigned int niov, struct iovec *iov, 
1700               size_t offset, size_t mlen, size_t rlen)
1701 {
1702         return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1703                                 offset, mlen, rlen));
1704 }
1705
1706 static ptl_err_t
1707 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1708                      unsigned int niov, ptl_kiov_t *kiov, 
1709                      size_t offset, size_t mlen, size_t rlen)
1710 {
1711         return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1712                                 offset, mlen, rlen));
1713 }
1714
1715 /*****************************************************************************
1716  * the rest of this file concerns connection management.  active connetions
1717  * start with connect_peer, passive connections start with passive_callback.
1718  * active disconnects start with conn_close, cm_callback starts passive
1719  * disconnects and contains the guts of how the disconnect state machine
1720  * progresses. 
1721  *****************************************************************************/
1722
1723 int
1724 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1725 {
1726         long    pid = kernel_thread (fn, arg, 0);
1727
1728         if (pid < 0)
1729                 return ((int)pid);
1730
1731         atomic_inc (&kibnal_data.kib_nthreads);
1732         return (0);
1733 }
1734
1735 static void
1736 kibnal_thread_fini (void)
1737 {
1738         atomic_dec (&kibnal_data.kib_nthreads);
1739 }
1740
1741 /* this can be called by anyone at any time to close a connection.  if
1742  * the connection is still established it heads to the connd to start
1743  * the disconnection in a safe context.  It has no effect if called
1744  * on a connection that is already disconnecting */
1745 void
1746 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1747 {
1748         /* This just does the immmediate housekeeping, and schedules the
1749          * connection for the connd to finish off.
1750          * Caller holds kib_global_lock exclusively in irq context */
1751         kib_peer_t   *peer = conn->ibc_peer;
1752
1753         KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_CONNECTING,
1754                                     IBNAL_CONN_DISCONNECTED);
1755
1756         if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
1757                 return; /* already disconnecting */
1758
1759         CDEBUG (error == 0 ? D_NET : D_ERROR,
1760                 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1761
1762         if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1763                 /* kib_connd_conns takes ibc_list's ref */
1764                 list_del (&conn->ibc_list);
1765         } else {
1766                 /* new ref for kib_connd_conns */
1767                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1768                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1769                        atomic_read (&conn->ibc_refcount));
1770                 atomic_inc (&conn->ibc_refcount);
1771         }
1772         
1773         if (list_empty (&peer->ibp_conns) &&
1774             peer->ibp_persistence == 0) {
1775                 /* Non-persistent peer with no more conns... */
1776                 kibnal_unlink_peer_locked (peer);
1777         }
1778
1779         conn->ibc_state = IBNAL_CONN_SEND_DREQ;
1780
1781         spin_lock (&kibnal_data.kib_connd_lock);
1782
1783         list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1784         wake_up (&kibnal_data.kib_connd_waitq);
1785                 
1786         spin_unlock (&kibnal_data.kib_connd_lock);
1787 }
1788
1789 void
1790 kibnal_close_conn (kib_conn_t *conn, int error)
1791 {
1792         unsigned long     flags;
1793
1794         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1795
1796         kibnal_close_conn_locked (conn, error);
1797         
1798         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1799 }
1800
1801 static void
1802 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc)
1803 {
1804         LIST_HEAD        (zombies);
1805         kib_tx_t         *tx;
1806         unsigned long     flags;
1807
1808         LASSERT (rc != 0);
1809         LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1810
1811         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1812
1813         LASSERT (peer->ibp_connecting != 0);
1814         peer->ibp_connecting--;
1815
1816         if (peer->ibp_connecting != 0) {
1817                 /* another connection attempt under way (loopback?)... */
1818                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1819                 return;
1820         }
1821
1822         if (list_empty(&peer->ibp_conns)) {
1823                 /* Say when active connection can be re-attempted */
1824                 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1825                 /* Increase reconnection interval */
1826                 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1827                                                     IBNAL_MAX_RECONNECT_INTERVAL);
1828         
1829                 /* Take peer's blocked blocked transmits; I'll complete
1830                  * them with error */
1831                 while (!list_empty (&peer->ibp_tx_queue)) {
1832                         tx = list_entry (peer->ibp_tx_queue.next,
1833                                          kib_tx_t, tx_list);
1834                         
1835                         list_del (&tx->tx_list);
1836                         list_add_tail (&tx->tx_list, &zombies);
1837                 }
1838                 
1839                 if (kibnal_peer_active(peer) &&
1840                     (peer->ibp_persistence == 0)) {
1841                         /* failed connection attempt on non-persistent peer */
1842                         kibnal_unlink_peer_locked (peer);
1843                 }
1844         } else {
1845                 /* Can't have blocked transmits if there are connections */
1846                 LASSERT (list_empty(&peer->ibp_tx_queue));
1847         }
1848         
1849         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1850
1851         if (!list_empty (&zombies))
1852                 CERROR ("Deleting messages for "LPX64": connection failed\n",
1853                         peer->ibp_nid);
1854
1855         while (!list_empty (&zombies)) {
1856                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1857
1858                 list_del (&tx->tx_list);
1859                 /* complete now */
1860                 tx->tx_status = -EHOSTUNREACH;
1861                 kibnal_tx_done (tx);
1862         }
1863 }
1864
1865 static void
1866 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1867 {
1868         int               state = conn->ibc_state;
1869         kib_peer_t       *peer = conn->ibc_peer;
1870         kib_tx_t         *tx;
1871         unsigned long     flags;
1872         int               i;
1873
1874         /* passive connection has no connreq & vice versa */
1875         LASSERTF(!active == !(conn->ibc_connreq != NULL),
1876                  "%d %p\n", active, conn->ibc_connreq);
1877         if (active) {
1878                 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1879                 conn->ibc_connreq = NULL;
1880         }
1881
1882         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1883
1884         LASSERT (peer->ibp_connecting != 0);
1885         
1886         if (status == 0) {                         
1887                 /* connection established... */
1888                 KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_CONNECTING);
1889                 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1890
1891                 if (!kibnal_peer_active(peer)) {
1892                         /* ...but peer deleted meantime */
1893                         status = -ECONNABORTED;
1894                 }
1895         } else {
1896                 KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_INIT_QP,
1897                                             IBNAL_CONN_CONNECTING);
1898         }
1899
1900         if (status == 0) {
1901                 /* Everything worked! */
1902
1903                 peer->ibp_connecting--;
1904
1905                 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1906                  * the IB_CM_IDLE callback */
1907                 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1908                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1909                        atomic_read (&conn->ibc_refcount));
1910                 atomic_inc (&conn->ibc_refcount);
1911                 list_add (&conn->ibc_list, &peer->ibp_conns);
1912                 
1913                 /* reset reconnect interval for next attempt */
1914                 peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
1915
1916                 /* post blocked sends to the new connection */
1917                 spin_lock (&conn->ibc_lock);
1918                 
1919                 while (!list_empty (&peer->ibp_tx_queue)) {
1920                         tx = list_entry (peer->ibp_tx_queue.next, 
1921                                          kib_tx_t, tx_list);
1922                         
1923                         list_del (&tx->tx_list);
1924
1925                         /* +1 ref for each tx */
1926                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1927                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1928                                atomic_read (&conn->ibc_refcount));
1929                         atomic_inc (&conn->ibc_refcount);
1930                         kibnal_queue_tx_locked (tx, conn);
1931                 }
1932                 
1933                 spin_unlock (&conn->ibc_lock);
1934
1935                 /* Nuke any dangling conns from a different peer instance... */
1936                 kibnal_close_stale_conns_locked (conn->ibc_peer,
1937                                                  conn->ibc_incarnation);
1938
1939                 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1940
1941                 /* queue up all the receives */
1942                 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1943                         /* +1 ref for rx desc */
1944                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1945                                conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1946                                atomic_read (&conn->ibc_refcount));
1947                         atomic_inc (&conn->ibc_refcount);
1948
1949                         CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1950                                i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1951                                conn->ibc_rxs[i].rx_vaddr);
1952
1953                         kibnal_post_rx (&conn->ibc_rxs[i], 0);
1954                 }
1955
1956                 kibnal_check_sends (conn);
1957                 return;
1958         }
1959
1960         /* connection failed */
1961         if (state == IBNAL_CONN_CONNECTING) {
1962                 /* schedule for connd to close */
1963                 kibnal_close_conn_locked (conn, status);
1964         } else {
1965                 /* Don't have a CM comm_id; just wait for refs to drain */
1966                 conn->ibc_state = IBNAL_CONN_DISCONNECTED;
1967         } 
1968
1969         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1970
1971         kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1972
1973         /* If we didn't establish the connection we don't have to pass
1974          * through the disconnect protocol before dropping the CM ref */
1975         if (state < IBNAL_CONN_CONNECTING) 
1976                 kibnal_put_conn (conn);
1977 }
1978
1979 static int
1980 kibnal_accept (kib_conn_t **connp, IB_HANDLE *cep,
1981                 ptl_nid_t nid, __u64 incarnation, int queue_depth)
1982 {
1983         kib_conn_t    *conn = kibnal_create_conn();
1984         kib_peer_t    *peer;
1985         kib_peer_t    *peer2;
1986         unsigned long  flags;
1987
1988         if (conn == NULL)
1989                 return (-ENOMEM);
1990
1991         if (queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1992                 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
1993                        nid, queue_depth, IBNAL_MSG_QUEUE_SIZE);
1994                 atomic_dec (&conn->ibc_refcount);
1995                 kibnal_destroy_conn(conn);
1996                 return (-EPROTO);
1997         }
1998         
1999         /* assume 'nid' is a new peer */
2000         peer = kibnal_create_peer (nid);
2001         if (peer == NULL) {
2002                 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
2003                        conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
2004                        atomic_read (&conn->ibc_refcount));
2005                 atomic_dec (&conn->ibc_refcount);
2006                 kibnal_destroy_conn(conn);
2007                 return (-ENOMEM);
2008         }
2009         
2010         write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
2011
2012         peer2 = kibnal_find_peer_locked(nid);
2013         if (peer2 == NULL) {
2014                 /* peer table takes my ref on peer */
2015                 list_add_tail (&peer->ibp_list, kibnal_nid2peerlist(nid));
2016         } else {
2017                 kib_peer_decref (peer);
2018                 peer = peer2;
2019         }
2020
2021         kib_peer_addref(peer); /* +1 ref for conn */
2022         peer->ibp_connecting++;
2023
2024         write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2025
2026         conn->ibc_peer = peer;
2027         conn->ibc_state = IBNAL_CONN_CONNECTING;
2028         /* conn->ibc_cep is set when cm_accept is called */
2029         conn->ibc_incarnation = incarnation;
2030         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2031
2032         *connp = conn;
2033         return (0);
2034 }
2035
2036 static void kibnal_set_qp_state(IB_HANDLE *qp, IB_QP_STATE state)
2037 {
2038         IB_QP_ATTRIBUTES_MODIFY modify_attr = {0,};
2039         FSTATUS frc;
2040
2041         modify_attr.RequestState = state;
2042
2043         frc = iibt_qp_modify(qp, &modify_attr, NULL);
2044         if (frc != FSUCCESS)
2045                 CERROR("couldn't set qp state to %d, error %d\n", state, frc);
2046 }
2047
2048 static void kibnal_flush_pending(kib_conn_t *conn)
2049 {
2050         LIST_HEAD        (zombies); 
2051         struct list_head *tmp;
2052         struct list_head *nxt;
2053         kib_tx_t         *tx;
2054         unsigned long     flags;
2055         int               done;
2056
2057         /* NB we wait until the connection has closed before completing
2058          * outstanding passive RDMAs so we can be sure the network can't 
2059          * touch the mapped memory any more. */
2060         KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_DISCONNECTED);
2061
2062         /* set the QP to the error state so that we get flush callbacks
2063          * on our posted receives which can then drop their conn refs */
2064         kibnal_set_qp_state(conn->ibc_qp, QPStateError);
2065
2066         spin_lock_irqsave (&conn->ibc_lock, flags);
2067
2068         /* grab passive RDMAs not waiting for the tx callback */
2069         list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
2070                 tx = list_entry (tmp, kib_tx_t, tx_list);
2071
2072                 LASSERT (tx->tx_passive_rdma ||
2073                          !tx->tx_passive_rdma_wait);
2074
2075                 LASSERT (tx->tx_passive_rdma_wait ||
2076                          tx->tx_sending != 0);
2077
2078                 /* still waiting for tx callback? */
2079                 if (!tx->tx_passive_rdma_wait)
2080                         continue;
2081
2082                 tx->tx_status = -ECONNABORTED;
2083                 tx->tx_passive_rdma_wait = 0;
2084                 done = (tx->tx_sending == 0);
2085
2086                 if (!done)
2087                         continue;
2088
2089                 list_del (&tx->tx_list);
2090                 list_add (&tx->tx_list, &zombies);
2091         }
2092
2093         /* grab all blocked transmits */
2094         list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
2095                 tx = list_entry (tmp, kib_tx_t, tx_list);
2096                 
2097                 list_del (&tx->tx_list);
2098                 list_add (&tx->tx_list, &zombies);
2099         }
2100         
2101         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2102
2103         while (!list_empty(&zombies)) {
2104                 tx = list_entry (zombies.next, kib_tx_t, tx_list);
2105
2106                 list_del(&tx->tx_list);
2107                 kibnal_tx_done (tx);
2108         }
2109 }
2110
2111 static void
2112 kibnal_reject (IB_HANDLE cep, uint16_t reason)
2113 {
2114         CM_REJECT_INFO *rej;
2115
2116         PORTAL_ALLOC(rej, sizeof(*rej));
2117         if (rej == NULL) /* PORTAL_ALLOC() will CERROR on failure */
2118                 return;  
2119
2120         rej->Reason = reason;
2121         iibt_cm_reject(cep, rej);
2122         PORTAL_FREE(rej, sizeof(*rej));
2123 }
2124
2125 static FSTATUS
2126 kibnal_qp_rts(IB_HANDLE qp_handle, __u32 qpn, __u8 resp_res, 
2127               IB_PATH_RECORD *path, __u8 init_depth, __u32 send_psn)
2128 {
2129         IB_QP_ATTRIBUTES_MODIFY modify_attr;
2130         FSTATUS frc;
2131         ENTRY;
2132
2133         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
2134                 .RequestState           = QPStateReadyToRecv,
2135                 .RecvPSN                = IBNAL_STARTING_PSN,
2136                 .DestQPNumber           = qpn,
2137                 .ResponderResources     = resp_res,
2138                 .MinRnrTimer            = UsecToRnrNakTimer(2000), /* 20 ms */
2139                 .Attrs                  = (IB_QP_ATTR_RECVPSN |
2140                                            IB_QP_ATTR_DESTQPNUMBER | 
2141                                            IB_QP_ATTR_RESPONDERRESOURCES | 
2142                                            IB_QP_ATTR_DESTAV | 
2143                                            IB_QP_ATTR_PATHMTU | 
2144                                            IB_QP_ATTR_MINRNRTIMER),
2145         };
2146         GetAVFromPath(0, path, &modify_attr.PathMTU, NULL, 
2147                       &modify_attr.DestAV);
2148
2149         frc = iibt_qp_modify(qp_handle, &modify_attr, NULL);
2150         if (frc != FSUCCESS) 
2151                 RETURN(frc);
2152
2153         modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
2154                 .RequestState           = QPStateReadyToSend,
2155                 .FlowControl            = TRUE,
2156                 .InitiatorDepth         = init_depth,
2157                 .SendPSN                = send_psn,
2158                 .LocalAckTimeout        = path->PktLifeTime + 2, /* 2 or 1? */
2159                 .RetryCount             = IBNAL_RETRY,
2160                 .RnrRetryCount          = IBNAL_RNR_RETRY,
2161                 .Attrs                  = (IB_QP_ATTR_FLOWCONTROL | 
2162                                            IB_QP_ATTR_INITIATORDEPTH | 
2163                                            IB_QP_ATTR_SENDPSN | 
2164                                            IB_QP_ATTR_LOCALACKTIMEOUT | 
2165                                            IB_QP_ATTR_RETRYCOUNT | 
2166                                            IB_QP_ATTR_RNRRETRYCOUNT),
2167         };
2168
2169         frc = iibt_qp_modify(qp_handle, &modify_attr, NULL);
2170         RETURN(frc);
2171 }
2172
2173 static void
2174 kibnal_connect_reply (IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2175 {
2176         IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2177         kib_conn_t *conn = arg;
2178         kib_wire_connreq_t *wcr;
2179         CM_REPLY_INFO *rep = &info->Info.Reply;
2180         uint16_t reason;
2181         FSTATUS frc;
2182
2183         wcr = (kib_wire_connreq_t *)info->Info.Reply.PrivateData;
2184
2185         if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
2186                 CERROR ("Can't connect "LPX64": bad magic %08x\n",
2187                         conn->ibc_peer->ibp_nid, le32_to_cpu(wcr->wcr_magic));
2188                 GOTO(reject, reason = RC_USER_REJ);
2189         }
2190         
2191         if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
2192                 CERROR ("Can't connect "LPX64": bad version %d\n",
2193                         conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_magic));
2194                 GOTO(reject, reason = RC_USER_REJ);
2195         }
2196                         
2197         if (wcr->wcr_queue_depth != cpu_to_le16(IBNAL_MSG_QUEUE_SIZE)) {
2198                 CERROR ("Can't connect "LPX64": bad queue depth %d\n",
2199                         conn->ibc_peer->ibp_nid, 
2200                         le16_to_cpu(wcr->wcr_queue_depth));
2201                 GOTO(reject, reason = RC_USER_REJ);
2202         }
2203                         
2204         if (le64_to_cpu(wcr->wcr_nid) != conn->ibc_peer->ibp_nid) {
2205                 CERROR ("Unexpected NID "LPX64" from "LPX64"\n",
2206                         le64_to_cpu(wcr->wcr_nid), conn->ibc_peer->ibp_nid);
2207                 GOTO(reject, reason = RC_USER_REJ);
2208         }
2209
2210         CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2211                conn, conn->ibc_peer->ibp_nid);
2212
2213         conn->ibc_incarnation = le64_to_cpu(wcr->wcr_incarnation);
2214         conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2215
2216         frc = kibnal_qp_rts(conn->ibc_qp, rep->QPN, 
2217                             min_t(__u8, rep->ArbInitiatorDepth,
2218                                   ca_attr->MaxQPResponderResources),
2219                             &conn->ibc_connreq->cr_path, 
2220                             min_t(__u8, rep->ArbResponderResources,
2221                                   ca_attr->MaxQPInitiatorDepth),
2222                             rep->StartingPSN);
2223         if (frc != FSUCCESS) {
2224                 CERROR("Connection %p -> "LPX64" QP RTS/RTR failed: %d\n",
2225                        conn, conn->ibc_peer->ibp_nid, frc);
2226                 GOTO(reject, reason = RC_NO_QP);
2227         }
2228
2229         /* the callback arguments are ignored for an active accept */
2230         conn->ibc_connreq->cr_discarded.Status = FSUCCESS;
2231         frc = iibt_cm_accept(cep, &conn->ibc_connreq->cr_discarded, 
2232                              NULL, NULL, NULL, NULL);
2233         if (frc != FCM_CONNECT_ESTABLISHED) {
2234                 CERROR("Connection %p -> "LPX64" CMAccept failed: %d\n",
2235                        conn, conn->ibc_peer->ibp_nid, frc);
2236                 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2237                 /* XXX don't call reject after accept fails? */
2238                 return;
2239         }
2240
2241         CDEBUG(D_NET, "Connection %p -> "LPX64" Established\n",
2242                conn, conn->ibc_peer->ibp_nid);
2243
2244         kibnal_connreq_done (conn, 1, 0);
2245         return;
2246
2247 reject:
2248         kibnal_reject(cep, reason);
2249         kibnal_connreq_done (conn, 1, -EPROTO);
2250 }
2251
2252 /* ib_cm.h has a wealth of information on the CM procedures */
2253 static void
2254 kibnal_cm_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2255 {
2256         kib_conn_t       *conn = arg;
2257
2258         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2259
2260         /* Established Connection Notifier */
2261         switch (info->Status) {
2262         default:
2263                 CERROR("unknown status %d on Connection %p -> "LPX64"\n",
2264                        info->Status, conn, conn->ibc_peer->ibp_nid);
2265                 LBUG();
2266                 break;
2267
2268         case FCM_CONNECT_REPLY:
2269                 kibnal_connect_reply(cep, info, arg);
2270                 break;
2271
2272         case FCM_DISCONNECT_REQUEST:
2273                 /* XXX lock around these state management bits? */
2274                 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
2275                         kibnal_close_conn (conn, 0);
2276                 conn->ibc_state = IBNAL_CONN_DREP;
2277                 iibt_cm_disconnect(conn->ibc_cep, NULL, NULL);
2278                 break;
2279
2280         /* these both guarantee that no more cm callbacks will occur */
2281         case FCM_DISCONNECTED: /* aka FCM_DISCONNECT_TIMEOUT */
2282         case FCM_DISCONNECT_REPLY:
2283                 CDEBUG(D_NET, "Connection %p -> "LPX64" disconnect done.\n",
2284                        conn, conn->ibc_peer->ibp_nid);
2285
2286                 conn->ibc_state = IBNAL_CONN_DISCONNECTED;
2287                 kibnal_flush_pending(conn);
2288                 kibnal_put_conn(conn);        /* Lose CM's ref */
2289                 break;
2290         }
2291
2292         return;
2293 }
2294
2295 static int
2296 kibnal_set_cm_flags(IB_HANDLE cep)
2297 {
2298         FSTATUS frc;
2299         uint32 value = 1;
2300
2301         frc = iibt_cm_modify_cep(cep, CM_FLAG_TIMEWAIT_CALLBACK,
2302                                  (char *)&value, sizeof(value), 0);
2303         if (frc != FSUCCESS) {
2304                 CERROR("error setting timeout callback: %d\n", frc);
2305                 return -1;
2306         }
2307
2308 #if 0
2309         frc = iibt_cm_modify_cep(cep, CM_FLAG_ASYNC_ACCEPT, (char *)&value,
2310                                  sizeof(value), 0);
2311         if (frc != FSUCCESS) {
2312                 CERROR("error setting async accept: %d\n", frc);
2313                 return -1;
2314         }
2315 #endif
2316
2317         return 0;
2318 }
2319
2320 void
2321 kibnal_listen_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2322 {
2323         IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2324         IB_QP_ATTRIBUTES_QUERY *query;
2325         CM_REQUEST_INFO    *req;
2326         CM_CONN_INFO       *rep = NULL, *rcv = NULL;
2327         kib_wire_connreq_t *wcr;
2328         kib_conn_t         *conn = NULL;
2329         uint16_t            reason = 0;
2330         FSTATUS             frc;
2331         int                 rc = 0;
2332         
2333         LASSERT(cep);
2334         LASSERT(info);
2335         LASSERT(arg == NULL); /* no conn yet for passive */
2336
2337         CDEBUG(D_NET, "status 0x%x\n", info->Status);
2338
2339         req = &info->Info.Request;
2340         wcr = (kib_wire_connreq_t *)req->PrivateData;
2341
2342         CDEBUG(D_NET, "%d from "LPX64"\n", info->Status, 
2343                le64_to_cpu(wcr->wcr_nid));
2344         
2345         if (info->Status == FCM_CONNECT_CANCEL)
2346                 return;
2347         
2348         LASSERT (info->Status == FCM_CONNECT_REQUEST);
2349         
2350         if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
2351                 CERROR ("Can't accept: bad magic %08x\n",
2352                         le32_to_cpu(wcr->wcr_magic));
2353                 GOTO(out, reason = RC_USER_REJ);
2354         }
2355
2356         if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
2357                 CERROR ("Can't accept: bad version %d\n",
2358                         le16_to_cpu(wcr->wcr_magic));
2359                 GOTO(out, reason = RC_USER_REJ);
2360         }
2361
2362         rc = kibnal_accept(&conn, cep,
2363                            le64_to_cpu(wcr->wcr_nid),
2364                            le64_to_cpu(wcr->wcr_incarnation),
2365                            le16_to_cpu(wcr->wcr_queue_depth));
2366         if (rc != 0) {
2367                 CERROR ("Can't accept "LPX64": %d\n",
2368                         le64_to_cpu(wcr->wcr_nid), rc);
2369                 GOTO(out, reason = RC_NO_RESOURCES);
2370         }
2371
2372         frc = kibnal_qp_rts(conn->ibc_qp, req->CEPInfo.QPN,
2373                             min_t(__u8, req->CEPInfo.OfferedInitiatorDepth, 
2374                                   ca_attr->MaxQPResponderResources),
2375                             &req->PathInfo.Path,
2376                             min_t(__u8, req->CEPInfo.OfferedResponderResources, 
2377                                   ca_attr->MaxQPInitiatorDepth),
2378                             req->CEPInfo.StartingPSN);
2379
2380         if (frc != FSUCCESS) {
2381                 CERROR ("Can't mark QP RTS/RTR  "LPX64": %d\n",
2382                         le64_to_cpu(wcr->wcr_nid), frc);
2383                 GOTO(out, reason = RC_NO_QP);
2384         }
2385
2386         frc = iibt_qp_query(conn->ibc_qp, &conn->ibc_qp_attrs, NULL);
2387         if (frc != FSUCCESS) {
2388                 CERROR ("Couldn't query qp attributes "LPX64": %d\n",
2389                         le64_to_cpu(wcr->wcr_nid), frc);
2390                 GOTO(out, reason = RC_NO_QP);
2391         }
2392         query = &conn->ibc_qp_attrs;
2393
2394         PORTAL_ALLOC(rep, sizeof(*rep));
2395         PORTAL_ALLOC(rcv, sizeof(*rcv));
2396         if (rep == NULL || rcv == NULL) {
2397                 CERROR ("can't reply and receive buffers\n");
2398                 GOTO(out, reason = RC_INSUFFICIENT_RESP_RES);
2399         }
2400
2401         /* don't try to deref this into the incoming wcr :) */
2402         wcr = (kib_wire_connreq_t *)rep->Info.Reply.PrivateData;
2403
2404         rep->Info.Reply = (CM_REPLY_INFO) {
2405                 .QPN = query->QPNumber,
2406                 .QKey = query->Qkey,
2407                 .StartingPSN = query->RecvPSN,
2408                 .EndToEndFlowControl = query->FlowControl,
2409                 /* XXX Hmm. */
2410                 .ArbInitiatorDepth = query->InitiatorDepth,
2411                 .ArbResponderResources = query->ResponderResources,
2412                 .TargetAckDelay = 0,
2413                 .FailoverAccepted = 0,
2414                 .RnRRetryCount = req->CEPInfo.RnrRetryCount,
2415         };
2416                 
2417         *wcr = (kib_wire_connreq_t) {
2418                 .wcr_magic       = cpu_to_le32(IBNAL_MSG_MAGIC),
2419                 .wcr_version     = cpu_to_le16(IBNAL_MSG_VERSION),
2420                 .wcr_queue_depth = cpu_to_le32(IBNAL_MSG_QUEUE_SIZE),
2421                 .wcr_nid         = cpu_to_le64(kibnal_data.kib_nid),
2422                 .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
2423         };
2424
2425         frc = iibt_cm_accept(cep, rep, rcv, kibnal_cm_callback, conn, 
2426                              &conn->ibc_cep);
2427
2428         PORTAL_FREE(rep, sizeof(*rep));
2429         PORTAL_FREE(rcv, sizeof(*rcv));
2430
2431         if (frc != FCM_CONNECT_ESTABLISHED) {
2432                 /* XXX it seems we don't call reject after this point? */
2433                 CERROR("iibt_cm_accept() failed: %d, aborting\n", frc);
2434                 rc = -ECONNABORTED;
2435                 goto out;
2436         }
2437
2438         if (kibnal_set_cm_flags(conn->ibc_cep)) {
2439                 rc = -ECONNABORTED;
2440                 goto out;
2441         }
2442
2443         CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
2444                conn, conn->ibc_peer->ibp_nid);
2445
2446 out:
2447         if (reason) {
2448                 kibnal_reject(cep, reason);
2449                 rc = -ECONNABORTED;
2450         }
2451         if (conn != NULL) 
2452                 kibnal_connreq_done(conn, 0, rc);
2453
2454         return;
2455 }
2456
2457 static void
2458 dump_path_records(PATH_RESULTS *results)
2459 {
2460         IB_PATH_RECORD *path;
2461         int i;
2462
2463         for(i = 0; i < results->NumPathRecords; i++) {
2464                 path = &results->PathRecords[i];
2465                 CDEBUG(D_NET, "%d: sgid "LPX64":"LPX64" dgid "
2466                        LPX64":"LPX64" pkey %x\n",
2467                        i,
2468                        path->SGID.Type.Global.SubnetPrefix,
2469                        path->SGID.Type.Global.InterfaceID,
2470                        path->DGID.Type.Global.SubnetPrefix,
2471                        path->DGID.Type.Global.InterfaceID,
2472                        path->P_Key);
2473         }
2474 }
2475
2476 static void
2477 kibnal_pathreq_callback (void *arg, QUERY *query, 
2478                          QUERY_RESULT_VALUES *query_res)
2479 {
2480         IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2481         kib_conn_t *conn = arg;
2482         PATH_RESULTS *path;
2483         FSTATUS frc;
2484         
2485         if (query_res->Status != FSUCCESS || query_res->ResultDataSize == 0) {
2486                 CERROR ("status %d data size %d\n", query_res->Status,
2487                         query_res->ResultDataSize);
2488                 kibnal_connreq_done (conn, 1, -EINVAL);
2489                 return;
2490         }
2491
2492         path = (PATH_RESULTS *)query_res->QueryResult;
2493
2494         if (path->NumPathRecords < 1) {
2495                 CERROR ("expected path records: %d\n", path->NumPathRecords);
2496                 kibnal_connreq_done (conn, 1, -EINVAL);
2497                 return;
2498         }
2499
2500         dump_path_records(path);
2501
2502         /* just using the first.  this is probably a horrible idea. */
2503         conn->ibc_connreq->cr_path = path->PathRecords[0];
2504
2505         conn->ibc_cep = iibt_cm_create_cep(CM_RC_TYPE);
2506         if (conn->ibc_cep == NULL) {
2507                 CERROR ("Can't create CEP\n");
2508                 kibnal_connreq_done (conn, 1, -EINVAL);
2509                 return;
2510         }
2511
2512         if (kibnal_set_cm_flags(conn->ibc_cep)) {
2513                 kibnal_connreq_done (conn, 1, -EINVAL);
2514                 return;
2515         }
2516
2517         conn->ibc_connreq->cr_wcr = (kib_wire_connreq_t) {
2518                 .wcr_magic       = cpu_to_le32(IBNAL_MSG_MAGIC),
2519                 .wcr_version     = cpu_to_le16(IBNAL_MSG_VERSION),
2520                 .wcr_queue_depth = cpu_to_le16(IBNAL_MSG_QUEUE_SIZE),
2521                 .wcr_nid         = cpu_to_le64(kibnal_data.kib_nid),
2522                 .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
2523         };
2524
2525         conn->ibc_connreq->cr_cmreq = (CM_REQUEST_INFO) {
2526                 .SID = conn->ibc_connreq->cr_service.RID.ServiceID,
2527                 .CEPInfo = (CM_CEP_INFO) { 
2528                         .CaGUID = kibnal_data.kib_hca_guids[0],
2529                         .EndToEndFlowControl = FALSE,
2530                         .PortGUID = conn->ibc_connreq->cr_path.SGID.Type.Global.InterfaceID,
2531                         .RetryCount = IBNAL_RETRY,
2532                         .RnrRetryCount = IBNAL_RNR_RETRY,
2533                         .AckTimeout = IBNAL_ACK_TIMEOUT,
2534                         .StartingPSN = IBNAL_STARTING_PSN,
2535                         .QPN = conn->ibc_qp_attrs.QPNumber,
2536                         .QKey = conn->ibc_qp_attrs.Qkey,
2537                         .OfferedResponderResources = ca_attr->MaxQPResponderResources,
2538                         .OfferedInitiatorDepth = ca_attr->MaxQPInitiatorDepth,
2539                 },
2540                 .PathInfo = (CM_CEP_PATHINFO) {
2541                         .bSubnetLocal = TRUE,
2542                         .Path = conn->ibc_connreq->cr_path,
2543                 },
2544         };
2545
2546 #if 0
2547         /* XXX set timeout just like SDP!!!*/
2548         conn->ibc_connreq->cr_path.packet_life = 13;
2549 #endif
2550         /* Flag I'm getting involved with the CM... */
2551         conn->ibc_state = IBNAL_CONN_CONNECTING;
2552
2553         CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
2554                conn->ibc_connreq->cr_service.RID.ServiceID, 
2555                *kibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2556
2557         memset(conn->ibc_connreq->cr_cmreq.PrivateData, 0, 
2558                CM_REQUEST_INFO_USER_LEN);
2559         memcpy(conn->ibc_connreq->cr_cmreq.PrivateData, 
2560                &conn->ibc_connreq->cr_wcr, sizeof(conn->ibc_connreq->cr_wcr));
2561
2562         /* kibnal_cm_callback gets my conn ref */
2563         frc = iibt_cm_connect(conn->ibc_cep, &conn->ibc_connreq->cr_cmreq,
2564                               kibnal_cm_callback, conn);
2565         if (frc != FPENDING && frc != FSUCCESS) {
2566                 CERROR ("Connect: %d\n", frc);
2567                 /* Back out state change as connect failed */
2568                 conn->ibc_state = IBNAL_CONN_INIT_QP;
2569                 kibnal_connreq_done (conn, 1, -EINVAL);
2570         }
2571 }
2572
2573 static void
2574 dump_service_records(SERVICE_RECORD_RESULTS *results)
2575 {
2576         IB_SERVICE_RECORD *svc;
2577         int i;
2578
2579         for(i = 0; i < results->NumServiceRecords; i++) {
2580                 svc = &results->ServiceRecords[i];
2581                 CDEBUG(D_NET, "%d: sid "LPX64" gid "LPX64":"LPX64" pkey %x\n",
2582                        i,
2583                        svc->RID.ServiceID,
2584                        svc->RID.ServiceGID.Type.Global.SubnetPrefix,
2585                        svc->RID.ServiceGID.Type.Global.InterfaceID,
2586                        svc->RID.ServiceP_Key);
2587         }
2588 }
2589
2590
2591 static void
2592 kibnal_service_get_callback (void *arg, QUERY *query, 
2593                              QUERY_RESULT_VALUES *query_res)
2594 {
2595         kib_conn_t *conn = arg;
2596         SERVICE_RECORD_RESULTS *svc;
2597         COMMAND_CONTROL_PARAMETERS sd_params;
2598         QUERY   path_query;
2599         FSTATUS frc;
2600         
2601         if (query_res->Status != FSUCCESS || query_res->ResultDataSize == 0) {
2602                 CERROR ("status %d data size %d\n", query_res->Status,
2603                         query_res->ResultDataSize);
2604                 kibnal_connreq_done (conn, 1, -EINVAL);
2605                 return;
2606         }
2607
2608         svc = (SERVICE_RECORD_RESULTS *)query_res->QueryResult;
2609
2610         if (svc->NumServiceRecords < 1) {
2611                 CERROR ("%d service records\n", svc->NumServiceRecords);
2612                 kibnal_connreq_done (conn, 1, -EINVAL);
2613                 return;
2614         }
2615
2616         dump_service_records(svc);
2617
2618         conn->ibc_connreq->cr_service = svc->ServiceRecords[0];
2619
2620         CDEBUG(D_NET, "Got status %d, service id "LPX64", on "LPX64"\n",
2621                query_res->Status , conn->ibc_connreq->cr_service.RID.ServiceID, 
2622                *kibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2623
2624         memset(&path_query, 0, sizeof(path_query));
2625         path_query.InputType = InputTypePortGuidPair;
2626         path_query.OutputType = OutputTypePathRecord;
2627         path_query.InputValue.PortGuidPair.SourcePortGuid = kibnal_data.kib_port_guid;
2628         path_query.InputValue.PortGuidPair.DestPortGuid  = conn->ibc_connreq->cr_service.RID.ServiceGID.Type.Global.InterfaceID;
2629
2630         memset(&sd_params, 0, sizeof(sd_params));
2631         sd_params.RetryCount = IBNAL_RETRY;
2632         sd_params.Timeout = 10 * 1000;   /* wait 10 seconds */
2633
2634         /* kibnal_service_get_callback gets my conn ref */
2635
2636         frc = iibt_sd_query_port_fabric_information(kibnal_data.kib_sd,
2637                                                     kibnal_data.kib_port_guid,
2638                                                     &path_query, 
2639                                                     kibnal_pathreq_callback,
2640                                                     &sd_params, conn);
2641         if (frc == FPENDING)
2642                 return;
2643
2644         CERROR ("Path record request failed: %d\n", frc);
2645         kibnal_connreq_done (conn, 1, -EINVAL);
2646 }
2647
2648 static void
2649 kibnal_connect_peer (kib_peer_t *peer)
2650 {
2651         COMMAND_CONTROL_PARAMETERS sd_params;
2652         QUERY   query;
2653         FSTATUS frc;
2654         kib_conn_t  *conn = kibnal_create_conn();
2655
2656         LASSERT (peer->ibp_connecting != 0);
2657
2658         if (conn == NULL) {
2659                 CERROR ("Can't allocate conn\n");
2660                 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2661                 return;
2662         }
2663
2664         conn->ibc_peer = peer;
2665         kib_peer_addref(peer);
2666
2667         PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2668         if (conn->ibc_connreq == NULL) {
2669                 CERROR ("Can't allocate connreq\n");
2670                 kibnal_connreq_done (conn, 1, -ENOMEM);
2671                 return;
2672         }
2673
2674         memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2675
2676         kibnal_set_service_keys(&conn->ibc_connreq->cr_service, peer->ibp_nid);
2677
2678         memset(&query, 0, sizeof(query));
2679         query.InputType = InputTypeServiceRecord;
2680         query.OutputType = OutputTypeServiceRecord;
2681         query.InputValue.ServiceRecordValue.ServiceRecord = conn->ibc_connreq->cr_service;
2682         query.InputValue.ServiceRecordValue.ComponentMask = KIBNAL_SERVICE_KEY_MASK;
2683
2684         memset(&sd_params, 0, sizeof(sd_params));
2685         sd_params.RetryCount = IBNAL_RETRY;
2686         sd_params.Timeout = 10 * 1000;   /* wait 10 seconds */
2687
2688         /* kibnal_service_get_callback gets my conn ref */
2689         frc = iibt_sd_query_port_fabric_information(kibnal_data.kib_sd,
2690                                                     kibnal_data.kib_port_guid,
2691                                                     &query, 
2692                                                 kibnal_service_get_callback, 
2693                                                     &sd_params, conn);
2694         if (frc == FPENDING)
2695                 return;
2696
2697         CERROR ("iibt_sd_query_port_fabric_information(): %d\n", frc);
2698         kibnal_connreq_done (conn, 1, frc);
2699 }
2700
2701 static int
2702 kibnal_conn_timed_out (kib_conn_t *conn)
2703 {
2704         kib_tx_t          *tx;
2705         struct list_head  *ttmp;
2706         unsigned long      flags;
2707
2708         spin_lock_irqsave (&conn->ibc_lock, flags);
2709
2710         list_for_each (ttmp, &conn->ibc_tx_queue) {
2711                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2712
2713                 LASSERT (!tx->tx_passive_rdma_wait);
2714                 LASSERT (tx->tx_sending == 0);
2715
2716                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2717                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2718                         return 1;
2719                 }
2720         }
2721
2722         list_for_each (ttmp, &conn->ibc_active_txs) {
2723                 tx = list_entry (ttmp, kib_tx_t, tx_list);
2724
2725                 LASSERT (tx->tx_passive_rdma ||
2726                          !tx->tx_passive_rdma_wait);
2727
2728                 LASSERT (tx->tx_passive_rdma_wait ||
2729                          tx->tx_sending != 0);
2730
2731                 if (time_after_eq (jiffies, tx->tx_deadline)) {
2732                         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2733                         return 1;
2734                 }
2735         }
2736
2737         spin_unlock_irqrestore (&conn->ibc_lock, flags);
2738
2739         return 0;
2740 }
2741
2742 static void
2743 kibnal_check_conns (int idx)
2744 {
2745         struct list_head  *peers = &kibnal_data.kib_peers[idx];
2746         struct list_head  *ptmp;
2747         kib_peer_t        *peer;
2748         kib_conn_t        *conn;
2749         struct list_head  *ctmp;
2750
2751  again:
2752         /* NB. We expect to have a look at all the peers and not find any
2753          * rdmas to time out, so we just use a shared lock while we
2754          * take a look... */
2755         read_lock (&kibnal_data.kib_global_lock);
2756
2757         list_for_each (ptmp, peers) {
2758                 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2759
2760                 list_for_each (ctmp, &peer->ibp_conns) {
2761                         conn = list_entry (ctmp, kib_conn_t, ibc_list);
2762
2763                         KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_ESTABLISHED);
2764
2765                         /* In case we have enough credits to return via a
2766                          * NOOP, but there were no non-blocking tx descs
2767                          * free to do it last time... */
2768                         kibnal_check_sends(conn);
2769
2770                         if (!kibnal_conn_timed_out(conn))
2771                                 continue;
2772                         
2773                         CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2774                                conn, conn->ibc_state, peer->ibp_nid,
2775                                atomic_read (&conn->ibc_refcount));
2776
2777                         atomic_inc (&conn->ibc_refcount);
2778                         read_unlock (&kibnal_data.kib_global_lock);
2779
2780                         CERROR("Timed out RDMA with "LPX64"\n",
2781                                peer->ibp_nid);
2782
2783                         kibnal_close_conn (conn, -ETIMEDOUT);
2784                         kibnal_put_conn (conn);
2785
2786                         /* start again now I've dropped the lock */
2787                         goto again;
2788                 }
2789         }
2790
2791         read_unlock (&kibnal_data.kib_global_lock);
2792 }
2793
2794 static void
2795 kib_connd_handle_state(kib_conn_t *conn)
2796 {
2797         FSTATUS frc;
2798
2799         switch (conn->ibc_state) {
2800                 /* all refs have gone, free and be done with it */ 
2801                 case IBNAL_CONN_DISCONNECTED:
2802                         kibnal_destroy_conn (conn);
2803                         return; /* avoid put_conn */
2804
2805                 case IBNAL_CONN_SEND_DREQ:
2806                         frc = iibt_cm_disconnect(conn->ibc_cep, NULL, NULL);
2807                         if (frc != FSUCCESS) /* XXX do real things */
2808                                 CERROR("disconnect failed: %d\n", frc);
2809                         conn->ibc_state = IBNAL_CONN_DREQ;
2810                         break;
2811
2812                 /* a callback got to the conn before we did */ 
2813                 case IBNAL_CONN_DREP:
2814                         break;
2815                                 
2816                 default:
2817                         CERROR ("Bad conn %p state: %d\n", conn, 
2818                                 conn->ibc_state);
2819                         LBUG();
2820                         break;
2821         }
2822
2823         /* drop ref from close_conn */
2824         kibnal_put_conn(conn);
2825 }
2826
2827 int
2828 kibnal_connd (void *arg)
2829 {
2830         wait_queue_t       wait;
2831         unsigned long      flags;
2832         kib_conn_t        *conn;
2833         kib_peer_t        *peer;
2834         int                timeout;
2835         int                i;
2836         int                peer_index = 0;
2837         unsigned long      deadline = jiffies;
2838         
2839         kportal_daemonize ("kibnal_connd");
2840         kportal_blockallsigs ();
2841
2842         init_waitqueue_entry (&wait, current);
2843
2844         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2845
2846         for (;;) {
2847                 if (!list_empty (&kibnal_data.kib_connd_conns)) {
2848                         conn = list_entry (kibnal_data.kib_connd_conns.next,
2849                                            kib_conn_t, ibc_list);
2850                         list_del (&conn->ibc_list);
2851                         
2852                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2853                         kib_connd_handle_state(conn);
2854
2855                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2856                         continue;
2857                 }
2858
2859                 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2860                         peer = list_entry (kibnal_data.kib_connd_peers.next,
2861                                            kib_peer_t, ibp_connd_list);
2862                         
2863                         list_del_init (&peer->ibp_connd_list);
2864                         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2865
2866                         kibnal_connect_peer (peer);
2867                         kib_peer_decref (peer);
2868
2869                         spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2870                 }
2871
2872                 /* shut down and nobody left to reap... */
2873                 if (kibnal_data.kib_shutdown &&
2874                     atomic_read(&kibnal_data.kib_nconns) == 0)
2875                         break;
2876
2877                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2878
2879                 /* careful with the jiffy wrap... */
2880                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2881                         const int n = 4;
2882                         const int p = 1;
2883                         int       chunk = kibnal_data.kib_peer_hash_size;
2884                         
2885                         /* Time to check for RDMA timeouts on a few more
2886                          * peers: I do checks every 'p' seconds on a
2887                          * proportion of the peer table and I need to check
2888                          * every connection 'n' times within a timeout
2889                          * interval, to ensure I detect a timeout on any
2890                          * connection within (n+1)/n times the timeout
2891                          * interval. */
2892
2893                         if (kibnal_tunables.kib_io_timeout > n * p)
2894                                 chunk = (chunk * n * p) / 
2895                                         kibnal_tunables.kib_io_timeout;
2896                         if (chunk == 0)
2897                                 chunk = 1;
2898
2899                         for (i = 0; i < chunk; i++) {
2900                                 kibnal_check_conns (peer_index);
2901                                 peer_index = (peer_index + 1) % 
2902                                              kibnal_data.kib_peer_hash_size;
2903                         }
2904
2905                         deadline += p * HZ;
2906                 }
2907
2908                 kibnal_data.kib_connd_waketime = jiffies + timeout;
2909
2910                 set_current_state (TASK_INTERRUPTIBLE);
2911                 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2912
2913                 if (!kibnal_data.kib_shutdown &&
2914                     list_empty (&kibnal_data.kib_connd_conns) &&
2915                     list_empty (&kibnal_data.kib_connd_peers))
2916                         schedule_timeout (timeout);
2917
2918                 set_current_state (TASK_RUNNING);
2919                 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2920
2921                 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2922         }
2923
2924         spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2925
2926         kibnal_thread_fini ();
2927         return (0);
2928 }
2929
2930 int
2931 kibnal_scheduler(void *arg)
2932 {
2933         long            id = (long)arg;
2934         char            name[16];
2935         kib_rx_t       *rx;
2936         kib_tx_t       *tx;
2937         unsigned long   flags;
2938         int             rc;
2939         int             counter = 0;
2940         int             did_something;
2941
2942         snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2943         kportal_daemonize(name);
2944         kportal_blockallsigs();
2945
2946         spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2947
2948         for (;;) {
2949                 did_something = 0;
2950
2951                 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2952                         tx = list_entry(kibnal_data.kib_sched_txq.next,
2953                                         kib_tx_t, tx_list);
2954                         list_del(&tx->tx_list);
2955                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2956                                                flags);
2957                         kibnal_tx_done(tx);
2958
2959                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2960                                           flags);
2961                 }
2962
2963                 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2964                         rx = list_entry(kibnal_data.kib_sched_rxq.next,
2965                                         kib_rx_t, rx_list);
2966                         list_del(&rx->rx_list);
2967                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2968                                                flags);
2969
2970                         kibnal_rx(rx);
2971
2972                         did_something = 1;
2973                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2974                                           flags);
2975                 }
2976
2977                 /* shut down and no receives to complete... */
2978                 if (kibnal_data.kib_shutdown &&
2979                     atomic_read(&kibnal_data.kib_nconns) == 0)
2980                         break;
2981
2982                 /* nothing to do or hogging CPU */
2983                 if (!did_something || counter++ == IBNAL_RESCHED) {
2984                         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2985                                                flags);
2986                         counter = 0;
2987
2988                         if (!did_something) {
2989                                 rc = wait_event_interruptible(
2990                                         kibnal_data.kib_sched_waitq,
2991                                         !list_empty(&kibnal_data.kib_sched_txq) || 
2992                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
2993                                         (kibnal_data.kib_shutdown &&
2994                                          atomic_read (&kibnal_data.kib_nconns) == 0));
2995                         } else {
2996                                 our_cond_resched();
2997                         }
2998
2999                         spin_lock_irqsave(&kibnal_data.kib_sched_lock,
3000                                           flags);
3001                 }
3002         }
3003
3004         spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3005
3006         kibnal_thread_fini();
3007         return (0);
3008 }
3009
3010
3011 lib_nal_t kibnal_lib = {
3012         libnal_data:        &kibnal_data,      /* NAL private data */
3013         libnal_send:         kibnal_send,
3014         libnal_send_pages:   kibnal_send_pages,
3015         libnal_recv:         kibnal_recv,
3016         libnal_recv_pages:   kibnal_recv_pages,
3017         libnal_dist:         kibnal_dist
3018 };