Whamcloud - gitweb
LU-14739 quota: nodemap squashed root cannot bypass quota
[fs/lustre-release.git] / lustre / ofd / ofd_io.c
index 90d1807..c9811ff 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ofd/ofd_io.c
  *
@@ -147,6 +146,17 @@ static void ofd_inconsistency_verify_one(const struct lu_env *env,
        OBD_FREE_PTR(oii);
 }
 
+struct oivm_args {
+       struct ofd_device       *od_ofd;
+       struct lu_env           od_env;
+       struct lfsck_req_local  od_lrl;
+       struct completion       *od_started;
+};
+
+#ifndef TASK_IDLE
+#define TASK_IDLE TASK_INTERRUPTIBLE
+#endif
+
 /**
  * Verification thread to check parent FID consistency.
  *
@@ -158,52 +168,39 @@ static void ofd_inconsistency_verify_one(const struct lu_env *env,
  * \retval             0 on successful thread termination
  * \retval             negative value if thread can't start
  */
-static int ofd_inconsistency_verification_main(void *args)
+static int ofd_inconsistency_verification_main(void *_args)
 {
-       struct lu_env env;
-       struct ofd_device *ofd = args;
-       struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+       struct oivm_args *args = _args;
+       struct lu_env *env = &args->od_env;
+       struct ofd_device *ofd = args->od_ofd;
        struct ofd_inconsistency_item *oii;
-       struct lfsck_req_local *lrl = NULL;
-       int rc;
+       struct lfsck_req_local *lrl = &args->od_lrl;
        ENTRY;
 
-       rc = lu_env_init(&env, LCT_DT_THREAD);
-       spin_lock(&ofd->ofd_inconsistency_lock);
-       thread_set_flags(thread, rc ? SVC_STOPPED : SVC_RUNNING);
-       wake_up_all(&thread->t_ctl_waitq);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       if (rc)
-               RETURN(rc);
-
-       OBD_ALLOC_PTR(lrl);
-       if (unlikely(!lrl))
-               GOTO(out_unlocked, rc = -ENOMEM);
-
        lrl->lrl_event = LEL_PAIRS_VERIFY_LOCAL;
        lrl->lrl_active = LFSCK_TYPE_LAYOUT;
+       complete(args->od_started);
 
        spin_lock(&ofd->ofd_inconsistency_lock);
-       while (1) {
-               if (unlikely(!thread_is_running(thread)))
-                       break;
+       while (({set_current_state(TASK_IDLE);
+                !kthread_should_stop(); })) {
 
                while (!list_empty(&ofd->ofd_inconsistency_list)) {
+                       __set_current_state(TASK_RUNNING);
                        oii = list_entry(ofd->ofd_inconsistency_list.next,
                                         struct ofd_inconsistency_item,
                                         oii_list);
                        list_del_init(&oii->oii_list);
                        spin_unlock(&ofd->ofd_inconsistency_lock);
-                       ofd_inconsistency_verify_one(&env, ofd, oii, lrl);
+                       ofd_inconsistency_verify_one(env, ofd, oii, lrl);
                        spin_lock(&ofd->ofd_inconsistency_lock);
                }
 
                spin_unlock(&ofd->ofd_inconsistency_lock);
-               wait_event_idle(thread->t_ctl_waitq,
-                               !list_empty(&ofd->ofd_inconsistency_list) ||
-                               !thread_is_running(thread));
+               schedule();
                spin_lock(&ofd->ofd_inconsistency_lock);
        }
+       __set_current_state(TASK_RUNNING);
 
        while (!list_empty(&ofd->ofd_inconsistency_list)) {
                struct ofd_object *fo;
@@ -215,28 +212,20 @@ static int ofd_inconsistency_verification_main(void *args)
                fo = oii->oii_obj;
                spin_unlock(&ofd->ofd_inconsistency_lock);
 
-               ofd_write_lock(&env, fo);
+               ofd_write_lock(env, fo);
                fo->ofo_pfid_checking = 0;
-               ofd_write_unlock(&env, fo);
+               ofd_write_unlock(env, fo);
 
-               ofd_object_put(&env, fo);
+               ofd_object_put(env, fo);
                OBD_FREE_PTR(oii);
                spin_lock(&ofd->ofd_inconsistency_lock);
        }
 
-       OBD_FREE_PTR(lrl);
-
-       GOTO(out, rc = 0);
-
-out_unlocked:
-       spin_lock(&ofd->ofd_inconsistency_lock);
-out:
-       thread_set_flags(thread, SVC_STOPPED);
        spin_unlock(&ofd->ofd_inconsistency_lock);
-       wake_up_all(&thread->t_ctl_waitq);
-       lu_env_fini(&env);
 
-       return rc;
+       lu_env_fini(&args->od_env);
+       OBD_FREE_PTR(args);
+       return 0;
 }
 
 /**
@@ -251,30 +240,50 @@ out:
  */
 int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
 {
-       struct ptlrpc_thread    *thread = &ofd->ofd_inconsistency_thread;
        struct task_struct      *task;
+       struct oivm_args        *args;
+       DECLARE_COMPLETION_ONSTACK(started);
        int                      rc;
 
-       spin_lock(&ofd->ofd_inconsistency_lock);
-       if (unlikely(thread_is_running(thread))) {
-               spin_unlock(&ofd->ofd_inconsistency_lock);
-
+       if (ofd->ofd_inconsistency_task)
                return -EALREADY;
+
+       OBD_ALLOC_PTR(args);
+       if (!args)
+               return -ENOMEM;
+       rc = lu_env_init(&args->od_env, LCT_DT_THREAD);
+       if (rc) {
+               OBD_FREE_PTR(args);
+               return rc;
        }
 
-       thread_set_flags(thread, 0);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       task = kthread_run(ofd_inconsistency_verification_main, ofd,
-                          "inconsistency_verification");
+       args->od_ofd = ofd;
+       args->od_started = &started;
+       task = kthread_create(ofd_inconsistency_verification_main, args,
+                             "inconsistency_verification");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
                CERROR("%s: cannot start self_repair thread: rc = %d\n",
                       ofd_name(ofd), rc);
        } else {
                rc = 0;
-               wait_event_idle(thread->t_ctl_waitq,
-                               thread_is_running(thread) ||
-                               thread_is_stopped(thread));
+               spin_lock(&ofd->ofd_inconsistency_lock);
+               if (ofd->ofd_inconsistency_task)
+                       rc = -EALREADY;
+               else
+                       ofd->ofd_inconsistency_task = task;
+               spin_unlock(&ofd->ofd_inconsistency_lock);
+
+               if (rc)
+                       kthread_stop(task);
+               else {
+                       wake_up_process(task);
+                       wait_for_completion(&started);
+               }
+       }
+       if (rc) {
+               lu_env_fini(&args->od_env);
+               OBD_FREE_PTR(args);
        }
 
        return rc;
@@ -290,20 +299,16 @@ int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
  */
 int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd)
 {
-       struct ptlrpc_thread    *thread = &ofd->ofd_inconsistency_thread;
+       struct task_struct *task;
 
        spin_lock(&ofd->ofd_inconsistency_lock);
-       if (thread_is_init(thread) || thread_is_stopped(thread)) {
-               spin_unlock(&ofd->ofd_inconsistency_lock);
+       task = ofd->ofd_inconsistency_task;
+       ofd->ofd_inconsistency_task = NULL;
+       spin_unlock(&ofd->ofd_inconsistency_lock);
 
+       if (!task)
                return -EALREADY;
-       }
-
-       thread_set_flags(thread, SVC_STOPPING);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       wake_up_all(&thread->t_ctl_waitq);
-       wait_event_idle(thread->t_ctl_waitq,
-                       thread_is_stopped(thread));
+       kthread_stop(task);
 
        return 0;
 }
@@ -351,9 +356,9 @@ static void ofd_add_inconsistency_item(const struct lu_env *env,
        if (list_empty(&ofd->ofd_inconsistency_list))
                wakeup = true;
        list_add_tail(&oii->oii_list, &ofd->ofd_inconsistency_list);
+       if (wakeup && ofd->ofd_inconsistency_task)
+               wake_up_process(ofd->ofd_inconsistency_task);
        spin_unlock(&ofd->ofd_inconsistency_lock);
-       if (wakeup)
-               wake_up_all(&ofd->ofd_inconsistency_thread.t_ctl_waitq);
 
        /* XXX: When the found inconsistency exceeds some threshold,
         *      we can trigger the LFSCK to scan part of the system
@@ -688,14 +693,15 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
        int maxlnb = *nr_local;
        __u64 begin, end;
        ktime_t kstart = ktime_get();
+       struct range_lock *range = &ofd_info(env)->fti_write_range;
 
        ENTRY;
        LASSERT(env != NULL);
        LASSERT(objcount == 1);
 
        if (unlikely(exp->exp_obd->obd_recovering)) {
-               u64 seq = fid_seq(fid);
-               u64 oid = fid_oid(fid);
+               u64 seq = ostid_seq(&oa->o_oi);
+               u64 oid = ostid_id(&oa->o_oi);
                struct ofd_seq *oseq;
 
                oseq = ofd_seq_load(env, ofd, seq);
@@ -842,6 +848,26 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                obj->ioo_bufcnt,
                WRITE);
 
+       /*
+        * Reordering precautions: make sure that request processing that
+        * was able to receive its bulk data should not get reordered with
+        * overlapping BRW requests, e.g.
+        *  1) BRW1 sent, bulk data received, but disk I/O delayed
+        *  2) BRW1 resent and fully processed
+        *  3) the page was unlocked on the client and its writeback bit reset
+        *  4) BRW2 sent and fully processed
+        *  5) BRW1 processing wakes up and writes stale data to disk
+        * If on step 1 bulk data was not received, client resend will invalidate
+        * its bulk descriptor and the RPC will be dropped due to failed bulk
+        * transfer, which is just fine.
+        */
+       range_lock_init(range,
+                       rnb[0].rnb_offset,
+                       rnb[obj->ioo_bufcnt - 1].rnb_offset +
+                       rnb[obj->ioo_bufcnt - 1].rnb_len - 1);
+       range_lock(&fo->ofo_write_tree, range);
+       ofd_info(env)->fti_range_locked = 1;
+
        ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE_BYTES, jobid, tot_bytes);
        ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid,
                         ktime_us_delta(ktime_get(), kstart));
@@ -1220,6 +1246,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
        bool soft_sync = false;
        bool cb_registered = false;
        bool fake_write = false;
+       struct range_lock *range = &ofd_info(env)->fti_write_range;
 
        ENTRY;
 
@@ -1384,6 +1411,10 @@ out_stop:
                dt_commit_async(env, ofd->ofd_osd);
 
 out:
+       if (info->fti_range_locked) {
+               range_unlock(&fo->ofo_write_tree, range);
+               info->fti_range_locked = 0;
+       }
        dt_bufs_put(env, o, lnb, niocount);
        ofd_object_put(env, fo);
        if (granted > 0)
@@ -1430,6 +1461,26 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
 
        if (cmd == OBD_BRW_WRITE) {
                struct lu_nodemap *nodemap;
+               __u32 mapped_uid, mapped_gid;
+
+               nodemap = nodemap_get_from_exp(exp);
+               mapped_uid = nodemap_map_id(nodemap, NODEMAP_UID,
+                                           NODEMAP_FS_TO_CLIENT,
+                                           oa->o_uid);
+               mapped_gid = nodemap_map_id(nodemap, NODEMAP_GID,
+                                           NODEMAP_FS_TO_CLIENT,
+                                           oa->o_gid);
+
+               if (!IS_ERR_OR_NULL(nodemap)) {
+                       /* do not bypass quota enforcement if squashed uid */
+                       if (unlikely(mapped_uid == nodemap->nm_squash_uid)) {
+                               int idx;
+
+                               for (idx = 0; idx < npages; idx++)
+                                       lnb[idx].lnb_flags &= ~OBD_BRW_NOQUOTA;
+                       }
+                       nodemap_putref(nodemap);
+               }
 
                valid = OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLPROJID |
                        OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
@@ -1494,16 +1545,8 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                /* Convert back to client IDs. LU-9671.
                 * nodemap_get_from_exp() may fail due to nodemap deactivated,
                 * server ID will be returned back to client in that case. */
-               nodemap = nodemap_get_from_exp(exp);
-               if (nodemap != NULL && !IS_ERR(nodemap)) {
-                       oa->o_uid = nodemap_map_id(nodemap, NODEMAP_UID,
-                                                  NODEMAP_FS_TO_CLIENT,
-                                                  oa->o_uid);
-                       oa->o_gid = nodemap_map_id(nodemap, NODEMAP_GID,
-                                                  NODEMAP_FS_TO_CLIENT,
-                                                  oa->o_gid);
-                       nodemap_putref(nodemap);
-               }
+               oa->o_uid = mapped_uid;
+               oa->o_gid = mapped_gid;
        } else if (cmd == OBD_BRW_READ) {
                rc = ofd_commitrw_read(env, ofd, fid, objcount,
                                       npages, lnb);