Whamcloud - gitweb
89456ac61ef968ccdaba53e36820c9963b22715b
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5  *   Author: PJ Kirner <pjkirner@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   This file is confidential source code owned by Cluster File Systems.
11  *   No viewing, modification, compilation, redistribution, or any other
12  *   form of use is permitted except through a signed license agreement.
13  *
14  *   If you have not signed such an agreement, then you have no rights to
15  *   this file.  Please destroy it immediately and contact CFS.
16  *
17  */
18
19 #include "ptllnd.h"
20
21 #ifndef _USING_LUSTRE_PORTALS_
22 int
23 kptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,
24                      int src_niov, struct iovec *src,
25                      unsigned int offset, unsigned int len)
26 {
27         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
28          * for exactly 'len' bytes, and return the number of entries.
29          * NB not destructive to 'src' */
30         unsigned int    frag_len;
31         unsigned int    niov;
32
33         if (len == 0)                           /* no data => */
34                 return (0);                     /* no frags */
35
36         LASSERT (src_niov > 0);
37         while (offset >= src->iov_len) {      /* skip initial frags */
38                 offset -= src->iov_len;
39                 src_niov--;
40                 src++;
41                 LASSERT (src_niov > 0);
42         }
43
44         niov = 1;
45         for (;;) {
46                 LASSERT (src_niov > 0);
47                 LASSERT (niov <= dst_niov);
48
49                 frag_len = src->iov_len - offset;
50                 dst->iov_base = ((char *)src->iov_base) + offset;
51
52                 if (len <= frag_len) {
53                         dst->iov_len = len;
54                         return (niov);
55                 }
56
57                 dst->iov_len = frag_len;
58
59                 len -= frag_len;
60                 dst++;
61                 src++;
62                 niov++;
63                 src_niov--;
64                 offset = 0;
65         }
66 }
67
68 int
69 kptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,
70                       int src_niov, lnet_kiov_t *src,
71                       unsigned int offset, unsigned int len)
72 {
73         /* Initialise 'dst' to the physical addresses of the subset of 'src'
74          * starting at 'offset', for exactly 'len' bytes, and return the number
75          * of entries.  NB not destructive to 'src' */
76         unsigned int    frag_len;
77         unsigned int    niov;
78         __u64           phys_page;
79         __u64           phys;
80
81         if (len == 0)                           /* no data => */
82                 return (0);                     /* no frags */
83
84         LASSERT (src_niov > 0);
85         while (offset >= src->kiov_len) {      /* skip initial frags */
86                 offset -= src->kiov_len;
87                 src_niov--;
88                 src++;
89                 LASSERT (src_niov > 0);
90         }
91
92         niov = 1;
93         for (;;) {
94                 LASSERT (src_niov > 0);
95                 LASSERT (niov <= dst_niov);
96
97                 frag_len = min(src->kiov_len - offset, len);
98                 phys_page = lnet_page2phys(src->kiov_page);
99                 phys = phys_page + src->kiov_offset + offset;
100
101                 LASSERT (sizeof(void *) > 4 || 
102                          (phys <= 0xffffffffULL &&
103                           phys + (frag_len - 1) <= 0xffffffffULL));
104
105                 dst->iov_base = (void *)((unsigned long)phys);
106                 dst->iov_len = frag_len;
107                 
108                 if (frag_len == len)
109                         return niov;
110
111                 len -= frag_len;
112                 dst++;
113                 src++;
114                 niov++;
115                 src_niov--;
116                 offset = 0;
117         }
118 }
119 #endif
120
121 void
122 kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
123                      struct iovec *iov, lnet_kiov_t *kiov,
124                      unsigned int offset, unsigned int nob)
125 {
126         LASSERT (iov == NULL || kiov == NULL);
127         
128         memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
129
130         tx->tx_rdma_md.start     = tx->tx_rdma_frags;
131         tx->tx_rdma_md.user_ptr  = &tx->tx_rdma_eventarg;
132         tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
133         tx->tx_rdma_md.options   = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
134                                    PTL_MD_EVENT_START_DISABLE;
135         switch (tx->tx_type) {
136         default:
137                 LBUG();
138                 
139         case TX_TYPE_PUT_REQUEST:               /* passive: peer gets */
140                 tx->tx_rdma_md.threshold = 1;   /* GET event */
141                 tx->tx_rdma_md.options |= PTL_MD_OP_GET;
142                 break;
143
144         case TX_TYPE_GET_REQUEST:               /* passive: peer puts */
145                 tx->tx_rdma_md.threshold = 1;   /* PUT event */
146                 tx->tx_rdma_md.options |= PTL_MD_OP_PUT;
147                 break;
148                 
149         case TX_TYPE_PUT_RESPONSE:              /* active: I get */
150                 tx->tx_rdma_md.threshold = 2;   /* SEND + REPLY */
151                 break;
152                 
153         case TX_TYPE_GET_RESPONSE:              /* active: I put */
154                 tx->tx_rdma_md.threshold = 1;   /* SEND */
155                 break;
156         }
157
158         if (nob == 0) {
159                 tx->tx_rdma_md.length = 0;
160                 return;
161         }
162
163 #ifdef _USING_LUSTRE_PORTALS_
164         if (iov != NULL) {
165                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
166                 tx->tx_rdma_md.length = 
167                         lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
168                                          niov, iov, offset, nob);
169                 return;
170         }
171
172         /* Cheating OK since ptl_kiov_t == lnet_kiov_t */
173         CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));
174         CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==
175                  offsetof(lnet_kiov_t, kiov_offset));
176         CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==
177                  offsetof(lnet_kiov_t, kiov_page));
178         CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==
179                  offsetof(lnet_kiov_t, kiov_len));
180         
181         tx->tx_rdma_md.options |= PTL_MD_KIOV;
182         tx->tx_rdma_md.length = 
183                 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->kiov,
184                                   niov, kiov, offset, nob);
185 #else
186         if (iov != NULL) {
187                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
188                 tx->tx_rdma_md.length = 
189                         kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
190                                             niov, iov, offset, nob);
191                 return;
192         }
193
194         tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
195         tx->tx_rdma_md.length =
196                 kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
197                                      niov, kiov, offset, nob);
198 #endif
199 }
200
201 int
202 kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
203                     unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
204                     unsigned int offset, int nob)
205 {
206         kptl_tx_t       *tx;
207         ptl_err_t        ptlrc;
208         kptl_msg_t      *rxmsg = rx->rx_msg;
209         kptl_peer_t     *peer = rx->rx_peer;
210         unsigned long    flags;
211         ptl_handle_md_t  mdh;
212
213         LASSERT (type == TX_TYPE_PUT_RESPONSE || 
214                  type == TX_TYPE_GET_RESPONSE);
215
216         tx = kptllnd_get_idle_tx(type);
217         if (tx == NULL) {
218                 CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",
219                         type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",
220                         libcfs_id2str(peer->peer_id));
221                 return -ENOMEM;
222         }
223
224         kptllnd_set_tx_peer(tx, peer);
225         kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);
226
227         ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md, 
228                           PTL_UNLINK, &mdh);
229         if (ptlrc != PTL_OK) {
230                 CERROR("PtlMDBind(%s) failed: %d\n",
231                        libcfs_id2str(peer->peer_id), ptlrc);
232                 tx->tx_status = -EIO;
233                 kptllnd_tx_decref(tx);
234                 return -EIO;
235         }
236         
237         spin_lock_irqsave(&peer->peer_lock, flags);
238
239         tx->tx_lnet_msg = lntmsg;
240         /* lnet_finalize() will be called when tx is torn down, so I must
241          * return success from here on... */
242
243         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
244         tx->tx_rdma_mdh = mdh;
245         tx->tx_active = 1;
246         list_add_tail(&tx->tx_list, &peer->peer_activeq);
247
248         /* peer has now got my ref on 'tx' */
249
250         spin_unlock_irqrestore(&peer->peer_lock, flags);
251
252         if (type == TX_TYPE_GET_RESPONSE)
253                 ptlrc = PtlPut(mdh,
254                                PTL_NOACK_REQ,
255                                rx->rx_initiator,
256                                *kptllnd_tunables.kptl_portal,
257                                0,                     /* acl cookie */
258                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
259                                0,                     /* offset */
260                                (lntmsg != NULL) ?     /* header data */
261                                PTLLND_RDMA_OK :
262                                PTLLND_RDMA_FAIL);
263         else
264                 ptlrc = PtlGet(mdh,
265                                rx->rx_initiator,
266                                *kptllnd_tunables.kptl_portal,
267                                0,                     /* acl cookie */
268                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
269                                0);                    /* offset */
270
271         if (ptlrc != PTL_OK) {
272                 CERROR("Ptl%s failed: %d\n", 
273                        (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get", ptlrc);
274                 
275                 kptllnd_peer_close(peer, -EIO);
276                 /* Everything (including this RDMA) queued on the peer will
277                  * be completed with failure */
278         }
279
280         return 0;
281 }
282
283 int
284 kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
285 {
286         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
287         int               type = lntmsg->msg_type;
288         lnet_process_id_t target = lntmsg->msg_target;
289         int               target_is_router = lntmsg->msg_target_is_router;
290         int               routing = lntmsg->msg_routing;
291         unsigned int      payload_niov = lntmsg->msg_niov;
292         struct iovec     *payload_iov = lntmsg->msg_iov;
293         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
294         unsigned int      payload_offset = lntmsg->msg_offset;
295         unsigned int      payload_nob = lntmsg->msg_len;
296         kptl_tx_t        *tx;
297         int               nob;
298
299         LASSERT (payload_nob == 0 || payload_niov > 0);
300         LASSERT (payload_niov <= LNET_MAX_IOV);
301         LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
302         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
303         LASSERT (!in_interrupt());
304
305         switch (type) {
306         default:
307                 LBUG();
308                 return -EINVAL;
309
310         case LNET_MSG_REPLY:
311         case LNET_MSG_PUT:
312                 /* Is the payload small enough not to need RDMA? */
313                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
314                 if (nob <= *kptllnd_tunables.kptl_max_msg_size)
315                         break;
316
317                 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
318                 if (tx == NULL) {
319                         CERROR("Can't send %s to %s: can't allocate descriptor\n",
320                                lnet_msgtyp2str(type),
321                                libcfs_id2str(target));
322                         return -ENOMEM;
323                 }
324
325                 kptllnd_init_rdma_md(tx, payload_niov, 
326                                      payload_iov, payload_kiov,
327                                      payload_offset, payload_nob);
328
329                 tx->tx_lnet_msg = lntmsg;
330                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
331                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
332                                   sizeof(kptl_rdma_msg_t));
333                 kptllnd_tx_launch(tx, target);
334                 return 0;
335
336         case LNET_MSG_GET:
337                 /* routed gets don't RDMA */
338                 if (target_is_router || routing)
339                         break;
340
341                 /* Is the payload small enough not to need RDMA? */
342                 nob = lntmsg->msg_md->md_length;
343                 nob = offsetof(kptl_msg_t, 
344                                ptlm_u.immediate.kptlim_payload[nob]);
345                 if (nob <= *kptllnd_tunables.kptl_max_msg_size)
346                         break;
347
348                 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
349                 if (tx == NULL) {
350                         CERROR("Can't send GET to %s: can't allocate descriptor\n",
351                                libcfs_id2str(target));
352                         return -ENOMEM;
353                 }
354
355                 tx->tx_lnet_replymsg =
356                         lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg);
357                 if (tx->tx_lnet_replymsg == NULL) {
358                         CERROR("Failed to allocate LNET reply for %s\n",
359                                libcfs_id2str(target));
360                         kptllnd_tx_decref(tx);
361                         return -ENOMEM;
362                 }
363
364                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
365                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
366                                              lntmsg->msg_md->md_iov.iov, NULL,
367                                              0, lntmsg->msg_md->md_length);
368                 else
369                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
370                                              NULL, lntmsg->msg_md->md_iov.kiov,
371                                              0, lntmsg->msg_md->md_length);
372                 
373                 tx->tx_lnet_msg = lntmsg;
374                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
375                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
376                                   sizeof(kptl_rdma_msg_t));
377                 kptllnd_tx_launch(tx, target);
378                 return 0;
379
380         case LNET_MSG_ACK:
381                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
382                 LASSERT (payload_nob == 0);
383                 break;
384         }
385
386         tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
387         if (tx == NULL) {
388                 CERROR("Can't send %s to %s: can't allocate descriptor\n",
389                        lnet_msgtyp2str(type), libcfs_id2str(target));
390                         return -ENOMEM;
391         }
392
393         tx->tx_lnet_msg = lntmsg;
394         tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
395
396         if (payload_kiov != NULL)
397                 lnet_copy_kiov2flat(*kptllnd_tunables.kptl_max_msg_size,
398                                     tx->tx_msg->ptlm_u.immediate.kptlim_payload,
399                                     0,
400                                     payload_niov, payload_kiov,
401                                     payload_offset, payload_nob);
402         else
403                 lnet_copy_iov2flat(*kptllnd_tunables.kptl_max_msg_size,
404                                    tx->tx_msg->ptlm_u.immediate.kptlim_payload,
405                                    0,
406                                    payload_niov, payload_iov,
407                                    payload_offset, payload_nob);
408
409         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
410         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
411         kptllnd_tx_launch(tx, target);
412         return 0;
413 }
414
415 int 
416 kptllnd_eager_recv(struct lnet_ni *ni, void *private,
417                    lnet_msg_t *msg, void **new_privatep)
418 {
419         kptl_rx_t        *rx = private;
420
421         CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);
422
423         /* I have to release my ref on rxb (if I have one) to ensure I'm an
424          * eager receiver, so I copy the incoming request from the buffer it
425          * landed in, into space reserved in the descriptor... */
426
427 #if (PTL_MD_LOCAL_ALIGN8 == 0)
428         if (rx->rx_rxb == NULL)                 /* already copied */
429                 return 0;                       /* to fix alignment */
430 #else
431         LASSERT(rx->rx_rxb != NULL);
432 #endif
433         LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);
434
435         memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);
436         rx->rx_msg = (kptl_msg_t *)rx->rx_space;
437
438         kptllnd_rx_buffer_decref(rx->rx_rxb);
439         rx->rx_rxb = NULL;
440
441         return 0;
442 }
443
444
445 int 
446 kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
447               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
448               unsigned int offset, unsigned int mlen, unsigned int rlen)
449 {
450         kptl_rx_t    *rx = private;
451         kptl_msg_t   *rxmsg = rx->rx_msg;
452         int           nob;
453         int           rc;
454
455         CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",
456                kptllnd_msgtype2str(rxmsg->ptlm_type),
457                niov, offset, mlen, rlen);
458
459         LASSERT (mlen <= rlen);
460         LASSERT (mlen >= 0);
461         LASSERT (!in_interrupt());
462         LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
463         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
464
465 #ifdef CRAY_XT3
466         if (lntmsg != NULL &&
467             rx->rx_uid != 0) {
468                 /* Set the UID if the sender's uid isn't 0; i.e. non-root
469                  * running in userspace (e.g. a catamount node; linux kernel
470                  * senders, including routers have uid 0).  If this is a lustre
471                  * RPC request, this tells lustre not to trust the creds in the
472                  * RPC message body. */
473                 lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
474         }
475 #endif
476         switch(rxmsg->ptlm_type)
477         {
478         default:
479                 LBUG();
480                 rc = -EINVAL;
481                 break;
482
483         case PTLLND_MSG_TYPE_IMMEDIATE:
484                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);
485
486                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
487                 if (nob > rx->rx_nob) {
488                         CERROR ("Immediate message from %s too big: %d(%d)\n",
489                                 libcfs_id2str(rx->rx_peer->peer_id), nob,
490                                 rx->rx_nob);
491                         rc = -EINVAL;
492                         break;
493                 }
494
495                 if (kiov != NULL)
496                         lnet_copy_flat2kiov(
497                                 niov, kiov, offset,
498                                 *kptllnd_tunables.kptl_max_msg_size,
499                                 rxmsg->ptlm_u.immediate.kptlim_payload,
500                                 0,
501                                 mlen);
502                 else
503                         lnet_copy_flat2iov(
504                                 niov, iov, offset,
505                                 *kptllnd_tunables.kptl_max_msg_size,
506                                 rxmsg->ptlm_u.immediate.kptlim_payload,
507                                 0,
508                                 mlen);
509
510                 lnet_finalize (ni, lntmsg, 0);
511                 rc = 0;
512                 break;
513
514         case PTLLND_MSG_TYPE_GET:
515                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);
516
517                 /* NB always send RDMA so the peer can complete.  I send
518                  * success/failure in the portals 'hdr_data' */
519
520                 if (lntmsg == NULL)
521                         rc = kptllnd_active_rdma(rx, NULL,
522                                                  TX_TYPE_GET_RESPONSE,
523                                                  0, NULL, NULL, 0, 0);
524                 else
525                         rc = kptllnd_active_rdma(rx, lntmsg, 
526                                                  TX_TYPE_GET_RESPONSE,
527                                                  lntmsg->msg_niov,
528                                                  lntmsg->msg_iov, 
529                                                  lntmsg->msg_kiov,
530                                                  lntmsg->msg_offset, 
531                                                  lntmsg->msg_len);
532                 break;
533
534         case PTLLND_MSG_TYPE_PUT:
535                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);
536
537                 /* NB always send RDMA so the peer can complete; it'll be 0
538                  * bytes if there was no match (lntmsg == NULL). I have no way
539                  * to let my peer know this, but she's only interested in when
540                  * the net has stopped accessing her buffer in any case. */
541
542                 rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,
543                                          niov, iov, kiov, offset, mlen);
544                 break;
545         }
546
547         /*
548          * We're done with the RX
549          */
550         kptllnd_rx_done(rx);
551         return rc;
552 }
553
554 void
555 kptllnd_eq_callback(ptl_event_t *ev)
556 {
557         kptl_eventarg_t *eva = ev->md.user_ptr;
558
559         switch (eva->eva_type) {
560         default:
561                 LBUG();
562                 
563         case PTLLND_EVENTARG_TYPE_MSG:
564         case PTLLND_EVENTARG_TYPE_RDMA:
565                 kptllnd_tx_callback(ev);
566                 break;
567                 
568         case PTLLND_EVENTARG_TYPE_BUF:
569                 kptllnd_rx_buffer_callback(ev);
570                 break;
571         }
572 }
573
574 void
575 kptllnd_thread_fini (void)
576 {
577         atomic_dec(&kptllnd_data.kptl_nthreads);
578 }
579
580 int
581 kptllnd_thread_start (int (*fn)(void *arg), void *arg)
582 {
583         long                pid;
584
585         atomic_inc(&kptllnd_data.kptl_nthreads);
586
587         pid = kernel_thread (fn, arg, 0);
588         if (pid >= 0)
589                 return 0;
590         
591         CERROR("Failed to start kernel_thread: error %d\n", (int)pid);
592         kptllnd_thread_fini();
593         return (int)pid;
594 }
595
596 int
597 kptllnd_watchdog(void *arg)
598 {
599         int                 id = (long)arg;
600         char                name[16];
601         wait_queue_t        waitlink;
602         int                 peer_index = 0;
603         unsigned long       deadline = jiffies;
604         int                 timeout;
605         int                 i;
606
607         snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);
608         cfs_daemonize(name);
609         cfs_block_allsigs();
610
611         init_waitqueue_entry(&waitlink, current);
612
613         /* threads shut down in phase 2 after all peers have been destroyed */
614         while (kptllnd_data.kptl_shutdown < 2) {
615
616                 timeout = (int)(deadline - jiffies);
617                 
618                 if (timeout <= 0) {
619                         const int n = 4;
620                         const int p = 1;
621                         int       chunk = kptllnd_data.kptl_peer_hash_size;
622
623
624                         /* Time to check for RDMA timeouts on a few more
625                          * peers: I do checks every 'p' seconds on a
626                          * proportion of the peer table and I need to check
627                          * every connection 'n' times within a timeout
628                          * interval, to ensure I detect a timeout on any
629                          * connection within (n+1)/n times the timeout
630                          * interval. */
631
632                         if ((*kptllnd_tunables.kptl_timeout) > n * p)
633                                 chunk = (chunk * n * p) /
634                                         (*kptllnd_tunables.kptl_timeout);
635                         if (chunk == 0)
636                                 chunk = 1;
637
638                         for (i = 0; i < chunk; i++) {
639                                 kptllnd_peer_check_bucket(peer_index);
640                                 peer_index = (peer_index + 1) %
641                                      kptllnd_data.kptl_peer_hash_size;
642                         }
643
644                         deadline += p * HZ;
645                         continue;
646                 }
647
648                 kptllnd_handle_closing_peers();
649
650                 set_current_state(TASK_INTERRUPTIBLE);
651                 add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq,
652                                          &waitlink);
653
654                 schedule_timeout(timeout);
655                 
656                 set_current_state (TASK_RUNNING);
657                 remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
658         }
659
660         kptllnd_thread_fini();
661         CDEBUG(D_NET, "<<<\n");
662         return (0);
663 };
664
665 int
666 kptllnd_scheduler (void *arg)
667 {
668         int                 id = (long)arg;
669         char                name[16];
670         wait_queue_t        waitlink;
671         unsigned long       flags;
672         int                 did_something;
673         int                 counter = 0;
674         kptl_rx_t          *rx;
675         kptl_rx_buffer_t   *rxb;
676         kptl_tx_t          *tx;
677
678         snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);
679         cfs_daemonize(name);
680         cfs_block_allsigs();
681
682         init_waitqueue_entry(&waitlink, current);
683
684         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
685
686         /* threads shut down in phase 2 after all peers have been destroyed */
687         while (kptllnd_data.kptl_shutdown < 2) {
688
689                 did_something = 0;
690
691                 if (!list_empty(&kptllnd_data.kptl_sched_rxq)) {
692                         rx = list_entry (kptllnd_data.kptl_sched_rxq.next,
693                                          kptl_rx_t, rx_list);
694                         list_del(&rx->rx_list);
695                         
696                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
697                                                flags);
698
699                         kptllnd_rx_parse(rx);
700                         did_something = 1;
701
702                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
703                 }
704
705                 if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) {
706                         rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next,
707                                           kptl_rx_buffer_t, rxb_repost_list);
708                         list_del(&rxb->rxb_repost_list);
709
710                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
711                                                flags);
712
713                         kptllnd_rx_buffer_post(rxb);
714                         did_something = 1;
715
716                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
717                 }
718
719                 if (!list_empty(&kptllnd_data.kptl_sched_txq)) {
720                         tx = list_entry (kptllnd_data.kptl_sched_txq.next,
721                                          kptl_tx_t, tx_list);
722                         list_del_init(&tx->tx_list);
723
724                         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
725
726                         kptllnd_tx_fini(tx);
727                         did_something = 1;
728
729                         spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
730                 }
731
732                 if (did_something) {
733                         if (++counter != *kptllnd_tunables.kptl_reschedule_loops)
734                                 continue;
735                 }
736
737                 set_current_state(TASK_INTERRUPTIBLE);
738                 add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq,
739                                          &waitlink);
740                 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
741
742                 if (!did_something)
743                         schedule(); 
744                 else
745                         cond_resched();
746
747                 set_current_state(TASK_RUNNING);
748                 remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink);
749
750                 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
751
752                 counter = 0;
753         }
754
755         spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
756
757         kptllnd_thread_fini();
758         return 0;
759 }
760