From e2a2fab993d01597010cb2b44df44a522af0eec8 Mon Sep 17 00:00:00 2001 From: Isaac Huang Date: Fri, 2 Jul 2010 15:15:52 -0600 Subject: [PATCH] b=21776 Use atomic memory allocation when VM is flushing to free memory. - Added rq_memalloc flag to RPC and set memory pressure when sending such RPCs - Set memory pressure flag when osc creates RPC for flushing dirty pages. - Added msg_vmflush to lnet_msg_t so that LNDs can set memory pressure flag when lnd_send called asynchronously. The o2iblnd probably doesn't need it as its TX are preallocated and TX pools grown asynchronously. i=johann i=dmitry.zoguine i=maxim.patlasov --- libcfs/include/libcfs/libcfs_prim.h | 31 +++++++++++++++++++++++++++++++ libcfs/include/libcfs/linux/linux-mem.h | 6 +++--- libcfs/libcfs/tracefile.c | 4 ++++ lnet/include/lnet/lib-types.h | 1 + lnet/klnds/ptllnd/ptllnd_cb.c | 19 +++++++++++++------ lnet/klnds/ptllnd/ptllnd_peer.c | 12 ++++++------ lnet/klnds/socklnd/socklnd_cb.c | 7 +++++++ lnet/lnet/lib-move.c | 5 +++-- lustre/include/liblustre.h | 4 ---- lustre/include/lustre_net.h | 1 + lustre/osc/osc_io.c | 2 +- lustre/osc/osc_page.c | 2 +- lustre/osc/osc_request.c | 31 ++++++++++++++++++++++--------- lustre/ost/ost_handler.c | 4 ++-- lustre/ptlrpc/niobuf.c | 15 +++++++++++---- 15 files changed, 106 insertions(+), 38 deletions(-) diff --git a/libcfs/include/libcfs/libcfs_prim.h b/libcfs/include/libcfs/libcfs_prim.h index 1bbf943..2aa7422 100644 --- a/libcfs/include/libcfs/libcfs_prim.h +++ b/libcfs/include/libcfs/libcfs_prim.h @@ -86,4 +86,35 @@ void cfs_timer_arm(cfs_timer_t *t, cfs_time_t deadline); void cfs_timer_disarm(cfs_timer_t *t); int cfs_timer_is_armed(cfs_timer_t *t); cfs_time_t cfs_timer_deadline(cfs_timer_t *t); + +/* + * Memory + */ +#ifndef cfs_memory_pressure_get +#define cfs_memory_pressure_get() (0) +#endif +#ifndef cfs_memory_pressure_set +#define cfs_memory_pressure_set() do {} while (0) +#endif +#ifndef cfs_memory_pressure_clr +#define cfs_memory_pressure_clr() do {} while (0) +#endif + +static inline int cfs_memory_pressure_get_and_set(void) +{ + int old = cfs_memory_pressure_get(); + + if (!old) + cfs_memory_pressure_set(); + return old; +} + +static inline void cfs_memory_pressure_restore(int old) +{ + if (old) + cfs_memory_pressure_set(); + else + cfs_memory_pressure_clr(); + return; +} #endif diff --git a/libcfs/include/libcfs/linux/linux-mem.h b/libcfs/include/libcfs/linux/linux-mem.h index f5043bc..18c64e7 100644 --- a/libcfs/include/libcfs/linux/linux-mem.h +++ b/libcfs/include/libcfs/linux/linux-mem.h @@ -115,9 +115,9 @@ extern void __cfs_free_pages(cfs_page_t *page, unsigned int order); #define __cfs_free_page(page) __cfs_free_pages(page, 0) #define cfs_free_page(p) __free_pages(p, 0) -#define libcfs_memory_pressure_get() (current->flags & PF_MEMALLOC) -#define libcfs_memory_pressure_set() do { current->flags |= PF_MEMALLOC; } while (0) -#define libcfs_memory_pressure_clr() do { current->flags &= ~PF_MEMALLOC; } while (0) +#define cfs_memory_pressure_get() (current->flags & PF_MEMALLOC) +#define cfs_memory_pressure_set() do { current->flags |= PF_MEMALLOC; } while (0) +#define cfs_memory_pressure_clr() do { current->flags &= ~PF_MEMALLOC; } while (0) #if BITS_PER_LONG == 32 /* limit to lowmem on 32-bit systems */ diff --git a/libcfs/libcfs/tracefile.c b/libcfs/libcfs/tracefile.c index 0402dc2..dbdb896 100644 --- a/libcfs/libcfs/tracefile.c +++ b/libcfs/libcfs/tracefile.c @@ -71,6 +71,10 @@ static struct cfs_trace_page *cfs_tage_alloc(int gfp) cfs_page_t *page; struct cfs_trace_page *tage; + /* My caller is trying to free memory */ + if (!cfs_in_interrupt() && cfs_memory_pressure_get()) + return NULL; + /* * Don't spam console with allocation failures: they will be reported * by upper layer anyway. diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 2e0a66f..63d497a 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -194,6 +194,7 @@ typedef struct lnet_msg { lnet_process_id_t msg_target; __u32 msg_type; + unsigned int msg_vmflush:1; /* VM trying to free memory */ unsigned int msg_target_is_router:1; /* sending to a router */ unsigned int msg_routing:1; /* being forwarded */ unsigned int msg_ack:1; /* ack on finalize (PUT) */ diff --git a/lnet/klnds/ptllnd/ptllnd_cb.c b/lnet/klnds/ptllnd/ptllnd_cb.c index 2e6e4ab..ad6a2bd 100644 --- a/lnet/klnds/ptllnd/ptllnd_cb.c +++ b/lnet/klnds/ptllnd/ptllnd_cb.c @@ -321,7 +321,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; kptl_net_t *net = ni->ni_data; - kptl_peer_t *peer; + kptl_peer_t *peer = NULL; + int mpflag = 0; kptl_tx_t *tx; int nob; int nfrag; @@ -335,10 +336,13 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); LASSERT (!cfs_in_interrupt()); + if (lntmsg->msg_vmflush) + mpflag = cfs_memory_pressure_get_and_set(); + rc = kptllnd_find_target(net, target, &peer); if (rc != 0) - return rc; - + goto out; + /* NB peer->peer_id does NOT always equal target, be careful with * which one to use */ switch (type) { @@ -416,7 +420,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov, NULL, lntmsg->msg_md->md_iov.kiov, 0, lntmsg->msg_md->md_length); - + tx->tx_lnet_msg = lntmsg; tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr; kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET, @@ -470,7 +474,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) payload_offset, payload_nob); #endif } - + nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]); kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob); @@ -486,7 +490,10 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) kptllnd_tx_launch(peer, tx, nfrag); out: - kptllnd_peer_decref(peer); + if (lntmsg->msg_vmflush) + cfs_memory_pressure_restore(mpflag); + if (peer) + kptllnd_peer_decref(peer); return rc; } diff --git a/lnet/klnds/ptllnd/ptllnd_peer.c b/lnet/klnds/ptllnd/ptllnd_peer.c index 3fe60c8..bf663f8 100644 --- a/lnet/klnds/ptllnd/ptllnd_peer.c +++ b/lnet/klnds/ptllnd/ptllnd_peer.c @@ -1438,26 +1438,26 @@ kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target, hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen; hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size = *kptllnd_tunables.kptl_max_msg_size; - + new_peer->peer_state = PEER_STATE_WAITING_HELLO; new_peer->peer_last_matchbits_seen = last_matchbits_seen; - + kptllnd_peer_add_peertable_locked(new_peer); cfs_write_unlock_irqrestore(g_lock, flags); - /* NB someone else could get in now and post a message before I post - * the HELLO, but post_tx/check_sends take care of that! */ + /* NB someone else could get in now and post a message before I post + * the HELLO, but post_tx/check_sends take care of that! */ CDEBUG(D_NETTRACE, "%s: post initial hello %p\n", libcfs_id2str(new_peer->peer_id), hello_tx); kptllnd_post_tx(new_peer, hello_tx, 0); kptllnd_peer_check_sends(new_peer); - + *peerp = new_peer; return 0; - + unwind_2: kptllnd_peer_unreserve_buffers(); unwind_1: diff --git a/lnet/klnds/socklnd/socklnd_cb.c b/lnet/klnds/socklnd/socklnd_cb.c index 453bd0e..39dbbee 100644 --- a/lnet/klnds/socklnd/socklnd_cb.c +++ b/lnet/klnds/socklnd/socklnd_cb.c @@ -927,6 +927,7 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id) int ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) { + int mpflag = 0; int type = lntmsg->msg_type; lnet_process_id_t target = lntmsg->msg_target; unsigned int payload_niov = lntmsg->msg_niov; @@ -957,10 +958,14 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) desc_size = offsetof(ksock_tx_t, tx_frags.paged.kiov[payload_niov]); + if (lntmsg->msg_vmflush) + mpflag = cfs_memory_pressure_get_and_set(); tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size); if (tx == NULL) { CERROR("Can't allocate tx desc type %d size %d\n", type, desc_size); + if (lntmsg->msg_vmflush) + cfs_memory_pressure_restore(mpflag); return (-ENOMEM); } @@ -991,6 +996,8 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) /* The first fragment will be set later in pro_pack */ rc = ksocknal_launch_packet(ni, tx, target); + if (lntmsg->msg_vmflush) + cfs_memory_pressure_restore(mpflag); if (rc == 0) return (0); diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index 5ed574c..e46256b5 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -1046,7 +1046,7 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send) if (lp->lp_txcredits < 0) { msg->msg_delayed = 1; - cfs_list_add_tail (&msg->msg_list, &lp->lp_txq); + cfs_list_add_tail(&msg->msg_list, &lp->lp_txq); return EAGAIN; } } @@ -1063,7 +1063,7 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send) if (ni->ni_txcredits < 0) { msg->msg_delayed = 1; - cfs_list_add_tail (&msg->msg_list, &ni->ni_txq); + cfs_list_add_tail(&msg->msg_list, &ni->ni_txq); return EAGAIN; } } @@ -2383,6 +2383,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack, libcfs_id2str(target)); return -ENOMEM; } + msg->msg_vmflush = !!cfs_memory_pressure_get(); LNET_LOCK(); diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index 2ebe835..ff63241 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -338,10 +338,6 @@ int cfs_curproc_is_in_groups(gid_t gid); #define might_sleep_if(c) #define smp_mb() -#define libcfs_memory_pressure_get() (0) -#define libcfs_memory_pressure_put() do {} while (0) -#define libcfs_memory_pressure_clr() do {} while (0) - /* FIXME sys/capability will finally included linux/fs.h thus * cause numerous trouble on x86-64. as temporary solution for * build broken at Cray, we copy definition we need from capability.h diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 1a380dc..aebfb8b 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -506,6 +506,7 @@ struct ptlrpc_request { rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1, rq_early:1, rq_must_unlink:1, rq_fake:1, /* this fake req */ + rq_memalloc:1, /* req originated from "kswapd" */ /* server-side flags */ rq_packed_final:1, /* packed final reply */ rq_hp:1, /* high priority RPC */ diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 53366f8..fc1e38c 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -181,7 +181,7 @@ static int osc_io_submit(const struct lu_env *env, OSC_FLAGS); /* * bug 18881: we can't just break out here when - * error occurrs after cl_page_prep has been + * error occurs after cl_page_prep has been * called against the page. The correct * way is to call page's completion routine, * as in osc_oap_interrupted. For simplicity, diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index 3081093..29568a7 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -607,7 +607,7 @@ void osc_io_submit_page(const struct lu_env *env, oap->oap_page_off = opg->ops_from; oap->oap_count = opg->ops_to - opg->ops_from; /* Give a hint to OST that requests are coming from kswapd - bug19529 */ - if (libcfs_memory_pressure_get()) + if (cfs_memory_pressure_get()) oap->oap_brw_flags |= OBD_BRW_MEMALLOC; oap->oap_brw_flags |= OBD_BRW_SYNC; if (osc_io_srvlock(oio)) diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index b81d8d3..12edc7e 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1943,7 +1943,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, if (cmd & OBD_BRW_WRITE) { /* trigger a write rpc stream as long as there are dirtiers * waiting for space. as they're waiting, they're not going to - * create more pages to coallesce with what's waiting.. */ + * create more pages to coalesce with what's waiting.. */ if (!cfs_list_empty(&cli->cl_cache_waiters)) { CDEBUG(D_CACHE, "cache waiters forcing RPC\n"); RETURN(1); @@ -2235,11 +2235,14 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; struct ldlm_lock *lock = NULL; struct cl_req_attr crattr; - int i, rc; + int i, rc, mpflag = 0; ENTRY; LASSERT(!cfs_list_empty(rpc_list)); + if (cmd & OBD_BRW_MEMALLOC) + mpflag = cfs_memory_pressure_get_and_set(); + memset(&crattr, 0, sizeof crattr); OBD_ALLOC(pga, sizeof(*pga) * page_count); if (pga == NULL) @@ -2295,6 +2298,9 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, GOTO(out, req = ERR_PTR(rc)); } + if (cmd & OBD_BRW_MEMALLOC) + req->rq_memalloc = 1; + /* Need to update the timestamps after the request is built in case * we race with setattr (locally or in queue at OST). If OST gets * later setattr before earlier BRW (as determined by the request xid), @@ -2311,6 +2317,9 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, CFS_INIT_LIST_HEAD(rpc_list); aa->aa_clerq = clerq; out: + if (cmd & OBD_BRW_MEMALLOC) + cfs_memory_pressure_restore(mpflag); + capa_put(crattr.cra_capa); if (IS_ERR(req)) { if (oa) @@ -2345,8 +2354,9 @@ out: * \param cmd OBD_BRW_* macroses * \param lop pending pages * - * \return zero if pages successfully add to send queue. - * \return not zere if error occurring. + * \return zero if no page added to send queue. + * \return 1 if pages successfully added to send queue. + * \return negative on errors. */ static int osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, @@ -2362,7 +2372,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, CFS_LIST_HEAD(tmp_list); unsigned int ending_offset; unsigned starting_offset = 0; - int srvlock = 0; + int srvlock = 0, mem_tight = 0; struct cl_object *clob = NULL; ENTRY; @@ -2414,7 +2424,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, * until completion unlocks it. commit_write submits a page * as not ready because its unlock will happen unconditionally * as the call returns. if we race with commit_write giving - * us that page we dont' want to create a hole in the page + * us that page we don't want to create a hole in the page * stream, so we stop and leave the rpc to be fired by * another dirtier or kupdated interval (the not ready page * will still be on the dirty list). we could call in @@ -2506,6 +2516,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, /* now put the page back in our accounting */ cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list); + if (oap->oap_brw_flags & OBD_BRW_MEMALLOC) + mem_tight = 1; if (page_count == 0) srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK); if (++page_count >= cli->cl_max_pages_per_rpc) @@ -2541,7 +2553,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, RETURN(0); } - req = osc_build_req(env, cli, &rpc_list, page_count, cmd); + req = osc_build_req(env, cli, &rpc_list, page_count, + mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd); if (IS_ERR(req)) { LASSERT(cfs_list_empty(&rpc_list)); loi_list_maint(cli, loi); @@ -2725,7 +2738,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) race_counter++; } - /* attempt some inter-object balancing by issueing rpcs + /* attempt some inter-object balancing by issuing rpcs * for each object in turn */ if (!cfs_list_empty(&loi->loi_hp_ready_item)) cfs_list_del_init(&loi->loi_hp_ready_item); @@ -2951,7 +2964,7 @@ int osc_queue_async_io(const struct lu_env *env, oap->oap_count = count; oap->oap_brw_flags = brw_flags; /* Give a hint to OST that requests are coming from kswapd - bug19529 */ - if (libcfs_memory_pressure_get()) + if (cfs_memory_pressure_get()) oap->oap_brw_flags |= OBD_BRW_MEMALLOC; cfs_spin_lock(&oap->oap_lock); oap->oap_async_flags = async_flags; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 47254d3..1d32e38 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -1039,7 +1039,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if ((body->oa.o_flags & OBD_BRW_MEMALLOC) && (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self)) - libcfs_memory_pressure_set(); + cfs_memory_pressure_set(); objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ, RCL_CLIENT) / sizeof(*ioo); @@ -1347,7 +1347,7 @@ out: exp->exp_connection->c_remote_uuid.uuid, libcfs_id2str(req->rq_peer)); } - libcfs_memory_pressure_clr(); + cfs_memory_pressure_clr(); RETURN(rc); } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 508f0e2..537be4f 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -529,6 +529,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) { int rc; int rc2; + int mpflag = 0; struct ptlrpc_connection *connection; lnet_handle_me_t reply_me_h; lnet_md_t reply_md; @@ -568,15 +569,18 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) if (request->rq_resend) lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT); + if (request->rq_memalloc) + mpflag = cfs_memory_pressure_get_and_set(); + rc = sptlrpc_cli_wrap_request(request); if (rc) - RETURN(rc); + GOTO(out, rc); /* bulk register should be done after wrap_request() */ if (request->rq_bulk != NULL) { rc = ptlrpc_register_bulk (request); if (rc != 0) - RETURN(rc); + GOTO(out, rc); } if (!noreply) { @@ -681,11 +685,11 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) request->rq_request_portal, request->rq_xid, 0); if (rc == 0) - RETURN(rc); + GOTO(out, rc); ptlrpc_req_finished(request); if (noreply) - RETURN(rc); + GOTO(out, rc); cleanup_me: /* MEUnlink is safe; the PUT didn't even get off the ground, and @@ -700,6 +704,9 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) /* We do sync unlink here as there was no real transfer here so * the chance to have long unlink to sluggish net is smaller here. */ ptlrpc_unregister_bulk(request, 0); + out: + if (request->rq_memalloc) + cfs_memory_pressure_restore(mpflag); return rc; } -- 1.8.3.1