1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
5 * Author: PJ Kirner <pjkirner@clusterfs.com>
7 * This file is part of the Lustre file system, http://www.lustre.org
8 * Lustre is a trademark of Cluster File Systems, Inc.
10 * This file is confidential source code owned by Cluster File Systems.
11 * No viewing, modification, compilation, redistribution, or any other
12 * form of use is permitted except through a signed license agreement.
14 * If you have not signed such an agreement, then you have no rights to
15 * this file. Please destroy it immediately and contact CFS.
21 #ifndef _USING_LUSTRE_PORTALS_
23 kptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,
24 int src_niov, struct iovec *src,
25 unsigned int offset, unsigned int len)
27 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
28 * for exactly 'len' bytes, and return the number of entries.
29 * NB not destructive to 'src' */
30 unsigned int frag_len;
33 if (len == 0) /* no data => */
34 return (0); /* no frags */
36 LASSERT (src_niov > 0);
37 while (offset >= src->iov_len) { /* skip initial frags */
38 offset -= src->iov_len;
41 LASSERT (src_niov > 0);
46 LASSERT (src_niov > 0);
47 LASSERT (niov <= dst_niov);
49 frag_len = src->iov_len - offset;
50 dst->iov_base = ((char *)src->iov_base) + offset;
52 if (len <= frag_len) {
57 dst->iov_len = frag_len;
69 kptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,
70 int src_niov, lnet_kiov_t *src,
71 unsigned int offset, unsigned int len)
73 /* Initialise 'dst' to the physical addresses of the subset of 'src'
74 * starting at 'offset', for exactly 'len' bytes, and return the number
75 * of entries. NB not destructive to 'src' */
76 unsigned int frag_len;
81 if (len == 0) /* no data => */
82 return (0); /* no frags */
84 LASSERT (src_niov > 0);
85 while (offset >= src->kiov_len) { /* skip initial frags */
86 offset -= src->kiov_len;
89 LASSERT (src_niov > 0);
94 LASSERT (src_niov > 0);
95 LASSERT (niov <= dst_niov);
97 frag_len = min(src->kiov_len - offset, len);
98 phys_page = lnet_page2phys(src->kiov_page);
99 phys = phys_page + src->kiov_offset + offset;
101 LASSERT (sizeof(void *) > 4 ||
102 (phys <= 0xffffffffULL &&
103 phys + (frag_len - 1) <= 0xffffffffULL));
105 dst->iov_base = (void *)((unsigned long)phys);
106 dst->iov_len = frag_len;
122 kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
123 struct iovec *iov, lnet_kiov_t *kiov,
124 unsigned int offset, unsigned int nob)
126 LASSERT (iov == NULL || kiov == NULL);
128 memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
130 tx->tx_rdma_md.start = tx->tx_frags;
131 tx->tx_rdma_md.user_ptr = &tx->tx_rdma_eventarg;
132 tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
133 tx->tx_rdma_md.options = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
134 PTL_MD_EVENT_START_DISABLE;
135 switch (tx->tx_type) {
139 case TX_TYPE_PUT_REQUEST: /* passive: peer gets */
140 tx->tx_rdma_md.threshold = 1; /* GET event */
141 tx->tx_rdma_md.options |= PTL_MD_OP_GET;
144 case TX_TYPE_GET_REQUEST: /* passive: peer puts */
145 tx->tx_rdma_md.threshold = 1; /* PUT event */
146 tx->tx_rdma_md.options |= PTL_MD_OP_PUT;
149 case TX_TYPE_PUT_RESPONSE: /* active: I get */
150 tx->tx_rdma_md.threshold = 2; /* SEND + REPLY */
153 case TX_TYPE_GET_RESPONSE: /* active: I put */
154 tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1; /* SEND + ACK? */
159 tx->tx_rdma_md.length = 0;
163 #ifdef _USING_LUSTRE_PORTALS_
165 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
166 tx->tx_rdma_md.length =
167 lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
168 niov, iov, offset, nob);
172 /* Cheating OK since ptl_kiov_t == lnet_kiov_t */
173 CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));
174 CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==
175 offsetof(lnet_kiov_t, kiov_offset));
176 CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==
177 offsetof(lnet_kiov_t, kiov_page));
178 CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==
179 offsetof(lnet_kiov_t, kiov_len));
181 tx->tx_rdma_md.options |= PTL_MD_KIOV;
182 tx->tx_rdma_md.length =
183 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
184 niov, kiov, offset, nob);
187 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
188 tx->tx_rdma_md.length =
189 kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
190 niov, iov, offset, nob);
194 tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
195 tx->tx_rdma_md.length =
196 kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
197 niov, kiov, offset, nob);
202 kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
203 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
204 unsigned int offset, int nob)
208 kptl_msg_t *rxmsg = rx->rx_msg;
209 kptl_peer_t *peer = rx->rx_peer;
213 LASSERT (type == TX_TYPE_PUT_RESPONSE ||
214 type == TX_TYPE_GET_RESPONSE);
216 tx = kptllnd_get_idle_tx(type);
218 CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",
219 type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",
220 libcfs_id2str(peer->peer_id));
224 kptllnd_set_tx_peer(tx, peer);
225 kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);
227 ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md,
229 if (ptlrc != PTL_OK) {
230 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
231 libcfs_id2str(peer->peer_id),
232 kptllnd_errtype2str(ptlrc), ptlrc);
233 tx->tx_status = -EIO;
234 kptllnd_tx_decref(tx);
238 spin_lock_irqsave(&peer->peer_lock, flags);
240 tx->tx_lnet_msg = lntmsg;
241 /* lnet_finalize() will be called when tx is torn down, so I must
242 * return success from here on... */
244 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
245 tx->tx_rdma_mdh = mdh;
247 list_add_tail(&tx->tx_list, &peer->peer_activeq);
249 /* peer has now got my ref on 'tx' */
251 spin_unlock_irqrestore(&peer->peer_lock, flags);
253 tx->tx_tposted = jiffies;
255 if (type == TX_TYPE_GET_RESPONSE)
257 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
259 *kptllnd_tunables.kptl_portal,
261 rxmsg->ptlm_u.rdma.kptlrm_matchbits,
263 (lntmsg != NULL) ? /* header data */
269 *kptllnd_tunables.kptl_portal,
271 rxmsg->ptlm_u.rdma.kptlrm_matchbits,
274 if (ptlrc != PTL_OK) {
275 CERROR("Ptl%s failed: %s(%d)\n",
276 (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
277 kptllnd_errtype2str(ptlrc), ptlrc);
279 kptllnd_peer_close(peer, -EIO);
280 /* Everything (including this RDMA) queued on the peer will
281 * be completed with failure */
288 kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
290 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
291 int type = lntmsg->msg_type;
292 lnet_process_id_t target = lntmsg->msg_target;
293 int target_is_router = lntmsg->msg_target_is_router;
294 int routing = lntmsg->msg_routing;
295 unsigned int payload_niov = lntmsg->msg_niov;
296 struct iovec *payload_iov = lntmsg->msg_iov;
297 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
298 unsigned int payload_offset = lntmsg->msg_offset;
299 unsigned int payload_nob = lntmsg->msg_len;
306 LASSERT (payload_nob == 0 || payload_niov > 0);
307 LASSERT (payload_niov <= LNET_MAX_IOV);
308 LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
309 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
310 LASSERT (!in_interrupt());
312 rc = kptllnd_find_target(&peer, target);
323 /* Should the payload avoid RDMA? */
324 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
325 if (payload_kiov == NULL &&
326 nob <= peer->peer_max_msg_size)
329 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
331 CERROR("Can't send %s to %s: can't allocate descriptor\n",
332 lnet_msgtyp2str(type),
333 libcfs_id2str(target));
338 kptllnd_init_rdma_md(tx, payload_niov,
339 payload_iov, payload_kiov,
340 payload_offset, payload_nob);
342 tx->tx_lnet_msg = lntmsg;
343 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
344 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
345 sizeof(kptl_rdma_msg_t));
347 CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
348 libcfs_id2str(target),
349 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
351 kptllnd_tx_launch(peer, tx, 0);
355 /* routed gets don't RDMA */
356 if (target_is_router || routing)
359 /* Is the payload small enough not to need RDMA? */
360 nob = lntmsg->msg_md->md_length;
361 nob = offsetof(kptl_msg_t,
362 ptlm_u.immediate.kptlim_payload[nob]);
363 if (nob <= peer->peer_max_msg_size)
366 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
368 CERROR("Can't send GET to %s: can't allocate descriptor\n",
369 libcfs_id2str(target));
374 tx->tx_lnet_replymsg =
375 lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg);
376 if (tx->tx_lnet_replymsg == NULL) {
377 CERROR("Failed to allocate LNET reply for %s\n",
378 libcfs_id2str(target));
379 kptllnd_tx_decref(tx);
384 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
385 kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
386 lntmsg->msg_md->md_iov.iov, NULL,
387 0, lntmsg->msg_md->md_length);
389 kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
390 NULL, lntmsg->msg_md->md_iov.kiov,
391 0, lntmsg->msg_md->md_length);
393 tx->tx_lnet_msg = lntmsg;
394 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
395 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
396 sizeof(kptl_rdma_msg_t));
398 CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
399 libcfs_id2str(target),
400 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
402 kptllnd_tx_launch(peer, tx, 0);
406 CDEBUG(D_NET, "LNET_MSG_ACK\n");
407 LASSERT (payload_nob == 0);
411 /* I don't have to handle kiovs */
412 LASSERT (payload_nob == 0 || payload_iov != NULL);
414 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
416 CERROR("Can't send %s to %s: can't allocate descriptor\n",
417 lnet_msgtyp2str(type), libcfs_id2str(target));
422 tx->tx_lnet_msg = lntmsg;
423 tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
425 if (payload_nob == 0) {
428 tx->tx_frags->iov[0].iov_base = tx->tx_msg;
429 tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
430 ptlm_u.immediate.kptlim_payload);
432 /* NB relying on lustre not asking for PTL_MD_MAX_IOV
434 #ifdef _USING_LUSTRE_PORTALS_
435 nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1,
436 &tx->tx_frags->iov[1],
437 payload_niov, payload_iov,
438 payload_offset, payload_nob);
440 nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
441 &tx->tx_frags->iov[1],
442 payload_niov, payload_iov,
443 payload_offset, payload_nob);
447 nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
448 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
450 CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
451 libcfs_id2str(target),
452 lnet_msgtyp2str(lntmsg->msg_type),
453 (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ?
454 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
455 (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ?
456 le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
459 kptllnd_tx_launch(peer, tx, nfrag);
462 kptllnd_peer_decref(peer);
467 kptllnd_eager_recv(struct lnet_ni *ni, void *private,
468 lnet_msg_t *msg, void **new_privatep)
470 kptl_rx_t *rx = private;
472 CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);
474 /* I have to release my ref on rxb (if I have one) to ensure I'm an
475 * eager receiver, so I copy the incoming request from the buffer it
476 * landed in, into space reserved in the descriptor... */
478 #if (PTL_MD_LOCAL_ALIGN8 == 0)
479 if (rx->rx_rxb == NULL) /* already copied */
480 return 0; /* to fix alignment */
482 LASSERT(rx->rx_rxb != NULL);
484 LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);
486 memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);
487 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
489 kptllnd_rx_buffer_decref(rx->rx_rxb);
497 kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
498 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
499 unsigned int offset, unsigned int mlen, unsigned int rlen)
501 kptl_rx_t *rx = private;
502 kptl_msg_t *rxmsg = rx->rx_msg;
506 CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",
507 kptllnd_msgtype2str(rxmsg->ptlm_type),
508 niov, offset, mlen, rlen);
510 LASSERT (mlen <= rlen);
512 LASSERT (!in_interrupt());
513 LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
514 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
517 if (lntmsg != NULL &&
519 /* Set the UID if the sender's uid isn't 0; i.e. non-root
520 * running in userspace (e.g. a catamount node; linux kernel
521 * senders, including routers have uid 0). If this is a lustre
522 * RPC request, this tells lustre not to trust the creds in the
523 * RPC message body. */
524 lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
527 switch(rxmsg->ptlm_type)
534 case PTLLND_MSG_TYPE_IMMEDIATE:
535 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);
537 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
538 if (nob > rx->rx_nob) {
539 CERROR ("Immediate message from %s too big: %d(%d)\n",
540 libcfs_id2str(rx->rx_peer->peer_id), nob,
549 *kptllnd_tunables.kptl_max_msg_size,
550 rxmsg->ptlm_u.immediate.kptlim_payload,
556 *kptllnd_tunables.kptl_max_msg_size,
557 rxmsg->ptlm_u.immediate.kptlim_payload,
561 lnet_finalize (ni, lntmsg, 0);
565 case PTLLND_MSG_TYPE_GET:
566 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);
568 /* NB always send RDMA so the peer can complete. I send
569 * success/failure in the portals 'hdr_data' */
572 rc = kptllnd_active_rdma(rx, NULL,
573 TX_TYPE_GET_RESPONSE,
574 0, NULL, NULL, 0, 0);
576 rc = kptllnd_active_rdma(rx, lntmsg,
577 TX_TYPE_GET_RESPONSE,
585 case PTLLND_MSG_TYPE_PUT:
586 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);
588 /* NB always send RDMA so the peer can complete; it'll be 0
589 * bytes if there was no match (lntmsg == NULL). I have no way
590 * to let my peer know this, but she's only interested in when
591 * the net has stopped accessing her buffer in any case. */
593 rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,
594 niov, iov, kiov, offset, mlen);
599 * We're done with the RX
606 kptllnd_eq_callback(ptl_event_t *ev)
608 kptl_eventarg_t *eva = ev->md.user_ptr;
610 switch (eva->eva_type) {
614 case PTLLND_EVENTARG_TYPE_MSG:
615 case PTLLND_EVENTARG_TYPE_RDMA:
616 kptllnd_tx_callback(ev);
619 case PTLLND_EVENTARG_TYPE_BUF:
620 kptllnd_rx_buffer_callback(ev);
626 kptllnd_thread_fini (void)
628 atomic_dec(&kptllnd_data.kptl_nthreads);
632 kptllnd_thread_start (int (*fn)(void *arg), void *arg)
636 atomic_inc(&kptllnd_data.kptl_nthreads);
638 pid = kernel_thread (fn, arg, 0);
642 CERROR("Failed to start kernel_thread: error %d\n", (int)pid);
643 kptllnd_thread_fini();
648 kptllnd_watchdog(void *arg)
652 wait_queue_t waitlink;
655 unsigned long deadline = jiffies;
659 snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);
663 init_waitqueue_entry(&waitlink, current);
665 /* threads shut down in phase 2 after all peers have been destroyed */
666 while (kptllnd_data.kptl_shutdown < 2) {
668 timeout = (int)(deadline - jiffies);
673 int chunk = kptllnd_data.kptl_peer_hash_size;
676 /* Time to check for RDMA timeouts on a few more
677 * peers: I do checks every 'p' seconds on a
678 * proportion of the peer table and I need to check
679 * every connection 'n' times within a timeout
680 * interval, to ensure I detect a timeout on any
681 * connection within (n+1)/n times the timeout
684 if ((*kptllnd_tunables.kptl_timeout) > n * p)
685 chunk = (chunk * n * p) /
686 (*kptllnd_tunables.kptl_timeout);
690 for (i = 0; i < chunk; i++) {
691 kptllnd_peer_check_bucket(peer_index, stamp);
692 peer_index = (peer_index + 1) %
693 kptllnd_data.kptl_peer_hash_size;
701 kptllnd_handle_closing_peers();
703 set_current_state(TASK_INTERRUPTIBLE);
704 add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq,
707 schedule_timeout(timeout);
709 set_current_state (TASK_RUNNING);
710 remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
713 kptllnd_thread_fini();
714 CDEBUG(D_NET, "<<<\n");
719 kptllnd_scheduler (void *arg)
723 wait_queue_t waitlink;
728 kptl_rx_buffer_t *rxb;
731 snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);
735 init_waitqueue_entry(&waitlink, current);
737 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
739 /* threads shut down in phase 2 after all peers have been destroyed */
740 while (kptllnd_data.kptl_shutdown < 2) {
744 if (!list_empty(&kptllnd_data.kptl_sched_rxq)) {
745 rx = list_entry (kptllnd_data.kptl_sched_rxq.next,
747 list_del(&rx->rx_list);
749 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
752 kptllnd_rx_parse(rx);
755 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
758 if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) {
759 rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next,
760 kptl_rx_buffer_t, rxb_repost_list);
761 list_del(&rxb->rxb_repost_list);
763 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
766 kptllnd_rx_buffer_post(rxb);
769 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
772 if (!list_empty(&kptllnd_data.kptl_sched_txq)) {
773 tx = list_entry (kptllnd_data.kptl_sched_txq.next,
775 list_del_init(&tx->tx_list);
777 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
782 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
786 if (++counter != *kptllnd_tunables.kptl_reschedule_loops)
790 set_current_state(TASK_INTERRUPTIBLE);
791 add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq,
793 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
800 set_current_state(TASK_RUNNING);
801 remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink);
803 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
808 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
810 kptllnd_thread_fini();