Whamcloud - gitweb
4934eda392df7873fb18d2692f9b29aad843e318
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/ptllnd/ptllnd_cb.c
37  *
38  * Author: PJ Kirner <pjkirner@clusterfs.com>
39  */
40
41 #include "ptllnd.h"
42
43 #ifndef _USING_LUSTRE_PORTALS_
44 int
45 kptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,
46                      int src_niov, struct iovec *src,
47                      unsigned int offset, unsigned int len)
48 {
49         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
50          * for exactly 'len' bytes, and return the number of entries.
51          * NB not destructive to 'src' */
52         unsigned int    frag_len;
53         unsigned int    niov;
54
55         if (len == 0)                           /* no data => */
56                 return (0);                     /* no frags */
57
58         LASSERT (src_niov > 0);
59         while (offset >= src->iov_len) {      /* skip initial frags */
60                 offset -= src->iov_len;
61                 src_niov--;
62                 src++;
63                 LASSERT (src_niov > 0);
64         }
65
66         niov = 1;
67         for (;;) {
68                 LASSERT (src_niov > 0);
69                 LASSERT (niov <= dst_niov);
70
71                 frag_len = src->iov_len - offset;
72                 dst->iov_base = ((char *)src->iov_base) + offset;
73
74                 if (len <= frag_len) {
75                         dst->iov_len = len;
76                         return (niov);
77                 }
78
79                 dst->iov_len = frag_len;
80
81                 len -= frag_len;
82                 dst++;
83                 src++;
84                 niov++;
85                 src_niov--;
86                 offset = 0;
87         }
88 }
89
90 int
91 kptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,
92                       int src_niov, lnet_kiov_t *src,
93                       unsigned int offset, unsigned int len)
94 {
95         /* Initialise 'dst' to the physical addresses of the subset of 'src'
96          * starting at 'offset', for exactly 'len' bytes, and return the number
97          * of entries.  NB not destructive to 'src' */
98         unsigned int    frag_len;
99         unsigned int    niov;
100         __u64           phys_page;
101         __u64           phys;
102
103         if (len == 0)                           /* no data => */
104                 return (0);                     /* no frags */
105
106         LASSERT (src_niov > 0);
107         while (offset >= src->kiov_len) {      /* skip initial frags */
108                 offset -= src->kiov_len;
109                 src_niov--;
110                 src++;
111                 LASSERT (src_niov > 0);
112         }
113
114         niov = 1;
115         for (;;) {
116                 LASSERT (src_niov > 0);
117                 LASSERT (niov <= dst_niov);
118
119                 frag_len = min(src->kiov_len - offset, len);
120                 phys_page = lnet_page2phys(src->kiov_page);
121                 phys = phys_page + src->kiov_offset + offset;
122
123                 LASSERT (sizeof(void *) > 4 || 
124                          (phys <= 0xffffffffULL &&
125                           phys + (frag_len - 1) <= 0xffffffffULL));
126
127                 dst->iov_base = (void *)((unsigned long)phys);
128                 dst->iov_len = frag_len;
129                 
130                 if (frag_len == len)
131                         return niov;
132
133                 len -= frag_len;
134                 dst++;
135                 src++;
136                 niov++;
137                 src_niov--;
138                 offset = 0;
139         }
140 }
141 #endif
142
143 void
144 kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
145                      struct iovec *iov, lnet_kiov_t *kiov,
146                      unsigned int offset, unsigned int nob)
147 {
148         LASSERT (iov == NULL || kiov == NULL);
149         
150         memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
151
152         tx->tx_rdma_md.start     = tx->tx_frags;
153         tx->tx_rdma_md.user_ptr  = &tx->tx_rdma_eventarg;
154         tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
155         tx->tx_rdma_md.options   = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
156                                    PTL_MD_EVENT_START_DISABLE;
157         switch (tx->tx_type) {
158         default:
159                 LBUG();
160                 
161         case TX_TYPE_PUT_REQUEST:               /* passive: peer gets */
162                 tx->tx_rdma_md.threshold = 1;   /* GET event */
163                 tx->tx_rdma_md.options |= PTL_MD_OP_GET;
164                 break;
165
166         case TX_TYPE_GET_REQUEST:               /* passive: peer puts */
167                 tx->tx_rdma_md.threshold = 1;   /* PUT event */
168                 tx->tx_rdma_md.options |= PTL_MD_OP_PUT;
169                 break;
170                 
171         case TX_TYPE_PUT_RESPONSE:              /* active: I get */
172                 tx->tx_rdma_md.threshold = 2;   /* SEND + REPLY */
173                 break;
174                 
175         case TX_TYPE_GET_RESPONSE:              /* active: I put */
176                 tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1;   /* SEND + ACK? */
177                 break;
178         }
179
180         if (nob == 0) {
181                 tx->tx_rdma_md.length = 0;
182                 return;
183         }
184
185 #ifdef _USING_LUSTRE_PORTALS_
186         if (iov != NULL) {
187                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
188                 tx->tx_rdma_md.length = 
189                         lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
190                                          niov, iov, offset, nob);
191                 return;
192         }
193
194         /* Cheating OK since ptl_kiov_t == lnet_kiov_t */
195         CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));
196         CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==
197                  offsetof(lnet_kiov_t, kiov_offset));
198         CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==
199                  offsetof(lnet_kiov_t, kiov_page));
200         CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==
201                  offsetof(lnet_kiov_t, kiov_len));
202         
203         tx->tx_rdma_md.options |= PTL_MD_KIOV;
204         tx->tx_rdma_md.length = 
205                 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
206                                   niov, kiov, offset, nob);
207 #else
208         if (iov != NULL) {
209                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
210                 tx->tx_rdma_md.length = 
211                         kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
212                                             niov, iov, offset, nob);
213                 return;
214         }
215
216         tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
217         tx->tx_rdma_md.length =
218                 kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
219                                      niov, kiov, offset, nob);
220 #endif
221 }
222
223 int
224 kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
225                     unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
226                     unsigned int offset, int nob)
227 {
228         kptl_tx_t       *tx;
229         ptl_err_t        ptlrc;
230         kptl_msg_t      *rxmsg = rx->rx_msg;
231         kptl_peer_t     *peer = rx->rx_peer;
232         unsigned long    flags;
233         ptl_handle_md_t  mdh;
234
235         LASSERT (type == TX_TYPE_PUT_RESPONSE || 
236                  type == TX_TYPE_GET_RESPONSE);
237
238         tx = kptllnd_get_idle_tx(type);
239         if (tx == NULL) {
240                 CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",
241                         type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",
242                         libcfs_id2str(peer->peer_id));
243                 return -ENOMEM;
244         }
245
246         kptllnd_set_tx_peer(tx, peer);
247         kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);
248
249         ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md, 
250                           PTL_UNLINK, &mdh);
251         if (ptlrc != PTL_OK) {
252                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
253                        libcfs_id2str(peer->peer_id),
254                        kptllnd_errtype2str(ptlrc), ptlrc);
255                 tx->tx_status = -EIO;
256                 kptllnd_tx_decref(tx);
257                 return -EIO;
258         }
259
260         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
261
262         tx->tx_lnet_msg = lntmsg;
263         /* lnet_finalize() will be called when tx is torn down, so I must
264          * return success from here on... */
265
266         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
267         tx->tx_rdma_mdh = mdh;
268         tx->tx_active = 1;
269         cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
270
271         /* peer has now got my ref on 'tx' */
272
273         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
274
275         tx->tx_tposted = jiffies;
276
277         if (type == TX_TYPE_GET_RESPONSE)
278                 ptlrc = PtlPut(mdh,
279                                tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
280                                rx->rx_initiator,
281                                *kptllnd_tunables.kptl_portal,
282                                0,                     /* acl cookie */
283                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
284                                0,                     /* offset */
285                                (lntmsg != NULL) ?     /* header data */
286                                PTLLND_RDMA_OK :
287                                PTLLND_RDMA_FAIL);
288         else
289                 ptlrc = PtlGet(mdh,
290                                rx->rx_initiator,
291                                *kptllnd_tunables.kptl_portal,
292                                0,                     /* acl cookie */
293                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
294                                0);                    /* offset */
295
296         if (ptlrc != PTL_OK) {
297                 CERROR("Ptl%s failed: %s(%d)\n", 
298                        (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
299                        kptllnd_errtype2str(ptlrc), ptlrc);
300                 
301                 kptllnd_peer_close(peer, -EIO);
302                 /* Everything (including this RDMA) queued on the peer will
303                  * be completed with failure */
304                 kptllnd_schedule_ptltrace_dump();
305         }
306
307         return 0;
308 }
309
310 int
311 kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
312 {
313         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
314         int               type = lntmsg->msg_type;
315         lnet_process_id_t target = lntmsg->msg_target;
316         int               target_is_router = lntmsg->msg_target_is_router;
317         int               routing = lntmsg->msg_routing;
318         unsigned int      payload_niov = lntmsg->msg_niov;
319         struct iovec     *payload_iov = lntmsg->msg_iov;
320         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
321         unsigned int      payload_offset = lntmsg->msg_offset;
322         unsigned int      payload_nob = lntmsg->msg_len;
323         kptl_net_t       *net = ni->ni_data;
324         kptl_peer_t      *peer = NULL;
325         int               mpflag = 0;
326         kptl_tx_t        *tx;
327         int               nob;
328         int               nfrag;
329         int               rc;
330
331         LASSERT (net->net_ni == ni);
332         LASSERT (!net->net_shutdown);
333         LASSERT (payload_nob == 0 || payload_niov > 0);
334         LASSERT (payload_niov <= LNET_MAX_IOV);
335         LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
336         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
337         LASSERT (!cfs_in_interrupt());
338
339         if (lntmsg->msg_vmflush)
340                 mpflag = cfs_memory_pressure_get_and_set();
341
342         rc = kptllnd_find_target(net, target, &peer);
343         if (rc != 0)
344                 goto out;
345
346         /* NB peer->peer_id does NOT always equal target, be careful with
347          * which one to use */
348         switch (type) {
349         default:
350                 LBUG();
351                 return -EINVAL;
352
353         case LNET_MSG_REPLY:
354         case LNET_MSG_PUT:
355                 /* Should the payload avoid RDMA? */
356                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
357                 if (payload_kiov == NULL && 
358                     nob <= peer->peer_max_msg_size)
359                         break;
360
361                 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
362                 if (tx == NULL) {
363                         CERROR("Can't send %s to %s: can't allocate descriptor\n",
364                                lnet_msgtyp2str(type),
365                                libcfs_id2str(target));
366                         rc = -ENOMEM;
367                         goto out;
368                 }
369
370                 kptllnd_init_rdma_md(tx, payload_niov, 
371                                      payload_iov, payload_kiov,
372                                      payload_offset, payload_nob);
373
374                 tx->tx_lnet_msg = lntmsg;
375                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
376                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
377                                  target, sizeof(kptl_rdma_msg_t));
378
379                 CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
380                        libcfs_id2str(target),
381                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
382
383                 kptllnd_tx_launch(peer, tx, 0);
384                 goto out;
385
386         case LNET_MSG_GET:
387                 /* routed gets don't RDMA */
388                 if (target_is_router || routing)
389                         break;
390
391                 /* Is the payload small enough not to need RDMA? */
392                 nob = lntmsg->msg_md->md_length;
393                 nob = offsetof(kptl_msg_t, 
394                                ptlm_u.immediate.kptlim_payload[nob]);
395                 if (nob <= peer->peer_max_msg_size)
396                         break;
397
398                 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
399                 if (tx == NULL) {
400                         CERROR("Can't send GET to %s: can't allocate descriptor\n",
401                                libcfs_id2str(target));
402                         rc = -ENOMEM;
403                         goto out;
404                 }
405
406                 tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg);
407                 if (tx->tx_lnet_replymsg == NULL) {
408                         CERROR("Failed to allocate LNET reply for %s\n",
409                                libcfs_id2str(target));
410                         kptllnd_tx_decref(tx);
411                         rc = -ENOMEM;
412                         goto out;
413                 }
414
415                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
416                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
417                                              lntmsg->msg_md->md_iov.iov, NULL,
418                                              0, lntmsg->msg_md->md_length);
419                 else
420                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
421                                              NULL, lntmsg->msg_md->md_iov.kiov,
422                                              0, lntmsg->msg_md->md_length);
423
424                 tx->tx_lnet_msg = lntmsg;
425                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
426                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
427                                  target, sizeof(kptl_rdma_msg_t));
428
429                 CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
430                        libcfs_id2str(target),
431                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
432
433                 kptllnd_tx_launch(peer, tx, 0);
434                 goto out;
435
436         case LNET_MSG_ACK:
437                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
438                 LASSERT (payload_nob == 0);
439                 break;
440         }
441
442         /* I don't have to handle kiovs */
443         LASSERT (payload_nob == 0 || payload_iov != NULL);
444
445         tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
446         if (tx == NULL) {
447                 CERROR("Can't send %s to %s: can't allocate descriptor\n",
448                        lnet_msgtyp2str(type), libcfs_id2str(target));
449                 rc = -ENOMEM;
450                 goto out;
451         }
452
453         tx->tx_lnet_msg = lntmsg;
454         tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
455
456         if (payload_nob == 0) {
457                 nfrag = 0;
458         } else {
459                 tx->tx_frags->iov[0].iov_base = tx->tx_msg;
460                 tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
461                                                         ptlm_u.immediate.kptlim_payload);
462
463                 /* NB relying on lustre not asking for PTL_MD_MAX_IOV
464                  * fragments!! */
465 #ifdef _USING_LUSTRE_PORTALS_
466                 nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1, 
467                                              &tx->tx_frags->iov[1],
468                                              payload_niov, payload_iov,
469                                              payload_offset, payload_nob);
470 #else
471                 nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
472                                                 &tx->tx_frags->iov[1],
473                                                 payload_niov, payload_iov,
474                                                 payload_offset, payload_nob);
475 #endif
476         }
477
478         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
479         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob);
480
481         CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
482                libcfs_id2str(target),
483                lnet_msgtyp2str(lntmsg->msg_type),
484                (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ? 
485                le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
486                (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ? 
487                le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
488                tx);
489
490         kptllnd_tx_launch(peer, tx, nfrag);
491
492  out:
493         if (lntmsg->msg_vmflush)
494                 cfs_memory_pressure_restore(mpflag);
495         if (peer)
496                 kptllnd_peer_decref(peer);
497         return rc;
498 }
499
500 int 
501 kptllnd_eager_recv(struct lnet_ni *ni, void *private,
502                    lnet_msg_t *msg, void **new_privatep)
503 {
504         kptl_rx_t        *rx = private;
505
506         CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);
507
508         /* I have to release my ref on rxb (if I have one) to ensure I'm an
509          * eager receiver, so I copy the incoming request from the buffer it
510          * landed in, into space reserved in the descriptor... */
511
512 #if (PTL_MD_LOCAL_ALIGN8 == 0)
513         if (rx->rx_rxb == NULL)                 /* already copied */
514                 return 0;                       /* to fix alignment */
515 #else
516         LASSERT(rx->rx_rxb != NULL);
517 #endif
518         LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);
519
520         memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);
521         rx->rx_msg = (kptl_msg_t *)rx->rx_space;
522
523         kptllnd_rx_buffer_decref(rx->rx_rxb);
524         rx->rx_rxb = NULL;
525
526         return 0;
527 }
528
529
530 int 
531 kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
532               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
533               unsigned int offset, unsigned int mlen, unsigned int rlen)
534 {
535         kptl_rx_t    *rx = private;
536         kptl_msg_t   *rxmsg = rx->rx_msg;
537         int           nob;
538         int           rc;
539
540         CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",
541                kptllnd_msgtype2str(rxmsg->ptlm_type),
542                niov, offset, mlen, rlen);
543
544         LASSERT (mlen <= rlen);
545         LASSERT (mlen >= 0);
546         LASSERT (!cfs_in_interrupt());
547         LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
548         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
549
550 #ifdef CRAY_XT3
551         if (lntmsg != NULL &&
552             rx->rx_uid != 0) {
553                 /* Set the UID if the sender's uid isn't 0; i.e. non-root
554                  * running in userspace (e.g. a catamount node; linux kernel
555                  * senders, including routers have uid 0).  If this is a lustre
556                  * RPC request, this tells lustre not to trust the creds in the
557                  * RPC message body. */
558                 lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
559         }
560 #endif
561         switch(rxmsg->ptlm_type)
562         {
563         default:
564                 LBUG();
565                 rc = -EINVAL;
566                 break;
567
568         case PTLLND_MSG_TYPE_IMMEDIATE:
569                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);
570
571                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
572                 if (nob > rx->rx_nob) {
573                         CERROR ("Immediate message from %s too big: %d(%d)\n",
574                                 libcfs_id2str(rx->rx_peer->peer_id), nob,
575                                 rx->rx_nob);
576                         rc = -EINVAL;
577                         break;
578                 }
579
580                 if (kiov != NULL)
581                         lnet_copy_flat2kiov(
582                                 niov, kiov, offset,
583                                 *kptllnd_tunables.kptl_max_msg_size,
584                                 rxmsg->ptlm_u.immediate.kptlim_payload,
585                                 0,
586                                 mlen);
587                 else
588                         lnet_copy_flat2iov(
589                                 niov, iov, offset,
590                                 *kptllnd_tunables.kptl_max_msg_size,
591                                 rxmsg->ptlm_u.immediate.kptlim_payload,
592                                 0,
593                                 mlen);
594
595                 lnet_finalize (ni, lntmsg, 0);
596                 rc = 0;
597                 break;
598
599         case PTLLND_MSG_TYPE_GET:
600                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);
601
602                 /* NB always send RDMA so the peer can complete.  I send
603                  * success/failure in the portals 'hdr_data' */
604
605                 if (lntmsg == NULL)
606                         rc = kptllnd_active_rdma(rx, NULL,
607                                                  TX_TYPE_GET_RESPONSE,
608                                                  0, NULL, NULL, 0, 0);
609                 else
610                         rc = kptllnd_active_rdma(rx, lntmsg, 
611                                                  TX_TYPE_GET_RESPONSE,
612                                                  lntmsg->msg_niov,
613                                                  lntmsg->msg_iov, 
614                                                  lntmsg->msg_kiov,
615                                                  lntmsg->msg_offset, 
616                                                  lntmsg->msg_len);
617                 break;
618
619         case PTLLND_MSG_TYPE_PUT:
620                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);
621
622                 /* NB always send RDMA so the peer can complete; it'll be 0
623                  * bytes if there was no match (lntmsg == NULL). I have no way
624                  * to let my peer know this, but she's only interested in when
625                  * the net has stopped accessing her buffer in any case. */
626
627                 rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,
628                                          niov, iov, kiov, offset, mlen);
629                 break;
630         }
631
632         /*
633          * We're done with the RX
634          */
635         kptllnd_rx_done(rx, PTLLND_POSTRX_PEER_CREDIT);
636         return rc;
637 }
638
639 void
640 kptllnd_eq_callback(ptl_event_t *ev)
641 {
642         kptl_eventarg_t *eva = ev->md.user_ptr;
643
644         switch (eva->eva_type) {
645         default:
646                 LBUG();
647
648         case PTLLND_EVENTARG_TYPE_MSG:
649         case PTLLND_EVENTARG_TYPE_RDMA:
650                 kptllnd_tx_callback(ev);
651                 break;
652
653         case PTLLND_EVENTARG_TYPE_BUF:
654                 kptllnd_rx_buffer_callback(ev);
655                 break;
656         }
657 }
658
659 void
660 kptllnd_thread_fini (void)
661 {
662         cfs_atomic_dec(&kptllnd_data.kptl_nthreads);
663 }
664
665 int
666 kptllnd_thread_start (int (*fn)(void *arg), void *arg)
667 {
668         long                pid;
669
670         cfs_atomic_inc(&kptllnd_data.kptl_nthreads);
671
672         pid = cfs_create_thread (fn, arg, 0);
673         if (pid >= 0)
674                 return 0;
675
676         CERROR("Failed to start thread: error %d\n", (int)pid);
677         kptllnd_thread_fini();
678         return (int)pid;
679 }
680
681 int
682 kptllnd_watchdog(void *arg)
683 {
684         int                 id = (long)arg;
685         char                name[16];
686         cfs_waitlink_t      waitlink;
687         int                 stamp = 0;
688         int                 peer_index = 0;
689         unsigned long       deadline = jiffies;
690         int                 timeout;
691         int                 i;
692
693         snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);
694         cfs_daemonize(name);
695         cfs_block_allsigs();
696
697         cfs_waitlink_init(&waitlink);
698
699         /* threads shut down in phase 2 after all peers have been destroyed */
700         while (kptllnd_data.kptl_shutdown < 2) {
701
702                 /* add a check for needs ptltrace
703                  * yes, this is blatant hijacking of this thread
704                  * we can't dump directly from tx or rx _callbacks as it
705                  * deadlocks portals and takes out the node
706                 */
707
708                 if (cfs_atomic_read(&kptllnd_data.kptl_needs_ptltrace)) {
709 #ifdef CRAY_XT3
710                         kptllnd_dump_ptltrace();
711                         /* we only dump once, no matter how many pending */
712                         cfs_atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
713 #else
714                         LBUG();
715 #endif
716                 }
717
718                 timeout = (int)(deadline - jiffies);
719
720                 if (timeout <= 0) {
721                         const int n = 4;
722                         const int p = 1;
723                         int       chunk = kptllnd_data.kptl_peer_hash_size;
724
725
726                         /* Time to check for RDMA timeouts on a few more
727                          * peers: I do checks every 'p' seconds on a
728                          * proportion of the peer table and I need to check
729                          * every connection 'n' times within a timeout
730                          * interval, to ensure I detect a timeout on any
731                          * connection within (n+1)/n times the timeout
732                          * interval. */
733
734                         if ((*kptllnd_tunables.kptl_timeout) > n * p)
735                                 chunk = (chunk * n * p) /
736                                         (*kptllnd_tunables.kptl_timeout);
737                         if (chunk == 0)
738                                 chunk = 1;
739
740                         for (i = 0; i < chunk; i++) {
741                                 kptllnd_peer_check_bucket(peer_index, stamp);
742                                 peer_index = (peer_index + 1) %
743                                      kptllnd_data.kptl_peer_hash_size;
744                         }
745
746                         deadline += p * CFS_HZ;
747                         stamp++;
748                         continue;
749                 }
750
751                 kptllnd_handle_closing_peers();
752
753                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
754                 cfs_waitq_add_exclusive(&kptllnd_data.kptl_watchdog_waitq,
755                                         &waitlink);
756
757                 cfs_waitq_timedwait(&waitlink, CFS_TASK_INTERRUPTIBLE, timeout);
758
759                 cfs_set_current_state (CFS_TASK_RUNNING);
760                 cfs_waitq_del(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
761         }
762
763         kptllnd_thread_fini();
764         CDEBUG(D_NET, "<<<\n");
765         return (0);
766 };
767
768 int
769 kptllnd_scheduler (void *arg)
770 {
771         int                 id = (long)arg;
772         char                name[16];
773         cfs_waitlink_t      waitlink;
774         unsigned long       flags;
775         int                 did_something;
776         int                 counter = 0;
777         kptl_rx_t          *rx;
778         kptl_rx_buffer_t   *rxb;
779         kptl_tx_t          *tx;
780
781         snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);
782         cfs_daemonize(name);
783         cfs_block_allsigs();
784
785         cfs_waitlink_init(&waitlink);
786
787         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
788
789         /* threads shut down in phase 2 after all peers have been destroyed */
790         while (kptllnd_data.kptl_shutdown < 2) {
791
792                 did_something = 0;
793
794                 if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxq)) {
795                         rx = cfs_list_entry (kptllnd_data.kptl_sched_rxq.next,
796                                              kptl_rx_t, rx_list);
797                         cfs_list_del(&rx->rx_list);
798
799                         cfs_spin_unlock_irqrestore(&kptllnd_data. \
800                                                    kptl_sched_lock,
801                                                    flags);
802
803                         kptllnd_rx_parse(rx);
804                         did_something = 1;
805
806                         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
807                                               flags);
808                 }
809
810                 if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxbq)) {
811                         rxb = cfs_list_entry (kptllnd_data.kptl_sched_rxbq.next,
812                                               kptl_rx_buffer_t,
813                                               rxb_repost_list);
814                         cfs_list_del(&rxb->rxb_repost_list);
815
816                         cfs_spin_unlock_irqrestore(&kptllnd_data. \
817                                                    kptl_sched_lock,
818                                                    flags);
819
820                         kptllnd_rx_buffer_post(rxb);
821                         did_something = 1;
822
823                         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
824                                               flags);
825                 }
826
827                 if (!cfs_list_empty(&kptllnd_data.kptl_sched_txq)) {
828                         tx = cfs_list_entry (kptllnd_data.kptl_sched_txq.next,
829                                              kptl_tx_t, tx_list);
830                         cfs_list_del_init(&tx->tx_list);
831
832                         cfs_spin_unlock_irqrestore(&kptllnd_data. \
833                                                    kptl_sched_lock, flags);
834
835                         kptllnd_tx_fini(tx);
836                         did_something = 1;
837
838                         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
839                                               flags);
840                 }
841
842                 if (did_something) {
843                         if (++counter != *kptllnd_tunables.kptl_reschedule_loops)
844                                 continue;
845                 }
846
847                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
848                 cfs_waitq_add_exclusive(&kptllnd_data.kptl_sched_waitq,
849                                         &waitlink);
850                 cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
851                                            flags);
852
853                 if (!did_something)
854                         cfs_waitq_wait(&waitlink, CFS_TASK_INTERRUPTIBLE);
855                 else
856                         cfs_cond_resched();
857
858                 cfs_set_current_state(CFS_TASK_RUNNING);
859                 cfs_waitq_del(&kptllnd_data.kptl_sched_waitq, &waitlink);
860
861                 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
862
863                 counter = 0;
864         }
865
866         cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
867
868         kptllnd_thread_fini();
869         return 0;
870 }