X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lnet%2Fklnds%2Fptllnd%2Fptllnd_cb.c;h=ff93136630b735568c7c096867580ba4ff76d37d;hb=e3a7c58aebafce40323db54bf6056029e5af4a70;hp=8acf9d00e4ab800f17d88d7e62dd209b673dfa57;hpb=d2f2601baff3fb8add503ecab932c20c26f9a118;p=fs%2Flustre-release.git diff --git a/lnet/klnds/ptllnd/ptllnd_cb.c b/lnet/klnds/ptllnd/ptllnd_cb.c index 8acf9d0..ff93136 100644 --- a/lnet/klnds/ptllnd/ptllnd_cb.c +++ b/lnet/klnds/ptllnd/ptllnd_cb.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,7 +24,7 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. */ /* @@ -257,20 +255,20 @@ kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type, return -EIO; } - spin_lock_irqsave(&peer->peer_lock, flags); + cfs_spin_lock_irqsave(&peer->peer_lock, flags); tx->tx_lnet_msg = lntmsg; /* lnet_finalize() will be called when tx is torn down, so I must * return success from here on... */ - tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ); + tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ); tx->tx_rdma_mdh = mdh; tx->tx_active = 1; - list_add_tail(&tx->tx_list, &peer->peer_activeq); + cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq); /* peer has now got my ref on 'tx' */ - spin_unlock_irqrestore(&peer->peer_lock, flags); + cfs_spin_unlock_irqrestore(&peer->peer_lock, flags); tx->tx_tposted = jiffies; @@ -320,22 +318,31 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; - kptl_peer_t *peer; + kptl_net_t *net = ni->ni_data; + kptl_peer_t *peer = NULL; + int mpflag = 0; kptl_tx_t *tx; int nob; int nfrag; int rc; + LASSERT (net->net_ni == ni); + LASSERT (!net->net_shutdown); LASSERT (payload_nob == 0 || payload_niov > 0); LASSERT (payload_niov <= LNET_MAX_IOV); LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); - LASSERT (!in_interrupt()); + LASSERT (!cfs_in_interrupt()); + + if (lntmsg->msg_vmflush) + mpflag = cfs_memory_pressure_get_and_set(); - rc = kptllnd_find_target(&peer, target); + rc = kptllnd_find_target(net, target, &peer); if (rc != 0) - return rc; - + goto out; + + /* NB peer->peer_id does NOT always equal target, be careful with + * which one to use */ switch (type) { default: LBUG(); @@ -365,7 +372,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) tx->tx_lnet_msg = lntmsg; tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr; kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT, - sizeof(kptl_rdma_msg_t)); + target, sizeof(kptl_rdma_msg_t)); CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n", libcfs_id2str(target), @@ -394,8 +401,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) goto out; } - tx->tx_lnet_replymsg = - lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg); + tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg); if (tx->tx_lnet_replymsg == NULL) { CERROR("Failed to allocate LNET reply for %s\n", libcfs_id2str(target)); @@ -412,11 +418,11 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov, NULL, lntmsg->msg_md->md_iov.kiov, 0, lntmsg->msg_md->md_length); - + tx->tx_lnet_msg = lntmsg; tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr; kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET, - sizeof(kptl_rdma_msg_t)); + target, sizeof(kptl_rdma_msg_t)); CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n", libcfs_id2str(target), @@ -466,9 +472,9 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) payload_offset, payload_nob); #endif } - + nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]); - kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob); + kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob); CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n", libcfs_id2str(target), @@ -482,7 +488,10 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) kptllnd_tx_launch(peer, tx, nfrag); out: - kptllnd_peer_decref(peer); + if (lntmsg->msg_vmflush) + cfs_memory_pressure_restore(mpflag); + if (peer) + kptllnd_peer_decref(peer); return rc; } @@ -532,7 +541,7 @@ kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, LASSERT (mlen <= rlen); LASSERT (mlen >= 0); - LASSERT (!in_interrupt()); + LASSERT (!cfs_in_interrupt()); LASSERT (!(kiov != NULL && iov != NULL)); /* never both */ LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */ @@ -633,12 +642,12 @@ kptllnd_eq_callback(ptl_event_t *ev) switch (eva->eva_type) { default: LBUG(); - + case PTLLND_EVENTARG_TYPE_MSG: case PTLLND_EVENTARG_TYPE_RDMA: kptllnd_tx_callback(ev); break; - + case PTLLND_EVENTARG_TYPE_BUF: kptllnd_rx_buffer_callback(ev); break; @@ -648,7 +657,7 @@ kptllnd_eq_callback(ptl_event_t *ev) void kptllnd_thread_fini (void) { - atomic_dec(&kptllnd_data.kptl_nthreads); + cfs_atomic_dec(&kptllnd_data.kptl_nthreads); } int @@ -656,13 +665,13 @@ kptllnd_thread_start (int (*fn)(void *arg), void *arg) { long pid; - atomic_inc(&kptllnd_data.kptl_nthreads); + cfs_atomic_inc(&kptllnd_data.kptl_nthreads); - pid = kernel_thread (fn, arg, 0); + pid = cfs_create_thread (fn, arg, 0); if (pid >= 0) return 0; - - CERROR("Failed to start kernel_thread: error %d\n", (int)pid); + + CERROR("Failed to start thread: error %d\n", (int)pid); kptllnd_thread_fini(); return (int)pid; } @@ -672,7 +681,7 @@ kptllnd_watchdog(void *arg) { int id = (long)arg; char name[16]; - wait_queue_t waitlink; + cfs_waitlink_t waitlink; int stamp = 0; int peer_index = 0; unsigned long deadline = jiffies; @@ -683,22 +692,22 @@ kptllnd_watchdog(void *arg) cfs_daemonize(name); cfs_block_allsigs(); - init_waitqueue_entry(&waitlink, current); + cfs_waitlink_init(&waitlink); /* threads shut down in phase 2 after all peers have been destroyed */ while (kptllnd_data.kptl_shutdown < 2) { /* add a check for needs ptltrace * yes, this is blatant hijacking of this thread - * we can't dump directly from tx or rx _callbacks as it deadlocks portals - * and takes out the node + * we can't dump directly from tx or rx _callbacks as it + * deadlocks portals and takes out the node */ - if (atomic_read(&kptllnd_data.kptl_needs_ptltrace)) { + if (cfs_atomic_read(&kptllnd_data.kptl_needs_ptltrace)) { #ifdef CRAY_XT3 kptllnd_dump_ptltrace(); /* we only dump once, no matter how many pending */ - atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0); + cfs_atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0); #else LBUG(); #endif @@ -732,21 +741,21 @@ kptllnd_watchdog(void *arg) kptllnd_data.kptl_peer_hash_size; } - deadline += p * HZ; + deadline += p * CFS_HZ; stamp++; continue; } kptllnd_handle_closing_peers(); - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq, - &waitlink); + cfs_set_current_state(CFS_TASK_INTERRUPTIBLE); + cfs_waitq_add_exclusive(&kptllnd_data.kptl_watchdog_waitq, + &waitlink); - schedule_timeout(timeout); - - set_current_state (TASK_RUNNING); - remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink); + cfs_waitq_timedwait(&waitlink, CFS_TASK_INTERRUPTIBLE, timeout); + + cfs_set_current_state (CFS_TASK_RUNNING); + cfs_waitq_del(&kptllnd_data.kptl_watchdog_waitq, &waitlink); } kptllnd_thread_fini(); @@ -759,7 +768,7 @@ kptllnd_scheduler (void *arg) { int id = (long)arg; char name[16]; - wait_queue_t waitlink; + cfs_waitlink_t waitlink; unsigned long flags; int did_something; int counter = 0; @@ -771,54 +780,61 @@ kptllnd_scheduler (void *arg) cfs_daemonize(name); cfs_block_allsigs(); - init_waitqueue_entry(&waitlink, current); + cfs_waitlink_init(&waitlink); - spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); + cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); /* threads shut down in phase 2 after all peers have been destroyed */ while (kptllnd_data.kptl_shutdown < 2) { did_something = 0; - if (!list_empty(&kptllnd_data.kptl_sched_rxq)) { - rx = list_entry (kptllnd_data.kptl_sched_rxq.next, - kptl_rx_t, rx_list); - list_del(&rx->rx_list); - - spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, - flags); + if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxq)) { + rx = cfs_list_entry (kptllnd_data.kptl_sched_rxq.next, + kptl_rx_t, rx_list); + cfs_list_del(&rx->rx_list); + + cfs_spin_unlock_irqrestore(&kptllnd_data. \ + kptl_sched_lock, + flags); kptllnd_rx_parse(rx); did_something = 1; - spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); + cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, + flags); } - if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) { - rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next, - kptl_rx_buffer_t, rxb_repost_list); - list_del(&rxb->rxb_repost_list); + if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxbq)) { + rxb = cfs_list_entry (kptllnd_data.kptl_sched_rxbq.next, + kptl_rx_buffer_t, + rxb_repost_list); + cfs_list_del(&rxb->rxb_repost_list); - spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, - flags); + cfs_spin_unlock_irqrestore(&kptllnd_data. \ + kptl_sched_lock, + flags); kptllnd_rx_buffer_post(rxb); did_something = 1; - spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); + cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, + flags); } - if (!list_empty(&kptllnd_data.kptl_sched_txq)) { - tx = list_entry (kptllnd_data.kptl_sched_txq.next, - kptl_tx_t, tx_list); - list_del_init(&tx->tx_list); + if (!cfs_list_empty(&kptllnd_data.kptl_sched_txq)) { + tx = cfs_list_entry (kptllnd_data.kptl_sched_txq.next, + kptl_tx_t, tx_list); + cfs_list_del_init(&tx->tx_list); - spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); + cfs_spin_unlock_irqrestore(&kptllnd_data. \ + kptl_sched_lock, flags); kptllnd_tx_fini(tx); did_something = 1; - spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); + cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, + flags); } if (did_something) { @@ -826,25 +842,26 @@ kptllnd_scheduler (void *arg) continue; } - set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq, - &waitlink); - spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); + cfs_set_current_state(CFS_TASK_INTERRUPTIBLE); + cfs_waitq_add_exclusive(&kptllnd_data.kptl_sched_waitq, + &waitlink); + cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, + flags); if (!did_something) - schedule(); + cfs_waitq_wait(&waitlink, CFS_TASK_INTERRUPTIBLE); else - cond_resched(); + cfs_cond_resched(); - set_current_state(TASK_RUNNING); - remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink); + cfs_set_current_state(CFS_TASK_RUNNING); + cfs_waitq_del(&kptllnd_data.kptl_sched_waitq, &waitlink); - spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); + cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); counter = 0; } - spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); + cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); kptllnd_thread_fini(); return 0;