Whamcloud - gitweb
- turn off enable_irq_affinity by default.
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 #ifndef _USING_LUSTRE_PORTALS_
22 int
23 kptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,
24                      int src_niov, struct iovec *src,
25                      unsigned int offset, unsigned int len)
26 {
27         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
28          * for exactly 'len' bytes, and return the number of entries.
29          * NB not destructive to 'src' */
30         unsigned int    frag_len;
31         unsigned int    niov;
32
33         if (len == 0)                           /* no data => */
34                 return (0);                     /* no frags */
35
36         LASSERT (src_niov > 0);
37         while (offset >= src->iov_len) {      /* skip initial frags */
38                 offset -= src->iov_len;
39                 src_niov--;
40                 src++;
41                 LASSERT (src_niov > 0);
42         }
43
44         niov = 1;
45         for (;;) {
46                 LASSERT (src_niov > 0);
47                 LASSERT (niov <= dst_niov);
48
49                 frag_len = src->iov_len - offset;
50                 dst->iov_base = ((char *)src->iov_base) + offset;
51
52                 if (len <= frag_len) {
53                         dst->iov_len = len;
54                         return (niov);
55                 }
56
57                 dst->iov_len = frag_len;
58
59                 len -= frag_len;
60                 dst++;
61                 src++;
62                 niov++;
63                 src_niov--;
64                 offset = 0;
65         }
66 }
67
68 int
69 kptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,
70                       int src_niov, lnet_kiov_t *src,
71                       unsigned int offset, unsigned int len)
72 {
73         /* Initialise 'dst' to the physical addresses of the subset of 'src'
74          * starting at 'offset', for exactly 'len' bytes, and return the number
75          * of entries.  NB not destructive to 'src' */
76         unsigned int    frag_len;
77         unsigned int    niov;
78         __u64           phys_page;
79         __u64           phys;
80
81         if (len == 0)                           /* no data => */
82                 return (0);                     /* no frags */
83
84         LASSERT (src_niov > 0);
85         while (offset >= src->kiov_len) {      /* skip initial frags */
86                 offset -= src->kiov_len;
87                 src_niov--;
88                 src++;
89                 LASSERT (src_niov > 0);
90         }
91
92         niov = 1;
93         for (;;) {
94                 LASSERT (src_niov > 0);
95                 LASSERT (niov <= dst_niov);
96
97                 frag_len = min(src->kiov_len - offset, len);
98                 phys_page = lnet_page2phys(src->kiov_page);
99                 phys = phys_page + src->kiov_offset + offset;
100
101                 LASSERT (sizeof(void *) > 4 || 
102                          (phys <= 0xffffffffULL &&
103                           phys + (frag_len - 1) <= 0xffffffffULL));
104
105                 dst->iov_base = (void *)((unsigned long)phys);
106                 dst->iov_len = frag_len;
107                 
108                 if (frag_len == len)
109                         return niov;
110
111                 len -= frag_len;
112                 dst++;
113                 src++;
114                 niov++;
115                 src_niov--;
116                 offset = 0;
117         }
118 }
119 #endif
120
121 void
122 kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
123                      struct iovec *iov, lnet_kiov_t *kiov,
124                      unsigned int offset, unsigned int nob)
125 {
126         LASSERT (iov == NULL || kiov == NULL);
127         
128         memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
129
130         tx->tx_rdma_md.start     = tx->tx_frags;
131         tx->tx_rdma_md.user_ptr  = &tx->tx_rdma_eventarg;
132         tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
133         tx->tx_rdma_md.options   = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
134                                    PTL_MD_EVENT_START_DISABLE;
135         switch (tx->tx_type) {
136         default:
137                 LBUG();
138                 
139         case TX_TYPE_PUT_REQUEST:               /* passive: peer gets */
140                 tx->tx_rdma_md.threshold = 1;   /* GET event */
141                 tx->tx_rdma_md.options |= PTL_MD_OP_GET;
142                 break;
143
144         case TX_TYPE_GET_REQUEST:               /* passive: peer puts */
145                 tx->tx_rdma_md.threshold = 1;   /* PUT event */
146                 tx->tx_rdma_md.options |= PTL_MD_OP_PUT;
147                 break;
148                 
149         case TX_TYPE_PUT_RESPONSE:              /* active: I get */
150                 tx->tx_rdma_md.threshold = 2;   /* SEND + REPLY */
151                 break;
152                 
153         case TX_TYPE_GET_RESPONSE:              /* active: I put */
154                 tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1;   /* SEND + ACK? */
155                 break;
156         }
157
158         if (nob == 0) {
159                 tx->tx_rdma_md.length = 0;
160                 return;
161         }
162
163 #ifdef _USING_LUSTRE_PORTALS_
164         if (iov != NULL) {
165                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
166                 tx->tx_rdma_md.length = 
167                         lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
168                                          niov, iov, offset, nob);
169                 return;
170         }
171
172         /* Cheating OK since ptl_kiov_t == lnet_kiov_t */
173         CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));
174         CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==
175                  offsetof(lnet_kiov_t, kiov_offset));
176         CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==
177                  offsetof(lnet_kiov_t, kiov_page));
178         CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==
179                  offsetof(lnet_kiov_t, kiov_len));
180         
181         tx->tx_rdma_md.options |= PTL_MD_KIOV;
182         tx->tx_rdma_md.length = 
183                 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
184                                   niov, kiov, offset, nob);
185 #else
186         if (iov != NULL) {
187                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
188                 tx->tx_rdma_md.length = 
189                         kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
190                                             niov, iov, offset, nob);
191                 return;
192         }
193
194         tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
195         tx->tx_rdma_md.length =
196                 kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
197                                      niov, kiov, offset, nob);
198 #endif
199 }
200
201 int
202 kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
203                     unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
204                     unsigned int offset, int nob)
205 {
206         kptl_tx_t       *tx;
207         ptl_err_t        ptlrc;
208         kptl_msg_t      *rxmsg = rx->rx_msg;
209         kptl_peer_t     *peer = rx->rx_peer;
210         unsigned long    flags;
211         ptl_handle_md_t  mdh;
212
213         LASSERT (type == TX_TYPE_PUT_RESPONSE || 
214                  type == TX_TYPE_GET_RESPONSE);
215
216         tx = kptllnd_get_idle_tx(type);
217         if (tx == NULL) {
218                 CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",
219                         type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",
220                         libcfs_id2str(peer->peer_id));
221                 return -ENOMEM;
222         }
223
224         kptllnd_set_tx_peer(tx, peer);
225         kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);
226
227         ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md, 
228                           PTL_UNLINK, &mdh);
229         if (ptlrc != PTL_OK) {
230                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
231                        libcfs_id2str(peer->peer_id),
232                        kptllnd_errtype2str(ptlrc), ptlrc);
233                 tx->tx_status = -EIO;
234                 kptllnd_tx_decref(tx);
235                 return -EIO;
236         }
237
238         spin_lock_irqsave(&peer->peer_lock, flags);
239
240         tx->tx_lnet_msg = lntmsg;
241         /* lnet_finalize() will be called when tx is torn down, so I must
242          * return success from here on... */
243
244         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
245         tx->tx_rdma_mdh = mdh;
246         tx->tx_active = 1;
247         list_add_tail(&tx->tx_list, &peer->peer_activeq);
248
249         /* peer has now got my ref on 'tx' */
250
251         spin_unlock_irqrestore(&peer->peer_lock, flags);
252
253         tx->tx_tposted = jiffies;
254
255         if (type == TX_TYPE_GET_RESPONSE)
256                 ptlrc = PtlPut(mdh,
257                                tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
258                                rx->rx_initiator,
259                                *kptllnd_tunables.kptl_portal,
260                                0,                     /* acl cookie */
261                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
262                                0,                     /* offset */
263                                (lntmsg != NULL) ?     /* header data */
264                                PTLLND_RDMA_OK :
265                                PTLLND_RDMA_FAIL);
266         else
267                 ptlrc = PtlGet(mdh,
268                                rx->rx_initiator,
269                                *kptllnd_tunables.kptl_portal,
270                                0,                     /* acl cookie */
271                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
272                                0);                    /* offset */
273
274         if (ptlrc != PTL_OK) {
275                 CERROR("Ptl%s failed: %s(%d)\n", 
276                        (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
277                        kptllnd_errtype2str(ptlrc), ptlrc);
278                 
279                 kptllnd_peer_close(peer, -EIO);
280                 /* Everything (including this RDMA) queued on the peer will
281                  * be completed with failure */
282         }
283
284         return 0;
285 }
286
287 int
288 kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
289 {
290         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
291         int               type = lntmsg->msg_type;
292         lnet_process_id_t target = lntmsg->msg_target;
293         int               target_is_router = lntmsg->msg_target_is_router;
294         int               routing = lntmsg->msg_routing;
295         unsigned int      payload_niov = lntmsg->msg_niov;
296         struct iovec     *payload_iov = lntmsg->msg_iov;
297         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
298         unsigned int      payload_offset = lntmsg->msg_offset;
299         unsigned int      payload_nob = lntmsg->msg_len;
300         kptl_peer_t      *peer;
301         kptl_tx_t        *tx;
302         int               nob;
303         int               nfrag;
304         int               rc;
305
306         LASSERT (payload_nob == 0 || payload_niov > 0);
307         LASSERT (payload_niov <= LNET_MAX_IOV);
308         LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
309         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
310         LASSERT (!in_interrupt());
311
312         rc = kptllnd_find_target(&peer, target);
313         if (rc != 0)
314                 return rc;
315         
316         switch (type) {
317         default:
318                 LBUG();
319                 return -EINVAL;
320
321         case LNET_MSG_REPLY:
322         case LNET_MSG_PUT:
323                 /* Should the payload avoid RDMA? */
324                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
325                 if (payload_kiov == NULL && 
326                     nob <= peer->peer_max_msg_size)
327                         break;
328
329                 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
330                 if (tx == NULL) {
331                         CERROR("Can't send %s to %s: can't allocate descriptor\n",
332                                lnet_msgtyp2str(type),
333                                libcfs_id2str(target));
334                         rc = -ENOMEM;
335                         goto out;
336                 }
337
338                 kptllnd_init_rdma_md(tx, payload_niov, 
339                                      payload_iov, payload_kiov,
340                                      payload_offset, payload_nob);
341
342                 tx->tx_lnet_msg = lntmsg;
343                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
344                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
345                                   sizeof(kptl_rdma_msg_t));
346
347                 CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
348                        libcfs_id2str(target),
349                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
350
351                 kptllnd_tx_launch(peer, tx, 0);
352                 goto out;
353
354         case LNET_MSG_GET:
355                 /* routed gets don't RDMA */
356                 if (target_is_router || routing)
357                         break;
358
359                 /* Is the payload small enough not to need RDMA? */
360                 nob = lntmsg->msg_md->md_length;
361                 nob = offsetof(kptl_msg_t, 
362                                ptlm_u.immediate.kptlim_payload[nob]);
363                 if (nob <= peer->peer_max_msg_size)
364                         break;
365
366                 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
367                 if (tx == NULL) {
368                         CERROR("Can't send GET to %s: can't allocate descriptor\n",
369                                libcfs_id2str(target));
370                         rc = -ENOMEM;
371                         goto out;
372                 }
373
374                 tx->tx_lnet_replymsg =
375                         lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg);
376                 if (tx->tx_lnet_replymsg == NULL) {
377                         CERROR("Failed to allocate LNET reply for %s\n",
378                                libcfs_id2str(target));
379                         kptllnd_tx_decref(tx);
380                         rc = -ENOMEM;
381                         goto out;
382                 }
383
384                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
385                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
386                                              lntmsg->msg_md->md_iov.iov, NULL,
387                                              0, lntmsg->msg_md->md_length);
388                 else
389                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
390                                              NULL, lntmsg->msg_md->md_iov.kiov,
391                                              0, lntmsg->msg_md->md_length);
392                 
393                 tx->tx_lnet_msg = lntmsg;
394                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
395                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
396                                   sizeof(kptl_rdma_msg_t));
397
398                 CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
399                        libcfs_id2str(target),
400                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
401
402                 kptllnd_tx_launch(peer, tx, 0);
403                 goto out;
404
405         case LNET_MSG_ACK:
406                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
407                 LASSERT (payload_nob == 0);
408                 break;
409         }
410
411         /* I don't have to handle kiovs */
412         LASSERT (payload_nob == 0 || payload_iov != NULL);
413
414         tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
415         if (tx == NULL) {
416                 CERROR("Can't send %s to %s: can't allocate descriptor\n",
417                        lnet_msgtyp2str(type), libcfs_id2str(target));
418                 rc = -ENOMEM;
419                 goto out;
420         }
421
422         tx->tx_lnet_msg = lntmsg;
423         tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
424
425         if (payload_nob == 0) {
426                 nfrag = 0;
427         } else {
428                 tx->tx_frags->iov[0].iov_base = tx->tx_msg;
429                 tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
430                                                         ptlm_u.immediate.kptlim_payload);
431
432                 /* NB relying on lustre not asking for PTL_MD_MAX_IOV
433                  * fragments!! */
434 #ifdef _USING_LUSTRE_PORTALS_
435                 nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1, 
436                                              &tx->tx_frags->iov[1],
437                                              payload_niov, payload_iov,
438                                              payload_offset, payload_nob);
439 #else
440                 nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
441                                                 &tx->tx_frags->iov[1],
442                                                 payload_niov, payload_iov,
443                                                 payload_offset, payload_nob);
444 #endif
445         }
446         
447         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
448         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
449
450         CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
451                libcfs_id2str(target),
452                lnet_msgtyp2str(lntmsg->msg_type),
453                (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ? 
454                le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
455                (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ? 
456                le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
457                tx);
458
459         kptllnd_tx_launch(peer, tx, nfrag);
460
461  out:
462         kptllnd_peer_decref(peer);
463         return rc;
464 }
465
466 int 
467 kptllnd_eager_recv(struct lnet_ni *ni, void *private,
468                    lnet_msg_t *msg, void **new_privatep)
469 {
470         kptl_rx_t        *rx = private;
471
472         CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);
473
474         /* I have to release my ref on rxb (if I have one) to ensure I'm an
475          * eager receiver, so I copy the incoming request from the buffer it
476          * landed in, into space reserved in the descriptor... */
477
478 #if (PTL_MD_LOCAL_ALIGN8 == 0)
479         if (rx->rx_rxb == NULL)                 /* already copied */
480                 return 0;                       /* to fix alignment */
481 #else
482         LASSERT(rx->rx_rxb != NULL);
483 #endif
484         LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);
485
486         memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);
487         rx->rx_msg = (kptl_msg_t *)rx->rx_space;
488
489         kptllnd_rx_buffer_decref(rx->rx_rxb);
490         rx->rx_rxb = NULL;
491
492         return 0;
493 }
494
495
496 int 
497 kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
498               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
499               unsigned int offset, unsigned int mlen, unsigned int rlen)
500 {
501         kptl_rx_t    *rx = private;
502         kptl_msg_t   *rxmsg = rx->rx_msg;
503         int           nob;
504         int           rc;
505
506         CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",
507                kptllnd_msgtype2str(rxmsg->ptlm_type),
508                niov, offset, mlen, rlen);
509
510         LASSERT (mlen <= rlen);
511         LASSERT (mlen >= 0);
512         LASSERT (!in_interrupt());
513         LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
514         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
515
516 #ifdef CRAY_XT3
517         if (lntmsg != NULL &&
518             rx->rx_uid != 0) {
519                 /* Set the UID if the sender's uid isn't 0; i.e. non-root
520                  * running in userspace (e.g. a catamount node; linux kernel
521                  * senders, including routers have uid 0).  If this is a lustre
522                  * RPC request, this tells lustre not to trust the creds in the
523                  * RPC message body. */
524                 lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
525         }
526 #endif
527         switch(rxmsg->ptlm_type)
528         {
529         default:
530                 LBUG();
531                 rc = -EINVAL;
532                 break;
533
534         case PTLLND_MSG_TYPE_IMMEDIATE:
535                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);
536
537                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
538                 if (nob > rx->rx_nob) {
539                         CERROR ("Immediate message from %s too big: %d(%d)\n",
540                                 libcfs_id2str(rx->rx_peer->peer_id), nob,
541                                 rx->rx_nob);
542                         rc = -EINVAL;
543                         break;
544                 }
545
546                 if (kiov != NULL)
547                         lnet_copy_flat2kiov(
548                                 niov, kiov, offset,
549                                 *kptllnd_tunables.kptl_max_msg_size,
550                                 rxmsg->ptlm_u.immediate.kptlim_payload,
551                                 0,
552                                 mlen);
553                 else
554                         lnet_copy_flat2iov(
555                                 niov, iov, offset,
556                                 *kptllnd_tunables.kptl_max_msg_size,
557                                 rxmsg->ptlm_u.immediate.kptlim_payload,
558                                 0,
559                                 mlen);
560
561                 lnet_finalize (ni, lntmsg, 0);
562                 rc = 0;
563                 break;
564
565         case PTLLND_MSG_TYPE_GET:
566                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);
567
568                 /* NB always send RDMA so the peer can complete.  I send
569                  * success/failure in the portals 'hdr_data' */
570
571                 if (lntmsg == NULL)
572                         rc = kptllnd_active_rdma(rx, NULL,
573                                                  TX_TYPE_GET_RESPONSE,
574                                                  0, NULL, NULL, 0, 0);
575                 else
576                         rc = kptllnd_active_rdma(rx, lntmsg, 
577                                                  TX_TYPE_GET_RESPONSE,
578                                                  lntmsg->msg_niov,
579                                                  lntmsg->msg_iov, 
580                                                  lntmsg->msg_kiov,
581                                                  lntmsg->msg_offset, 
582                                                  lntmsg->msg_len);
583                 break;
584
585         case PTLLND_MSG_TYPE_PUT:
586                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);
587
588                 /* NB always send RDMA so the peer can complete; it'll be 0
589                  * bytes if there was no match (lntmsg == NULL). I have no way
590                  * to let my peer know this, but she's only interested in when
591                  * the net has stopped accessing her buffer in any case. */
592
593                 rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,
594                                          niov, iov, kiov, offset, mlen);
595                 break;
596         }
597
598         /*
599          * We're done with the RX
600          */
601         kptllnd_rx_done(rx);
602         return rc;
603 }
604
605 void
606 kptllnd_eq_callback(ptl_event_t *ev)
607 {
608         kptl_eventarg_t *eva = ev->md.user_ptr;
609
610         switch (eva->eva_type) {
611         default:
612                 LBUG();
613                 
614         case PTLLND_EVENTARG_TYPE_MSG:
615         case PTLLND_EVENTARG_TYPE_RDMA:
616                 kptllnd_tx_callback(ev);
617                 break;
618                 
619         case PTLLND_EVENTARG_TYPE_BUF:
620                 kptllnd_rx_buffer_callback(ev);
621                 break;
622         }
623 }
624
625 void
626 kptllnd_thread_fini (void)
627 {
628         atomic_dec(&kptllnd_data.kptl_nthreads);
629 }
630
631 int
632 kptllnd_thread_start (int (*fn)(void *arg), void *arg)
633 {
634         long                pid;
635
636         atomic_inc(&kptllnd_data.kptl_nthreads);
637
638         pid = kernel_thread (fn, arg, 0);
639         if (pid >= 0)
640                 return 0;
641         
642         CERROR("Failed to start kernel_thread: error %d\n", (int)pid);
643         kptllnd_thread_fini();
644         return (int)pid;
645 }
646
647 int
648 kptllnd_watchdog(void *arg)
649 {
650         int                 id = (long)arg;
651         char                name[16];
652         wait_queue_t        waitlink;
653         int                 stamp = 0;
654         int                 peer_index = 0;
655         unsigned long       deadline = jiffies;
656         int                 timeout;
657         int                 i;
658
659         snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);
660         cfs_daemonize(name);
661         cfs_block_allsigs();
662
663         init_waitqueue_entry(&waitlink, current);
664
665         /* threads shut down in phase 2 after all peers have been destroyed */
666         while (kptllnd_data.kptl_shutdown < 2) {
667
668                 timeout = (int)(deadline - jiffies);
669
670                 if (timeout <= 0) {
671                         const int n = 4;
672                         const int p = 1;
673                         int       chunk = kptllnd_data.kptl_peer_hash_size;
674
675
676                         /* Time to check for RDMA timeouts on a few more
677                          * peers: I do checks every 'p' seconds on a
678                          * proportion of the peer table and I need to check
679                          * every connection 'n' times within a timeout
680                          * interval, to ensure I detect a timeout on any
681                          * connection within (n+1)/n times the timeout
682                          * interval. */
683
684                         if ((*kptllnd_tunables.kptl_timeout) > n * p)
685                                 chunk = (chunk * n * p) /
686                                         (*kptllnd_tunables.kptl_timeout);
687                         if (chunk == 0)
688                                 chunk = 1;
689
690                         for (i = 0; i < chunk; i++) {
691                                 kptllnd_peer_check_bucket(peer_index, stamp);
692                                 peer_index = (peer_index + 1) %
693                                      kptllnd_data.kptl_peer_hash_size;
694                         }
695
696                         deadline += p * HZ;
697                         stamp++;
698                         continue;
699                 }
700
701                 kptllnd_handle_closing_peers();
702
703                 set_current_state(TASK_INTERRUPTIBLE);
704                 add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq,
705                                          &waitlink);
706
707                 schedule_timeout(timeout);
708                 
709                 set_current_state (TASK_RUNNING);
710                 remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
711         }
712
713         kptllnd_thread_fini();
714         CDEBUG(D_NET, "<<<\n");
715         return (0);
716 };
717
718 int
719 kptllnd_scheduler (void *arg)
720 {
721         int                 id = (long)arg;
722         char                name[16];
723         wait_queue_t        waitlink;
724         unsigned long       flags;
725         int                 did_something;
726         int                 counter = 0;
727         kptl_rx_t          *rx;
728         kptl_rx_buffer_t   *rxb;
729         kptl_tx_t          *tx;
730
731         snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);
732         cfs_daemonize(name);
733         cfs_block_allsigs();
734
735         init_waitqueue_entry(&waitlink, current);
736
737         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
738
739         /* threads shut down in phase 2 after all peers have been destroyed */
740         while (kptllnd_data.kptl_shutdown < 2) {
741
742                 did_something = 0;
743
744                 if (!list_empty(&kptllnd_data.kptl_sched_rxq)) {
745                         rx = list_entry (kptllnd_data.kptl_sched_rxq.next,
746                                          kptl_rx_t, rx_list);
747                         list_del(&rx->rx_list);
748                         
749                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
750                                                flags);
751
752                         kptllnd_rx_parse(rx);
753                         did_something = 1;
754
755                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
756                 }
757
758                 if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) {
759                         rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next,
760                                           kptl_rx_buffer_t, rxb_repost_list);
761                         list_del(&rxb->rxb_repost_list);
762
763                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
764                                                flags);
765
766                         kptllnd_rx_buffer_post(rxb);
767                         did_something = 1;
768
769                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
770                 }
771
772                 if (!list_empty(&kptllnd_data.kptl_sched_txq)) {
773                         tx = list_entry (kptllnd_data.kptl_sched_txq.next,
774                                          kptl_tx_t, tx_list);
775                         list_del_init(&tx->tx_list);
776
777                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
778
779                         kptllnd_tx_fini(tx);
780                         did_something = 1;
781
782                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
783                 }
784
785                 if (did_something) {
786                         if (++counter != *kptllnd_tunables.kptl_reschedule_loops)
787                                 continue;
788                 }
789
790                 set_current_state(TASK_INTERRUPTIBLE);
791                 add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq,
792                                          &waitlink);
793                 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
794
795                 if (!did_something)
796                         schedule(); 
797                 else
798                         cond_resched();
799
800                 set_current_state(TASK_RUNNING);
801                 remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink);
802
803                 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
804
805                 counter = 0;
806         }
807
808         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
809
810         kptllnd_thread_fini();
811         return 0;
812 }
813