Whamcloud - gitweb
Severity : major
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 #ifndef _USING_LUSTRE_PORTALS_
22 int
23 kptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,
24                      int src_niov, struct iovec *src,
25                      unsigned int offset, unsigned int len)
26 {
27         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
28          * for exactly 'len' bytes, and return the number of entries.
29          * NB not destructive to 'src' */
30         unsigned int    frag_len;
31         unsigned int    niov;
32
33         if (len == 0)                           /* no data => */
34                 return (0);                     /* no frags */
35
36         LASSERT (src_niov > 0);
37         while (offset >= src->iov_len) {      /* skip initial frags */
38                 offset -= src->iov_len;
39                 src_niov--;
40                 src++;
41                 LASSERT (src_niov > 0);
42         }
43
44         niov = 1;
45         for (;;) {
46                 LASSERT (src_niov > 0);
47                 LASSERT (niov <= dst_niov);
48
49                 frag_len = src->iov_len - offset;
50                 dst->iov_base = ((char *)src->iov_base) + offset;
51
52                 if (len <= frag_len) {
53                         dst->iov_len = len;
54                         return (niov);
55                 }
56
57                 dst->iov_len = frag_len;
58
59                 len -= frag_len;
60                 dst++;
61                 src++;
62                 niov++;
63                 src_niov--;
64                 offset = 0;
65         }
66 }
67
68 int
69 kptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,
70                       int src_niov, lnet_kiov_t *src,
71                       unsigned int offset, unsigned int len)
72 {
73         /* Initialise 'dst' to the physical addresses of the subset of 'src'
74          * starting at 'offset', for exactly 'len' bytes, and return the number
75          * of entries.  NB not destructive to 'src' */
76         unsigned int    frag_len;
77         unsigned int    niov;
78         __u64           phys_page;
79         __u64           phys;
80
81         if (len == 0)                           /* no data => */
82                 return (0);                     /* no frags */
83
84         LASSERT (src_niov > 0);
85         while (offset >= src->kiov_len) {      /* skip initial frags */
86                 offset -= src->kiov_len;
87                 src_niov--;
88                 src++;
89                 LASSERT (src_niov > 0);
90         }
91
92         niov = 1;
93         for (;;) {
94                 LASSERT (src_niov > 0);
95                 LASSERT (niov <= dst_niov);
96
97                 frag_len = min(src->kiov_len - offset, len);
98                 phys_page = lnet_page2phys(src->kiov_page);
99                 phys = phys_page + src->kiov_offset + offset;
100
101                 LASSERT (sizeof(void *) > 4 || 
102                          (phys <= 0xffffffffULL &&
103                           phys + (frag_len - 1) <= 0xffffffffULL));
104
105                 dst->iov_base = (void *)((unsigned long)phys);
106                 dst->iov_len = frag_len;
107                 
108                 if (frag_len == len)
109                         return niov;
110
111                 len -= frag_len;
112                 dst++;
113                 src++;
114                 niov++;
115                 src_niov--;
116                 offset = 0;
117         }
118 }
119 #endif
120
121 void
122 kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
123                      struct iovec *iov, lnet_kiov_t *kiov,
124                      unsigned int offset, unsigned int nob)
125 {
126         LASSERT (iov == NULL || kiov == NULL);
127         
128         memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
129
130         tx->tx_rdma_md.start     = tx->tx_frags;
131         tx->tx_rdma_md.user_ptr  = &tx->tx_rdma_eventarg;
132         tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
133         tx->tx_rdma_md.options   = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
134                                    PTL_MD_EVENT_START_DISABLE;
135         switch (tx->tx_type) {
136         default:
137                 LBUG();
138                 
139         case TX_TYPE_PUT_REQUEST:               /* passive: peer gets */
140                 tx->tx_rdma_md.threshold = 1;   /* GET event */
141                 tx->tx_rdma_md.options |= PTL_MD_OP_GET;
142                 break;
143
144         case TX_TYPE_GET_REQUEST:               /* passive: peer puts */
145                 tx->tx_rdma_md.threshold = 1;   /* PUT event */
146                 tx->tx_rdma_md.options |= PTL_MD_OP_PUT;
147                 break;
148                 
149         case TX_TYPE_PUT_RESPONSE:              /* active: I get */
150                 tx->tx_rdma_md.threshold = 2;   /* SEND + REPLY */
151                 break;
152                 
153         case TX_TYPE_GET_RESPONSE:              /* active: I put */
154                 tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1;   /* SEND + ACK? */
155                 break;
156         }
157
158         if (nob == 0) {
159                 tx->tx_rdma_md.length = 0;
160                 return;
161         }
162
163 #ifdef _USING_LUSTRE_PORTALS_
164         if (iov != NULL) {
165                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
166                 tx->tx_rdma_md.length = 
167                         lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
168                                          niov, iov, offset, nob);
169                 return;
170         }
171
172         /* Cheating OK since ptl_kiov_t == lnet_kiov_t */
173         CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));
174         CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==
175                  offsetof(lnet_kiov_t, kiov_offset));
176         CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==
177                  offsetof(lnet_kiov_t, kiov_page));
178         CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==
179                  offsetof(lnet_kiov_t, kiov_len));
180         
181         tx->tx_rdma_md.options |= PTL_MD_KIOV;
182         tx->tx_rdma_md.length = 
183                 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
184                                   niov, kiov, offset, nob);
185 #else
186         if (iov != NULL) {
187                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
188                 tx->tx_rdma_md.length = 
189                         kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
190                                             niov, iov, offset, nob);
191                 return;
192         }
193
194         tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
195         tx->tx_rdma_md.length =
196                 kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
197                                      niov, kiov, offset, nob);
198 #endif
199 }
200
201 int
202 kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
203                     unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
204                     unsigned int offset, int nob)
205 {
206         kptl_tx_t       *tx;
207         ptl_err_t        ptlrc;
208         kptl_msg_t      *rxmsg = rx->rx_msg;
209         kptl_peer_t     *peer = rx->rx_peer;
210         unsigned long    flags;
211         ptl_handle_md_t  mdh;
212
213         LASSERT (type == TX_TYPE_PUT_RESPONSE || 
214                  type == TX_TYPE_GET_RESPONSE);
215
216         tx = kptllnd_get_idle_tx(type);
217         if (tx == NULL) {
218                 CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",
219                         type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",
220                         libcfs_id2str(peer->peer_id));
221                 return -ENOMEM;
222         }
223
224         kptllnd_set_tx_peer(tx, peer);
225         kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);
226
227         ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md, 
228                           PTL_UNLINK, &mdh);
229         if (ptlrc != PTL_OK) {
230                 CERROR("PtlMDBind(%s) failed: %d\n",
231                        libcfs_id2str(peer->peer_id), ptlrc);
232                 tx->tx_status = -EIO;
233                 kptllnd_tx_decref(tx);
234                 return -EIO;
235         }
236         
237         spin_lock_irqsave(&peer->peer_lock, flags);
238
239         tx->tx_lnet_msg = lntmsg;
240         /* lnet_finalize() will be called when tx is torn down, so I must
241          * return success from here on... */
242
243         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
244         tx->tx_rdma_mdh = mdh;
245         tx->tx_active = 1;
246         list_add_tail(&tx->tx_list, &peer->peer_activeq);
247
248         /* peer has now got my ref on 'tx' */
249
250         spin_unlock_irqrestore(&peer->peer_lock, flags);
251
252         tx->tx_tposted = jiffies;
253
254         if (type == TX_TYPE_GET_RESPONSE)
255                 ptlrc = PtlPut(mdh,
256                                tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
257                                rx->rx_initiator,
258                                *kptllnd_tunables.kptl_portal,
259                                0,                     /* acl cookie */
260                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
261                                0,                     /* offset */
262                                (lntmsg != NULL) ?     /* header data */
263                                PTLLND_RDMA_OK :
264                                PTLLND_RDMA_FAIL);
265         else
266                 ptlrc = PtlGet(mdh,
267                                rx->rx_initiator,
268                                *kptllnd_tunables.kptl_portal,
269                                0,                     /* acl cookie */
270                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
271                                0);                    /* offset */
272
273         if (ptlrc != PTL_OK) {
274                 CERROR("Ptl%s failed: %d\n", 
275                        (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get", ptlrc);
276                 
277                 kptllnd_peer_close(peer, -EIO);
278                 /* Everything (including this RDMA) queued on the peer will
279                  * be completed with failure */
280         }
281
282         return 0;
283 }
284
285 int
286 kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
287 {
288         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
289         int               type = lntmsg->msg_type;
290         lnet_process_id_t target = lntmsg->msg_target;
291         int               target_is_router = lntmsg->msg_target_is_router;
292         int               routing = lntmsg->msg_routing;
293         unsigned int      payload_niov = lntmsg->msg_niov;
294         struct iovec     *payload_iov = lntmsg->msg_iov;
295         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
296         unsigned int      payload_offset = lntmsg->msg_offset;
297         unsigned int      payload_nob = lntmsg->msg_len;
298         kptl_peer_t      *peer;
299         kptl_tx_t        *tx;
300         int               nob;
301         int               nfrag;
302         int               rc;
303
304         LASSERT (payload_nob == 0 || payload_niov > 0);
305         LASSERT (payload_niov <= LNET_MAX_IOV);
306         LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
307         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
308         LASSERT (!in_interrupt());
309
310         rc = kptllnd_find_target(&peer, target);
311         if (rc != 0)
312                 return rc;
313         
314         switch (type) {
315         default:
316                 LBUG();
317                 return -EINVAL;
318
319         case LNET_MSG_REPLY:
320         case LNET_MSG_PUT:
321                 /* Should the payload avoid RDMA? */
322                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
323                 if (payload_kiov == NULL && 
324                     nob <= peer->peer_max_msg_size)
325                         break;
326
327                 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
328                 if (tx == NULL) {
329                         CERROR("Can't send %s to %s: can't allocate descriptor\n",
330                                lnet_msgtyp2str(type),
331                                libcfs_id2str(target));
332                         rc = -ENOMEM;
333                         goto out;
334                 }
335
336                 kptllnd_init_rdma_md(tx, payload_niov, 
337                                      payload_iov, payload_kiov,
338                                      payload_offset, payload_nob);
339
340                 tx->tx_lnet_msg = lntmsg;
341                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
342                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
343                                   sizeof(kptl_rdma_msg_t));
344
345                 CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
346                        libcfs_id2str(target),
347                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
348
349                 kptllnd_tx_launch(peer, tx, 0);
350                 goto out;
351
352         case LNET_MSG_GET:
353                 /* routed gets don't RDMA */
354                 if (target_is_router || routing)
355                         break;
356
357                 /* Is the payload small enough not to need RDMA? */
358                 nob = lntmsg->msg_md->md_length;
359                 nob = offsetof(kptl_msg_t, 
360                                ptlm_u.immediate.kptlim_payload[nob]);
361                 if (nob <= peer->peer_max_msg_size)
362                         break;
363
364                 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
365                 if (tx == NULL) {
366                         CERROR("Can't send GET to %s: can't allocate descriptor\n",
367                                libcfs_id2str(target));
368                         rc = -ENOMEM;
369                         goto out;
370                 }
371
372                 tx->tx_lnet_replymsg =
373                         lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg);
374                 if (tx->tx_lnet_replymsg == NULL) {
375                         CERROR("Failed to allocate LNET reply for %s\n",
376                                libcfs_id2str(target));
377                         kptllnd_tx_decref(tx);
378                         rc = -ENOMEM;
379                         goto out;
380                 }
381
382                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
383                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
384                                              lntmsg->msg_md->md_iov.iov, NULL,
385                                              0, lntmsg->msg_md->md_length);
386                 else
387                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
388                                              NULL, lntmsg->msg_md->md_iov.kiov,
389                                              0, lntmsg->msg_md->md_length);
390                 
391                 tx->tx_lnet_msg = lntmsg;
392                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
393                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
394                                   sizeof(kptl_rdma_msg_t));
395
396                 CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
397                        libcfs_id2str(target),
398                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
399
400                 kptllnd_tx_launch(peer, tx, 0);
401                 goto out;
402
403         case LNET_MSG_ACK:
404                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
405                 LASSERT (payload_nob == 0);
406                 break;
407         }
408
409         /* I don't have to handle kiovs */
410         LASSERT (payload_nob == 0 || payload_iov != NULL);
411
412         tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
413         if (tx == NULL) {
414                 CERROR("Can't send %s to %s: can't allocate descriptor\n",
415                        lnet_msgtyp2str(type), libcfs_id2str(target));
416                 rc = -ENOMEM;
417                 goto out;
418         }
419
420         tx->tx_lnet_msg = lntmsg;
421         tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
422
423         if (payload_nob == 0) {
424                 nfrag = 0;
425         } else {
426                 tx->tx_frags->iov[0].iov_base = tx->tx_msg;
427                 tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
428                                                         ptlm_u.immediate.kptlim_payload);
429
430                 /* NB relying on lustre not asking for PTL_MD_MAX_IOV
431                  * fragments!! */
432 #ifdef _USING_LUSTRE_PORTALS_
433                 nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1, 
434                                              &tx->tx_frags->iov[1],
435                                              payload_niov, payload_iov,
436                                              payload_offset, payload_nob);
437 #else
438                 nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
439                                                 &tx->tx_frags->iov[1],
440                                                 payload_niov, payload_iov,
441                                                 payload_offset, payload_nob);
442 #endif
443         }
444         
445         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
446         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
447
448         CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
449                libcfs_id2str(target),
450                lnet_msgtyp2str(lntmsg->msg_type),
451                (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ? 
452                le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
453                (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ? 
454                le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
455                tx);
456
457         kptllnd_tx_launch(peer, tx, nfrag);
458
459  out:
460         kptllnd_peer_decref(peer);
461         return rc;
462 }
463
464 int 
465 kptllnd_eager_recv(struct lnet_ni *ni, void *private,
466                    lnet_msg_t *msg, void **new_privatep)
467 {
468         kptl_rx_t        *rx = private;
469
470         CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);
471
472         /* I have to release my ref on rxb (if I have one) to ensure I'm an
473          * eager receiver, so I copy the incoming request from the buffer it
474          * landed in, into space reserved in the descriptor... */
475
476 #if (PTL_MD_LOCAL_ALIGN8 == 0)
477         if (rx->rx_rxb == NULL)                 /* already copied */
478                 return 0;                       /* to fix alignment */
479 #else
480         LASSERT(rx->rx_rxb != NULL);
481 #endif
482         LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);
483
484         memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);
485         rx->rx_msg = (kptl_msg_t *)rx->rx_space;
486
487         kptllnd_rx_buffer_decref(rx->rx_rxb);
488         rx->rx_rxb = NULL;
489
490         return 0;
491 }
492
493
494 int 
495 kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
496               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
497               unsigned int offset, unsigned int mlen, unsigned int rlen)
498 {
499         kptl_rx_t    *rx = private;
500         kptl_msg_t   *rxmsg = rx->rx_msg;
501         int           nob;
502         int           rc;
503
504         CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",
505                kptllnd_msgtype2str(rxmsg->ptlm_type),
506                niov, offset, mlen, rlen);
507
508         LASSERT (mlen <= rlen);
509         LASSERT (mlen >= 0);
510         LASSERT (!in_interrupt());
511         LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
512         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
513
514 #ifdef CRAY_XT3
515         if (lntmsg != NULL &&
516             rx->rx_uid != 0) {
517                 /* Set the UID if the sender's uid isn't 0; i.e. non-root
518                  * running in userspace (e.g. a catamount node; linux kernel
519                  * senders, including routers have uid 0).  If this is a lustre
520                  * RPC request, this tells lustre not to trust the creds in the
521                  * RPC message body. */
522                 lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
523         }
524 #endif
525         switch(rxmsg->ptlm_type)
526         {
527         default:
528                 LBUG();
529                 rc = -EINVAL;
530                 break;
531
532         case PTLLND_MSG_TYPE_IMMEDIATE:
533                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);
534
535                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
536                 if (nob > rx->rx_nob) {
537                         CERROR ("Immediate message from %s too big: %d(%d)\n",
538                                 libcfs_id2str(rx->rx_peer->peer_id), nob,
539                                 rx->rx_nob);
540                         rc = -EINVAL;
541                         break;
542                 }
543
544                 if (kiov != NULL)
545                         lnet_copy_flat2kiov(
546                                 niov, kiov, offset,
547                                 *kptllnd_tunables.kptl_max_msg_size,
548                                 rxmsg->ptlm_u.immediate.kptlim_payload,
549                                 0,
550                                 mlen);
551                 else
552                         lnet_copy_flat2iov(
553                                 niov, iov, offset,
554                                 *kptllnd_tunables.kptl_max_msg_size,
555                                 rxmsg->ptlm_u.immediate.kptlim_payload,
556                                 0,
557                                 mlen);
558
559                 lnet_finalize (ni, lntmsg, 0);
560                 rc = 0;
561                 break;
562
563         case PTLLND_MSG_TYPE_GET:
564                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);
565
566                 /* NB always send RDMA so the peer can complete.  I send
567                  * success/failure in the portals 'hdr_data' */
568
569                 if (lntmsg == NULL)
570                         rc = kptllnd_active_rdma(rx, NULL,
571                                                  TX_TYPE_GET_RESPONSE,
572                                                  0, NULL, NULL, 0, 0);
573                 else
574                         rc = kptllnd_active_rdma(rx, lntmsg, 
575                                                  TX_TYPE_GET_RESPONSE,
576                                                  lntmsg->msg_niov,
577                                                  lntmsg->msg_iov, 
578                                                  lntmsg->msg_kiov,
579                                                  lntmsg->msg_offset, 
580                                                  lntmsg->msg_len);
581                 break;
582
583         case PTLLND_MSG_TYPE_PUT:
584                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);
585
586                 /* NB always send RDMA so the peer can complete; it'll be 0
587                  * bytes if there was no match (lntmsg == NULL). I have no way
588                  * to let my peer know this, but she's only interested in when
589                  * the net has stopped accessing her buffer in any case. */
590
591                 rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,
592                                          niov, iov, kiov, offset, mlen);
593                 break;
594         }
595
596         /*
597          * We're done with the RX
598          */
599         kptllnd_rx_done(rx);
600         return rc;
601 }
602
603 void
604 kptllnd_eq_callback(ptl_event_t *ev)
605 {
606         kptl_eventarg_t *eva = ev->md.user_ptr;
607
608         switch (eva->eva_type) {
609         default:
610                 LBUG();
611                 
612         case PTLLND_EVENTARG_TYPE_MSG:
613         case PTLLND_EVENTARG_TYPE_RDMA:
614                 kptllnd_tx_callback(ev);
615                 break;
616                 
617         case PTLLND_EVENTARG_TYPE_BUF:
618                 kptllnd_rx_buffer_callback(ev);
619                 break;
620         }
621 }
622
623 void
624 kptllnd_thread_fini (void)
625 {
626         atomic_dec(&kptllnd_data.kptl_nthreads);
627 }
628
629 int
630 kptllnd_thread_start (int (*fn)(void *arg), void *arg)
631 {
632         long                pid;
633
634         atomic_inc(&kptllnd_data.kptl_nthreads);
635
636         pid = kernel_thread (fn, arg, 0);
637         if (pid >= 0)
638                 return 0;
639         
640         CERROR("Failed to start kernel_thread: error %d\n", (int)pid);
641         kptllnd_thread_fini();
642         return (int)pid;
643 }
644
645 int
646 kptllnd_watchdog(void *arg)
647 {
648         int                 id = (long)arg;
649         char                name[16];
650         wait_queue_t        waitlink;
651         int                 stamp = 0;
652         int                 peer_index = 0;
653         unsigned long       deadline = jiffies;
654         int                 timeout;
655         int                 i;
656
657         snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);
658         cfs_daemonize(name);
659         cfs_block_allsigs();
660
661         init_waitqueue_entry(&waitlink, current);
662
663         /* threads shut down in phase 2 after all peers have been destroyed */
664         while (kptllnd_data.kptl_shutdown < 2) {
665
666                 timeout = (int)(deadline - jiffies);
667                 
668                 if (timeout <= 0) {
669                         const int n = 4;
670                         const int p = 1;
671                         int       chunk = kptllnd_data.kptl_peer_hash_size;
672
673
674                         /* Time to check for RDMA timeouts on a few more
675                          * peers: I do checks every 'p' seconds on a
676                          * proportion of the peer table and I need to check
677                          * every connection 'n' times within a timeout
678                          * interval, to ensure I detect a timeout on any
679                          * connection within (n+1)/n times the timeout
680                          * interval. */
681
682                         if ((*kptllnd_tunables.kptl_timeout) > n * p)
683                                 chunk = (chunk * n * p) /
684                                         (*kptllnd_tunables.kptl_timeout);
685                         if (chunk == 0)
686                                 chunk = 1;
687
688                         for (i = 0; i < chunk; i++) {
689                                 kptllnd_peer_check_bucket(peer_index, stamp);
690                                 peer_index = (peer_index + 1) %
691                                      kptllnd_data.kptl_peer_hash_size;
692                         }
693
694                         deadline += p * HZ;
695                         stamp++;
696                         continue;
697                 }
698
699                 kptllnd_handle_closing_peers();
700
701                 set_current_state(TASK_INTERRUPTIBLE);
702                 add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq,
703                                          &waitlink);
704
705                 schedule_timeout(timeout);
706                 
707                 set_current_state (TASK_RUNNING);
708                 remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
709         }
710
711         kptllnd_thread_fini();
712         CDEBUG(D_NET, "<<<\n");
713         return (0);
714 };
715
716 int
717 kptllnd_scheduler (void *arg)
718 {
719         int                 id = (long)arg;
720         char                name[16];
721         wait_queue_t        waitlink;
722         unsigned long       flags;
723         int                 did_something;
724         int                 counter = 0;
725         kptl_rx_t          *rx;
726         kptl_rx_buffer_t   *rxb;
727         kptl_tx_t          *tx;
728
729         snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);
730         cfs_daemonize(name);
731         cfs_block_allsigs();
732
733         init_waitqueue_entry(&waitlink, current);
734
735         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
736
737         /* threads shut down in phase 2 after all peers have been destroyed */
738         while (kptllnd_data.kptl_shutdown < 2) {
739
740                 did_something = 0;
741
742                 if (!list_empty(&kptllnd_data.kptl_sched_rxq)) {
743                         rx = list_entry (kptllnd_data.kptl_sched_rxq.next,
744                                          kptl_rx_t, rx_list);
745                         list_del(&rx->rx_list);
746                         
747                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
748                                                flags);
749
750                         kptllnd_rx_parse(rx);
751                         did_something = 1;
752
753                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
754                 }
755
756                 if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) {
757                         rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next,
758                                           kptl_rx_buffer_t, rxb_repost_list);
759                         list_del(&rxb->rxb_repost_list);
760
761                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
762                                                flags);
763
764                         kptllnd_rx_buffer_post(rxb);
765                         did_something = 1;
766
767                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
768                 }
769
770                 if (!list_empty(&kptllnd_data.kptl_sched_txq)) {
771                         tx = list_entry (kptllnd_data.kptl_sched_txq.next,
772                                          kptl_tx_t, tx_list);
773                         list_del_init(&tx->tx_list);
774
775                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
776
777                         kptllnd_tx_fini(tx);
778                         did_something = 1;
779
780                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
781                 }
782
783                 if (did_something) {
784                         if (++counter != *kptllnd_tunables.kptl_reschedule_loops)
785                                 continue;
786                 }
787
788                 set_current_state(TASK_INTERRUPTIBLE);
789                 add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq,
790                                          &waitlink);
791                 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
792
793                 if (!did_something)
794                         schedule(); 
795                 else
796                         cond_resched();
797
798                 set_current_state(TASK_RUNNING);
799                 remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink);
800
801                 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
802
803                 counter = 0;
804         }
805
806         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
807
808         kptllnd_thread_fini();
809         return 0;
810 }
811