Whamcloud - gitweb
b=21776 Use atomic memory allocation when VM is flushing to free memory.
authorIsaac Huang <he.huang@sun.com>
Fri, 2 Jul 2010 21:15:52 +0000 (15:15 -0600)
committerRobert Read <robert.read@oracle.com>
Fri, 2 Jul 2010 21:33:48 +0000 (14:33 -0700)
- Added rq_memalloc flag to RPC and set memory pressure when sending such RPCs
- Set memory pressure flag when osc creates RPC for flushing dirty pages.
- Added msg_vmflush to lnet_msg_t so that LNDs can set memory pressure
  flag when lnd_send called asynchronously. The o2iblnd probably doesn't
  need it as its TX are preallocated and TX pools grown asynchronously.

i=johann
i=dmitry.zoguine
i=maxim.patlasov

15 files changed:
libcfs/include/libcfs/libcfs_prim.h
libcfs/include/libcfs/linux/linux-mem.h
libcfs/libcfs/tracefile.c
lnet/include/lnet/lib-types.h
lnet/klnds/ptllnd/ptllnd_cb.c
lnet/klnds/ptllnd/ptllnd_peer.c
lnet/klnds/socklnd/socklnd_cb.c
lnet/lnet/lib-move.c
lustre/include/liblustre.h
lustre/include/lustre_net.h
lustre/osc/osc_io.c
lustre/osc/osc_page.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/niobuf.c

index 1bbf943..2aa7422 100644 (file)
@@ -86,4 +86,35 @@ void cfs_timer_arm(cfs_timer_t *t, cfs_time_t deadline);
 void cfs_timer_disarm(cfs_timer_t *t);
 int  cfs_timer_is_armed(cfs_timer_t *t);
 cfs_time_t cfs_timer_deadline(cfs_timer_t *t);
+
+/*
+ * Memory
+ */
+#ifndef cfs_memory_pressure_get
+#define cfs_memory_pressure_get() (0)
+#endif
+#ifndef cfs_memory_pressure_set
+#define cfs_memory_pressure_set() do {} while (0)
+#endif
+#ifndef cfs_memory_pressure_clr
+#define cfs_memory_pressure_clr() do {} while (0)
+#endif
+
+static inline int cfs_memory_pressure_get_and_set(void)
+{
+        int old = cfs_memory_pressure_get();
+
+        if (!old)
+                cfs_memory_pressure_set();
+        return old;
+}
+
+static inline void cfs_memory_pressure_restore(int old)
+{
+        if (old)
+                cfs_memory_pressure_set();
+        else
+                cfs_memory_pressure_clr();
+        return;
+}
 #endif
index f5043bc..18c64e7 100644 (file)
@@ -115,9 +115,9 @@ extern void __cfs_free_pages(cfs_page_t *page, unsigned int order);
 #define __cfs_free_page(page)  __cfs_free_pages(page, 0)
 #define cfs_free_page(p)       __free_pages(p, 0)
 
-#define libcfs_memory_pressure_get() (current->flags & PF_MEMALLOC)
-#define libcfs_memory_pressure_set() do { current->flags |= PF_MEMALLOC; } while (0)
-#define libcfs_memory_pressure_clr() do { current->flags &= ~PF_MEMALLOC; } while (0)
+#define cfs_memory_pressure_get() (current->flags & PF_MEMALLOC)
+#define cfs_memory_pressure_set() do { current->flags |= PF_MEMALLOC; } while (0)
+#define cfs_memory_pressure_clr() do { current->flags &= ~PF_MEMALLOC; } while (0)
 
 #if BITS_PER_LONG == 32
 /* limit to lowmem on 32-bit systems */
index 0402dc2..dbdb896 100644 (file)
@@ -71,6 +71,10 @@ static struct cfs_trace_page *cfs_tage_alloc(int gfp)
         cfs_page_t            *page;
         struct cfs_trace_page *tage;
 
+        /* My caller is trying to free memory */
+        if (!cfs_in_interrupt() && cfs_memory_pressure_get())
+                return NULL;
+
         /*
          * Don't spam console with allocation failures: they will be reported
          * by upper layer anyway.
index 2e0a66f..63d497a 100644 (file)
@@ -194,6 +194,7 @@ typedef struct lnet_msg {
         lnet_process_id_t     msg_target;
         __u32                 msg_type;
 
+        unsigned int          msg_vmflush:1;      /* VM trying to free memory */
         unsigned int          msg_target_is_router:1; /* sending to a router */
         unsigned int          msg_routing:1;      /* being forwarded */
         unsigned int          msg_ack:1;          /* ack on finalize (PUT) */
index 2e6e4ab..ad6a2bd 100644 (file)
@@ -321,7 +321,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         unsigned int      payload_offset = lntmsg->msg_offset;
         unsigned int      payload_nob = lntmsg->msg_len;
         kptl_net_t       *net = ni->ni_data;
-        kptl_peer_t      *peer;
+        kptl_peer_t      *peer = NULL;
+        int               mpflag = 0;
         kptl_tx_t        *tx;
         int               nob;
         int               nfrag;
@@ -335,10 +336,13 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
         LASSERT (!cfs_in_interrupt());
 
+        if (lntmsg->msg_vmflush)
+                mpflag = cfs_memory_pressure_get_and_set();
+
         rc = kptllnd_find_target(net, target, &peer);
         if (rc != 0)
-                return rc;
-        
+                goto out;
+
         /* NB peer->peer_id does NOT always equal target, be careful with
          * which one to use */
         switch (type) {
@@ -416,7 +420,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                         kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
                                              NULL, lntmsg->msg_md->md_iov.kiov,
                                              0, lntmsg->msg_md->md_length);
-                
+
                 tx->tx_lnet_msg = lntmsg;
                 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
                 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
@@ -470,7 +474,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                                                 payload_offset, payload_nob);
 #endif
         }
-        
+
         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob);
 
@@ -486,7 +490,10 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         kptllnd_tx_launch(peer, tx, nfrag);
 
  out:
-        kptllnd_peer_decref(peer);
+        if (lntmsg->msg_vmflush)
+                cfs_memory_pressure_restore(mpflag);
+        if (peer)
+                kptllnd_peer_decref(peer);
         return rc;
 }
 
index 3fe60c8..bf663f8 100644 (file)
@@ -1438,26 +1438,26 @@ kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
         hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;
         hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =
                 *kptllnd_tunables.kptl_max_msg_size;
-                
+
         new_peer->peer_state = PEER_STATE_WAITING_HELLO;
         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
-        
+
         kptllnd_peer_add_peertable_locked(new_peer);
 
         cfs_write_unlock_irqrestore(g_lock, flags);
 
-       /* NB someone else could get in now and post a message before I post
-        * the HELLO, but post_tx/check_sends take care of that! */
+        /* NB someone else could get in now and post a message before I post
+         * the HELLO, but post_tx/check_sends take care of that! */
 
         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
                libcfs_id2str(new_peer->peer_id), hello_tx);
 
         kptllnd_post_tx(new_peer, hello_tx, 0);
         kptllnd_peer_check_sends(new_peer);
-       
+
         *peerp = new_peer;
         return 0;
-        
+
  unwind_2:
         kptllnd_peer_unreserve_buffers();
  unwind_1:
index 453bd0e..39dbbee 100644 (file)
@@ -927,6 +927,7 @@ ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id)
 int
 ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 {
+        int               mpflag = 0;
         int               type = lntmsg->msg_type;
         lnet_process_id_t target = lntmsg->msg_target;
         unsigned int      payload_niov = lntmsg->msg_niov;
@@ -957,10 +958,14 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 desc_size = offsetof(ksock_tx_t,
                                      tx_frags.paged.kiov[payload_niov]);
 
+        if (lntmsg->msg_vmflush)
+                mpflag = cfs_memory_pressure_get_and_set();
         tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size);
         if (tx == NULL) {
                 CERROR("Can't allocate tx desc type %d size %d\n",
                        type, desc_size);
+                if (lntmsg->msg_vmflush)
+                        cfs_memory_pressure_restore(mpflag);
                 return (-ENOMEM);
         }
 
@@ -991,6 +996,8 @@ ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 
         /* The first fragment will be set later in pro_pack */
         rc = ksocknal_launch_packet(ni, tx, target);
+        if (lntmsg->msg_vmflush)
+                cfs_memory_pressure_restore(mpflag);
         if (rc == 0)
                 return (0);
 
index 5ed574c..e46256b 100644 (file)
@@ -1046,7 +1046,7 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
 
                 if (lp->lp_txcredits < 0) {
                         msg->msg_delayed = 1;
-                        cfs_list_add_tail (&msg->msg_list, &lp->lp_txq);
+                        cfs_list_add_tail(&msg->msg_list, &lp->lp_txq);
                         return EAGAIN;
                 }
         }
@@ -1063,7 +1063,7 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
 
                 if (ni->ni_txcredits < 0) {
                         msg->msg_delayed = 1;
-                        cfs_list_add_tail (&msg->msg_list, &ni->ni_txq);
+                        cfs_list_add_tail(&msg->msg_list, &ni->ni_txq);
                         return EAGAIN;
                 }
         }
@@ -2383,6 +2383,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
                        libcfs_id2str(target));
                 return -ENOMEM;
         }
+        msg->msg_vmflush = !!cfs_memory_pressure_get();
 
         LNET_LOCK();
 
index 2ebe835..ff63241 100644 (file)
@@ -338,10 +338,6 @@ int cfs_curproc_is_in_groups(gid_t gid);
 #define might_sleep_if(c)
 #define smp_mb()
 
-#define libcfs_memory_pressure_get() (0)
-#define libcfs_memory_pressure_put() do {} while (0)
-#define libcfs_memory_pressure_clr() do {} while (0)
-
 /* FIXME sys/capability will finally included linux/fs.h thus
  * cause numerous trouble on x86-64. as temporary solution for
  * build broken at Cray, we copy definition we need from capability.h
index 1a380dc..aebfb8b 100644 (file)
@@ -506,6 +506,7 @@ struct ptlrpc_request {
                 rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1,
                 rq_early:1, rq_must_unlink:1,
                 rq_fake:1,          /* this fake req */
+                rq_memalloc:1,      /* req originated from "kswapd" */
                 /* server-side flags */
                 rq_packed_final:1,  /* packed final reply */
                 rq_hp:1,            /* high priority RPC */
index 53366f8..fc1e38c 100644 (file)
@@ -181,7 +181,7 @@ static int osc_io_submit(const struct lu_env *env,
                                                                   OSC_FLAGS);
                                 /*
                                  * bug 18881: we can't just break out here when
-                                 * error occurrs after cl_page_prep has been
+                                 * error occurs after cl_page_prep has been
                                  * called against the page. The correct
                                  * way is to call page's completion routine,
                                  * as in osc_oap_interrupted.  For simplicity,
index 3081093..29568a7 100644 (file)
@@ -607,7 +607,7 @@ void osc_io_submit_page(const struct lu_env *env,
         oap->oap_page_off   = opg->ops_from;
         oap->oap_count      = opg->ops_to - opg->ops_from;
         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
-        if (libcfs_memory_pressure_get())
+        if (cfs_memory_pressure_get())
                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
         oap->oap_brw_flags |= OBD_BRW_SYNC;
         if (osc_io_srvlock(oio))
index b81d8d3..12edc7e 100644 (file)
@@ -1943,7 +1943,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
         if (cmd & OBD_BRW_WRITE) {
                 /* trigger a write rpc stream as long as there are dirtiers
                  * waiting for space.  as they're waiting, they're not going to
-                 * create more pages to coallesce with what's waiting.. */
+                 * create more pages to coalesce with what's waiting.. */
                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
                         RETURN(1);
@@ -2235,11 +2235,14 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
         struct ldlm_lock *lock = NULL;
         struct cl_req_attr crattr;
-        int i, rc;
+        int i, rc, mpflag = 0;
 
         ENTRY;
         LASSERT(!cfs_list_empty(rpc_list));
 
+        if (cmd & OBD_BRW_MEMALLOC)
+                mpflag = cfs_memory_pressure_get_and_set();
+
         memset(&crattr, 0, sizeof crattr);
         OBD_ALLOC(pga, sizeof(*pga) * page_count);
         if (pga == NULL)
@@ -2295,6 +2298,9 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
                 GOTO(out, req = ERR_PTR(rc));
         }
 
+        if (cmd & OBD_BRW_MEMALLOC)
+                req->rq_memalloc = 1;
+
         /* Need to update the timestamps after the request is built in case
          * we race with setattr (locally or in queue at OST).  If OST gets
          * later setattr before earlier BRW (as determined by the request xid),
@@ -2311,6 +2317,9 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
         CFS_INIT_LIST_HEAD(rpc_list);
         aa->aa_clerq = clerq;
 out:
+        if (cmd & OBD_BRW_MEMALLOC)
+                cfs_memory_pressure_restore(mpflag);
+
         capa_put(crattr.cra_capa);
         if (IS_ERR(req)) {
                 if (oa)
@@ -2345,8 +2354,9 @@ out:
  * \param cmd OBD_BRW_* macroses
  * \param lop pending pages
  *
- * \return zero if pages successfully add to send queue.
- * \return not zere if error occurring.
+ * \return zero if no page added to send queue.
+ * \return 1 if pages successfully added to send queue.
+ * \return negative on errors.
  */
 static int
 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
@@ -2362,7 +2372,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
         CFS_LIST_HEAD(tmp_list);
         unsigned int ending_offset;
         unsigned  starting_offset = 0;
-        int srvlock = 0;
+        int srvlock = 0, mem_tight = 0;
         struct cl_object *clob = NULL;
         ENTRY;
 
@@ -2414,7 +2424,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                  * until completion unlocks it.  commit_write submits a page
                  * as not ready because its unlock will happen unconditionally
                  * as the call returns.  if we race with commit_write giving
-                 * us that page we dont' want to create a hole in the page
+                 * us that page we don't want to create a hole in the page
                  * stream, so we stop and leave the rpc to be fired by
                  * another dirtier or kupdated interval (the not ready page
                  * will still be on the dirty list).  we could call in
@@ -2506,6 +2516,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
                 /* now put the page back in our accounting */
                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+                if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
+                        mem_tight = 1;
                 if (page_count == 0)
                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
                 if (++page_count >= cli->cl_max_pages_per_rpc)
@@ -2541,7 +2553,8 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                 RETURN(0);
         }
 
-        req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
+        req = osc_build_req(env, cli, &rpc_list, page_count,
+                            mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
         if (IS_ERR(req)) {
                 LASSERT(cfs_list_empty(&rpc_list));
                 loi_list_maint(cli, loi);
@@ -2725,7 +2738,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
                                 race_counter++;
                 }
 
-                /* attempt some inter-object balancing by issueing rpcs
+                /* attempt some inter-object balancing by issuing rpcs
                  * for each object in turn */
                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
                         cfs_list_del_init(&loi->loi_hp_ready_item);
@@ -2951,7 +2964,7 @@ int osc_queue_async_io(const struct lu_env *env,
         oap->oap_count = count;
         oap->oap_brw_flags = brw_flags;
         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
-        if (libcfs_memory_pressure_get())
+        if (cfs_memory_pressure_get())
                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
         cfs_spin_lock(&oap->oap_lock);
         oap->oap_async_flags = async_flags;
index 47254d3..1d32e38 100644 (file)
@@ -1039,7 +1039,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
 
         if ((body->oa.o_flags & OBD_BRW_MEMALLOC) &&
             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
-                libcfs_memory_pressure_set();
+                cfs_memory_pressure_set();
 
         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
                                         RCL_CLIENT) / sizeof(*ioo);
@@ -1347,7 +1347,7 @@ out:
                       exp->exp_connection->c_remote_uuid.uuid,
                       libcfs_id2str(req->rq_peer));
         }
-        libcfs_memory_pressure_clr();
+        cfs_memory_pressure_clr();
         RETURN(rc);
 }
 
index 508f0e2..537be4f 100644 (file)
@@ -529,6 +529,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
 {
         int rc;
         int rc2;
+        int mpflag = 0;
         struct ptlrpc_connection *connection;
         lnet_handle_me_t  reply_me_h;
         lnet_md_t         reply_md;
@@ -568,15 +569,18 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         if (request->rq_resend)
                 lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT);
 
+        if (request->rq_memalloc)
+                mpflag = cfs_memory_pressure_get_and_set();
+
         rc = sptlrpc_cli_wrap_request(request);
         if (rc)
-                RETURN(rc);
+                GOTO(out, rc);
 
         /* bulk register should be done after wrap_request() */
         if (request->rq_bulk != NULL) {
                 rc = ptlrpc_register_bulk (request);
                 if (rc != 0)
-                        RETURN(rc);
+                        GOTO(out, rc);
         }
 
         if (!noreply) {
@@ -681,11 +685,11 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                           request->rq_request_portal,
                           request->rq_xid, 0);
         if (rc == 0)
-                RETURN(rc);
+                GOTO(out, rc);
 
         ptlrpc_req_finished(request);
         if (noreply)
-                RETURN(rc);
+                GOTO(out, rc);
 
  cleanup_me:
         /* MEUnlink is safe; the PUT didn't even get off the ground, and
@@ -700,6 +704,9 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         /* We do sync unlink here as there was no real transfer here so
          * the chance to have long unlink to sluggish net is smaller here. */
         ptlrpc_unregister_bulk(request, 0);
+ out:
+        if (request->rq_memalloc)
+                cfs_memory_pressure_restore(mpflag);
         return rc;
 }