Whamcloud - gitweb
LU-5519 lfsck: repair master LMV for striped directory 47/11847/12
authorFan Yong <fan.yong@intel.com>
Wed, 27 Aug 2014 21:35:00 +0000 (05:35 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 31 Oct 2014 02:26:51 +0000 (02:26 +0000)
If the master MDT-object of a striped directory lost its LMV EA,
then there may be some users have created some files under the
master MDT-object directly. Under such case, the LFSCK cannot
re-generate LMV EA for the master MDT-object, because we should
keep the existing files to be visible to client. Then the LFSCK
will mark the striped directory as read-only and keep it there
to be handled by administrator manually.

If nobody has created files under the master MDT-object of the
striped directory, then we will set the master LMV EA and
generate a new rescan (the striped directory) request that will
be handled later by the LFSCK instance on the MDT later.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I4d604cbd346c2cb044503f193ab4745e7fd2c2a2
Reviewed-on: http://review.whamcloud.com/11847
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
24 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/obd_support.h
lustre/lfsck/lfsck_engine.c
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_lib.c
lustre/lfsck/lfsck_namespace.c
lustre/lfsck/lfsck_striped_dir.c
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdd/mdd_permission.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_quota.c
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_quota.c
lustre/osp/osp_internal.h
lustre/osp/osp_object.c
lustre/ptlrpc/wiretest.c
lustre/tests/sanity-lfsck.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 9d4663a..307b0a2 100644 (file)
@@ -2757,6 +2757,11 @@ struct lmv_mds_md_v1 {
 #define LMV_HASH_FLAG_DEAD     0x40000000
 #define LMV_HASH_FLAG_BAD_TYPE 0x20000000
 
+/* The striped directory has ever lost its master LMV EA, then LFSCK
+ * re-generated it. This flag is used to indicate such case. It is an
+ * on-disk flag. */
+#define LMV_HASH_FLAG_LOST_LMV 0x10000000
+
 /**
  * The FNV-1a hash algorithm is as follows:
  *     hash = FNV_offset_basis
@@ -3595,12 +3600,14 @@ enum lfsck_events {
        LE_CREATE_ORPHAN        = 12,
        LE_SKIP_NLINK_DECLARE   = 13,
        LE_SKIP_NLINK           = 14,
+       LE_SET_LMV_MASTER       = 15,
 };
 
 enum lfsck_event_flags {
        LEF_TO_OST              = 0x00000001,
        LEF_FROM_OST            = 0x00000002,
        LEF_SET_LMV_HASH        = 0x00000004,
+       LEF_SET_LMV_ALL         = 0x00000008,
 };
 
 static inline void lustre_set_wire_obdo(const struct obd_connect_data *ocd,
index ab48413..6a156ff 100644 (file)
@@ -537,6 +537,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LFSCK_LESS_NLINK      0x1626
 #define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627
 #define OBD_FAIL_LFSCK_BAD_NAME_HASH   0x1628
+#define OBD_FAIL_LFSCK_LOST_MASTER_LMV 0x1629
 
 #define OBD_FAIL_LFSCK_NOTIFY_NET      0x16f0
 #define OBD_FAIL_LFSCK_QUERY_NET       0x16f1
index 322c122..d53f0d3 100644 (file)
@@ -368,9 +368,8 @@ static void lfsck_fail(const struct lu_env *env, struct lfsck_instance *lfsck,
        }
 }
 
-static void lfsck_close_dir(const struct lu_env *env,
-                           struct lfsck_instance *lfsck,
-                           int result)
+void lfsck_close_dir(const struct lu_env *env,
+                    struct lfsck_instance *lfsck, int result)
 {
        struct lfsck_component *com;
        ENTRY;
@@ -407,8 +406,8 @@ static void lfsck_close_dir(const struct lu_env *env,
        EXIT;
 }
 
-static int lfsck_open_dir(const struct lu_env *env,
-                         struct lfsck_instance *lfsck, __u64 cookie)
+int lfsck_open_dir(const struct lu_env *env,
+                  struct lfsck_instance *lfsck, __u64 cookie)
 {
        struct dt_object        *obj    = lfsck->li_obj_dir;
        struct dt_it            *di     = lfsck->li_di_dir;
@@ -633,20 +632,46 @@ static int lfsck_exec_dir(const struct lu_env *env,
        return 0;
 }
 
+static int lfsck_master_dir_engine(const struct lu_env *env,
+                                  struct lfsck_instance *lfsck);
+
 static int lfsck_post(const struct lu_env *env, struct lfsck_instance *lfsck,
                      int result)
 {
        struct lfsck_component *com;
        struct lfsck_component *next;
-       int                     rc  = 0;
-       int                     rc1 = 0;
+       int                     rc  = result;
 
        lfsck_pos_fill(env, lfsck, &lfsck->li_pos_checkpoint, false);
        lfsck_close_dir(env, lfsck, result);
+
+       while (thread_is_running(&lfsck->li_thread) && rc > 0 &&
+              !list_empty(&lfsck->li_list_lmv)) {
+               struct lfsck_lmv_unit *llu;
+
+               spin_lock(&lfsck->li_lock);
+               llu = list_entry(lfsck->li_list_lmv.next,
+                                struct lfsck_lmv_unit, llu_link);
+               list_del_init(&llu->llu_link);
+               spin_unlock(&lfsck->li_lock);
+
+               lfsck->li_lmv = &llu->llu_lmv;
+               lfsck->li_obj_dir = lfsck_object_get(llu->llu_obj);
+               rc = lfsck_open_dir(env, lfsck, 0);
+               if (rc == 0) {
+                       rc = lfsck_master_dir_engine(env, lfsck);
+                       lfsck_close_dir(env, lfsck, result);
+               }
+       }
+
+       result = rc;
+
        list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
                rc = com->lc_ops->lfsck_post(env, com, result, false);
                if (rc != 0)
-                       rc1 = rc;
+                       CDEBUG(D_LFSCK, "%s: lfsck_post at the component %u: "
+                              "rc = %d\n", lfsck_lfsck2name(lfsck),
+                              (__u32)com->lc_type, rc);
        }
 
        lfsck->li_time_last_checkpoint = cfs_time_current();
@@ -893,6 +918,26 @@ static int lfsck_master_oit_engine(const struct lu_env *env,
                        RETURN(0);
 
                lfsck->li_current_oit_processed = 1;
+
+               if (!list_empty(&lfsck->li_list_lmv)) {
+                       struct lfsck_lmv_unit *llu;
+
+                       spin_lock(&lfsck->li_lock);
+                       llu = list_entry(lfsck->li_list_lmv.next,
+                                        struct lfsck_lmv_unit, llu_link);
+                       list_del_init(&llu->llu_link);
+                       spin_unlock(&lfsck->li_lock);
+
+                       lfsck->li_lmv = &llu->llu_lmv;
+                       lfsck->li_obj_dir = lfsck_object_get(llu->llu_obj);
+                       rc = lfsck_open_dir(env, lfsck, 0);
+                       if (rc == 0)
+                               rc = lfsck_master_dir_engine(env, lfsck);
+
+                       if (rc <= 0)
+                               RETURN(rc);
+               }
+
                lfsck->li_new_scanned++;
                lfsck->li_pos_current.lp_oit_cookie = iops->store(env, di);
                rc = iops->rec(env, di, (struct dt_rec *)fid, 0);
index 1f23337..5032d15 100644 (file)
@@ -519,6 +519,13 @@ struct lfsck_component {
 #define LFSCK_LMV_MAX_STRIPES  LMV_MAX_STRIPE_COUNT
 #define LFSCK_LMV_DEF_STRIPES  4
 
+/* When the namespace LFSCK scans a striped directory, it will record all
+ * the known shards' information in the structure "lfsck_slave_lmv_rec",
+ * including the shard's FID, index, slave LMV EA, and so on. Each shard
+ * will take one lfsck_slave_lmv_rec slot. After the 1st cycle scanning
+ * the striped directory, the LFSCK will get all the information about
+ * whether there are some inconsistency, and then it can repair them in
+ * the 2nd cycle scanning. */
 struct lfsck_slave_lmv_rec {
        struct lu_fid   lslr_fid;
        __u32           lslr_stripe_count;
@@ -546,6 +553,20 @@ struct lfsck_lmv {
        struct lfsck_slave_lmv_rec      *ll_lslr;
 };
 
+/* If the namespace LFSCK finds that the master MDT-object of a striped
+ * directory lost its master LMV EA, it will re-generate the master LMV
+ * EA and notify the LFSCK instance on the MDT on which the striped dir
+ * master MDT-object resides to rescan the striped directory. To do that,
+ * the notify handler will insert a "lfsck_lmv_unit" structure into the
+ * lfsck::li_list_lmv. The LFSCK instance will scan such list from time
+ * to time to check whether needs to rescan some stirped directories. */
+struct lfsck_lmv_unit {
+       struct list_head         llu_link;
+       struct lfsck_lmv         llu_lmv;
+       struct dt_object        *llu_obj;
+       struct lfsck_instance   *llu_lfsck;
+};
+
 struct lfsck_instance {
        struct mutex              li_mutex;
        spinlock_t                li_lock;
@@ -567,6 +588,9 @@ struct lfsck_instance {
        /* For the components those are not scanning now. */
        struct list_head          li_list_idle;
 
+       /* For the lfsck_lmv_unit to be handled. */
+       struct list_head          li_list_lmv;
+
        atomic_t                  li_ref;
        atomic_t                  li_double_scan_count;
        struct ptlrpc_thread      li_thread;
@@ -779,6 +803,7 @@ struct lfsck_thread_info {
        struct lfsck_start      lti_start;
        struct lfsck_stop       lti_stop;
        ldlm_policy_data_t      lti_policy;
+       struct ldlm_enqueue_info lti_einfo;
        struct ldlm_res_id      lti_resid;
        union {
                struct filter_fid_old   lti_old_pfid;
@@ -852,6 +877,10 @@ void lfsck_quit_generic(const struct lu_env *env,
 
 /* lfsck_engine.c */
 int lfsck_unpack_ent(struct lu_dirent *ent, __u64 *cookie, __u16 *type);
+void lfsck_close_dir(const struct lu_env *env,
+                    struct lfsck_instance *lfsck, int result);
+int lfsck_open_dir(const struct lu_env *env,
+                  struct lfsck_instance *lfsck, __u64 cookie);
 int lfsck_master_engine(void *args);
 int lfsck_assistant_engine(void *args);
 
@@ -924,6 +953,12 @@ int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
                                        struct lfsck_component *com,
                                        struct dt_object *obj,
                                        struct lfsck_lmv *llmv);
+int lfsck_namespace_scan_shard(const struct lu_env *env,
+                              struct lfsck_component *com,
+                              struct dt_object *child);
+int lfsck_namespace_notify_lmv_master_local(const struct lu_env *env,
+                                           struct lfsck_component *com,
+                                           struct dt_object *obj);
 int lfsck_namespace_repair_bad_name_hash(const struct lu_env *env,
                                         struct lfsck_component *com,
                                         struct dt_object *shard,
index 12dbeb5..42143f1 100644 (file)
@@ -391,10 +391,25 @@ int lfsck_ibits_lock(const struct lu_env *env, struct lfsck_instance *lfsck,
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = bits;
        fid_build_reg_res_name(lfsck_dto2fid(obj), resid);
-       rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid, LDLM_IBITS,
-                                   policy, mode, &flags, ldlm_blocking_ast,
-                                   ldlm_completion_ast, NULL, NULL, 0,
-                                   LVB_T_NONE, NULL, lh);
+       if (dt_object_remote(obj)) {
+               struct ldlm_enqueue_info *einfo = &info->lti_einfo;
+
+               memset(einfo, 0, sizeof(*einfo));
+               einfo->ei_type = LDLM_IBITS;
+               einfo->ei_mode = mode;
+               einfo->ei_cb_bl = ldlm_blocking_ast;
+               einfo->ei_cb_cp = ldlm_completion_ast;
+               einfo->ei_res_id = resid;
+
+               rc = dt_object_lock(env, obj, lh, einfo, policy);
+       } else {
+               rc = ldlm_cli_enqueue_local(lfsck->li_namespace, resid,
+                                           LDLM_IBITS, policy, mode,
+                                           &flags, ldlm_blocking_ast,
+                                           ldlm_completion_ast, NULL, NULL,
+                                           0, LVB_T_NONE, NULL, lh);
+       }
+
        if (rc == ELDLM_OK) {
                rc = 0;
        } else {
@@ -1423,6 +1438,9 @@ void lfsck_instance_cleanup(const struct lu_env *env,
        struct ptlrpc_thread    *thread = &lfsck->li_thread;
        struct lfsck_component  *com;
        struct lfsck_component  *next;
+       struct lfsck_lmv_unit   *llu;
+       struct lfsck_lmv_unit   *llu_next;
+       struct lfsck_lmv        *llmv;
        ENTRY;
 
        LASSERT(list_empty(&lfsck->li_link));
@@ -1434,6 +1452,17 @@ void lfsck_instance_cleanup(const struct lu_env *env,
        }
 
        LASSERT(lfsck->li_obj_dir == NULL);
+       LASSERT(lfsck->li_lmv == NULL);
+
+       list_for_each_entry_safe(llu, llu_next, &lfsck->li_list_lmv, llu_link) {
+               llmv = &llu->llu_lmv;
+
+               LASSERTF(atomic_read(&llmv->ll_ref) == 1,
+                        "still in using: %u\n",
+                        atomic_read(&llmv->ll_ref));
+
+               lfsck_lmv_put(env, llmv);
+       }
 
        list_for_each_entry_safe(com, next, &lfsck->li_list_scan, lc_link) {
                lfsck_component_cleanup(env, com);
@@ -2911,6 +2940,7 @@ int lfsck_in_notify(const struct lu_env *env, struct dt_device *key,
        case LE_CREATE_ORPHAN:
        case LE_SKIP_NLINK_DECLARE:
        case LE_SKIP_NLINK:
+       case LE_SET_LMV_MASTER:
        case LE_PAIRS_VERIFY: {
                struct lfsck_instance  *lfsck;
                struct lfsck_component *com;
@@ -3005,6 +3035,7 @@ int lfsck_register(const struct lu_env *env, struct dt_device *key,
        INIT_LIST_HEAD(&lfsck->li_list_dir);
        INIT_LIST_HEAD(&lfsck->li_list_double_scan);
        INIT_LIST_HEAD(&lfsck->li_list_idle);
+       INIT_LIST_HEAD(&lfsck->li_list_lmv);
        atomic_set(&lfsck->li_ref, 1);
        atomic_set(&lfsck->li_double_scan_count, 0);
        init_waitqueue_head(&lfsck->li_thread.t_ctl_waitq);
index f4308bc..cfbda5d 100644 (file)
@@ -3003,6 +3003,22 @@ static int lfsck_namespace_double_scan_dir(const struct lu_env *env,
 
        LASSERT(!dt_object_remote(child));
 
+       if (flags & LNTF_UNCERTAIN_LMV) {
+               if (flags & LNTF_RECHECK_NAME_HASH) {
+                       rc = lfsck_namespace_scan_shard(env, com, child);
+                       if (rc < 0)
+                               RETURN(rc);
+
+                       ns->ln_striped_shards_scanned++;
+               } else {
+                       ns->ln_striped_shards_skipped++;
+               }
+       }
+
+       flags &= ~(LNTF_RECHECK_NAME_HASH | LNTF_UNCERTAIN_LMV);
+       if (flags == 0)
+               RETURN(0);
+
        if (flags & (LNTF_CHECK_LINKEA | LNTF_CHECK_PARENT) &&
            !(lfsck->li_bookmark_ram.lb_param & LPF_ALL_TGT)) {
                CDEBUG(D_LFSCK, "%s: some MDT(s) maybe NOT take part in the"
@@ -3622,6 +3638,29 @@ static void lfsck_namespace_dump_statistics(struct seq_file *m,
                      time_phase2);
 }
 
+static void lfsck_namespace_release_lmv(const struct lu_env *env,
+                                       struct lfsck_component *com)
+{
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+
+       while (!list_empty(&lfsck->li_list_lmv)) {
+               struct lfsck_lmv_unit   *llu;
+               struct lfsck_lmv        *llmv;
+
+               llu = list_entry(lfsck->li_list_lmv.next,
+                                struct lfsck_lmv_unit, llu_link);
+               llmv = &llu->llu_lmv;
+
+               LASSERTF(atomic_read(&llmv->ll_ref) == 1,
+                        "still in using: %u\n",
+                        atomic_read(&llmv->ll_ref));
+
+               ns->ln_striped_dirs_skipped++;
+               lfsck_lmv_put(env, llmv);
+       }
+}
+
 /* namespace APIs */
 
 static int lfsck_namespace_reset(const struct lu_env *env,
@@ -4100,6 +4139,8 @@ static int lfsck_namespace_post(const struct lu_env *env,
        lfsck_post_generic(env, com, &result);
 
        down_write(&com->lc_sem);
+       lfsck_namespace_release_lmv(env, com);
+
        spin_lock(&lfsck->li_lock);
        if (!init)
                ns->ln_pos_last_checkpoint = lfsck->li_pos_checkpoint;
@@ -4348,6 +4389,7 @@ static void lfsck_namespace_data_release(const struct lu_env *env,
        LASSERT(list_empty(&lad->lad_req_list));
 
        com->lc_data = NULL;
+       lfsck_namespace_release_lmv(env, com);
 
        spin_lock(&ltds->ltd_lock);
        list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
@@ -4386,6 +4428,8 @@ static void lfsck_namespace_quit(const struct lu_env *env,
                thread_is_stopped(&lad->lad_thread));
        LASSERT(list_empty(&lad->lad_req_list));
 
+       lfsck_namespace_release_lmv(env, com);
+
        spin_lock(&ltds->ltd_lock);
        list_for_each_entry_safe(ltd, next, &lad->lad_mdt_phase1_list,
                                 ltd_namespace_phase_list) {
@@ -4521,6 +4565,19 @@ log:
 
                return 0;
        }
+       case LE_SET_LMV_MASTER: {
+               struct dt_object        *obj;
+
+               obj = lfsck_object_find_by_dev(env, lfsck->li_bottom,
+                                              &lr->lr_fid);
+               if (IS_ERR(obj))
+                       RETURN(PTR_ERR(obj));
+
+               rc = lfsck_namespace_notify_lmv_master_local(env, com, obj);
+               lfsck_object_put(env, obj);
+
+               RETURN(rc > 0 ? 0 : rc);
+       }
        case LE_PHASE1_DONE:
        case LE_PHASE2_DONE:
        case LE_PEER_EXIT:
@@ -5648,6 +5705,117 @@ out:
        lu_object_put(env, &parent->do_lu);
 }
 
+/**
+ * Rescan the striped directory after the master LMV EA reset.
+ *
+ * Sometimes, the master LMV EA of the striped directory maybe lost, so when
+ * the namespace LFSCK engine scan the striped directory for the first time,
+ * it will be reguarded as a normal directory. As the LFSCK processing, some
+ * other LFSCK instance on other MDT will find the shard of this striped dir,
+ * and find that the master MDT-object of the striped directory lost its LMV
+ * EA, then such remote LFSCK instance will regenerate the master LMV EA and
+ * notify the LFSCK instance on this MDT to rescan the striped directory.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] llu      the lfsck_lmv_unit that contains the striped directory
+ *                     to be rescanned.
+ *
+ * \retval             positive number for success
+ * \retval             0 for LFSCK stopped/paused
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_rescan_striped_dir(const struct lu_env *env,
+                                             struct lfsck_component *com,
+                                             struct lfsck_lmv_unit *llu)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_assistant_data     *lad    = com->lc_data;
+       struct dt_object                *dir;
+       const struct dt_it_ops          *iops;
+       struct dt_it                    *di;
+       struct lu_dirent                *ent    =
+                       (struct lu_dirent *)info->lti_key;
+       struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
+       struct ptlrpc_thread            *thread = &lfsck->li_thread;
+       struct lfsck_namespace_req      *lnr;
+       struct lfsck_assistant_req      *lar;
+       int                              rc;
+       __u16                            type;
+       ENTRY;
+
+       LASSERT(list_empty(&lad->lad_req_list));
+
+       lfsck->li_lmv = &llu->llu_lmv;
+       lfsck->li_obj_dir = lfsck_object_get(llu->llu_obj);
+       rc = lfsck_open_dir(env, lfsck, 0);
+       if (rc != 0)
+               RETURN(rc);
+
+       dir = lfsck->li_obj_dir;
+       di = lfsck->li_di_dir;
+       iops = &dir->do_index_ops->dio_it;
+       do {
+               rc = iops->rec(env, di, (struct dt_rec *)ent,
+                              lfsck->li_args_dir);
+               if (rc == 0)
+                       rc = lfsck_unpack_ent(ent, &lfsck->li_cookie_dir,
+                                             &type);
+
+               if (rc != 0) {
+                       if (bk->lb_param & LPF_FAILOUT)
+                               GOTO(out, rc);
+
+                       goto next;
+               }
+
+               if (ent->lde_attrs & LUDA_IGNORE &&
+                   strcmp(ent->lde_name, dotdot) != 0)
+                       goto next;
+
+               lnr = lfsck_namespace_assistant_req_init(lfsck, ent, type);
+               if (IS_ERR(lnr)) {
+                       if (bk->lb_param & LPF_FAILOUT)
+                               GOTO(out, rc = PTR_ERR(lnr));
+
+                       goto next;
+               }
+
+               lar = &lnr->lnr_lar;
+               rc = lfsck_namespace_assistant_handler_p1(env, com, lar);
+               lfsck_namespace_assistant_req_fini(env, lar);
+               if (rc != 0 && bk->lb_param & LPF_FAILOUT)
+                       GOTO(out, rc);
+
+               if (unlikely(!thread_is_running(thread)))
+                       GOTO(out, rc = 0);
+
+next:
+               rc = iops->next(env, di);
+       } while (rc == 0);
+
+out:
+       lfsck_close_dir(env, lfsck, rc);
+       if (rc <= 0)
+               RETURN(rc);
+
+       /* The close_dir() may insert a dummy lnr in the lad->lad_req_list. */
+       if (list_empty(&lad->lad_req_list))
+               RETURN(1);
+
+       spin_lock(&lad->lad_lock);
+       lar = list_entry(lad->lad_req_list.next, struct lfsck_assistant_req,
+                         lar_list);
+       list_del_init(&lar->lar_list);
+       spin_unlock(&lad->lad_lock);
+
+       rc = lfsck_namespace_assistant_handler_p1(env, com, lar);
+       lfsck_namespace_assistant_req_fini(env, lar);
+
+       RETURN(rc == 0 ? 1 : rc);
+}
+
 static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
                                                struct lfsck_component *com)
 {
@@ -5665,6 +5833,20 @@ static int lfsck_namespace_assistant_handler_p2(const struct lu_env *env,
        __u8                     flags  = 0;
        ENTRY;
 
+       while (!list_empty(&lfsck->li_list_lmv)) {
+               struct lfsck_lmv_unit *llu;
+
+               spin_lock(&lfsck->li_lock);
+               llu = list_entry(lfsck->li_list_lmv.next,
+                                struct lfsck_lmv_unit, llu_link);
+               list_del_init(&llu->llu_link);
+               spin_unlock(&lfsck->li_lock);
+
+               rc = lfsck_namespace_rescan_striped_dir(env, com, llu);
+               if (rc <= 0)
+                       RETURN(rc);
+       }
+
        CDEBUG(D_LFSCK, "%s: namespace LFSCK phase2 scan start\n",
               lfsck_lfsck2name(lfsck));
 
index c6da886..7a4c09f 100644 (file)
 void lfsck_lmv_put(const struct lu_env *env, struct lfsck_lmv *llmv)
 {
        if (llmv != NULL && atomic_dec_and_test(&llmv->ll_ref)) {
-               if (llmv->ll_lslr != NULL)
+               if (llmv->ll_inline) {
+                       struct lfsck_lmv_unit   *llu;
+                       struct lfsck_instance   *lfsck;
+
+                       llu = list_entry(llmv, struct lfsck_lmv_unit, llu_lmv);
+                       lfsck = llu->llu_lfsck;
+
+                       spin_lock(&lfsck->li_lock);
+                       list_del(&llu->llu_link);
+                       spin_unlock(&lfsck->li_lock);
+
+                       lfsck_object_put(env, llu->llu_obj);
+
+                       LASSERT(llmv->ll_lslr != NULL);
+
                        OBD_FREE_LARGE(llmv->ll_lslr,
-                               sizeof(struct lfsck_slave_lmv_rec) *
-                               llmv->ll_stripes_allocated);
+                                      sizeof(*llmv->ll_lslr) *
+                                      llmv->ll_stripes_allocated);
+                       OBD_FREE_PTR(llu);
+               } else {
+                       if (llmv->ll_lslr != NULL)
+                               OBD_FREE_LARGE(llmv->ll_lslr,
+                                       sizeof(*llmv->ll_lslr) *
+                                       llmv->ll_stripes_allocated);
+
+                       OBD_FREE_PTR(llmv);
+               }
+       }
+}
+
+/**
+ * Mark the specified directory as read-only by set LUSTRE_IMMUTABLE_FL.
+ *
+ * The caller has taken the ldlm lock on the @obj already.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the object to be handled
+ * \param[in] del_lmv  true if need to drop the LMV EA
+ *
+ * \retval             positive number if nothing to be done
+ * \retval             zero for succeed
+ * \retval             negative error number on failure
+ */
+static int lfsck_disable_master_lmv(const struct lu_env *env,
+                                   struct lfsck_component *com,
+                                   struct dt_object *obj, bool del_lmv)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_attr                  *la     = &info->lti_la;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct dt_device                *dev    = lfsck_obj2dt_dev(obj);
+       struct thandle                  *th     = NULL;
+       int                              rc     = 0;
+       ENTRY;
+
+       th = dt_trans_create(env, dev);
+       if (IS_ERR(th))
+               GOTO(log, rc = PTR_ERR(th));
+
+       if (del_lmv) {
+               rc = dt_declare_xattr_del(env, obj, XATTR_NAME_LMV, th);
+               if (rc != 0)
+                       GOTO(stop, rc);
+       }
+
+       la->la_valid = LA_FLAGS;
+       rc = dt_declare_attr_set(env, obj, la, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       rc = dt_trans_start_local(env, dev, th);
+       if (rc != 0)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, obj, 0);
+       if (unlikely(lfsck_is_dead_obj(obj)))
+               GOTO(unlock, rc = 1);
+
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               GOTO(unlock, rc = 0);
+
+       if (del_lmv) {
+               rc = dt_xattr_del(env, obj, XATTR_NAME_LMV, th, BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(unlock, rc);
+       }
+
+       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
+       if (rc == 0 && !(la->la_flags & LUSTRE_IMMUTABLE_FL)) {
+               la->la_valid = LA_FLAGS;
+               la->la_flags |= LUSTRE_IMMUTABLE_FL;
+               rc = dt_attr_set(env, obj, la, th, BYPASS_CAPA);
+       }
+
+       GOTO(unlock, rc);
 
-               OBD_FREE_PTR(llmv);
+unlock:
+       dt_write_unlock(env, obj);
+
+stop:
+       dt_trans_stop(env, dev, th);
+
+log:
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK set the master MDT-object of "
+              "the striped directory "DFID" as read-only: rc = %d\n",
+              lfsck_lfsck2name(lfsck), PFID(lfsck_dto2fid(obj)), rc);
+
+       if (rc <= 0) {
+               struct lfsck_namespace *ns = com->lc_file_ram;
+
+               ns->ln_flags |= LF_INCONSISTENT;
+               if (rc == 0)
+                       ns->ln_striped_dirs_disabled++;
        }
+
+       return rc;
 }
 
 static inline bool lfsck_is_valid_slave_lmv(struct lmv_mds_md_v1 *lmv)
@@ -411,8 +521,281 @@ log:
 }
 
 /**
+ * Check whether there are non-shard objects under the striped directory.
+ *
+ * If the master MDT-object of the striped directory lost its master LMV EA,
+ * then before the LFSCK repaired the striped directory, some ones may have
+ * created some non-shard objects under the master MDT-object. If such case
+ * happend, then the LFSCK cannot re-generate the lost master LMV EA to keep
+ * those non-shard objects to be visible to client.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the master MDT-object to be checked
+ * \param[in] cfid     the shard's FID used for verification
+ * \param[in] cidx     the shard's index used for verification
+ *
+ * \retval             positive number if not allow to re-generate LMV EA
+ * \retval             zero if allow to re-generate LMV EA
+ * \retval             negative error number on failure
+ */
+static int lfsck_allow_set_master_lmv(const struct lu_env *env,
+                                     struct lfsck_component *com,
+                                     struct dt_object *obj,
+                                     const struct lu_fid *cfid, __u32 cidx)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lu_fid                   *tfid   = &info->lti_fid3;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lu_dirent                *ent    =
+                       (struct lu_dirent *)info->lti_key;
+       const struct dt_it_ops          *iops;
+       struct dt_it                    *di;
+       __u64                            cookie;
+       __u32                            args;
+       int                              rc;
+       __u16                            type;
+       ENTRY;
+
+       if (unlikely(!dt_try_as_dir(env, obj)))
+               RETURN(-ENOTDIR);
+
+       /* Check whether the shard and the master MDT-object matches or not. */
+       snprintf(info->lti_tmpbuf, sizeof(info->lti_tmpbuf), DFID":%u",
+                PFID(cfid), cidx);
+       rc = dt_lookup(env, obj, (struct dt_rec *)tfid,
+                      (const struct dt_key *)info->lti_tmpbuf, BYPASS_CAPA);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (!lu_fid_eq(tfid, cfid))
+               RETURN(-ENOENT);
+
+       args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
+       iops = &obj->do_index_ops->dio_it;
+       di = iops->init(env, obj, args, BYPASS_CAPA);
+       if (IS_ERR(di))
+               RETURN(PTR_ERR(di));
+
+       rc = iops->load(env, di, 0);
+       if (rc == 0)
+               rc = iops->next(env, di);
+       else if (rc > 0)
+               rc = 0;
+
+       if (rc != 0)
+               GOTO(out, rc);
+
+       do {
+               rc = iops->rec(env, di, (struct dt_rec *)ent, args);
+               if (rc == 0)
+                       rc = lfsck_unpack_ent(ent, &cookie, &type);
+
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               /* skip dot and dotdot entries */
+               if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
+                       goto next;
+
+               /* If the subdir name does not match the shard name rule, then
+                * it is quite possible that it is NOT a shard, but created by
+                * someone after the master MDT-object lost the master LMV EA.
+                * But it is also possible that the subdir name entry crashed,
+                * under such double failure cases, the LFSCK cannot know how
+                * to repair the inconsistency. For data safe, the LFSCK will
+                * mark the master MDT-object as read-only. The administrator
+                * can fix the bad shard name manually, then run LFSCK again.
+                *
+                * XXX: If the subdir name matches the shard name rule, but it
+                *      is not a real shard of the striped directory, instead,
+                *      it was created by someone after the master MDT-object
+                *      lost the LMV EA, then re-generating the master LMV EA
+                *      will cause such subdir to be invisible to client, and
+                *      if its index occupies some lost shard index, then the
+                *      LFSCK will use it to replace the bad shard, and cause
+                *      the subdir (itself) to be invisible for ever. */
+               if (lfsck_shard_name_to_index(env, ent->lde_name,
+                               ent->lde_namelen, type, &ent->lde_fid) < 0)
+                       GOTO(out, rc = 1);
+
+next:
+               rc = iops->next(env, di);
+       } while (rc == 0);
+
+       GOTO(out, rc = 0);
+
+out:
+       iops->put(env, di);
+       iops->fini(env, di);
+
+       return rc;
+}
+
+/**
+ * Notify remote LFSCK instance that the object's LMV EA has been updated.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the object on which the LMV EA will be set
+ * \param[in] event    indicate either master or slave LMV EA has been updated
+ * \param[in] flags    indicate which element(s) in the LMV EA has been updated
+ * \param[in] index    the MDT index on which the LFSCK instance to be notified
+ *
+ * \retval             positive number if nothing to be done
+ * \retval             zero for succeed
+ * \retval             negative error number on failure
+ */
+static int lfsck_namespace_notify_lmv_remote(const struct lu_env *env,
+                                            struct lfsck_component *com,
+                                            struct dt_object *obj,
+                                            __u32 event, __u32 flags,
+                                            __u32 index)
+{
+       struct lfsck_request            *lr     = &lfsck_env_info(env)->lti_lr;
+       const struct lu_fid             *fid    = lfsck_dto2fid(obj);
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_tgt_desc           *ltd    = NULL;
+       struct ptlrpc_request           *req    = NULL;
+       int                              rc;
+       ENTRY;
+
+       ltd = lfsck_tgt_get(&lfsck->li_mdt_descs, index);
+       if (ltd == NULL)
+               GOTO(out, rc = -ENODEV);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(ltd->ltd_exp),
+                                  &RQF_LFSCK_NOTIFY);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_OBD_VERSION, LFSCK_NOTIFY);
+       if (rc != 0) {
+               ptlrpc_request_free(req);
+
+               GOTO(out, rc);
+       }
+
+       lr = req_capsule_client_get(&req->rq_pill, &RMF_LFSCK_REQUEST);
+       memset(lr, 0, sizeof(*lr));
+       lr->lr_event = event;
+       lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
+       lr->lr_active = LFSCK_TYPE_NAMESPACE;
+       lr->lr_fid = *fid;
+       lr->lr_flags = flags;
+
+       ptlrpc_request_set_replen(req);
+       rc = ptlrpc_queue_wait(req);
+       ptlrpc_req_finished(req);
+
+       GOTO(out, rc = (rc == -ENOENT ? 1 : rc));
+
+out:
+       CDEBUG(D_LFSCK, "%s: namespace LFSCK notify LMV EA updated for the "
+              "object "DFID" on MDT %x remotely with event %u, flags %u: "
+              "rc = %d\n", lfsck_lfsck2name(lfsck), PFID(fid), index,
+              event, flags, rc);
+
+       if (ltd != NULL)
+               lfsck_tgt_put(ltd);
+
+       return rc;
+}
+
+/**
+ * Generate request for local LFSCK instance to rescan the striped directory.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] obj      pointer to the striped directory to be rescanned
+ *
+ * \retval             positive number if nothing to be done
+ * \retval             zero for succeed
+ * \retval             negative error number on failure
+ */
+int lfsck_namespace_notify_lmv_master_local(const struct lu_env *env,
+                                           struct lfsck_component *com,
+                                           struct dt_object *obj)
+{
+       struct lfsck_instance      *lfsck = com->lc_lfsck;
+       struct lfsck_namespace     *ns    = com->lc_file_ram;
+       struct lmv_mds_md_v1       *lmv4  = &lfsck_env_info(env)->lti_lmv4;
+       struct lfsck_lmv_unit      *llu;
+       struct lfsck_lmv           *llmv;
+       struct lfsck_slave_lmv_rec *lslr;
+       int                         count = 0;
+       int                         rc;
+       ENTRY;
+
+       if (lfsck->li_bookmark_ram.lb_param & LPF_DRYRUN)
+               RETURN(0);
+
+       rc = lfsck_read_stripe_lmv(env, obj, lmv4);
+       if (rc != 0)
+               RETURN(rc);
+
+       OBD_ALLOC_PTR(llu);
+       if (unlikely(llu == NULL))
+               RETURN(-ENOMEM);
+
+       if (lmv4->lmv_stripe_count < 1)
+               count = LFSCK_LMV_DEF_STRIPES;
+       else if (lmv4->lmv_stripe_count > LFSCK_LMV_MAX_STRIPES)
+               count = LFSCK_LMV_MAX_STRIPES;
+       else
+               count = lmv4->lmv_stripe_count;
+
+       OBD_ALLOC_LARGE(lslr, sizeof(struct lfsck_slave_lmv_rec) * count);
+       if (lslr == NULL) {
+               OBD_FREE_PTR(llu);
+
+               RETURN(-ENOMEM);
+       }
+
+       INIT_LIST_HEAD(&llu->llu_link);
+       llu->llu_lfsck = lfsck;
+       llu->llu_obj = lfsck_object_get(obj);
+       llmv = &llu->llu_lmv;
+       llmv->ll_lmv_master = 1;
+       llmv->ll_inline = 1;
+       atomic_set(&llmv->ll_ref, 1);
+       llmv->ll_stripes_allocated = count;
+       llmv->ll_hash_type = LMV_HASH_TYPE_UNKNOWN;
+       llmv->ll_lslr = lslr;
+       llmv->ll_lmv = *lmv4;
+
+       down_write(&com->lc_sem);
+       if (ns->ln_status != LS_SCANNING_PHASE1 &&
+           ns->ln_status != LS_SCANNING_PHASE2) {
+               ns->ln_striped_dirs_skipped++;
+               up_write(&com->lc_sem);
+               lfsck_lmv_put(env, llmv);
+       } else {
+               ns->ln_striped_dirs_repaired++;
+               spin_lock(&lfsck->li_lock);
+               list_add_tail(&llu->llu_link, &lfsck->li_list_lmv);
+               spin_unlock(&lfsck->li_lock);
+               up_write(&com->lc_sem);
+       }
+
+       RETURN(0);
+}
+
+/**
  * Set master LMV EA for the specified striped directory.
  *
+ * First, if the master MDT-object of a striped directory lost its LMV EA,
+ * then there may be some users have created some files under the master
+ * MDT-object directly. Under such case, the LFSCK cannot re-generate LMV
+ * EA for the master MDT-object, because we should keep the existing files
+ * to be visible to client. Then the LFSCK will mark the striped directory
+ * as read-only and keep it there to be handled by administrator manually.
+ *
+ * If nobody has created files under the master MDT-object of the striped
+ * directory, then we will set the master LMV EA and generate a new rescan
+ * (the striped directory) request that will be handled later by the LFSCK
+ * instance on the MDT later.
+ *
  * \param[in] env      pointer to the thread context
  * \param[in] com      pointer to the lfsck component
  * \param[in] dir      pointer to the object on which the LMV EA will be set
@@ -463,8 +846,6 @@ static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
                pidx = lfsck_dev_idx(lfsck->li_bottom);
        }
 
-       /* XXX: it will be improved with subsequent patches landed. */
-
        rc = lfsck_ibits_lock(env, lfsck, obj, &lh,
                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
                              LCK_EX);
@@ -472,14 +853,48 @@ static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
                GOTO(log, rc);
 
        rc = lfsck_read_stripe_lmv(env, obj, lmv3);
-       if (rc != 0)
+       if (rc == -ENODATA) {
+               if (!(flags & LEF_SET_LMV_ALL))
+                       GOTO(log, rc);
+
+               *lmv3 = *lmv;
+       } else if (rc == 0) {
+               if (flags & LEF_SET_LMV_ALL)
+                       GOTO(log, rc = 1);
+
+               if (flags & LEF_SET_LMV_HASH)
+                       lmv3->lmv_hash_type = lmv->lmv_hash_type;
+       } else {
                GOTO(log, rc);
+       }
 
-       lmv3->lmv_hash_type = lmv->lmv_hash_type;
        lmv3->lmv_magic = LMV_MAGIC;
        lmv3->lmv_master_mdt_index = pidx;
 
+       if (flags & LEF_SET_LMV_ALL) {
+               rc = lfsck_allow_set_master_lmv(env, com, obj, cfid, cidx);
+               if (rc > 0) {
+                       rc = lfsck_disable_master_lmv(env, com, obj, false);
+
+                       GOTO(log, rc = (rc == 0 ? 1 : rc));
+               }
+
+               if (rc < 0)
+                       GOTO(log, rc);
+
+               /* To indicate that the master has ever lost LMV EA. */
+               lmv3->lmv_hash_type |= LMV_HASH_FLAG_LOST_LMV;
+       }
+
        rc = lfsck_namespace_update_lmv(env, com, obj, lmv3, true);
+       if (rc == 0 && flags & LEF_SET_LMV_ALL) {
+               if (dt_object_remote(obj))
+                       rc = lfsck_namespace_notify_lmv_remote(env, com, obj,
+                                               LE_SET_LMV_MASTER, 0, pidx);
+               else
+                       rc = lfsck_namespace_notify_lmv_master_local(env, com,
+                                                                    obj);
+       }
 
        GOTO(log, rc);
 
@@ -567,6 +982,147 @@ log:
 }
 
 /**
+ * Scan the shard of a striped directory for name hash verification.
+ *
+ * During the first-stage scanning, if the LFSCK cannot make sure whether
+ * the shard of a stripe directory contains valid slave LMV EA or not, then
+ * it will skip the name hash verification for this shard temporarily, and
+ * record the shard's FID in the LFSCK tracing file. As the LFSCK processing,
+ * the slave LMV EA may has been verified/fixed by LFSCK instance on master.
+ * Then in the second-stage scanning, the shard will be re-scanned, and for
+ * every name entry under the shard, the name hash will be verified, and for
+ * unmatched name entry, the LFSCK will try to fix it.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] com      pointer to the lfsck component
+ * \param[in] child    pointer to the directory object to be handled
+ *
+ * \retval             positive number for scanning successfully
+ * \retval             zero for the scanning is paused
+ * \retval             negative error number on failure
+ */
+int lfsck_namespace_scan_shard(const struct lu_env *env,
+                              struct lfsck_component *com,
+                              struct dt_object *child)
+{
+       struct lfsck_thread_info        *info   = lfsck_env_info(env);
+       struct lmv_mds_md_v1            *lmv    = &info->lti_lmv;
+       struct lfsck_instance           *lfsck  = com->lc_lfsck;
+       struct lfsck_namespace          *ns     = com->lc_file_ram;
+       struct ptlrpc_thread            *thread = &lfsck->li_thread;
+       struct lu_dirent                *ent    =
+                       (struct lu_dirent *)info->lti_key;
+       struct lfsck_bookmark           *bk     = &lfsck->li_bookmark_ram;
+       struct lfsck_lmv                *llmv   = NULL;
+       const struct dt_it_ops          *iops;
+       struct dt_it                    *di;
+       __u64                            cookie;
+       __u32                            args;
+       int                              rc;
+       __u16                            type;
+       ENTRY;
+
+       rc = lfsck_read_stripe_lmv(env, child, lmv);
+       if (rc != 0)
+               RETURN(rc == -ENODATA ? 1 : rc);
+
+       if (lmv->lmv_magic != LMV_MAGIC_STRIPE)
+               RETURN(1);
+
+       if (unlikely(!dt_try_as_dir(env, child)))
+               RETURN(-ENOTDIR);
+
+       OBD_ALLOC_PTR(llmv);
+       if (llmv == NULL)
+               RETURN(-ENOMEM);
+
+       llmv->ll_lmv_slave = 1;
+       llmv->ll_lmv_verified = 1;
+       llmv->ll_lmv = *lmv;
+       atomic_set(&llmv->ll_ref, 1);
+
+       args = lfsck->li_args_dir & ~(LUDA_VERIFY | LUDA_VERIFY_DRYRUN);
+       iops = &child->do_index_ops->dio_it;
+       di = iops->init(env, child, args, BYPASS_CAPA);
+       if (IS_ERR(di))
+               GOTO(out, rc = PTR_ERR(di));
+
+       rc = iops->load(env, di, 0);
+       if (rc == 0)
+               rc = iops->next(env, di);
+       else if (rc > 0)
+               rc = 0;
+
+       while (rc == 0) {
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DELAY3) &&
+                   cfs_fail_val > 0) {
+                       struct l_wait_info lwi;
+
+                       lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val),
+                                         NULL, NULL);
+                       l_wait_event(thread->t_ctl_waitq,
+                                    !thread_is_running(thread),
+                                    &lwi);
+
+                       if (unlikely(!thread_is_running(thread)))
+                               GOTO(out, rc = 0);
+               }
+
+               rc = iops->rec(env, di, (struct dt_rec *)ent, args);
+               if (rc == 0)
+                       rc = lfsck_unpack_ent(ent, &cookie, &type);
+
+               if (rc != 0) {
+                       if (bk->lb_param & LPF_FAILOUT)
+                               GOTO(out, rc);
+
+                       goto next;
+               }
+
+               /* skip dot and dotdot entries */
+               if (name_is_dot_or_dotdot(ent->lde_name, ent->lde_namelen))
+                       goto next;
+
+               if (!lfsck_is_valid_slave_name_entry(env, llmv, ent->lde_name,
+                                                    ent->lde_namelen)) {
+                       ns->ln_flags |= LF_INCONSISTENT;
+                       rc = lfsck_namespace_repair_bad_name_hash(env, com,
+                                               child, llmv, ent->lde_name);
+                       if (rc >= 0)
+                               ns->ln_name_hash_repaired++;
+               }
+
+               if (rc < 0 && bk->lb_param & LPF_FAILOUT)
+                       GOTO(out, rc);
+
+               /* Rate control. */
+               lfsck_control_speed(lfsck);
+               if (unlikely(!thread_is_running(thread)))
+                       GOTO(out, rc = 0);
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_FATAL2)) {
+                       spin_lock(&lfsck->li_lock);
+                       thread_set_flags(thread, SVC_STOPPING);
+                       spin_unlock(&lfsck->li_lock);
+
+                       GOTO(out, rc = -EINVAL);
+               }
+
+next:
+               rc = iops->next(env, di);
+       }
+
+       GOTO(out, rc);
+
+out:
+       iops->put(env, di);
+       iops->fini(env, di);
+       lfsck_lmv_put(env, llmv);
+
+       return rc;
+}
+
+/**
  * Verify the slave object's (of striped directory) LMV EA.
  *
  * For the slave object of a striped directory, before traversing the shard
@@ -630,9 +1186,12 @@ int lfsck_namespace_verify_stripe_slave(const struct lu_env *env,
                /* If the parent has no LMV EA, then it maybe because:
                 * 1) The parent lost the LMV EA.
                 * 2) The child claims a wrong (slave) LMV EA. */
-
-               /* XXX: to be improved. */
-               rc = 0;
+               if (rc == -ENODATA)
+                       rc = lfsck_namespace_set_lmv_master(env, com, parent,
+                                       clmv, cfid, clmv->lmv_master_mdt_index,
+                                       LEF_SET_LMV_ALL);
+               else
+                       rc = 0;
 
                rc1 = lfsck_namespace_trace_update(env, com, cfid,
                                                   LNTF_UNCERTAIN_LMV, true);
index c1e25bd..5e535f0 100644 (file)
@@ -2513,8 +2513,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                        GOTO(out, rc);
        }
 
-       rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
-                         fl, th, capa);
+       if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV))
+               rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf,
+                                 XATTR_NAME_LMV, fl, th, capa);
 
 out:
        if (slave_lmm != NULL)
index 8899fc5..d6bb84b 100644 (file)
@@ -380,17 +380,16 @@ int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
        if (mdd_is_dead_obj(pobj))
                RETURN(-ENOENT);
 
-       if ((attr->la_valid & LA_FLAGS) &&
-           (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
+       if (attr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
                RETURN(-EPERM);
 
        rc = mdd_permission_internal_locked(env, pobj, pattr,
                                            MAY_WRITE | MAY_EXEC,
                                            MOR_TGT_PARENT);
-       if (rc)
+       if (rc != 0)
                RETURN(rc);
 
-       if (mdd_is_append(pobj))
+       if (pattr->la_flags & LUSTRE_APPEND_FL)
                RETURN(-EPERM);
 
        RETURN(rc);
@@ -445,7 +444,7 @@ static int mdd_may_delete_entry(const struct lu_env *env,
                        RETURN(rc);
        }
 
-       if (mdd_is_append(pobj))
+       if (pattr->la_flags & LUSTRE_APPEND_FL)
                RETURN(-EPERM);
 
        RETURN(0);
@@ -482,11 +481,7 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
        if (mdd_is_sticky(env, tpobj, tpattr, tobj, tattr))
                RETURN(-EPERM);
 
-       if (mdd_is_immutable(tobj) || mdd_is_append(tobj))
-               RETURN(-EPERM);
-
-       if ((tattr->la_valid & LA_FLAGS) &&
-           (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL)))
+       if (tattr->la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
                RETURN(-EPERM);
 
        /* additional check the rename case */
@@ -546,11 +541,11 @@ static int mdd_link_sanity_check(const struct lu_env *env,
        if (rc < 0)
                RETURN(rc);
 
-        if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
-                RETURN(-EPERM);
+       if (cattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
+               RETURN(-EPERM);
 
-        if (S_ISDIR(mdd_object_type(src_obj)))
-                RETURN(-EPERM);
+       if (S_ISDIR(mdd_object_type(src_obj)))
+               RETURN(-EPERM);
 
        LASSERT(src_obj != tgt_obj);
        rc = mdd_may_create(env, tgt_obj, tattr, NULL, true);
@@ -3351,7 +3346,6 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
 
        la_flag->la_valid = LA_FLAGS;
        la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
-       mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
        rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
 
        return rc;
@@ -3461,7 +3455,6 @@ static int mdd_migrate_create(const struct lu_env *env,
         * flag and approve the migration */
        la_flag->la_valid = LA_FLAGS;
        la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
-       mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
        rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
                          mdd_object_capa(env, mdd_sobj));
 stop_trans:
@@ -3719,7 +3712,6 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env,
        /* Revert IMMUTABLE flag */
        la_flag->la_valid = LA_FLAGS;
        la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
-       mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
        rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
        if (rc != 0)
                return rc;
@@ -3838,7 +3830,6 @@ static int mdd_migrate_update_name(const struct lu_env *env,
        /* Revert IMMUTABLE flag */
        la_flag->la_valid = LA_FLAGS;
        la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
-       mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
        rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
                          mdd_object_capa(env, mdd_pobj));
        if (rc != 0)
@@ -3949,7 +3940,6 @@ static int mdd_migrate_sanity_check(const struct lu_env *env,
                        struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
 
                        sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
-                       sobj->mod_flags &= ~IMMUTE_OBJ;
                        CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
                               mdd2obd_dev(mdd)->obd_name,
                               PFID(mdd_object_fid(sobj)));
index eff2098..c6b76f5 100644 (file)
@@ -119,9 +119,7 @@ struct mdd_device {
 enum mod_flags {
        /* The dir object has been unlinked */
        DEAD_OBJ   = 1 << 0,
-       APPEND_OBJ = 1 << 1,
-       IMMUTE_OBJ = 1 << 2,
-       ORPHAN_OBJ = 1 << 3,
+       ORPHAN_OBJ = 1 << 1,
 };
 
 struct mdd_object {
@@ -173,7 +171,6 @@ extern const char orph_index_name[];
 
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
                struct lu_attr *la, struct lustre_capa *capa);
-void mdd_flags_xlate(struct mdd_object *obj, __u32 flags);
 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
                 struct md_attr *ma);
 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
@@ -437,21 +434,11 @@ static inline umode_t mdd_object_type(const struct mdd_object *obj)
         return lu_object_attr(&obj->mod_obj.mo_lu);
 }
 
-static inline int mdd_is_immutable(struct mdd_object *obj)
-{
-        return obj->mod_flags & IMMUTE_OBJ;
-}
-
 static inline int mdd_is_dead_obj(struct mdd_object *obj)
 {
         return obj && obj->mod_flags & DEAD_OBJ;
 }
 
-static inline int mdd_is_append(struct mdd_object *obj)
-{
-        return obj->mod_flags & APPEND_OBJ;
-}
-
 static inline int mdd_object_exists(struct mdd_object *obj)
 {
         return lu_object_exists(mdd2lu_obj(obj));
index b3c9e0f..6504da2 100644 (file)
@@ -74,17 +74,6 @@ int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
         return mdo_attr_get(env, obj, la, capa);
 }
 
-void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
-{
-        obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
-
-        if (flags & LUSTRE_APPEND_FL)
-                obj->mod_flags |= APPEND_OBJ;
-
-        if (flags & LUSTRE_IMMUTABLE_FL)
-                obj->mod_flags |= IMMUTE_OBJ;
-}
-
 struct mdd_thread_info *mdd_env_info(const struct lu_env *env)
 {
         struct mdd_thread_info *info;
@@ -167,8 +156,6 @@ static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
                struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
 
                rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
-               if (rc == 0)
-                       mdd_flags_xlate(mdd_obj, attr->la_flags);
        }
 
        return rc;
@@ -455,7 +442,8 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
 
        /* Check if flags change. */
        if (la->la_valid & LA_FLAGS) {
-               unsigned int oldflags = 0;
+               unsigned int oldflags = oattr->la_flags &
+                               (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
                unsigned int newflags = la->la_flags &
                                (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
 
@@ -463,12 +451,8 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                    !md_capable(uc, CFS_CAP_FOWNER))
                        RETURN(-EPERM);
 
-               /* XXX: the IMMUTABLE and APPEND_ONLY flags can
+               /* The IMMUTABLE and APPEND_ONLY flags can
                 * only be changed by the relevant capability. */
-               if (mdd_is_immutable(obj))
-                       oldflags |= LUSTRE_IMMUTABLE_FL;
-               if (mdd_is_append(obj))
-                       oldflags |= LUSTRE_APPEND_FL;
                if ((oldflags ^ newflags) &&
                    !md_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
                        RETURN(-EPERM);
@@ -477,7 +461,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                        la->la_flags &= ~LUSTRE_DIRSYNC_FL;
        }
 
-       if ((mdd_is_immutable(obj) || mdd_is_append(obj)) &&
+       if (oattr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL) &&
            (la->la_valid & ~LA_FLAGS) &&
            !(flags & MDS_PERM_BYPASS))
                RETURN(-EPERM);
@@ -875,13 +859,10 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
                       la->la_mtime, la->la_ctime);
 
        mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
-       if (la_copy->la_valid & LA_FLAGS) {
+       if (la_copy->la_valid & LA_FLAGS)
                rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
-               if (rc == 0)
-                       mdd_flags_xlate(mdd_obj, la_copy->la_flags);
-       } else if (la_copy->la_valid) { /* setattr */
+       else if (la_copy->la_valid) /* setattr */
                rc = mdd_attr_set_internal(env, mdd_obj, la_copy, handle, 1);
-       }
        mdd_write_unlock(env, mdd_obj);
 
        if (rc == 0)
@@ -901,7 +882,7 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
        struct lu_ucred *uc     = lu_ucred_assert(env);
        ENTRY;
 
-       if (mdd_is_immutable(obj) || mdd_is_append(obj))
+       if (attr->la_flags & (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL))
                RETURN(-EPERM);
 
        if ((uc->uc_fsuid != attr->la_uid) && !md_capable(uc, CFS_CAP_FOWNER))
@@ -1650,7 +1631,7 @@ static int mdd_open_sanity_check(const struct lu_env *env,
                flag &= ~MDS_OPEN_TRUNC;
 
        /* For writing append-only file must open it with append mode. */
-       if (mdd_is_append(obj)) {
+       if (attr->la_flags & LUSTRE_APPEND_FL) {
                if ((flag & FMODE_WRITE) && !(flag & MDS_OPEN_APPEND))
                        RETURN(-EPERM);
                if (flag & MDS_OPEN_TRUNC)
index 0bbd5e0..f137d41 100644 (file)
@@ -241,15 +241,15 @@ static int mdd_check_acl(const struct lu_env *env, struct mdd_object *obj,
 }
 
 int __mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj,
-                               const struct lu_attr *la, int mask, int role)
+                             const struct lu_attr *la, int mask, int role)
 {
        struct lu_ucred *uc = lu_ucred(env);
-        __u32 mode;
-        int rc;
-        ENTRY;
+       __u32 mode;
+       int rc;
+       ENTRY;
 
-        if (mask == 0)
-                RETURN(0);
+       if (mask == 0)
+               RETURN(0);
 
        /* These means unnecessary for permission check */
        if ((uc == NULL) || (uc->uc_valid == UCRED_INIT))
@@ -259,15 +259,15 @@ int __mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj,
        if (uc->uc_valid == UCRED_INVALID)
                RETURN(-EACCES);
 
-        /*
-         * Nobody gets write access to an immutable file.
-         */
-        if ((mask & MAY_WRITE) && mdd_is_immutable(obj))
-                RETURN(-EACCES);
+       /*
+        * Nobody gets write access to an immutable file.
+        */
+       if (mask & MAY_WRITE && la->la_flags & LUSTRE_IMMUTABLE_FL)
+               RETURN(-EACCES);
 
        LASSERT(la != NULL);
 
-        mode = la->la_mode;
+       mode = la->la_mode;
        if (uc->uc_fsuid == la->la_uid) {
                mode >>= 6;
         } else {
index 23ce9fb..14786ee 100644 (file)
@@ -4601,16 +4601,30 @@ static struct dt_it *osd_it_iam_init(const struct lu_env *env,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
                 return ERR_PTR(-EACCES);
 
-        it = &oti->oti_it;
-        ipd = osd_it_ipd_get(env, bag);
-        if (likely(ipd != NULL)) {
-                it->oi_obj = obj;
-                it->oi_ipd = ipd;
-                lu_object_get(lo);
-                iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
-                return (struct dt_it *)it;
-        }
-        return ERR_PTR(-ENOMEM);
+       if (oti->oti_it_inline) {
+               OBD_ALLOC_PTR(it);
+               if (it == NULL)
+                       return ERR_PTR(-ENOMEM);
+       } else {
+               it = &oti->oti_it;
+               oti->oti_it_inline = 1;
+       }
+
+       ipd = osd_it_ipd_get(env, bag);
+       if (likely(ipd != NULL)) {
+               it->oi_obj = obj;
+               it->oi_ipd = ipd;
+               lu_object_get(lo);
+               iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
+               return (struct dt_it *)it;
+       } else {
+               if (it != &oti->oti_it)
+                       OBD_FREE_PTR(it);
+               else
+                       oti->oti_it_inline = 0;
+
+               return ERR_PTR(-ENOMEM);
+       }
 }
 
 /**
@@ -4619,12 +4633,17 @@ static struct dt_it *osd_it_iam_init(const struct lu_env *env,
 
 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_it_iam *it = (struct osd_it_iam *)di;
-        struct osd_object *obj = it->oi_obj;
-
-        iam_it_fini(&it->oi_it);
-        osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
-        lu_object_put(env, &obj->oo_dt.do_lu);
+       struct osd_thread_info  *oti = osd_oti_get(env);
+       struct osd_it_iam       *it  = (struct osd_it_iam *)di;
+       struct osd_object       *obj = it->oi_obj;
+
+       iam_it_fini(&it->oi_it);
+       osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
+       lu_object_put(env, &obj->oo_dt.do_lu);
+       if (it != &oti->oti_it)
+               OBD_FREE_PTR(it);
+       else
+               oti->oti_it_inline = 0;
 }
 
 /**
@@ -4862,29 +4881,40 @@ static const struct dt_index_operations osd_index_iam_ops = {
  *
  */
 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
-                                    struct dt_object *dt,
-                                    __u32 attr,
-                                    struct lustre_capa *capa)
-{
-        struct osd_object       *obj  = osd_dt_obj(dt);
-        struct osd_thread_info  *info = osd_oti_get(env);
-        struct osd_it_ea        *it   = &info->oti_it_ea;
-       struct file             *file = &it->oie_file;
-        struct lu_object        *lo   = &dt->do_lu;
-        struct dentry           *obj_dentry = &info->oti_it_dentry;
-        ENTRY;
-        LASSERT(lu_object_exists(lo));
+                                   struct dt_object *dt,
+                                   __u32 attr,
+                                   struct lustre_capa *capa)
+{
+       struct osd_object       *obj  = osd_dt_obj(dt);
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct osd_it_ea        *it;
+       struct file             *file;
+       struct lu_object        *lo   = &dt->do_lu;
+       struct dentry           *obj_dentry = &info->oti_it_dentry;
+       ENTRY;
 
-        obj_dentry->d_inode = obj->oo_inode;
-        obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
-        obj_dentry->d_name.hash = 0;
+       LASSERT(lu_object_exists(lo));
 
-        it->oie_rd_dirent       = 0;
-        it->oie_it_dirent       = 0;
-        it->oie_dirent          = NULL;
-        it->oie_buf             = info->oti_it_ea_buf;
-        it->oie_obj             = obj;
+       if (info->oti_it_inline) {
+               OBD_ALLOC_PTR(it);
+               if (it == NULL)
+                       RETURN(ERR_PTR(-ENOMEM));
+       } else {
+               it = &info->oti_it_ea;
+               info->oti_it_inline = 1;
+       }
+
+       obj_dentry->d_inode = obj->oo_inode;
+       obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
+       obj_dentry->d_name.hash = 0;
+
+       it->oie_rd_dirent       = 0;
+       it->oie_it_dirent       = 0;
+       it->oie_dirent          = NULL;
+       it->oie_buf             = info->oti_it_ea_buf;
+       it->oie_obj             = obj;
 
+       file = &it->oie_file;
        /* Reset the "file" totally to avoid to reuse any old value from
         * former readdir handling, the "file->f_pos" should be zero. */
        memset(file, 0, sizeof(*file));
@@ -4909,14 +4939,19 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
  */
 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_it_ea     *it   = (struct osd_it_ea *)di;
-        struct osd_object    *obj  = it->oie_obj;
-        struct inode       *inode  = obj->oo_inode;
+        struct osd_thread_info  *info  = osd_oti_get(env);
+        struct osd_it_ea       *it     = (struct osd_it_ea *)di;
+        struct osd_object      *obj    = it->oie_obj;
+        struct inode           *inode  = obj->oo_inode;
 
         ENTRY;
         it->oie_file.f_op->release(inode, &it->oie_file);
         lu_object_put(env, &obj->oo_dt.do_lu);
-        EXIT;
+       if (it != &info->oti_it_ea)
+               OBD_FREE_PTR(it);
+       else
+               info->oti_it_inline = 0;
+       EXIT;
 }
 
 /**
index af14812..12e759f 100644 (file)
@@ -544,6 +544,7 @@ struct osd_thread_info {
 
        struct osd_idmap_cache oti_cache;
 
+       unsigned int           oti_it_inline:1;
         int                    oti_r_locks;
         int                    oti_w_locks;
         int                    oti_txns;
index 38cf125..37e8d9b 100644 (file)
@@ -166,7 +166,15 @@ static struct dt_it *osd_it_acct_init(const struct lu_env *env,
        if (info == NULL)
                RETURN(ERR_PTR(-ENOMEM));
 
-       it = &info->oti_it_quota;
+       if (info->oti_it_inline) {
+               OBD_ALLOC_PTR(it);
+               if (it == NULL)
+                       RETURN(ERR_PTR(-ENOMEM));
+       } else {
+               it = &info->oti_it_quota;
+               info->oti_it_inline = 1;
+       }
+
        memset(it, 0, sizeof(*it));
        lu_object_get(lo);
        it->oiq_obj = obj;
@@ -188,6 +196,7 @@ static struct dt_it *osd_it_acct_init(const struct lu_env *env,
  */
 static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
 {
+       struct osd_thread_info *info = osd_oti_get(env);
        struct osd_it_quota *it = (struct osd_it_quota *)di;
        struct osd_quota_leaf *leaf, *tmp;
        ENTRY;
@@ -198,6 +207,12 @@ static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
                list_del_init(&leaf->oql_link);
                OBD_FREE_PTR(leaf);
        }
+
+       if (it != &info->oti_it_quota)
+               OBD_FREE_PTR(it);
+       else
+               info->oti_it_inline = 0;
+
        EXIT;
 }
 
index 2f420a9..fd6f2de 100644 (file)
@@ -169,11 +169,24 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
        LASSERT(osd_object_is_zap(obj->oo_db));
        LASSERT(info);
 
-       it = &info->oti_it_zap;
+       if (info->oti_it_inline) {
+               OBD_ALLOC_PTR(it);
+               if (it == NULL)
+                       RETURN(ERR_PTR(-ENOMEM));
+       } else {
+               it = &info->oti_it_zap;
+               info->oti_it_inline = 1;
+       }
 
        rc = osd_obj_cursor_init(&it->ozi_zc, obj, 0);
-       if (rc != 0)
+       if (rc != 0) {
+               if (it != &info->oti_it_zap)
+                       OBD_FREE_PTR(it);
+               else
+                       info->oti_it_inline = 0;
+
                RETURN(ERR_PTR(rc));
+       }
 
        it->ozi_obj   = obj;
        it->ozi_capa  = capa;
@@ -185,8 +198,9 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env,
 
 static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
 {
-       struct osd_zap_it *it = (struct osd_zap_it *)di;
-       struct osd_object *obj;
+       struct osd_thread_info  *info   = osd_oti_get(env);
+       struct osd_zap_it       *it     = (struct osd_zap_it *)di;
+       struct osd_object       *obj;
        ENTRY;
 
        LASSERT(it);
@@ -196,6 +210,10 @@ static void osd_index_it_fini(const struct lu_env *env, struct dt_it *di)
 
        osd_zap_cursor_fini(it->ozi_zc);
        lu_object_put(env, &obj->oo_dt.do_lu);
+       if (it != &info->oti_it_zap)
+               OBD_FREE_PTR(it);
+       else
+               info->oti_it_inline = 0;
 
        EXIT;
 }
index 3807fe1..90c8366 100644 (file)
@@ -190,6 +190,7 @@ struct osd_thread_info {
 
        struct lquota_id_info    oti_qi;
        struct lu_seq_range      oti_seq_range;
+       unsigned int             oti_it_inline:1;
 };
 
 extern struct lu_context_key osd_key;
index ee2293f..efedc3a 100644 (file)
@@ -167,14 +167,28 @@ static struct dt_it *osd_it_acct_init(const struct lu_env *env,
        if (info == NULL)
                RETURN(ERR_PTR(-ENOMEM));
 
-       it = &info->oti_it_quota;
+       if (info->oti_it_inline) {
+               OBD_ALLOC_PTR(it);
+               if (it == NULL)
+                       RETURN(ERR_PTR(-ENOMEM));
+       } else {
+               it = &info->oti_it_quota;
+               info->oti_it_inline = 1;
+       }
+
        memset(it, 0, sizeof(*it));
        it->oiq_oid = osd_quota_fid2dmu(lu_object_fid(lo));
 
        /* initialize zap cursor */
        rc = osd_zap_cursor_init(&it->oiq_zc, osd->od_os, it->oiq_oid, 0);
-       if (rc)
+       if (rc != 0) {
+               if (it != &info->oti_it_quota)
+                       OBD_FREE_PTR(it);
+               else
+                       info->oti_it_inline = 0;
+
                RETURN(ERR_PTR(rc));
+       }
 
        /* take object reference */
        lu_object_get(lo);
@@ -191,10 +205,17 @@ static struct dt_it *osd_it_acct_init(const struct lu_env *env,
  */
 static void osd_it_acct_fini(const struct lu_env *env, struct dt_it *di)
 {
-       struct osd_it_quota *it = (struct osd_it_quota *)di;
+       struct osd_thread_info  *info   = osd_oti_get(env);
+       struct osd_it_quota     *it     = (struct osd_it_quota *)di;
        ENTRY;
+
        osd_zap_cursor_fini(it->oiq_zc);
        lu_object_put(env, &it->oiq_obj->oo_dt.do_lu);
+       if (it != &info->oti_it_quota)
+               OBD_FREE_PTR(it);
+       else
+               info->oti_it_inline = 0;
+
        EXIT;
 }
 
index 9441a5f..16bca57 100644 (file)
@@ -280,6 +280,7 @@ struct osp_thread_info {
 struct osp_it {
        __u32                     ooi_pos_page;
        __u32                     ooi_pos_lu_page;
+       __u32                     ooi_attr;
        int                       ooi_pos_ent;
        int                       ooi_total_npages;
        int                       ooi_valid_npages;
index ed26ee1..8d9f51a 100644 (file)
@@ -1724,6 +1724,7 @@ struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt,
 
        it->ooi_pos_ent = -1;
        it->ooi_obj = dt;
+       it->ooi_attr = attr;
 
        return (struct dt_it *)it;
 }
@@ -1819,15 +1820,16 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
                ii->ii_fid.f_oid = osp->opd_index;
                ii->ii_fid.f_ver = 0;
                ii->ii_flags = II_FL_NOHASH;
+               ii->ii_attrs = osp_dev2node(osp);
        } else {
                ii->ii_fid = *lu_object_fid(&it->ooi_obj->do_lu);
                ii->ii_flags = II_FL_NOHASH | II_FL_NOKEY | II_FL_VARKEY |
                               II_FL_VARREC;
+               ii->ii_attrs = it->ooi_attr;
        }
        ii->ii_magic = IDX_INFO_MAGIC;
        ii->ii_count = npages * LU_PAGE_COUNT;
        ii->ii_hash_start = it->ooi_next;
-       ii->ii_attrs = osp_dev2node(osp);
 
        ptlrpc_at_set_req_timeout(req);
 
@@ -1890,6 +1892,7 @@ out:
  * \param[in] env      pointer to the thread context
  * \param[in] di       pointer to the iteration structure
  *
+ * \retval             positive for end of the directory
  * \retval             0 for success
  * \retval             negative error number on failure
  */
@@ -1915,6 +1918,7 @@ again2:
                }
                it->ooi_cur_idxpage = NULL;
                it->ooi_pos_lu_page++;
+
 again1:
                if (it->ooi_pos_lu_page < LU_PAGE_COUNT) {
                        it->ooi_cur_idxpage = (void *)it->ooi_cur_page +
@@ -1987,6 +1991,7 @@ again0:
  * \param[in] env      pointer to the thread context
  * \param[in] di       pointer to the iteration structure
  *
+ * \retval             positive for end of the directory
  * \retval             0 for success
  * \retval             negative error number on failure
  */
index e75d0f0..c202ef7 100644 (file)
@@ -1562,6 +1562,7 @@ void lustre_assert_wire_constants(void)
        CLASSERT(LMV_HASH_FLAG_MIGRATION == 0x80000000);
        CLASSERT(LMV_HASH_FLAG_DEAD == 0x40000000);
        CLASSERT(LMV_HASH_FLAG_BAD_TYPE == 0x20000000);
+       CLASSERT(LMV_HASH_FLAG_LOST_LMV == 0x10000000);
 
        /* Checks for struct obd_statfs */
        LASSERTF((int)sizeof(struct obd_statfs) == 144, "found %lld\n",
@@ -4724,12 +4725,16 @@ void lustre_assert_wire_constants(void)
                 (long long)LE_SKIP_NLINK_DECLARE);
        LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
                 (long long)LE_SKIP_NLINK);
+       LASSERTF(LE_SET_LMV_MASTER == 15, "found %lld\n",
+                (long long)LE_SET_LMV_MASTER);
        LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)LEF_TO_OST);
        LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",
                (unsigned)LEF_FROM_OST);
        LASSERTF(LEF_SET_LMV_HASH == 0x00000004UL, "found 0x%.8xUL\n",
                (unsigned)LEF_SET_LMV_HASH);
+       LASSERTF(LEF_SET_LMV_ALL == 0x00000008UL, "found 0x%.8xUL\n",
+               (unsigned)LEF_SET_LMV_ALL);
 
        /* Checks for struct lfsck_reply */
        LASSERTF((int)sizeof(struct lfsck_reply) == 16, "found %lld\n",
index 150d1d2..1862039 100644 (file)
@@ -3834,6 +3834,115 @@ test_31b() {
 }
 run_test 31b "The LFSCK can find/repair the name entry with bad name hash (2)"
 
+test_31c() {
+       [ $MDSCOUNT -lt 2 ] &&
+               skip "The test needs at least 2 MDTs" && return
+
+       echo "#####"
+       echo "For some reason, the master MDT-object of the striped directory"
+       echo "may lost its master LMV EA. If nobody created files under the"
+       echo "master directly after the master LMV EA lost, then the LFSCK"
+       echo "should re-generate the master LMV EA."
+       echo "#####"
+
+       check_mount_and_prep
+
+       echo "Inject failure stub on MDT0 to simulate the case that the"
+       echo "master MDT-object of the striped directory lost the LMV EA."
+
+       #define OBD_FAIL_LFSCK_LOST_MASTER_LMV  0x1629
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1629
+       $LFS setdirstripe -i 0 -c $MDSCOUNT $DIR/$tdir/striped_dir ||
+               error "(1) Fail to create striped directory"
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
+
+       echo "Trigger namespace LFSCK to re-generate master LMV EA"
+       $START_NAMESPACE -r -A ||
+               error "(2) Fail to start LFSCK for namespace"
+
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(3) unexpected status"
+       }
+
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^striped_dirs_repaired/ { print $2 }')
+       [ $repaired -eq 1 ] ||
+               error "(4) Fail to re-generate master LMV EA: $repaired"
+
+       umount_client $MOUNT || error "(5) umount failed"
+       mount_client $MOUNT || error "(6) mount failed"
+
+       local empty=$(ls $DIR/$tdir/striped_dir/)
+       [ -z "$empty" ] || error "(7) The master LMV EA is not repaired: $empty"
+
+       rmdir $DIR/$tdir/striped_dir ||
+               error "(8) Fail to remove the striped directory after LFSCK"
+}
+run_test 31c "Re-generate the lost master LMV EA for striped directory"
+
+test_31d() {
+       [ $MDSCOUNT -lt 2 ] &&
+               skip "The test needs at least 2 MDTs" && return
+
+       echo "#####"
+       echo "For some reason, the master MDT-object of the striped directory"
+       echo "may lost its master LMV EA. If somebody created files under the"
+       echo "master directly after the master LMV EA lost, then the LFSCK"
+       echo "should NOT re-generate the master LMV EA, instead, it should"
+       echo "change the broken striped dirctory as read-only to prevent"
+       echo "further damage"
+       echo "#####"
+
+       check_mount_and_prep
+
+       echo "Inject failure stub on MDT0 to simulate the case that the"
+       echo "master MDT-object of the striped directory lost the LMV EA."
+
+       #define OBD_FAIL_LFSCK_LOST_MASTER_LMV  0x1629
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1629
+       $LFS setdirstripe -i 0 -c $MDSCOUNT $DIR/$tdir/striped_dir ||
+               error "(1) Fail to create striped directory"
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x0
+
+       umount_client $MOUNT || error "(2) umount failed"
+       mount_client $MOUNT || error "(3) mount failed"
+
+       touch $DIR/$tdir/striped_dir/dummy ||
+               error "(4) Fail to touch under broken striped directory"
+
+       echo "Trigger namespace LFSCK to find out the inconsistency"
+       $START_NAMESPACE -r -A ||
+               error "(5) Fail to start LFSCK for namespace"
+
+       wait_update_facet $SINGLEMDS "$LCTL get_param -n \
+               mdd.${MDT_DEV}.lfsck_namespace |
+               awk '/^status/ { print \\\$2 }'" "completed" 32 || {
+               $SHOW_NAMESPACE
+               error "(6) unexpected status"
+       }
+
+       local repaired=$($SHOW_NAMESPACE |
+                        awk '/^striped_dirs_repaired/ { print $2 }')
+       [ $repaired -eq 0 ] ||
+               error "(7) Re-generate master LMV EA unexpected: $repaired"
+
+       stat $DIR/$tdir/striped_dir/dummy ||
+               error "(8) Fail to stat $DIR/$tdir/striped_dir/dummy"
+
+       touch $DIR/$tdir/striped_dir/foo &&
+               error "(9) The broken striped directory should be read-only"
+
+       chattr -i $DIR/$tdir/striped_dir ||
+               error "(10) Fail to chattr on the broken striped directory"
+
+       rmdir $DIR/$tdir/striped_dir ||
+               error "(11) Fail to remove the striped directory after LFSCK"
+}
+run_test 31d "Set broken striped directory (modified after broken) as read-only"
+
 $LCTL set_param debug=-lfsck > /dev/null || true
 
 # restore MDS/OST size
index 0dfbe40..8870fa5 100644 (file)
@@ -712,6 +712,7 @@ check_lmv_mds_md_v1(void)
        CHECK_CDEFINE(LMV_HASH_FLAG_MIGRATION);
        CHECK_CDEFINE(LMV_HASH_FLAG_DEAD);
        CHECK_CDEFINE(LMV_HASH_FLAG_BAD_TYPE);
+       CHECK_CDEFINE(LMV_HASH_FLAG_LOST_LMV);
 }
 
 static void
@@ -2153,10 +2154,12 @@ static void check_lfsck_request(void)
        CHECK_VALUE(LE_CREATE_ORPHAN);
        CHECK_VALUE(LE_SKIP_NLINK_DECLARE);
        CHECK_VALUE(LE_SKIP_NLINK);
+       CHECK_VALUE(LE_SET_LMV_MASTER);
 
        CHECK_VALUE_X(LEF_TO_OST);
        CHECK_VALUE_X(LEF_FROM_OST);
        CHECK_VALUE_X(LEF_SET_LMV_HASH);
+       CHECK_VALUE_X(LEF_SET_LMV_ALL);
 }
 
 static void check_lfsck_reply(void)
index 5cbf8cc..0219eec 100644 (file)
@@ -1574,6 +1574,7 @@ void lustre_assert_wire_constants(void)
        CLASSERT(LMV_HASH_FLAG_MIGRATION == 0x80000000);
        CLASSERT(LMV_HASH_FLAG_DEAD == 0x40000000);
        CLASSERT(LMV_HASH_FLAG_BAD_TYPE == 0x20000000);
+       CLASSERT(LMV_HASH_FLAG_LOST_LMV == 0x10000000);
 
        /* Checks for struct obd_statfs */
        LASSERTF((int)sizeof(struct obd_statfs) == 144, "found %lld\n",
@@ -4736,12 +4737,16 @@ void lustre_assert_wire_constants(void)
                 (long long)LE_SKIP_NLINK_DECLARE);
        LASSERTF(LE_SKIP_NLINK == 14, "found %lld\n",
                 (long long)LE_SKIP_NLINK);
+       LASSERTF(LE_SET_LMV_MASTER == 15, "found %lld\n",
+                (long long)LE_SET_LMV_MASTER);
        LASSERTF(LEF_TO_OST == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)LEF_TO_OST);
        LASSERTF(LEF_FROM_OST == 0x00000002UL, "found 0x%.8xUL\n",
                (unsigned)LEF_FROM_OST);
        LASSERTF(LEF_SET_LMV_HASH == 0x00000004UL, "found 0x%.8xUL\n",
                (unsigned)LEF_SET_LMV_HASH);
+       LASSERTF(LEF_SET_LMV_ALL == 0x00000008UL, "found 0x%.8xUL\n",
+               (unsigned)LEF_SET_LMV_ALL);
 
        /* Checks for struct lfsck_reply */
        LASSERTF((int)sizeof(struct lfsck_reply) == 16, "found %lld\n",