Whamcloud - gitweb
ff93136630b735568c7c096867580ba4ff76d37d
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/ptllnd/ptllnd_cb.c
35  *
36  * Author: PJ Kirner <pjkirner@clusterfs.com>
37  */
38
39 #include "ptllnd.h"
40
41 #ifndef _USING_LUSTRE_PORTALS_
42 int
43 kptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,
44                      int src_niov, struct iovec *src,
45                      unsigned int offset, unsigned int len)
46 {
47         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
48          * for exactly 'len' bytes, and return the number of entries.
49          * NB not destructive to 'src' */
50         unsigned int    frag_len;
51         unsigned int    niov;
52
53         if (len == 0)                           /* no data => */
54                 return (0);                     /* no frags */
55
56         LASSERT (src_niov > 0);
57         while (offset >= src->iov_len) {      /* skip initial frags */
58                 offset -= src->iov_len;
59                 src_niov--;
60                 src++;
61                 LASSERT (src_niov > 0);
62         }
63
64         niov = 1;
65         for (;;) {
66                 LASSERT (src_niov > 0);
67                 LASSERT (niov <= dst_niov);
68
69                 frag_len = src->iov_len - offset;
70                 dst->iov_base = ((char *)src->iov_base) + offset;
71
72                 if (len <= frag_len) {
73                         dst->iov_len = len;
74                         return (niov);
75                 }
76
77                 dst->iov_len = frag_len;
78
79                 len -= frag_len;
80                 dst++;
81                 src++;
82                 niov++;
83                 src_niov--;
84                 offset = 0;
85         }
86 }
87
88 int
89 kptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,
90                       int src_niov, lnet_kiov_t *src,
91                       unsigned int offset, unsigned int len)
92 {
93         /* Initialise 'dst' to the physical addresses of the subset of 'src'
94          * starting at 'offset', for exactly 'len' bytes, and return the number
95          * of entries.  NB not destructive to 'src' */
96         unsigned int    frag_len;
97         unsigned int    niov;
98         __u64           phys_page;
99         __u64           phys;
100
101         if (len == 0)                           /* no data => */
102                 return (0);                     /* no frags */
103
104         LASSERT (src_niov > 0);
105         while (offset >= src->kiov_len) {      /* skip initial frags */
106                 offset -= src->kiov_len;
107                 src_niov--;
108                 src++;
109                 LASSERT (src_niov > 0);
110         }
111
112         niov = 1;
113         for (;;) {
114                 LASSERT (src_niov > 0);
115                 LASSERT (niov <= dst_niov);
116
117                 frag_len = min(src->kiov_len - offset, len);
118                 phys_page = lnet_page2phys(src->kiov_page);
119                 phys = phys_page + src->kiov_offset + offset;
120
121                 LASSERT (sizeof(void *) > 4 || 
122                          (phys <= 0xffffffffULL &&
123                           phys + (frag_len - 1) <= 0xffffffffULL));
124
125                 dst->iov_base = (void *)((unsigned long)phys);
126                 dst->iov_len = frag_len;
127                 
128                 if (frag_len == len)
129                         return niov;
130
131                 len -= frag_len;
132                 dst++;
133                 src++;
134                 niov++;
135                 src_niov--;
136                 offset = 0;
137         }
138 }
139 #endif
140
141 void
142 kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
143                      struct iovec *iov, lnet_kiov_t *kiov,
144                      unsigned int offset, unsigned int nob)
145 {
146         LASSERT (iov == NULL || kiov == NULL);
147         
148         memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
149
150         tx->tx_rdma_md.start     = tx->tx_frags;
151         tx->tx_rdma_md.user_ptr  = &tx->tx_rdma_eventarg;
152         tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
153         tx->tx_rdma_md.options   = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
154                                    PTL_MD_EVENT_START_DISABLE;
155         switch (tx->tx_type) {
156         default:
157                 LBUG();
158                 
159         case TX_TYPE_PUT_REQUEST:               /* passive: peer gets */
160                 tx->tx_rdma_md.threshold = 1;   /* GET event */
161                 tx->tx_rdma_md.options |= PTL_MD_OP_GET;
162                 break;
163
164         case TX_TYPE_GET_REQUEST:               /* passive: peer puts */
165                 tx->tx_rdma_md.threshold = 1;   /* PUT event */
166                 tx->tx_rdma_md.options |= PTL_MD_OP_PUT;
167                 break;
168                 
169         case TX_TYPE_PUT_RESPONSE:              /* active: I get */
170                 tx->tx_rdma_md.threshold = 2;   /* SEND + REPLY */
171                 break;
172                 
173         case TX_TYPE_GET_RESPONSE:              /* active: I put */
174                 tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1;   /* SEND + ACK? */
175                 break;
176         }
177
178         if (nob == 0) {
179                 tx->tx_rdma_md.length = 0;
180                 return;
181         }
182
183 #ifdef _USING_LUSTRE_PORTALS_
184         if (iov != NULL) {
185                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
186                 tx->tx_rdma_md.length = 
187                         lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
188                                          niov, iov, offset, nob);
189                 return;
190         }
191
192         /* Cheating OK since ptl_kiov_t == lnet_kiov_t */
193         CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));
194         CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==
195                  offsetof(lnet_kiov_t, kiov_offset));
196         CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==
197                  offsetof(lnet_kiov_t, kiov_page));
198         CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==
199                  offsetof(lnet_kiov_t, kiov_len));
200         
201         tx->tx_rdma_md.options |= PTL_MD_KIOV;
202         tx->tx_rdma_md.length = 
203                 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
204                                   niov, kiov, offset, nob);
205 #else
206         if (iov != NULL) {
207                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
208                 tx->tx_rdma_md.length = 
209                         kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
210                                             niov, iov, offset, nob);
211                 return;
212         }
213
214         tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
215         tx->tx_rdma_md.length =
216                 kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
217                                      niov, kiov, offset, nob);
218 #endif
219 }
220
221 int
222 kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
223                     unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
224                     unsigned int offset, int nob)
225 {
226         kptl_tx_t       *tx;
227         ptl_err_t        ptlrc;
228         kptl_msg_t      *rxmsg = rx->rx_msg;
229         kptl_peer_t     *peer = rx->rx_peer;
230         unsigned long    flags;
231         ptl_handle_md_t  mdh;
232
233         LASSERT (type == TX_TYPE_PUT_RESPONSE || 
234                  type == TX_TYPE_GET_RESPONSE);
235
236         tx = kptllnd_get_idle_tx(type);
237         if (tx == NULL) {
238                 CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",
239                         type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",
240                         libcfs_id2str(peer->peer_id));
241                 return -ENOMEM;
242         }
243
244         kptllnd_set_tx_peer(tx, peer);
245         kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);
246
247         ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md, 
248                           PTL_UNLINK, &mdh);
249         if (ptlrc != PTL_OK) {
250                 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
251                        libcfs_id2str(peer->peer_id),
252                        kptllnd_errtype2str(ptlrc), ptlrc);
253                 tx->tx_status = -EIO;
254                 kptllnd_tx_decref(tx);
255                 return -EIO;
256         }
257
258         cfs_spin_lock_irqsave(&peer->peer_lock, flags);
259
260         tx->tx_lnet_msg = lntmsg;
261         /* lnet_finalize() will be called when tx is torn down, so I must
262          * return success from here on... */
263
264         tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
265         tx->tx_rdma_mdh = mdh;
266         tx->tx_active = 1;
267         cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
268
269         /* peer has now got my ref on 'tx' */
270
271         cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
272
273         tx->tx_tposted = jiffies;
274
275         if (type == TX_TYPE_GET_RESPONSE)
276                 ptlrc = PtlPut(mdh,
277                                tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
278                                rx->rx_initiator,
279                                *kptllnd_tunables.kptl_portal,
280                                0,                     /* acl cookie */
281                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
282                                0,                     /* offset */
283                                (lntmsg != NULL) ?     /* header data */
284                                PTLLND_RDMA_OK :
285                                PTLLND_RDMA_FAIL);
286         else
287                 ptlrc = PtlGet(mdh,
288                                rx->rx_initiator,
289                                *kptllnd_tunables.kptl_portal,
290                                0,                     /* acl cookie */
291                                rxmsg->ptlm_u.rdma.kptlrm_matchbits,
292                                0);                    /* offset */
293
294         if (ptlrc != PTL_OK) {
295                 CERROR("Ptl%s failed: %s(%d)\n", 
296                        (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
297                        kptllnd_errtype2str(ptlrc), ptlrc);
298                 
299                 kptllnd_peer_close(peer, -EIO);
300                 /* Everything (including this RDMA) queued on the peer will
301                  * be completed with failure */
302                 kptllnd_schedule_ptltrace_dump();
303         }
304
305         return 0;
306 }
307
308 int
309 kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
310 {
311         lnet_hdr_t       *hdr = &lntmsg->msg_hdr;
312         int               type = lntmsg->msg_type;
313         lnet_process_id_t target = lntmsg->msg_target;
314         int               target_is_router = lntmsg->msg_target_is_router;
315         int               routing = lntmsg->msg_routing;
316         unsigned int      payload_niov = lntmsg->msg_niov;
317         struct iovec     *payload_iov = lntmsg->msg_iov;
318         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
319         unsigned int      payload_offset = lntmsg->msg_offset;
320         unsigned int      payload_nob = lntmsg->msg_len;
321         kptl_net_t       *net = ni->ni_data;
322         kptl_peer_t      *peer = NULL;
323         int               mpflag = 0;
324         kptl_tx_t        *tx;
325         int               nob;
326         int               nfrag;
327         int               rc;
328
329         LASSERT (net->net_ni == ni);
330         LASSERT (!net->net_shutdown);
331         LASSERT (payload_nob == 0 || payload_niov > 0);
332         LASSERT (payload_niov <= LNET_MAX_IOV);
333         LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
334         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
335         LASSERT (!cfs_in_interrupt());
336
337         if (lntmsg->msg_vmflush)
338                 mpflag = cfs_memory_pressure_get_and_set();
339
340         rc = kptllnd_find_target(net, target, &peer);
341         if (rc != 0)
342                 goto out;
343
344         /* NB peer->peer_id does NOT always equal target, be careful with
345          * which one to use */
346         switch (type) {
347         default:
348                 LBUG();
349                 return -EINVAL;
350
351         case LNET_MSG_REPLY:
352         case LNET_MSG_PUT:
353                 /* Should the payload avoid RDMA? */
354                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
355                 if (payload_kiov == NULL && 
356                     nob <= peer->peer_max_msg_size)
357                         break;
358
359                 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
360                 if (tx == NULL) {
361                         CERROR("Can't send %s to %s: can't allocate descriptor\n",
362                                lnet_msgtyp2str(type),
363                                libcfs_id2str(target));
364                         rc = -ENOMEM;
365                         goto out;
366                 }
367
368                 kptllnd_init_rdma_md(tx, payload_niov, 
369                                      payload_iov, payload_kiov,
370                                      payload_offset, payload_nob);
371
372                 tx->tx_lnet_msg = lntmsg;
373                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
374                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
375                                  target, sizeof(kptl_rdma_msg_t));
376
377                 CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
378                        libcfs_id2str(target),
379                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
380
381                 kptllnd_tx_launch(peer, tx, 0);
382                 goto out;
383
384         case LNET_MSG_GET:
385                 /* routed gets don't RDMA */
386                 if (target_is_router || routing)
387                         break;
388
389                 /* Is the payload small enough not to need RDMA? */
390                 nob = lntmsg->msg_md->md_length;
391                 nob = offsetof(kptl_msg_t, 
392                                ptlm_u.immediate.kptlim_payload[nob]);
393                 if (nob <= peer->peer_max_msg_size)
394                         break;
395
396                 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
397                 if (tx == NULL) {
398                         CERROR("Can't send GET to %s: can't allocate descriptor\n",
399                                libcfs_id2str(target));
400                         rc = -ENOMEM;
401                         goto out;
402                 }
403
404                 tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg);
405                 if (tx->tx_lnet_replymsg == NULL) {
406                         CERROR("Failed to allocate LNET reply for %s\n",
407                                libcfs_id2str(target));
408                         kptllnd_tx_decref(tx);
409                         rc = -ENOMEM;
410                         goto out;
411                 }
412
413                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
414                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
415                                              lntmsg->msg_md->md_iov.iov, NULL,
416                                              0, lntmsg->msg_md->md_length);
417                 else
418                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
419                                              NULL, lntmsg->msg_md->md_iov.kiov,
420                                              0, lntmsg->msg_md->md_length);
421
422                 tx->tx_lnet_msg = lntmsg;
423                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
424                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
425                                  target, sizeof(kptl_rdma_msg_t));
426
427                 CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
428                        libcfs_id2str(target),
429                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
430
431                 kptllnd_tx_launch(peer, tx, 0);
432                 goto out;
433
434         case LNET_MSG_ACK:
435                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
436                 LASSERT (payload_nob == 0);
437                 break;
438         }
439
440         /* I don't have to handle kiovs */
441         LASSERT (payload_nob == 0 || payload_iov != NULL);
442
443         tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
444         if (tx == NULL) {
445                 CERROR("Can't send %s to %s: can't allocate descriptor\n",
446                        lnet_msgtyp2str(type), libcfs_id2str(target));
447                 rc = -ENOMEM;
448                 goto out;
449         }
450
451         tx->tx_lnet_msg = lntmsg;
452         tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
453
454         if (payload_nob == 0) {
455                 nfrag = 0;
456         } else {
457                 tx->tx_frags->iov[0].iov_base = tx->tx_msg;
458                 tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
459                                                         ptlm_u.immediate.kptlim_payload);
460
461                 /* NB relying on lustre not asking for PTL_MD_MAX_IOV
462                  * fragments!! */
463 #ifdef _USING_LUSTRE_PORTALS_
464                 nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1, 
465                                              &tx->tx_frags->iov[1],
466                                              payload_niov, payload_iov,
467                                              payload_offset, payload_nob);
468 #else
469                 nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
470                                                 &tx->tx_frags->iov[1],
471                                                 payload_niov, payload_iov,
472                                                 payload_offset, payload_nob);
473 #endif
474         }
475
476         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
477         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob);
478
479         CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
480                libcfs_id2str(target),
481                lnet_msgtyp2str(lntmsg->msg_type),
482                (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ? 
483                le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
484                (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ? 
485                le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
486                tx);
487
488         kptllnd_tx_launch(peer, tx, nfrag);
489
490  out:
491         if (lntmsg->msg_vmflush)
492                 cfs_memory_pressure_restore(mpflag);
493         if (peer)
494                 kptllnd_peer_decref(peer);
495         return rc;
496 }
497
498 int 
499 kptllnd_eager_recv(struct lnet_ni *ni, void *private,
500                    lnet_msg_t *msg, void **new_privatep)
501 {
502         kptl_rx_t        *rx = private;
503
504         CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);
505
506         /* I have to release my ref on rxb (if I have one) to ensure I'm an
507          * eager receiver, so I copy the incoming request from the buffer it
508          * landed in, into space reserved in the descriptor... */
509
510 #if (PTL_MD_LOCAL_ALIGN8 == 0)
511         if (rx->rx_rxb == NULL)                 /* already copied */
512                 return 0;                       /* to fix alignment */
513 #else
514         LASSERT(rx->rx_rxb != NULL);
515 #endif
516         LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);
517
518         memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);
519         rx->rx_msg = (kptl_msg_t *)rx->rx_space;
520
521         kptllnd_rx_buffer_decref(rx->rx_rxb);
522         rx->rx_rxb = NULL;
523
524         return 0;
525 }
526
527
528 int 
529 kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
530               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
531               unsigned int offset, unsigned int mlen, unsigned int rlen)
532 {
533         kptl_rx_t    *rx = private;
534         kptl_msg_t   *rxmsg = rx->rx_msg;
535         int           nob;
536         int           rc;
537
538         CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",
539                kptllnd_msgtype2str(rxmsg->ptlm_type),
540                niov, offset, mlen, rlen);
541
542         LASSERT (mlen <= rlen);
543         LASSERT (mlen >= 0);
544         LASSERT (!cfs_in_interrupt());
545         LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
546         LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */
547
548 #ifdef CRAY_XT3
549         if (lntmsg != NULL &&
550             rx->rx_uid != 0) {
551                 /* Set the UID if the sender's uid isn't 0; i.e. non-root
552                  * running in userspace (e.g. a catamount node; linux kernel
553                  * senders, including routers have uid 0).  If this is a lustre
554                  * RPC request, this tells lustre not to trust the creds in the
555                  * RPC message body. */
556                 lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
557         }
558 #endif
559         switch(rxmsg->ptlm_type)
560         {
561         default:
562                 LBUG();
563                 rc = -EINVAL;
564                 break;
565
566         case PTLLND_MSG_TYPE_IMMEDIATE:
567                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);
568
569                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
570                 if (nob > rx->rx_nob) {
571                         CERROR ("Immediate message from %s too big: %d(%d)\n",
572                                 libcfs_id2str(rx->rx_peer->peer_id), nob,
573                                 rx->rx_nob);
574                         rc = -EINVAL;
575                         break;
576                 }
577
578                 if (kiov != NULL)
579                         lnet_copy_flat2kiov(
580                                 niov, kiov, offset,
581                                 *kptllnd_tunables.kptl_max_msg_size,
582                                 rxmsg->ptlm_u.immediate.kptlim_payload,
583                                 0,
584                                 mlen);
585                 else
586                         lnet_copy_flat2iov(
587                                 niov, iov, offset,
588                                 *kptllnd_tunables.kptl_max_msg_size,
589                                 rxmsg->ptlm_u.immediate.kptlim_payload,
590                                 0,
591                                 mlen);
592
593                 lnet_finalize (ni, lntmsg, 0);
594                 rc = 0;
595                 break;
596
597         case PTLLND_MSG_TYPE_GET:
598                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);
599
600                 /* NB always send RDMA so the peer can complete.  I send
601                  * success/failure in the portals 'hdr_data' */
602
603                 if (lntmsg == NULL)
604                         rc = kptllnd_active_rdma(rx, NULL,
605                                                  TX_TYPE_GET_RESPONSE,
606                                                  0, NULL, NULL, 0, 0);
607                 else
608                         rc = kptllnd_active_rdma(rx, lntmsg, 
609                                                  TX_TYPE_GET_RESPONSE,
610                                                  lntmsg->msg_niov,
611                                                  lntmsg->msg_iov, 
612                                                  lntmsg->msg_kiov,
613                                                  lntmsg->msg_offset, 
614                                                  lntmsg->msg_len);
615                 break;
616
617         case PTLLND_MSG_TYPE_PUT:
618                 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);
619
620                 /* NB always send RDMA so the peer can complete; it'll be 0
621                  * bytes if there was no match (lntmsg == NULL). I have no way
622                  * to let my peer know this, but she's only interested in when
623                  * the net has stopped accessing her buffer in any case. */
624
625                 rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,
626                                          niov, iov, kiov, offset, mlen);
627                 break;
628         }
629
630         /*
631          * We're done with the RX
632          */
633         kptllnd_rx_done(rx, PTLLND_POSTRX_PEER_CREDIT);
634         return rc;
635 }
636
637 void
638 kptllnd_eq_callback(ptl_event_t *ev)
639 {
640         kptl_eventarg_t *eva = ev->md.user_ptr;
641
642         switch (eva->eva_type) {
643         default:
644                 LBUG();
645
646         case PTLLND_EVENTARG_TYPE_MSG:
647         case PTLLND_EVENTARG_TYPE_RDMA:
648                 kptllnd_tx_callback(ev);
649                 break;
650
651         case PTLLND_EVENTARG_TYPE_BUF:
652                 kptllnd_rx_buffer_callback(ev);
653                 break;
654         }
655 }
656
657 void
658 kptllnd_thread_fini (void)
659 {
660         cfs_atomic_dec(&kptllnd_data.kptl_nthreads);
661 }
662
663 int
664 kptllnd_thread_start (int (*fn)(void *arg), void *arg)
665 {
666         long                pid;
667
668         cfs_atomic_inc(&kptllnd_data.kptl_nthreads);
669
670         pid = cfs_create_thread (fn, arg, 0);
671         if (pid >= 0)
672                 return 0;
673
674         CERROR("Failed to start thread: error %d\n", (int)pid);
675         kptllnd_thread_fini();
676         return (int)pid;
677 }
678
679 int
680 kptllnd_watchdog(void *arg)
681 {
682         int                 id = (long)arg;
683         char                name[16];
684         cfs_waitlink_t      waitlink;
685         int                 stamp = 0;
686         int                 peer_index = 0;
687         unsigned long       deadline = jiffies;
688         int                 timeout;
689         int                 i;
690
691         snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);
692         cfs_daemonize(name);
693         cfs_block_allsigs();
694
695         cfs_waitlink_init(&waitlink);
696
697         /* threads shut down in phase 2 after all peers have been destroyed */
698         while (kptllnd_data.kptl_shutdown < 2) {
699
700                 /* add a check for needs ptltrace
701                  * yes, this is blatant hijacking of this thread
702                  * we can't dump directly from tx or rx _callbacks as it
703                  * deadlocks portals and takes out the node
704                 */
705
706                 if (cfs_atomic_read(&kptllnd_data.kptl_needs_ptltrace)) {
707 #ifdef CRAY_XT3
708                         kptllnd_dump_ptltrace();
709                         /* we only dump once, no matter how many pending */
710                         cfs_atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
711 #else
712                         LBUG();
713 #endif
714                 }
715
716                 timeout = (int)(deadline - jiffies);
717
718                 if (timeout <= 0) {
719                         const int n = 4;
720                         const int p = 1;
721                         int       chunk = kptllnd_data.kptl_peer_hash_size;
722
723
724                         /* Time to check for RDMA timeouts on a few more
725                          * peers: I do checks every 'p' seconds on a
726                          * proportion of the peer table and I need to check
727                          * every connection 'n' times within a timeout
728                          * interval, to ensure I detect a timeout on any
729                          * connection within (n+1)/n times the timeout
730                          * interval. */
731
732                         if ((*kptllnd_tunables.kptl_timeout) > n * p)
733                                 chunk = (chunk * n * p) /
734                                         (*kptllnd_tunables.kptl_timeout);
735                         if (chunk == 0)
736                                 chunk = 1;
737
738                         for (i = 0; i < chunk; i++) {
739                                 kptllnd_peer_check_bucket(peer_index, stamp);
740                                 peer_index = (peer_index + 1) %
741                                      kptllnd_data.kptl_peer_hash_size;
742                         }
743
744                         deadline += p * CFS_HZ;
745                         stamp++;
746                         continue;
747                 }
748
749                 kptllnd_handle_closing_peers();
750
751                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
752                 cfs_waitq_add_exclusive(&kptllnd_data.kptl_watchdog_waitq,
753                                         &waitlink);
754
755                 cfs_waitq_timedwait(&waitlink, CFS_TASK_INTERRUPTIBLE, timeout);
756
757                 cfs_set_current_state (CFS_TASK_RUNNING);
758                 cfs_waitq_del(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
759         }
760
761         kptllnd_thread_fini();
762         CDEBUG(D_NET, "<<<\n");
763         return (0);
764 };
765
766 int
767 kptllnd_scheduler (void *arg)
768 {
769         int                 id = (long)arg;
770         char                name[16];
771         cfs_waitlink_t      waitlink;
772         unsigned long       flags;
773         int                 did_something;
774         int                 counter = 0;
775         kptl_rx_t          *rx;
776         kptl_rx_buffer_t   *rxb;
777         kptl_tx_t          *tx;
778
779         snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);
780         cfs_daemonize(name);
781         cfs_block_allsigs();
782
783         cfs_waitlink_init(&waitlink);
784
785         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
786
787         /* threads shut down in phase 2 after all peers have been destroyed */
788         while (kptllnd_data.kptl_shutdown < 2) {
789
790                 did_something = 0;
791
792                 if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxq)) {
793                         rx = cfs_list_entry (kptllnd_data.kptl_sched_rxq.next,
794                                              kptl_rx_t, rx_list);
795                         cfs_list_del(&rx->rx_list);
796
797                         cfs_spin_unlock_irqrestore(&kptllnd_data. \
798                                                    kptl_sched_lock,
799                                                    flags);
800
801                         kptllnd_rx_parse(rx);
802                         did_something = 1;
803
804                         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
805                                               flags);
806                 }
807
808                 if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxbq)) {
809                         rxb = cfs_list_entry (kptllnd_data.kptl_sched_rxbq.next,
810                                               kptl_rx_buffer_t,
811                                               rxb_repost_list);
812                         cfs_list_del(&rxb->rxb_repost_list);
813
814                         cfs_spin_unlock_irqrestore(&kptllnd_data. \
815                                                    kptl_sched_lock,
816                                                    flags);
817
818                         kptllnd_rx_buffer_post(rxb);
819                         did_something = 1;
820
821                         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
822                                               flags);
823                 }
824
825                 if (!cfs_list_empty(&kptllnd_data.kptl_sched_txq)) {
826                         tx = cfs_list_entry (kptllnd_data.kptl_sched_txq.next,
827                                              kptl_tx_t, tx_list);
828                         cfs_list_del_init(&tx->tx_list);
829
830                         cfs_spin_unlock_irqrestore(&kptllnd_data. \
831                                                    kptl_sched_lock, flags);
832
833                         kptllnd_tx_fini(tx);
834                         did_something = 1;
835
836                         cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
837                                               flags);
838                 }
839
840                 if (did_something) {
841                         if (++counter != *kptllnd_tunables.kptl_reschedule_loops)
842                                 continue;
843                 }
844
845                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
846                 cfs_waitq_add_exclusive(&kptllnd_data.kptl_sched_waitq,
847                                         &waitlink);
848                 cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
849                                            flags);
850
851                 if (!did_something)
852                         cfs_waitq_wait(&waitlink, CFS_TASK_INTERRUPTIBLE);
853                 else
854                         cfs_cond_resched();
855
856                 cfs_set_current_state(CFS_TASK_RUNNING);
857                 cfs_waitq_del(&kptllnd_data.kptl_sched_waitq, &waitlink);
858
859                 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
860
861                 counter = 0;
862         }
863
864         cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
865
866         kptllnd_thread_fini();
867         return 0;
868 }