Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / osc / osc_page.c
index d42e4f9..5f5179c 100644 (file)
@@ -75,6 +75,10 @@ static int osc_page_is_dlocked(const struct lu_env *env,
                               dlmmode, &flags, NULL, lockh, unref);
 }
 
+/**
+ * Checks an invariant that a page in the cache is covered by a lock, as
+ * needed.
+ */
 static int osc_page_protected(const struct lu_env *env,
                               const struct osc_page *opg,
                               enum cl_lock_mode mode, int unref)
@@ -87,11 +91,20 @@ static int osc_page_protected(const struct lu_env *env,
 
         LINVRNT(!opg->ops_temp);
 
+        page = opg->ops_cl.cpl_page;
+        if (page->cp_owner != NULL &&
+            cl_io_top(page->cp_owner)->ci_lockreq == CILR_NEVER)
+                /*
+                 * If IO is done without locks (liblustre, or lloop), lock is
+                 * not required.
+                 */
+                result = 1;
+        else
+                /* otherwise check for a DLM lock */
         result = osc_page_is_dlocked(env, opg, mode, 1, unref);
         if (result == 0) {
                 /* maybe this page is a part of a lockless io? */
                 hdr = cl_object_header(opg->ops_cl.cpl_obj);
-                page = opg->ops_cl.cpl_page;
                 descr = &osc_env_info(env)->oti_descr;
                 descr->cld_mode = mode;
                 descr->cld_start = page->cp_index;
@@ -164,6 +177,8 @@ static void osc_page_transfer_add(const struct lu_env *env,
 {
         struct osc_object *obj;
 
+        LINVRNT(cl_page_is_vmlocked(env, opg->ops_cl.cpl_page));
+
         obj = cl2osc(opg->ops_cl.cpl_obj);
         spin_lock(&obj->oo_seatbelt);
         list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
@@ -180,16 +195,22 @@ static int osc_page_cache_add(const struct lu_env *env,
         struct osc_io     *oio = osc_env_io(env);
         int result;
         int brw_flags;
+        int noquota = 0;
 
         LINVRNT(osc_page_protected(env, opg, CLM_WRITE, 0));
         ENTRY;
 
         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
-        brw_flags = oio->oi_lockless ? OBD_BRW_SRVLOCK : 0;
+        brw_flags = osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0;
+        if (!client_is_remote(osc_export(obj)) &&
+            cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+                brw_flags |= OBD_BRW_NOQUOTA;
+                noquota = OBD_BRW_NOQUOTA;
+        }
 
         osc_page_transfer_get(opg, "transfer\0cache");
         result = osc_queue_async_io(env, osc_export(obj), NULL, obj->oo_oinfo,
-                                    &opg->ops_oap, OBD_BRW_WRITE,
+                                    &opg->ops_oap, OBD_BRW_WRITE | noquota,
                                     0, 0, brw_flags, 0);
         if (result != 0)
                 osc_page_transfer_put(env, opg);
@@ -389,6 +410,7 @@ static int osc_completion(const struct lu_env *env,
         enum cl_req_type crt;
 
         LINVRNT(osc_page_protected(env, opg, CLM_READ, 1));
+        LINVRNT(cl_page_is_vmlocked(env, page));
 
         ENTRY;
 
@@ -461,13 +483,12 @@ struct cl_page *osc_page_init(const struct lu_env *env,
         struct osc_page   *opg;
         int result;
 
-        OBD_SLAB_ALLOC_PTR(opg, osc_page_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(opg, osc_page_kmem, CFS_ALLOC_IO);
         if (opg != NULL) {
                 void *oap = &opg->ops_oap;
 
                 opg->ops_from = 0;
                 opg->ops_to   = CFS_PAGE_SIZE;
-                opg->ops_ignore_quota = !!cfs_capable(CFS_CAP_SYS_RESOURCE);
 
                 result = osc_prep_async_page(osc_export(osc),
                                              NULL, osc->oo_oinfo, vmpage,
@@ -490,6 +511,10 @@ struct cl_page *osc_page_init(const struct lu_env *env,
         return ERR_PTR(result);
 }
 
+/**
+ * Helper function called by osc_io_submit() for every page in an immediate
+ * transfer (i.e., transferred synchronously).
+ */
 void osc_io_submit_page(const struct lu_env *env,
                         struct osc_io *oio, struct osc_page *opg,
                         enum cl_req_type crt)
@@ -500,9 +525,18 @@ void osc_io_submit_page(const struct lu_env *env,
         LINVRNT(osc_page_protected(env, opg,
                                    crt == CRT_WRITE ? CLM_WRITE : CLM_READ, 1));
 
+        oap->oap_page_off   = opg->ops_from;
+        oap->oap_count      = opg->ops_to - opg->ops_from;
+        oap->oap_brw_flags |= OBD_BRW_SYNC;
+        if (osc_io_srvlock(oio))
+                oap->oap_brw_flags |= OBD_BRW_SRVLOCK;
+
         oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
-        if (opg->ops_ignore_quota)
+        if (!client_is_remote(osc_export(cl2osc(opg->ops_cl.cpl_obj))) &&
+            cfs_capable(CFS_CAP_SYS_RESOURCE)) {
+                oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
                 oap->oap_cmd |= OBD_BRW_NOQUOTA;
+        }
 
         oap->oap_async_flags |= OSC_FLAGS;
         if (oap->oap_cmd & OBD_BRW_READ)
@@ -510,10 +544,6 @@ void osc_io_submit_page(const struct lu_env *env,
         else if (!(oap->oap_brw_page.flag & OBD_BRW_FROM_GRANT))
                 osc_enter_cache_try(env, cli, oap->oap_loi, oap, 1);
 
-        oap->oap_page_off   = opg->ops_from;
-        oap->oap_count      = opg->ops_to - opg->ops_from;
-        oap->oap_brw_flags |= oio->oi_lockless ? OBD_BRW_SRVLOCK : 0;
-
         osc_oap_to_pending(oap);
         osc_page_transfer_get(opg, "transfer\0imm");
         osc_page_transfer_add(env, opg, crt);